This is an automated email from the ASF dual-hosted git repository.

jgresock pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 405934d  NIFI-9571 Corrected Session commit handling in PutTCP
405934d is described below

commit 405934dcd216db24a4d1009dc8e8da5f76bf165d
Author: exceptionfactory <exceptionfact...@apache.org>
AuthorDate: Thu Jan 13 13:15:50 2022 -0600

    NIFI-9571 Corrected Session commit handling in PutTCP
    
    - Added generic type to AbstractPutEventProcessor for compiler checking of 
event types
    - Refactored createTransitUri to shared method in AbstractPutEventProcessor
    
    Signed-off-by: Joe Gresock <jgres...@gmail.com>
    
    This closes #5658.
---
 .../util/put/AbstractPutEventProcessor.java        | 29 ++++++--------
 .../apache/nifi/processors/splunk/PutSplunk.java   | 39 +++++++++----------
 .../apache/nifi/processors/standard/PutTCP.java    | 44 ++++++++--------------
 .../apache/nifi/processors/standard/PutUDP.java    | 43 ++++++++-------------
 4 files changed, 63 insertions(+), 92 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/put/AbstractPutEventProcessor.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/put/AbstractPutEventProcessor.java
index 659e3df..567f8fd 100644
--- 
a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/put/AbstractPutEventProcessor.java
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/put/AbstractPutEventProcessor.java
@@ -22,8 +22,6 @@ import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.PropertyValue;
 import org.apache.nifi.event.transport.EventSender;
-import org.apache.nifi.event.transport.configuration.TransportProtocol;
-import org.apache.nifi.event.transport.netty.ByteArrayNettyEventSenderFactory;
 import org.apache.nifi.event.transport.netty.NettyEventSenderFactory;
 import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.flowfile.FlowFile;
@@ -52,7 +50,7 @@ import java.util.concurrent.TimeUnit;
 /**
  * A base class for processors that send data to an external system using TCP 
or UDP.
  */
-public abstract class AbstractPutEventProcessor extends 
AbstractSessionFactoryProcessor {
+public abstract class AbstractPutEventProcessor<T> extends 
AbstractSessionFactoryProcessor {
 
     public static final PropertyDescriptor HOSTNAME = new 
PropertyDescriptor.Builder()
             .name("Hostname")
@@ -164,7 +162,7 @@ public abstract class AbstractPutEventProcessor extends 
AbstractSessionFactoryPr
     private List<PropertyDescriptor> descriptors;
 
     protected volatile String transitUri;
-    protected EventSender eventSender;
+    protected EventSender<T> eventSender;
 
     protected final BlockingQueue<FlowFileMessageBatch> completeBatches = new 
LinkedBlockingQueue<>();
     protected final Set<FlowFileMessageBatch> activeBatches = 
Collections.synchronizedSet(new HashSet<>());
@@ -229,23 +227,20 @@ public abstract class AbstractPutEventProcessor extends 
AbstractSessionFactoryPr
         }
     }
 
-    /**
-     * Sub-classes construct a transit uri for provenance events. Called from 
@OnScheduled
-     * method of this class.
-     *
-     * @param context the current context
-     *
-     * @return the transit uri
-     */
-    protected abstract String createTransitUri(final ProcessContext context);
+    protected String createTransitUri(ProcessContext context) {
+        final String port = 
context.getProperty(PORT).evaluateAttributeExpressions().getValue();
+        final String host = 
context.getProperty(HOSTNAME).evaluateAttributeExpressions().getValue();
+        final String protocol = getProtocol(context);
+        return String.format("%s://%s:%s", protocol, host, port);
+    }
 
