This is an automated email from the ASF dual-hosted git repository. jgresock pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push: new 405934d NIFI-9571 Corrected Session commit handling in PutTCP 405934d is described below commit 405934dcd216db24a4d1009dc8e8da5f76bf165d Author: exceptionfactory <exceptionfact...@apache.org> AuthorDate: Thu Jan 13 13:15:50 2022 -0600 NIFI-9571 Corrected Session commit handling in PutTCP - Added generic type to AbstractPutEventProcessor for compiler checking of event types - Refactored createTransitUri to shared method in AbstractPutEventProcessor Signed-off-by: Joe Gresock <jgres...@gmail.com> This closes #5658. --- .../util/put/AbstractPutEventProcessor.java | 29 ++++++-------- .../apache/nifi/processors/splunk/PutSplunk.java | 39 +++++++++---------- .../apache/nifi/processors/standard/PutTCP.java | 44 ++++++++-------------- .../apache/nifi/processors/standard/PutUDP.java | 43 ++++++++------------- 4 files changed, 63 insertions(+), 92 deletions(-) diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/put/AbstractPutEventProcessor.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/put/AbstractPutEventProcessor.java index 659e3df..567f8fd 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/put/AbstractPutEventProcessor.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/put/AbstractPutEventProcessor.java @@ -22,8 +22,6 @@ import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyValue; import org.apache.nifi.event.transport.EventSender; -import org.apache.nifi.event.transport.configuration.TransportProtocol; -import org.apache.nifi.event.transport.netty.ByteArrayNettyEventSenderFactory; import org.apache.nifi.event.transport.netty.NettyEventSenderFactory; import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.flowfile.FlowFile; @@ -52,7 +50,7 @@ import java.util.concurrent.TimeUnit; /** * A base class for processors that send data to an external system using TCP or UDP. */ -public abstract class AbstractPutEventProcessor extends AbstractSessionFactoryProcessor { +public abstract class AbstractPutEventProcessor<T> extends AbstractSessionFactoryProcessor { public static final PropertyDescriptor HOSTNAME = new PropertyDescriptor.Builder() .name("Hostname") @@ -164,7 +162,7 @@ public abstract class AbstractPutEventProcessor extends AbstractSessionFactoryPr private List<PropertyDescriptor> descriptors; protected volatile String transitUri; - protected EventSender eventSender; + protected EventSender<T> eventSender; protected final BlockingQueue<FlowFileMessageBatch> completeBatches = new LinkedBlockingQueue<>(); protected final Set<FlowFileMessageBatch> activeBatches = Collections.synchronizedSet(new HashSet<>()); @@ -229,23 +227,20 @@ public abstract class AbstractPutEventProcessor extends AbstractSessionFactoryPr } } - /** - * Sub-classes construct a transit uri for provenance events. Called from @OnScheduled - * method of this class. - * - * @param context the current context - * - * @return the transit uri - */ - protected abstract String createTransitUri(final ProcessContext context); + protected String createTransitUri(ProcessContext context) { + final String port = context.getProperty(PORT).evaluateAttributeExpressions().getValue(); + final String host = context.getProperty(HOSTNAME).evaluateAttributeExpressions().getValue(); + final String protocol = getProtocol(context); + return String.format("%s://%s:%s", protocol, host, port); + } - protected EventSender<?> getEventSender(final ProcessContext context) { + protected EventSender<T> getEventSender(final ProcessContext context) { final String hostname = context.getProperty(HOSTNAME).evaluateAttributeExpressions().getValue(); final int port = context.getProperty(PORT).evaluateAttributeExpressions().asInteger(); final String protocol = getProtocol(context); final boolean singleEventPerConnection = context.getProperty(CONNECTION_PER_FLOWFILE).getValue() != null ? context.getProperty(CONNECTION_PER_FLOWFILE).asBoolean() : false; - final NettyEventSenderFactory factory = getNettyEventSenderFactory(hostname, port, protocol); + final NettyEventSenderFactory<T> factory = getNettyEventSenderFactory(hostname, port, protocol); factory.setThreadNamePrefix(String.format("%s[%s]", getClass().getSimpleName(), getIdentifier())); factory.setWorkerThreads(context.getMaxConcurrentTasks()); factory.setMaxConnections(context.getMaxConcurrentTasks()); @@ -473,7 +468,5 @@ public abstract class AbstractPutEventProcessor extends AbstractSessionFactoryPr return context.getProperty(PROTOCOL).getValue(); } - protected NettyEventSenderFactory<?> getNettyEventSenderFactory(final String hostname, final int port, final String protocol) { - return new ByteArrayNettyEventSenderFactory(getLogger(), hostname, port, TransportProtocol.valueOf(protocol)); - } + protected abstract NettyEventSenderFactory<T> getNettyEventSenderFactory(String hostname, int port, String protocol); } diff --git a/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/PutSplunk.java b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/PutSplunk.java index 0a354d6..7668401 100644 --- a/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/PutSplunk.java +++ b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/PutSplunk.java @@ -26,6 +26,8 @@ import java.util.Arrays; import java.util.Collection; import java.util.List; import java.util.concurrent.atomic.AtomicLong; + +import org.apache.commons.io.IOUtils; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.TriggerWhenEmpty; import org.apache.nifi.annotation.documentation.CapabilityDescription; @@ -35,6 +37,9 @@ import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.event.transport.EventException; +import org.apache.nifi.event.transport.configuration.TransportProtocol; +import org.apache.nifi.event.transport.netty.ByteArrayNettyEventSenderFactory; +import org.apache.nifi.event.transport.netty.NettyEventSenderFactory; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; @@ -44,7 +49,6 @@ import org.apache.nifi.processor.io.InputStreamCallback; import org.apache.nifi.processor.util.put.AbstractPutEventProcessor; import org.apache.nifi.ssl.SSLContextService; import org.apache.nifi.stream.io.ByteCountingInputStream; -import org.apache.nifi.stream.io.StreamUtils; import org.apache.nifi.stream.io.util.NonThreadSafeCircularBuffer; @InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) @@ -54,7 +58,7 @@ import org.apache.nifi.stream.io.util.NonThreadSafeCircularBuffer; "Delimiter is provided, then this processor will read messages from the incoming FlowFile based on the " + "delimiter, and send each message to Splunk. If a Message Delimiter is not provided then the content of " + "the FlowFile will be sent directly to Splunk as if it were a single message.") -public class PutSplunk extends AbstractPutEventProcessor { +public class PutSplunk extends AbstractPutEventProcessor<byte[]> { public static final char NEW_LINE_CHAR = '\n'; @@ -98,14 +102,6 @@ public class PutSplunk extends AbstractPutEventProcessor { } @Override - protected String createTransitUri(ProcessContext context) { - final String port = context.getProperty(PORT).evaluateAttributeExpressions().getValue(); - final String host = context.getProperty(HOSTNAME).evaluateAttributeExpressions().getValue(); - final String protocol = context.getProperty(PROTOCOL).getValue().toLowerCase(); - return new StringBuilder().append(protocol).append("://").append(host).append(":").append(port).toString(); - } - - @Override public void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException { // first complete any batches from previous executions FlowFileMessageBatch batch; @@ -140,22 +136,19 @@ public class PutSplunk extends AbstractPutEventProcessor { } } + @Override + protected NettyEventSenderFactory<byte[]> getNettyEventSenderFactory(final String hostname, final int port, final String protocol) { + return new ByteArrayNettyEventSenderFactory(getLogger(), hostname, port, TransportProtocol.valueOf(protocol)); + } + /** * Send the entire FlowFile as a single message. */ private void processSingleMessage(final ProcessContext context, final ProcessSession session, final FlowFile flowFile) { - // copy the contents of the FlowFile to the ByteArrayOutputStream - final ByteArrayOutputStream baos = new ByteArrayOutputStream((int)flowFile.getSize() + 1); - session.read(flowFile, new InputStreamCallback() { - @Override - public void process(final InputStream in) throws IOException { - StreamUtils.copy(in, baos); - } - }); + byte[] buf = readFlowFile(session, flowFile); // if TCP and we don't end in a new line then add one final String protocol = context.getProperty(PROTOCOL).getValue(); - byte[] buf = baos.toByteArray(); if (protocol.equals(TCP_VALUE.getValue()) && buf[buf.length - 1] != NEW_LINE_CHAR) { final byte[] updatedBuf = new byte[buf.length + 1]; System.arraycopy(buf, 0, updatedBuf, 0, buf.length); @@ -280,4 +273,12 @@ public class PutSplunk extends AbstractPutEventProcessor { return Arrays.copyOfRange(baos.toByteArray(), 0, length); } } + + private byte[] readFlowFile(final ProcessSession session, final FlowFile flowFile) { + try (InputStream inputStream = session.read(flowFile)) { + return IOUtils.toByteArray(inputStream); + } catch (final IOException e) { + throw new ProcessException("Read FlowFile Failed", e); + } + } } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutTCP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutTCP.java index 7bcdcc1..639d022 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutTCP.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutTCP.java @@ -32,11 +32,9 @@ import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.ProcessSessionFactory; import org.apache.nifi.processor.exception.ProcessException; -import org.apache.nifi.processor.io.InputStreamCallback; import org.apache.nifi.processor.util.put.AbstractPutEventProcessor; import org.apache.nifi.util.StopWatch; -import java.io.IOException; import java.io.InputStream; import java.nio.charset.Charset; import java.util.Arrays; @@ -53,16 +51,7 @@ import java.util.concurrent.TimeUnit; @SeeAlso({ListenTCP.class, PutUDP.class}) @Tags({ "remote", "egress", "put", "tcp" }) @SupportsBatching -public class PutTCP extends AbstractPutEventProcessor { - - @Override - protected String createTransitUri(final ProcessContext context) { - final String protocol = TCP_VALUE.getValue(); - final String host = context.getProperty(HOSTNAME).evaluateAttributeExpressions().getValue(); - final String port = context.getProperty(PORT).evaluateAttributeExpressions().getValue(); - - return new StringBuilder().append(protocol).append("://").append(host).append(":").append(port).toString(); - } +public class PutTCP extends AbstractPutEventProcessor<InputStream> { @Override protected List<PropertyDescriptor> getAdditionalProperties() { @@ -81,28 +70,27 @@ public class PutTCP extends AbstractPutEventProcessor { return; } + final StopWatch stopWatch = new StopWatch(true); try { - StopWatch stopWatch = new StopWatch(true); - session.read(flowFile, new InputStreamCallback() { - @Override - public void process(final InputStream in) throws IOException { - InputStream event = in; - - String delimiter = getOutgoingMessageDelimiter(context, flowFile); - if (delimiter != null) { - final Charset charSet = Charset.forName(context.getProperty(CHARSET).getValue()); - event = new DelimitedInputStream(in, delimiter.getBytes(charSet)); - } + session.read(flowFile, inputStream -> { + InputStream inputStreamEvent = inputStream; - eventSender.sendEvent(event); + final String delimiter = getOutgoingMessageDelimiter(context, flowFile); + if (delimiter != null) { + final Charset charSet = Charset.forName(context.getProperty(CHARSET).getValue()); + inputStreamEvent = new DelimitedInputStream(inputStream, delimiter.getBytes(charSet)); } + + eventSender.sendEvent(inputStreamEvent); }); session.getProvenanceReporter().send(flowFile, transitUri, stopWatch.getElapsed(TimeUnit.MILLISECONDS)); session.transfer(flowFile, REL_SUCCESS); - } catch (Exception e) { - getLogger().error("Exception while handling a process session, transferring {} to failure.", flowFile, e); + session.commitAsync(); + } catch (final Exception e) { + getLogger().error("Send Failed {}", flowFile, e); session.transfer(session.penalize(flowFile), REL_FAILURE); + session.commitAsync(); context.yield(); } } @@ -113,7 +101,7 @@ public class PutTCP extends AbstractPutEventProcessor { } @Override - protected NettyEventSenderFactory<?> getNettyEventSenderFactory(final String hostname, final int port, final String protocol) { - return new StreamingNettyEventSenderFactory(getLogger(), hostname, port, TransportProtocol.valueOf(protocol)); + protected NettyEventSenderFactory<InputStream> getNettyEventSenderFactory(final String hostname, final int port, final String protocol) { + return new StreamingNettyEventSenderFactory(getLogger(), hostname, port, TransportProtocol.TCP); } } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutUDP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutUDP.java index 9990701..b36129c 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutUDP.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutUDP.java @@ -16,22 +16,24 @@ */ package org.apache.nifi.processors.standard; +import org.apache.commons.io.IOUtils; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.SupportsBatching; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.event.transport.configuration.TransportProtocol; +import org.apache.nifi.event.transport.netty.ByteArrayNettyEventSenderFactory; +import org.apache.nifi.event.transport.netty.NettyEventSenderFactory; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.ProcessSessionFactory; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.put.AbstractPutEventProcessor; -import org.apache.nifi.stream.io.StreamUtils; import org.apache.nifi.util.StopWatch; -import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; import java.util.concurrent.TimeUnit; @@ -43,23 +45,7 @@ import java.util.concurrent.TimeUnit; @SeeAlso({ListenUDP.class, PutTCP.class}) @Tags({ "remote", "egress", "put", "udp" }) @SupportsBatching -public class PutUDP extends AbstractPutEventProcessor { - - /** - * Creates a Universal Resource Identifier (URI) for this processor. Constructs a URI of the form UDP://host:port where the host and port values are taken from the configured property values. - * - * @param context - the current process context. - * - * @return The URI value as a String. - */ - @Override - protected String createTransitUri(final ProcessContext context) { - final String protocol = UDP_VALUE.getValue(); - final String host = context.getProperty(HOSTNAME).evaluateAttributeExpressions().getValue(); - final String port = context.getProperty(PORT).evaluateAttributeExpressions().getValue(); - - return protocol + "://" + host + ":" + port; - } +public class PutUDP extends AbstractPutEventProcessor<byte[]> { @Override public void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException { @@ -69,17 +55,18 @@ public class PutUDP extends AbstractPutEventProcessor { return; } + final StopWatch stopWatch = new StopWatch(true); try { - StopWatch stopWatch = new StopWatch(true); final byte[] content = readContent(session, flowFile); eventSender.sendEvent(content); session.getProvenanceReporter().send(flowFile, transitUri, stopWatch.getElapsed(TimeUnit.MILLISECONDS)); session.transfer(flowFile, REL_SUCCESS); session.commitAsync(); - } catch (Exception e) { - getLogger().error("Exception while handling a process session, transferring {} to failure.", new Object[]{flowFile}, e); + } catch (final Exception e) { + getLogger().error("Send Failed {}", flowFile, e); session.transfer(session.penalize(flowFile), REL_FAILURE); + session.commitAsync(); context.yield(); } } @@ -89,12 +76,14 @@ public class PutUDP extends AbstractPutEventProcessor { return UDP_VALUE.getValue(); } + @Override + protected NettyEventSenderFactory<byte[]> getNettyEventSenderFactory(final String hostname, final int port, final String protocol) { + return new ByteArrayNettyEventSenderFactory(getLogger(), hostname, port, TransportProtocol.UDP); + } + private byte[] readContent(final ProcessSession session, final FlowFile flowFile) throws IOException { - final ByteArrayOutputStream baos = new ByteArrayOutputStream((int) flowFile.getSize()); - try (final InputStream in = session.read(flowFile)) { - StreamUtils.copy(in, baos); + try (final InputStream inputStream = session.read(flowFile)) { + return IOUtils.toByteArray(inputStream); } - - return baos.toByteArray(); } }