exceptionfactory commented on a change in pull request #5493:
URL: https://github.com/apache/nifi/pull/5493#discussion_r738754015



##########
File path: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCP.java
##########
@@ -112,15 +121,121 @@
             .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
             .build();
 
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("Messages received successfully will be sent out this 
relationship.")
+            .build();
+
+    protected static final String RECEIVED_COUNTER = "Messages Received";
+
+    protected List<PropertyDescriptor> descriptors;
+    protected Set<Relationship> relationships;
+    protected volatile int port;
+    protected volatile Charset charset;
+    protected volatile BlockingQueue<ByteArrayMessage> events;
+    protected volatile BlockingQueue<ByteArrayMessage> errorEvents;
+    protected volatile InetAddress hostname;
+    protected volatile EventServer eventServer;
+    protected volatile byte[] messageDemarcatorBytes;
+    protected volatile EventBatcher eventBatcher;
+
+    @Override
+    protected void init(final ProcessorInitializationContext context) {
+        final List<PropertyDescriptor> descriptors = new ArrayList<>();
+        descriptors.add(ListenerProperties.PORT);
+        descriptors.add(ListenerProperties.RECV_BUFFER_SIZE);
+        descriptors.add(ListenerProperties.MAX_MESSAGE_QUEUE_SIZE);
+        descriptors.add(ListenerProperties.MAX_SOCKET_BUFFER_SIZE);
+        descriptors.add(ListenerProperties.CHARSET);
+        descriptors.add(ListenerProperties.MAX_CONNECTIONS);
+        descriptors.add(ListenerProperties.MAX_BATCH_SIZE);
+        descriptors.add(ListenerProperties.MESSAGE_DELIMITER);
+        descriptors.add(MAX_RECV_THREAD_POOL_SIZE);
+        descriptors.add(POOL_RECV_BUFFERS);
+        descriptors.add(ListenerProperties.NETWORK_INTF_NAME);
+        descriptors.add(CLIENT_AUTH);
+        descriptors.add(SSL_CONTEXT_SERVICE);
+        this.descriptors = Collections.unmodifiableList(descriptors);
+
+        final Set<Relationship> relationships = new HashSet<>();
+        relationships.add(REL_SUCCESS);
+        this.relationships = Collections.unmodifiableSet(relationships);
+    }
+
+    @OnScheduled
+    public void onScheduled(ProcessContext context) throws IOException {
+        int maxConnections = 
context.getProperty(ListenerProperties.MAX_CONNECTIONS).asInteger();
+        int bufferSize = 
context.getProperty(ListenerProperties.RECV_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
+        final String networkInterface = 
context.getProperty(ListenerProperties.NETWORK_INTF_NAME).evaluateAttributeExpressions().getValue();
+        InetAddress hostname = 
NetworkUtils.getInterfaceAddress(networkInterface);
+        Charset charset = 
Charset.forName(context.getProperty(ListenerProperties.CHARSET).getValue());
+        port = 
context.getProperty(ListenerProperties.PORT).evaluateAttributeExpressions().asInteger();
+        events = new 
LinkedBlockingQueue<>(context.getProperty(ListenerProperties.MAX_MESSAGE_QUEUE_SIZE).asInteger());
+        errorEvents = new LinkedBlockingQueue<>();
+        final String msgDemarcator = getMessageDemarcator(context);
+        messageDemarcatorBytes = msgDemarcator.getBytes(charset);
+        final NettyEventServerFactory eventFactory = new 
ByteArrayMessageNettyEventServerFactory(getLogger(), hostname, port, 
TransportProtocol.TCP, messageDemarcatorBytes, bufferSize, events);
+
+        final SSLContextService sslContextService = 
context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
+        if (sslContextService != null) {
+            final String clientAuthValue = 
context.getProperty(CLIENT_AUTH).getValue();
+            ClientAuth clientAuth = ClientAuth.valueOf(clientAuthValue);
+            SSLContext sslContext = sslContextService.createContext();
+            if (sslContext != null) {

Review comment:
       This null check should not be necessary, the service should throw an 
exception if it cannot create an SSLContext.

##########
File path: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCP.java
##########
@@ -112,15 +121,121 @@
             .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
             .build();
 
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("Messages received successfully will be sent out this 
relationship.")
+            .build();
+
+    protected static final String RECEIVED_COUNTER = "Messages Received";
+
+    protected List<PropertyDescriptor> descriptors;
+    protected Set<Relationship> relationships;
+    protected volatile int port;
+    protected volatile Charset charset;
+    protected volatile BlockingQueue<ByteArrayMessage> events;
+    protected volatile BlockingQueue<ByteArrayMessage> errorEvents;
+    protected volatile InetAddress hostname;
+    protected volatile EventServer eventServer;
+    protected volatile byte[] messageDemarcatorBytes;
+    protected volatile EventBatcher eventBatcher;
+
+    @Override
+    protected void init(final ProcessorInitializationContext context) {
+        final List<PropertyDescriptor> descriptors = new ArrayList<>();
+        descriptors.add(ListenerProperties.PORT);
+        descriptors.add(ListenerProperties.RECV_BUFFER_SIZE);
+        descriptors.add(ListenerProperties.MAX_MESSAGE_QUEUE_SIZE);
+        descriptors.add(ListenerProperties.MAX_SOCKET_BUFFER_SIZE);

Review comment:
       This property does not appear to be used, adding a comment above would 
be helpful.

##########
File path: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCP.java
##########
@@ -112,15 +121,121 @@
             .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
             .build();
 
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("Messages received successfully will be sent out this 
relationship.")
+            .build();
+
+    protected static final String RECEIVED_COUNTER = "Messages Received";
+
+    protected List<PropertyDescriptor> descriptors;
+    protected Set<Relationship> relationships;
+    protected volatile int port;
+    protected volatile Charset charset;
+    protected volatile BlockingQueue<ByteArrayMessage> events;
+    protected volatile BlockingQueue<ByteArrayMessage> errorEvents;
+    protected volatile InetAddress hostname;
+    protected volatile EventServer eventServer;
+    protected volatile byte[] messageDemarcatorBytes;
+    protected volatile EventBatcher eventBatcher;
+
+    @Override
+    protected void init(final ProcessorInitializationContext context) {
+        final List<PropertyDescriptor> descriptors = new ArrayList<>();
+        descriptors.add(ListenerProperties.PORT);
+        descriptors.add(ListenerProperties.RECV_BUFFER_SIZE);
+        descriptors.add(ListenerProperties.MAX_MESSAGE_QUEUE_SIZE);
+        descriptors.add(ListenerProperties.MAX_SOCKET_BUFFER_SIZE);
+        descriptors.add(ListenerProperties.CHARSET);
+        descriptors.add(ListenerProperties.MAX_CONNECTIONS);
+        descriptors.add(ListenerProperties.MAX_BATCH_SIZE);
+        descriptors.add(ListenerProperties.MESSAGE_DELIMITER);
+        descriptors.add(MAX_RECV_THREAD_POOL_SIZE);
+        descriptors.add(POOL_RECV_BUFFERS);

Review comment:
       These two properties no longer appear to be used, but they probably need 
to remain in place for backward compatibility.  Adding a comment would be 
helpful.

##########
File path: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCP.java
##########
@@ -153,56 +268,51 @@
         return results;
     }
 
-    @Override
-    protected ChannelDispatcher createDispatcher(final ProcessContext context, 
final BlockingQueue<StandardEvent> events)
-            throws IOException {
-
-        final int maxConnections = 
context.getProperty(MAX_CONNECTIONS).asInteger();
-        final int maxThreadPoolSize = 
context.getProperty(MAX_RECV_THREAD_POOL_SIZE).isSet()
-                ? context.getProperty(MAX_RECV_THREAD_POOL_SIZE).asInteger()
-                : maxConnections;
-
-        final int bufferSize = 
context.getProperty(RECV_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
-        final Charset charSet = 
Charset.forName(context.getProperty(CHARSET).getValue());
-
-        // initialize the buffer pool based on max number of connections and 
the buffer size
-        final ByteBufferSource byteBufferSource = 
context.getProperty(POOL_RECV_BUFFERS).asBoolean()
-                ? new ByteBufferPool(maxConnections, bufferSize)
-                : new ByteBufferFactory(bufferSize);
-
-        // if an SSLContextService was provided then create an SSLContext to 
pass down to the dispatcher
-        SSLContext sslContext = null;
-        ClientAuth clientAuth = null;
-
-        final SSLContextService sslContextService = 
context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
-        if (sslContextService != null) {
-            final String clientAuthValue = 
context.getProperty(CLIENT_AUTH).getValue();
-            sslContext = sslContextService.createContext();
-            clientAuth = ClientAuth.valueOf(clientAuthValue);
-        }
-
-        final EventFactory<StandardEvent> eventFactory = new 
StandardEventFactory();
-        final ChannelHandlerFactory<StandardEvent<SocketChannel>, 
AsyncChannelDispatcher> handlerFactory = new SocketChannelHandlerFactory<>();
-        return new SocketChannelDispatcher(eventFactory, handlerFactory, 
byteBufferSource, events, getLogger(), maxConnections,
-                maxThreadPoolSize, sslContext, clientAuth, charSet);
-    }
-
-    @Override
     protected Map<String, String> getAttributes(final FlowFileEventBatch 
batch) {
-        final String sender = batch.getEvents().get(0).getSender();
+        final List<ByteArrayMessage> events = batch.getEvents();
+        final String sender = events.get(0).getSender();
         final Map<String,String> attributes = new HashMap<>(3);
         attributes.put("tcp.sender", sender);
         attributes.put("tcp.port", String.valueOf(port));
         return attributes;
     }
 
-    @Override
     protected String getTransitUri(FlowFileEventBatch batch) {
-        final String sender = batch.getEvents().get(0).getSender();
+        final List<ByteArrayMessage> events = batch.getEvents();
+        final String sender = events.get(0).getSender();
         final String senderHost = sender.startsWith("/") && sender.length() > 
1 ? sender.substring(1) : sender;
         final String transitUri = new 
StringBuilder().append("tcp").append("://").append(senderHost).append(":")
                 .append(port).toString();
         return transitUri;
     }
 
-}
+    @Override
+    public final Set<Relationship> getRelationships() {
+        return this.relationships;
+    }
+
+    @Override
+    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return descriptors;
+    }
+
+    private String getMessageDemarcator(final ProcessContext context) {
+        return context.getProperty(ListenerProperties.MESSAGE_DELIMITER)
+                .getValue()
+                .replace("\\n", "\n").replace("\\r", "\r").replace("\\t", 
"\t");
+    }
+
+    private EventBatcher getEventBatcher() {
+        if(eventBatcher != null) {

Review comment:
       Recommend correcting the spacing:
   ```suggestion
           if (eventBatcher != null) {
   ```

##########
File path: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCP.java
##########
@@ -112,15 +121,121 @@
             .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
             .build();
 
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("Messages received successfully will be sent out this 
relationship.")
+            .build();
+
+    protected static final String RECEIVED_COUNTER = "Messages Received";
+
+    protected List<PropertyDescriptor> descriptors;
+    protected Set<Relationship> relationships;
+    protected volatile int port;
+    protected volatile Charset charset;
+    protected volatile BlockingQueue<ByteArrayMessage> events;
+    protected volatile BlockingQueue<ByteArrayMessage> errorEvents;
+    protected volatile InetAddress hostname;
+    protected volatile EventServer eventServer;
+    protected volatile byte[] messageDemarcatorBytes;
+    protected volatile EventBatcher eventBatcher;
+
+    @Override
+    protected void init(final ProcessorInitializationContext context) {
+        final List<PropertyDescriptor> descriptors = new ArrayList<>();
+        descriptors.add(ListenerProperties.PORT);
+        descriptors.add(ListenerProperties.RECV_BUFFER_SIZE);
+        descriptors.add(ListenerProperties.MAX_MESSAGE_QUEUE_SIZE);
+        descriptors.add(ListenerProperties.MAX_SOCKET_BUFFER_SIZE);
+        descriptors.add(ListenerProperties.CHARSET);
+        descriptors.add(ListenerProperties.MAX_CONNECTIONS);
+        descriptors.add(ListenerProperties.MAX_BATCH_SIZE);
+        descriptors.add(ListenerProperties.MESSAGE_DELIMITER);
+        descriptors.add(MAX_RECV_THREAD_POOL_SIZE);
+        descriptors.add(POOL_RECV_BUFFERS);
+        descriptors.add(ListenerProperties.NETWORK_INTF_NAME);
+        descriptors.add(CLIENT_AUTH);
+        descriptors.add(SSL_CONTEXT_SERVICE);
+        this.descriptors = Collections.unmodifiableList(descriptors);
+
+        final Set<Relationship> relationships = new HashSet<>();
+        relationships.add(REL_SUCCESS);
+        this.relationships = Collections.unmodifiableSet(relationships);
+    }
+
+    @OnScheduled
+    public void onScheduled(ProcessContext context) throws IOException {
+        int maxConnections = 
context.getProperty(ListenerProperties.MAX_CONNECTIONS).asInteger();
+        int bufferSize = 
context.getProperty(ListenerProperties.RECV_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
+        final String networkInterface = 
context.getProperty(ListenerProperties.NETWORK_INTF_NAME).evaluateAttributeExpressions().getValue();
+        InetAddress hostname = 
NetworkUtils.getInterfaceAddress(networkInterface);
+        Charset charset = 
Charset.forName(context.getProperty(ListenerProperties.CHARSET).getValue());
+        port = 
context.getProperty(ListenerProperties.PORT).evaluateAttributeExpressions().asInteger();
+        events = new 
LinkedBlockingQueue<>(context.getProperty(ListenerProperties.MAX_MESSAGE_QUEUE_SIZE).asInteger());
+        errorEvents = new LinkedBlockingQueue<>();
+        final String msgDemarcator = getMessageDemarcator(context);
+        messageDemarcatorBytes = msgDemarcator.getBytes(charset);
+        final NettyEventServerFactory eventFactory = new 
ByteArrayMessageNettyEventServerFactory(getLogger(), hostname, port, 
TransportProtocol.TCP, messageDemarcatorBytes, bufferSize, events);
+
+        final SSLContextService sslContextService = 
context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
+        if (sslContextService != null) {
+            final String clientAuthValue = 
context.getProperty(CLIENT_AUTH).getValue();
+            ClientAuth clientAuth = ClientAuth.valueOf(clientAuthValue);
+            SSLContext sslContext = sslContextService.createContext();
+            if (sslContext != null) {
+                eventFactory.setSslContext(sslContext);
+                eventFactory.setClientAuth(clientAuth);
+            }
+        }
+
+        eventFactory.setSocketReceiveBuffer(bufferSize);
+        eventFactory.setWorkerThreads(maxConnections);
+
+        try {
+            eventServer = eventFactory.getEventServer();
+        } catch (EventException e) {
+            getLogger().error("Failed to bind to [{}:{}].", 
hostname.getHostAddress(), port);
+        }
+    }
+
     @Override
-    protected List<PropertyDescriptor> getAdditionalProperties() {
-        return Arrays.asList(
-                MAX_CONNECTIONS,
-                MAX_RECV_THREAD_POOL_SIZE,
-                POOL_RECV_BUFFERS,
-                SSL_CONTEXT_SERVICE,
-                CLIENT_AUTH
-        );
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+        final int batchSize = 
context.getProperty(ListenerProperties.MAX_BATCH_SIZE).asInteger();
+        Map<String, FlowFileEventBatch> batches = 
getEventBatcher().getBatches(session, batchSize, messageDemarcatorBytes);
+        processEvents(session, batches);
+    }
+
+    private void processEvents(final ProcessSession session, final Map<String, 
FlowFileEventBatch> batches) {
+        for (Map.Entry<String, FlowFileEventBatch> entry : batches.entrySet()) 
{
+            FlowFile flowFile = entry.getValue().getFlowFile();
+            final List<ByteArrayMessage> events = entry.getValue().getEvents();
+
+            if (flowFile.getSize() == 0L || events.size() == 0) {
+                session.remove(flowFile);
+                getLogger().debug("No data written to FlowFile from batch {}; 
removing FlowFile", entry.getKey());
+                continue;
+            }
+
+            final Map<String,String> attributes = 
getAttributes(entry.getValue());
+            flowFile = session.putAllAttributes(flowFile, attributes);
+
+            getLogger().debug("Transferring {} to success", flowFile);
+            session.transfer(flowFile, REL_SUCCESS);
+            session.adjustCounter("FlowFiles Transferred to Success", 1L, 
false);
+
+            // the sender and command will be the same for all events based on 
the batch key
+            final String transitUri = getTransitUri(entry.getValue());
+            session.getProvenanceReporter().receive(flowFile, transitUri);
+
+        }
+        session.commitAsync();

Review comment:
       Is this call necessary? It seems like the standard AbstractProcessor 
handling of onTrigger() should be sufficient.

##########
File path: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCP.java
##########
@@ -112,15 +121,121 @@
             .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
             .build();
 
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("Messages received successfully will be sent out this 
relationship.")
+            .build();
+
+    protected static final String RECEIVED_COUNTER = "Messages Received";
+
+    protected List<PropertyDescriptor> descriptors;
+    protected Set<Relationship> relationships;
+    protected volatile int port;
+    protected volatile Charset charset;
+    protected volatile BlockingQueue<ByteArrayMessage> events;
+    protected volatile BlockingQueue<ByteArrayMessage> errorEvents;
+    protected volatile InetAddress hostname;

Review comment:
       Is this property used?

##########
File path: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCP.java
##########
@@ -112,15 +121,121 @@
             .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
             .build();
 
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("Messages received successfully will be sent out this 
relationship.")
+            .build();
+
+    protected static final String RECEIVED_COUNTER = "Messages Received";
+
+    protected List<PropertyDescriptor> descriptors;
+    protected Set<Relationship> relationships;
+    protected volatile int port;
+    protected volatile Charset charset;
+    protected volatile BlockingQueue<ByteArrayMessage> events;
+    protected volatile BlockingQueue<ByteArrayMessage> errorEvents;
+    protected volatile InetAddress hostname;
+    protected volatile EventServer eventServer;
+    protected volatile byte[] messageDemarcatorBytes;
+    protected volatile EventBatcher eventBatcher;
+
+    @Override
+    protected void init(final ProcessorInitializationContext context) {
+        final List<PropertyDescriptor> descriptors = new ArrayList<>();
+        descriptors.add(ListenerProperties.PORT);
+        descriptors.add(ListenerProperties.RECV_BUFFER_SIZE);
+        descriptors.add(ListenerProperties.MAX_MESSAGE_QUEUE_SIZE);
+        descriptors.add(ListenerProperties.MAX_SOCKET_BUFFER_SIZE);
+        descriptors.add(ListenerProperties.CHARSET);
+        descriptors.add(ListenerProperties.MAX_CONNECTIONS);
+        descriptors.add(ListenerProperties.MAX_BATCH_SIZE);
+        descriptors.add(ListenerProperties.MESSAGE_DELIMITER);
+        descriptors.add(MAX_RECV_THREAD_POOL_SIZE);
+        descriptors.add(POOL_RECV_BUFFERS);
+        descriptors.add(ListenerProperties.NETWORK_INTF_NAME);
+        descriptors.add(CLIENT_AUTH);
+        descriptors.add(SSL_CONTEXT_SERVICE);
+        this.descriptors = Collections.unmodifiableList(descriptors);
+
+        final Set<Relationship> relationships = new HashSet<>();
+        relationships.add(REL_SUCCESS);
+        this.relationships = Collections.unmodifiableSet(relationships);
+    }
+
+    @OnScheduled
+    public void onScheduled(ProcessContext context) throws IOException {
+        int maxConnections = 
context.getProperty(ListenerProperties.MAX_CONNECTIONS).asInteger();
+        int bufferSize = 
context.getProperty(ListenerProperties.RECV_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
+        final String networkInterface = 
context.getProperty(ListenerProperties.NETWORK_INTF_NAME).evaluateAttributeExpressions().getValue();
+        InetAddress hostname = 
NetworkUtils.getInterfaceAddress(networkInterface);
+        Charset charset = 
Charset.forName(context.getProperty(ListenerProperties.CHARSET).getValue());
+        port = 
context.getProperty(ListenerProperties.PORT).evaluateAttributeExpressions().asInteger();
+        events = new 
LinkedBlockingQueue<>(context.getProperty(ListenerProperties.MAX_MESSAGE_QUEUE_SIZE).asInteger());
+        errorEvents = new LinkedBlockingQueue<>();
+        final String msgDemarcator = getMessageDemarcator(context);
+        messageDemarcatorBytes = msgDemarcator.getBytes(charset);
+        final NettyEventServerFactory eventFactory = new 
ByteArrayMessageNettyEventServerFactory(getLogger(), hostname, port, 
TransportProtocol.TCP, messageDemarcatorBytes, bufferSize, events);
+
+        final SSLContextService sslContextService = 
context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
+        if (sslContextService != null) {
+            final String clientAuthValue = 
context.getProperty(CLIENT_AUTH).getValue();
+            ClientAuth clientAuth = ClientAuth.valueOf(clientAuthValue);
+            SSLContext sslContext = sslContextService.createContext();
+            if (sslContext != null) {
+                eventFactory.setSslContext(sslContext);
+                eventFactory.setClientAuth(clientAuth);
+            }
+        }
+
+        eventFactory.setSocketReceiveBuffer(bufferSize);
+        eventFactory.setWorkerThreads(maxConnections);
+
+        try {
+            eventServer = eventFactory.getEventServer();
+        } catch (EventException e) {
+            getLogger().error("Failed to bind to [{}:{}].", 
hostname.getHostAddress(), port);
+        }
+    }
+
     @Override
-    protected List<PropertyDescriptor> getAdditionalProperties() {
-        return Arrays.asList(
-                MAX_CONNECTIONS,
-                MAX_RECV_THREAD_POOL_SIZE,
-                POOL_RECV_BUFFERS,
-                SSL_CONTEXT_SERVICE,
-                CLIENT_AUTH
-        );
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+        final int batchSize = 
context.getProperty(ListenerProperties.MAX_BATCH_SIZE).asInteger();
+        Map<String, FlowFileEventBatch> batches = 
getEventBatcher().getBatches(session, batchSize, messageDemarcatorBytes);
+        processEvents(session, batches);
+    }
+
+    private void processEvents(final ProcessSession session, final Map<String, 
FlowFileEventBatch> batches) {
+        for (Map.Entry<String, FlowFileEventBatch> entry : batches.entrySet()) 
{
+            FlowFile flowFile = entry.getValue().getFlowFile();
+            final List<ByteArrayMessage> events = entry.getValue().getEvents();
+
+            if (flowFile.getSize() == 0L || events.size() == 0) {
+                session.remove(flowFile);
+                getLogger().debug("No data written to FlowFile from batch {}; 
removing FlowFile", entry.getKey());
+                continue;
+            }
+
+            final Map<String,String> attributes = 
getAttributes(entry.getValue());
+            flowFile = session.putAllAttributes(flowFile, attributes);
+
+            getLogger().debug("Transferring {} to success", flowFile);
+            session.transfer(flowFile, REL_SUCCESS);
+            session.adjustCounter("FlowFiles Transferred to Success", 1L, 
false);
+
+            // the sender and command will be the same for all events based on 
the batch key

Review comment:
       This comment is unclear given the following lines, is it necessary?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to