-    protected EventSender<?> getEventSender(final ProcessContext context) {
+    protected EventSender<T> getEventSender(final ProcessContext context) {
         final String hostname = 
context.getProperty(HOSTNAME).evaluateAttributeExpressions().getValue();
         final int port = 
context.getProperty(PORT).evaluateAttributeExpressions().asInteger();
         final String protocol = getProtocol(context);
         final boolean singleEventPerConnection = 
context.getProperty(CONNECTION_PER_FLOWFILE).getValue() != null ? 
context.getProperty(CONNECTION_PER_FLOWFILE).asBoolean() : false;
 
-        final NettyEventSenderFactory factory = 
getNettyEventSenderFactory(hostname, port, protocol);
+        final NettyEventSenderFactory<T> factory = 
getNettyEventSenderFactory(hostname, port, protocol);
         factory.setThreadNamePrefix(String.format("%s[%s]", 
getClass().getSimpleName(), getIdentifier()));
         factory.setWorkerThreads(context.getMaxConcurrentTasks());
         factory.setMaxConnections(context.getMaxConcurrentTasks());
@@ -473,7 +468,5 @@ public abstract class AbstractPutEventProcessor extends 
AbstractSessionFactoryPr
         return context.getProperty(PROTOCOL).getValue();
     }
 
-    protected NettyEventSenderFactory<?> getNettyEventSenderFactory(final 
String hostname, final int port, final String protocol) {
-        return new ByteArrayNettyEventSenderFactory(getLogger(), hostname, 
port, TransportProtocol.valueOf(protocol));
-    }
+    protected abstract NettyEventSenderFactory<T> 
getNettyEventSenderFactory(String hostname, int port, String protocol);
 }
diff --git 
a/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/PutSplunk.java
 
b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/PutSplunk.java
index 0a354d6..7668401 100644
--- 
a/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/PutSplunk.java
+++ 
b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/PutSplunk.java
@@ -26,6 +26,8 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.io.IOUtils;
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
@@ -35,6 +37,9 @@ import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.event.transport.EventException;
+import org.apache.nifi.event.transport.configuration.TransportProtocol;
+import org.apache.nifi.event.transport.netty.ByteArrayNettyEventSenderFactory;
+import org.apache.nifi.event.transport.netty.NettyEventSenderFactory;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
@@ -44,7 +49,6 @@ import org.apache.nifi.processor.io.InputStreamCallback;
 import org.apache.nifi.processor.util.put.AbstractPutEventProcessor;
 import org.apache.nifi.ssl.SSLContextService;
 import org.apache.nifi.stream.io.ByteCountingInputStream;
-import org.apache.nifi.stream.io.StreamUtils;
 import org.apache.nifi.stream.io.util.NonThreadSafeCircularBuffer;
 
 @InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@@ -54,7 +58,7 @@ import 
org.apache.nifi.stream.io.util.NonThreadSafeCircularBuffer;
         "Delimiter is provided, then this processor will read messages from 
the incoming FlowFile based on the " +
         "delimiter, and send each message to Splunk. If a Message Delimiter is 
not provided then the content of " +
         "the FlowFile will be sent directly to Splunk as if it were a single 
message.")
-public class PutSplunk extends AbstractPutEventProcessor {
+public class PutSplunk extends AbstractPutEventProcessor<byte[]> {
 
     public static final char NEW_LINE_CHAR = '\n';
 
@@ -98,14 +102,6 @@ public class PutSplunk extends AbstractPutEventProcessor {
     }
 
     @Override
-    protected String createTransitUri(ProcessContext context) {
-        final String port = 
context.getProperty(PORT).evaluateAttributeExpressions().getValue();
-        final String host = 
context.getProperty(HOSTNAME).evaluateAttributeExpressions().getValue();
-        final String protocol = 
context.getProperty(PROTOCOL).getValue().toLowerCase();
-        return new 
StringBuilder().append(protocol).append("://").append(host).append(":").append(port).toString();
-    }
-
-    @Override
     public void onTrigger(ProcessContext context, ProcessSessionFactory 
sessionFactory) throws ProcessException {
         // first complete any batches from previous executions
         FlowFileMessageBatch batch;
@@ -140,22 +136,19 @@ public class PutSplunk extends AbstractPutEventProcessor {
         }
     }
 
+    @Override
+    protected NettyEventSenderFactory<byte[]> getNettyEventSenderFactory(final 
String hostname, final int port, final String protocol) {
+        return new ByteArrayNettyEventSenderFactory(getLogger(), hostname, 
port, TransportProtocol.valueOf(protocol));
+    }
+
     /**
      * Send the entire FlowFile as a single message.
      */
     private void processSingleMessage(final ProcessContext context, final 
ProcessSession session, final FlowFile flowFile) {
-        // copy the contents of the FlowFile to the ByteArrayOutputStream
-        final ByteArrayOutputStream baos = new 
ByteArrayOutputStream((int)flowFile.getSize() + 1);
-        session.read(flowFile, new InputStreamCallback() {
-            @Override
-            public void process(final InputStream in) throws IOException {
-                StreamUtils.copy(in, baos);
-            }
-        });
+        byte[] buf = readFlowFile(session, flowFile);
 
         // if TCP and we don't end in a new line then add one
         final String protocol = context.getProperty(PROTOCOL).getValue();
-        byte[] buf = baos.toByteArray();
         if (protocol.equals(TCP_VALUE.getValue()) && buf[buf.length - 1] != 
NEW_LINE_CHAR) {
             final byte[] updatedBuf = new byte[buf.length + 1];
             System.arraycopy(buf, 0, updatedBuf, 0, buf.length);
@@ -280,4 +273,12 @@ public class PutSplunk extends AbstractPutEventProcessor {
             return Arrays.copyOfRange(baos.toByteArray(), 0, length);
         }
     }
+
+    private byte[] readFlowFile(final ProcessSession session, final FlowFile 
flowFile) {
+        try (InputStream inputStream = session.read(flowFile)) {
+            return IOUtils.toByteArray(inputStream);
+        } catch (final IOException e) {
+            throw new ProcessException("Read FlowFile Failed", e);
+        }
+    }
 }
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutTCP.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutTCP.java
index 7bcdcc1..639d022 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutTCP.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutTCP.java
@@ -32,11 +32,9 @@ import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.ProcessSessionFactory;
 import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.io.InputStreamCallback;
 import org.apache.nifi.processor.util.put.AbstractPutEventProcessor;
 import org.apache.nifi.util.StopWatch;
 
-import java.io.IOException;
 import java.io.InputStream;
 import java.nio.charset.Charset;
 import java.util.Arrays;
@@ -53,16 +51,7 @@ import java.util.concurrent.TimeUnit;
 @SeeAlso({ListenTCP.class, PutUDP.class})
 @Tags({ "remote", "egress", "put", "tcp" })
 @SupportsBatching
-public class PutTCP extends AbstractPutEventProcessor {
-
-    @Override
-    protected String createTransitUri(final ProcessContext context) {
-        final String protocol = TCP_VALUE.getValue();
-        final String host = 
context.getProperty(HOSTNAME).evaluateAttributeExpressions().getValue();
-        final String port = 
context.getProperty(PORT).evaluateAttributeExpressions().getValue();
-
-        return new 
StringBuilder().append(protocol).append("://").append(host).append(":").append(port).toString();
-    }
+public class PutTCP extends AbstractPutEventProcessor<InputStream> {
 
     @Override
     protected List<PropertyDescriptor> getAdditionalProperties() {
@@ -81,28 +70,27 @@ public class PutTCP extends AbstractPutEventProcessor {
             return;
         }
 
+        final StopWatch stopWatch = new StopWatch(true);
         try {
-            StopWatch stopWatch = new StopWatch(true);
-            session.read(flowFile, new InputStreamCallback() {
-                @Override
-                public void process(final InputStream in) throws IOException {
-                    InputStream event = in;
-
-                    String delimiter = getOutgoingMessageDelimiter(context, 
flowFile);
-                    if (delimiter != null) {
-                        final Charset charSet = 
Charset.forName(context.getProperty(CHARSET).getValue());
-                        event = new DelimitedInputStream(in, 
delimiter.getBytes(charSet));
-                    }
+            session.read(flowFile, inputStream -> {
+                InputStream inputStreamEvent = inputStream;
 
-                    eventSender.sendEvent(event);
+                final String delimiter = getOutgoingMessageDelimiter(context, 
flowFile);
+                if (delimiter != null) {
+                    final Charset charSet = 
Charset.forName(context.getProperty(CHARSET).getValue());
+                    inputStreamEvent = new DelimitedInputStream(inputStream, 
delimiter.getBytes(charSet));
                 }
+
+                eventSender.sendEvent(inputStreamEvent);
             });
 
             session.getProvenanceReporter().send(flowFile, transitUri, 
stopWatch.getElapsed(TimeUnit.MILLISECONDS));
             session.transfer(flowFile, REL_SUCCESS);
-        } catch (Exception e) {
-            getLogger().error("Exception while handling a process session, 
transferring {} to failure.", flowFile, e);
+            session.commitAsync();
+        } catch (final Exception e) {
+            getLogger().error("Send Failed {}", flowFile, e);
             session.transfer(session.penalize(flowFile), REL_FAILURE);
+            session.commitAsync();
             context.yield();
         }
     }
@@ -113,7 +101,7 @@ public class PutTCP extends AbstractPutEventProcessor {
     }
 
     @Override
-    protected NettyEventSenderFactory<?> getNettyEventSenderFactory(final 
String hostname, final int port, final String protocol) {
-        return new StreamingNettyEventSenderFactory(getLogger(), hostname, 
port, TransportProtocol.valueOf(protocol));
+    protected NettyEventSenderFactory<InputStream> 
getNettyEventSenderFactory(final String hostname, final int port, final String 
protocol) {
+        return new StreamingNettyEventSenderFactory(getLogger(), hostname, 
port, TransportProtocol.TCP);
     }
 }
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutUDP.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutUDP.java
index 9990701..b36129c 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutUDP.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutUDP.java
@@ -16,22 +16,24 @@
  */
 package org.apache.nifi.processors.standard;
 
+import org.apache.commons.io.IOUtils;
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.behavior.SupportsBatching;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.SeeAlso;
 import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.event.transport.configuration.TransportProtocol;
+import org.apache.nifi.event.transport.netty.ByteArrayNettyEventSenderFactory;
+import org.apache.nifi.event.transport.netty.NettyEventSenderFactory;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.ProcessSessionFactory;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.put.AbstractPutEventProcessor;
-import org.apache.nifi.stream.io.StreamUtils;
 import org.apache.nifi.util.StopWatch;
 
-import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.concurrent.TimeUnit;
@@ -43,23 +45,7 @@ import java.util.concurrent.TimeUnit;
 @SeeAlso({ListenUDP.class, PutTCP.class})
 @Tags({ "remote", "egress", "put", "udp" })
 @SupportsBatching
-public class PutUDP extends AbstractPutEventProcessor {
-
-    /**
-     * Creates a Universal Resource Identifier (URI) for this processor. 
Constructs a URI of the form UDP://host:port where the host and port values are 
taken from the configured property values.
-     *
-     * @param context - the current process context.
-     *
-     * @return The URI value as a String.
-     */
-    @Override
-    protected String createTransitUri(final ProcessContext context) {
-        final String protocol = UDP_VALUE.getValue();
-        final String host = 
context.getProperty(HOSTNAME).evaluateAttributeExpressions().getValue();
-        final String port = 
context.getProperty(PORT).evaluateAttributeExpressions().getValue();
-
-        return protocol + "://" + host + ":" + port;
-    }
+public class PutUDP extends AbstractPutEventProcessor<byte[]> {
 
     @Override
     public void onTrigger(final ProcessContext context, final 
ProcessSessionFactory sessionFactory) throws ProcessException {
@@ -69,17 +55,18 @@ public class PutUDP extends AbstractPutEventProcessor {
             return;
         }
 
+        final StopWatch stopWatch = new StopWatch(true);
         try {
-            StopWatch stopWatch = new StopWatch(true);
             final byte[] content = readContent(session, flowFile);
             eventSender.sendEvent(content);
 
             session.getProvenanceReporter().send(flowFile, transitUri, 
stopWatch.getElapsed(TimeUnit.MILLISECONDS));
             session.transfer(flowFile, REL_SUCCESS);
             session.commitAsync();
-        } catch (Exception e) {
-            getLogger().error("Exception while handling a process session, 
transferring {} to failure.", new Object[]{flowFile}, e);
+        } catch (final Exception e) {
+            getLogger().error("Send Failed {}", flowFile, e);
             session.transfer(session.penalize(flowFile), REL_FAILURE);
+            session.commitAsync();
             context.yield();
         }
     }
@@ -89,12 +76,14 @@ public class PutUDP extends AbstractPutEventProcessor {
         return UDP_VALUE.getValue();
     }
 
+    @Override
+    protected NettyEventSenderFactory<byte[]> getNettyEventSenderFactory(final 
String hostname, final int port, final String protocol) {
+        return new ByteArrayNettyEventSenderFactory(getLogger(), hostname, 
port, TransportProtocol.UDP);
+    }
+
     private byte[] readContent(final ProcessSession session, final FlowFile 
flowFile) throws IOException {
-        final ByteArrayOutputStream baos = new ByteArrayOutputStream((int) 
flowFile.getSize());
-        try (final InputStream in = session.read(flowFile)) {
-            StreamUtils.copy(in, baos);
+        try (final InputStream inputStream = session.read(flowFile)) {
+            return IOUtils.toByteArray(inputStream);
         }
-
-        return baos.toByteArray();
     }
 }

Reply via email to