exceptionfactory commented on a change in pull request #5560:
URL: https://github.com/apache/nifi/pull/5560#discussion_r760222503



##########
File path: 
nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/NettyEventServerFactory.java
##########
@@ -137,6 +140,15 @@ public void setShutdownTimeout(final Duration timeout) {
         this.shutdownTimeout = timeout;
     }
 
+    /**
+     * Set the
+     *
+     * @param timeout

Review comment:
       The documentation appears to be missing some details.

##########
File path: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCPRecord.java
##########
@@ -284,95 +286,57 @@ public void onScheduled(final ProcessContext context) 
throws IOException {
             clientAuth = ClientAuth.valueOf(clientAuthValue);
         }
 
-        // create a ServerSocketChannel in non-blocking mode and bind to the 
given address and port
-        final ServerSocketChannel serverSocketChannel = 
ServerSocketChannel.open();
-        serverSocketChannel.configureBlocking(false);
-        serverSocketChannel.bind(new InetSocketAddress(nicAddress, port));
-
-        this.dispatcher = new 
SocketChannelRecordReaderDispatcher(serverSocketChannel, sslContext, 
clientAuth, readTimeout,
-                maxSocketBufferSize, maxConnections, recordReaderFactory, 
socketReaders, getLogger());
+        NettyEventServerFactory eventServerFactory = new 
RecordReaderEventServerFactory(getLogger(), nicAddress, port, 
TransportProtocol.TCP, recordReaderFactory, recordReaders);
+        eventServerFactory.setSslContext(sslContext);
+        eventServerFactory.setClientAuth(clientAuth);
+        eventServerFactory.setSocketReceiveBuffer(maxSocketBufferSize);
+        eventServerFactory.setWorkerThreads(maxConnections);
+        eventServerFactory.setConnectionTimeout(readTimeout);
 
-        // start a thread to run the dispatcher
-        final Thread readerThread = new Thread(dispatcher);
-        readerThread.setName(getClass().getName() + " [" + getIdentifier() + 
"]");
-        readerThread.setDaemon(true);
-        readerThread.start();
+        eventServer = eventServerFactory.getEventServer();
     }
 
     @OnStopped
     public void onStopped() {
-        if (dispatcher != null) {
-            dispatcher.close();
-            dispatcher = null;
+        if (eventServer != null) {
+            eventServer.shutdown();
+            eventServer = null;
         }
 
-        SocketChannelRecordReader socketRecordReader;
-        while ((socketRecordReader = socketReaders.poll()) != null) {
+        NetworkRecordReader recordReader;
+        while ((recordReader = recordReaders.poll()) != null) {
             try {
-                socketRecordReader.close();
+                recordReader.getRecordReader().close();
             } catch (Exception e) {
-                getLogger().error("Couldn't close " + socketRecordReader, e);
+                getLogger().error("Couldn't close " + recordReader, e);

Review comment:
       This log statement can be changed to use placeholders.
   ```suggestion
                   getLogger().error("Couldn't close " + recordReader, e);
   ```
   ```suggestion
                   getLogger().error("Close Record Reader [{}] Failed", 
recordReader, e);
   ```

##########
File path: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCPRecord.java
##########
@@ -284,95 +286,57 @@ public void onScheduled(final ProcessContext context) 
throws IOException {
             clientAuth = ClientAuth.valueOf(clientAuthValue);
         }
 
-        // create a ServerSocketChannel in non-blocking mode and bind to the 
given address and port
-        final ServerSocketChannel serverSocketChannel = 
ServerSocketChannel.open();
-        serverSocketChannel.configureBlocking(false);
-        serverSocketChannel.bind(new InetSocketAddress(nicAddress, port));
-
-        this.dispatcher = new 
SocketChannelRecordReaderDispatcher(serverSocketChannel, sslContext, 
clientAuth, readTimeout,
-                maxSocketBufferSize, maxConnections, recordReaderFactory, 
socketReaders, getLogger());
+        NettyEventServerFactory eventServerFactory = new 
RecordReaderEventServerFactory(getLogger(), nicAddress, port, 
TransportProtocol.TCP, recordReaderFactory, recordReaders);
+        eventServerFactory.setSslContext(sslContext);
+        eventServerFactory.setClientAuth(clientAuth);
+        eventServerFactory.setSocketReceiveBuffer(maxSocketBufferSize);
+        eventServerFactory.setWorkerThreads(maxConnections);
+        eventServerFactory.setConnectionTimeout(readTimeout);

Review comment:
       Read Timeout is different than Connection Timeout, so this seems to be a 
mismatch.  It may also make sense to rename `Connection Timeout` to `Connect 
Timeout`, so it is clear that `Connect Timeout` controls initial connect 
attempts, while Read Timeout controls subsequent read operations.

##########
File path: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenTCPRecord.java
##########
@@ -189,24 +198,17 @@ public void testRunClientAuthNone() throws 
InitializationException, IOException,
         Assert.assertTrue(content.contains("This is a test " + 3));
     }
 
-    protected void run(final int expectedTransferred, final SSLContext 
sslContext) throws IOException, InterruptedException {
+    protected void run(final int expectedTransferred, final byte[] data, final 
SSLContext sslContext, final boolean shouldInitialize) throws Exception {
         final int port = NetworkUtils.availablePort();
         runner.setProperty(ListenTCPRecord.PORT, Integer.toString(port));
 
         // Run Processor and start listener without shutting down
-        runner.run(1, false, true);
-
-        final Thread thread = new Thread(() -> {
-            try (final Socket socket = getSocket(port, sslContext)) {
-                final OutputStream outputStream = socket.getOutputStream();
-                outputStream.write(DATA.getBytes(StandardCharsets.UTF_8));
-                outputStream.flush();
-            } catch (final IOException e) {
-                LOGGER.error("Failed Sending Records to Port [{}]", port, e);
-            }
-        });
-        thread.start();
+        LOGGER.info("Before run:");
+        runner.run(1, false, shouldInitialize);
+        LOGGER.info("About to send messages:");
+        sendMessages(port, data, sslContext);
 
+        LOGGER.info("Sent messages to port: {}", port);

Review comment:
       Recommend removing these log statements from the test method.  Although 
logging in tests can be helpful during initial evaluation, it creates a lot of 
output during regular builds.

##########
File path: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCPRecord.java
##########
@@ -284,95 +286,57 @@ public void onScheduled(final ProcessContext context) 
throws IOException {
             clientAuth = ClientAuth.valueOf(clientAuthValue);
         }
 
-        // create a ServerSocketChannel in non-blocking mode and bind to the 
given address and port
-        final ServerSocketChannel serverSocketChannel = 
ServerSocketChannel.open();
-        serverSocketChannel.configureBlocking(false);
-        serverSocketChannel.bind(new InetSocketAddress(nicAddress, port));
-
-        this.dispatcher = new 
SocketChannelRecordReaderDispatcher(serverSocketChannel, sslContext, 
clientAuth, readTimeout,
-                maxSocketBufferSize, maxConnections, recordReaderFactory, 
socketReaders, getLogger());
+        NettyEventServerFactory eventServerFactory = new 
RecordReaderEventServerFactory(getLogger(), nicAddress, port, 
TransportProtocol.TCP, recordReaderFactory, recordReaders);
+        eventServerFactory.setSslContext(sslContext);
+        eventServerFactory.setClientAuth(clientAuth);
+        eventServerFactory.setSocketReceiveBuffer(maxSocketBufferSize);
+        eventServerFactory.setWorkerThreads(maxConnections);
+        eventServerFactory.setConnectionTimeout(readTimeout);
 
-        // start a thread to run the dispatcher
-        final Thread readerThread = new Thread(dispatcher);
-        readerThread.setName(getClass().getName() + " [" + getIdentifier() + 
"]");
-        readerThread.setDaemon(true);
-        readerThread.start();
+        eventServer = eventServerFactory.getEventServer();
     }
 
     @OnStopped
     public void onStopped() {
-        if (dispatcher != null) {
-            dispatcher.close();
-            dispatcher = null;
+        if (eventServer != null) {
+            eventServer.shutdown();
+            eventServer = null;
         }
 
-        SocketChannelRecordReader socketRecordReader;
-        while ((socketRecordReader = socketReaders.poll()) != null) {
+        NetworkRecordReader recordReader;
+        while ((recordReader = recordReaders.poll()) != null) {
             try {
-                socketRecordReader.close();
+                recordReader.getRecordReader().close();
             } catch (Exception e) {
-                getLogger().error("Couldn't close " + socketRecordReader, e);
+                getLogger().error("Couldn't close " + recordReader, e);
             }
         }
     }
 
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
-        final SocketChannelRecordReader socketRecordReader = 
pollForSocketRecordReader();
-        if (socketRecordReader == null) {
-            return;
-        }
-
-        if (socketRecordReader.isClosed()) {
-            getLogger().warn("Unable to read records from {}, socket already 
closed", new Object[] {getRemoteAddress(socketRecordReader)});
-            IOUtils.closeQuietly(socketRecordReader); // still need to call 
close so the overall count is decremented
+        NetworkRecordReader recordReader = pollForRecordReader();
+        if (recordReader == null) {
             return;
         }
 
         final int recordBatchSize = 
context.getProperty(RECORD_BATCH_SIZE).asInteger();
         final String readerErrorHandling = 
context.getProperty(READER_ERROR_HANDLING_STRATEGY).getValue();
         final RecordSetWriterFactory recordSetWriterFactory = 
context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
 
-        // synchronize to ensure there are no stale values in the underlying 
SocketChannel
-        synchronized (socketRecordReader) {
+        // synchronize to ensure there are no stale values in the underlying 
SocketChannel - is this still necessary?

Review comment:
       This still seems necessary to avoid different threads handling the same 
RecordReader.

##########
File path: nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/pom.xml
##########
@@ -39,5 +39,21 @@
             <version>1.16.0-SNAPSHOT</version>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-standard-record-utils</artifactId>
+            <version>1.15.0-SNAPSHOT</version>

Review comment:
       This version needs to be updated to `1.16.0-SNAPSHOT` to match the 
current main branch.

##########
File path: 
nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/channel/RecordReaderHandler.java
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.event.transport.netty.channel;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufUtil;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.handler.timeout.ReadTimeoutException;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.serialization.RecordReaderFactory;
+
+import java.io.BufferedInputStream;
+import java.io.PipedInputStream;
+import java.io.PipedOutputStream;
+import java.net.SocketAddress;
+import java.util.concurrent.BlockingQueue;
+
+/**
+ * Record Reader Handler will create piped input streams for a network based 
record reader, providing a mechanism for processors
+ * like ListenTCPRecord to read data received by a Netty server.
+ */
[email protected]
+public class RecordReaderHandler extends SimpleChannelInboundHandler<ByteBuf> {
+    private final RecordReaderFactory readerFactory;
+    private final BlockingQueue<NetworkRecordReader> recordReaders;
+    private final ComponentLog logger;
+    private PipedOutputStream fromChannel;
+    private PipedInputStream toReader;
+
+    public RecordReaderHandler(final RecordReaderFactory readerFactory, final 
BlockingQueue<NetworkRecordReader> recordReaders, final ComponentLog logger) {
+        this.logger = logger;
+        this.readerFactory = readerFactory;
+        this.recordReaders = recordReaders;
+    }
+
+    @Override
+    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws 
Exception {
+        final SocketAddress remoteSender = ctx.channel().remoteAddress();
+        logger.info("Netty message received to {}, sender is: {}", 
ctx.channel().localAddress(), remoteSender);
+        fromChannel.write(ByteBufUtil.getBytes(msg));
+    }
+
+    @Override
+    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+        super.channelInactive(ctx);
+        fromChannel.close();
+    }
+
+    @Override
+    public void channelActive(ChannelHandlerContext ctx) throws Exception {
+        final SocketAddress remoteSender = ctx.channel().remoteAddress();
+        fromChannel = new PipedOutputStream();
+        toReader = new PipedInputStream(fromChannel);
+        recordReaders.offer(new NetworkRecordReader(remoteSender, new 
BufferedInputStream(toReader), readerFactory, logger));

Review comment:
       This is a difficult point for integrating the existing stream-based 
Readers with message-based Netty handlers. Some further evaluation of potential 
solutions may help improve the overall implementation.

##########
File path: 
nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/channel/RecordReaderHandler.java
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.event.transport.netty.channel;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufUtil;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.handler.timeout.ReadTimeoutException;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.serialization.RecordReaderFactory;
+
+import java.io.BufferedInputStream;
+import java.io.PipedInputStream;
+import java.io.PipedOutputStream;
+import java.net.SocketAddress;
+import java.util.concurrent.BlockingQueue;
+
+/**
+ * Record Reader Handler will create piped input streams for a network based 
record reader, providing a mechanism for processors
+ * like ListenTCPRecord to read data received by a Netty server.
+ */
[email protected]
+public class RecordReaderHandler extends SimpleChannelInboundHandler<ByteBuf> {
+    private final RecordReaderFactory readerFactory;
+    private final BlockingQueue<NetworkRecordReader> recordReaders;
+    private final ComponentLog logger;
+    private PipedOutputStream fromChannel;
+    private PipedInputStream toReader;
+
+    public RecordReaderHandler(final RecordReaderFactory readerFactory, final 
BlockingQueue<NetworkRecordReader> recordReaders, final ComponentLog logger) {
+        this.logger = logger;
+        this.readerFactory = readerFactory;
+        this.recordReaders = recordReaders;
+    }
+
+    @Override
+    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws 
Exception {
+        final SocketAddress remoteSender = ctx.channel().remoteAddress();
+        logger.info("Netty message received to {}, sender is: {}", 
ctx.channel().localAddress(), remoteSender);

Review comment:
       This will generate a lot of logging, and probably should be removed, or 
at least changed to debug.

##########
File path: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCPRecord.java
##########
@@ -436,28 +401,23 @@ public void onTrigger(final ProcessContext context, final 
ProcessSession session
                     session.transfer(flowFile, REL_SUCCESS);
                 }
 
-                getLogger().debug("Re-queuing connection for further 
processing...");
-                socketReaders.offer(socketRecordReader);
-
+                getLogger().debug("Re-queuing reader for further 
processing...");
+                recordReaders.offer(recordReader);
             } catch (Exception e) {
                 getLogger().error("Error processing records: " + 
e.getMessage(), e);

Review comment:
       This could be adjusted to remove unnecessary repetition of the exception 
message and the stack trace from the the exception itself.
   ```suggestion
                   getLogger().error("Record Processing Failed", e);
   ```

##########
File path: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCPRecord.java
##########
@@ -284,95 +286,57 @@ public void onScheduled(final ProcessContext context) 
throws IOException {
             clientAuth = ClientAuth.valueOf(clientAuthValue);
         }
 
-        // create a ServerSocketChannel in non-blocking mode and bind to the 
given address and port
-        final ServerSocketChannel serverSocketChannel = 
ServerSocketChannel.open();
-        serverSocketChannel.configureBlocking(false);
-        serverSocketChannel.bind(new InetSocketAddress(nicAddress, port));
-
-        this.dispatcher = new 
SocketChannelRecordReaderDispatcher(serverSocketChannel, sslContext, 
clientAuth, readTimeout,
-                maxSocketBufferSize, maxConnections, recordReaderFactory, 
socketReaders, getLogger());
+        NettyEventServerFactory eventServerFactory = new 
RecordReaderEventServerFactory(getLogger(), nicAddress, port, 
TransportProtocol.TCP, recordReaderFactory, recordReaders);
+        eventServerFactory.setSslContext(sslContext);
+        eventServerFactory.setClientAuth(clientAuth);
+        eventServerFactory.setSocketReceiveBuffer(maxSocketBufferSize);
+        eventServerFactory.setWorkerThreads(maxConnections);
+        eventServerFactory.setConnectionTimeout(readTimeout);
 
-        // start a thread to run the dispatcher
-        final Thread readerThread = new Thread(dispatcher);
-        readerThread.setName(getClass().getName() + " [" + getIdentifier() + 
"]");
-        readerThread.setDaemon(true);
-        readerThread.start();
+        eventServer = eventServerFactory.getEventServer();
     }
 
     @OnStopped
     public void onStopped() {
-        if (dispatcher != null) {
-            dispatcher.close();
-            dispatcher = null;
+        if (eventServer != null) {
+            eventServer.shutdown();
+            eventServer = null;
         }
 
-        SocketChannelRecordReader socketRecordReader;
-        while ((socketRecordReader = socketReaders.poll()) != null) {
+        NetworkRecordReader recordReader;
+        while ((recordReader = recordReaders.poll()) != null) {
             try {
-                socketRecordReader.close();
+                recordReader.getRecordReader().close();
             } catch (Exception e) {
-                getLogger().error("Couldn't close " + socketRecordReader, e);
+                getLogger().error("Couldn't close " + recordReader, e);
             }
         }
     }
 
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
-        final SocketChannelRecordReader socketRecordReader = 
pollForSocketRecordReader();
-        if (socketRecordReader == null) {
-            return;
-        }
-
-        if (socketRecordReader.isClosed()) {
-            getLogger().warn("Unable to read records from {}, socket already 
closed", new Object[] {getRemoteAddress(socketRecordReader)});
-            IOUtils.closeQuietly(socketRecordReader); // still need to call 
close so the overall count is decremented
+        NetworkRecordReader recordReader = pollForRecordReader();
+        if (recordReader == null) {
             return;
         }
 
         final int recordBatchSize = 
context.getProperty(RECORD_BATCH_SIZE).asInteger();
         final String readerErrorHandling = 
context.getProperty(READER_ERROR_HANDLING_STRATEGY).getValue();
         final RecordSetWriterFactory recordSetWriterFactory = 
context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
 
-        // synchronize to ensure there are no stale values in the underlying 
SocketChannel
-        synchronized (socketRecordReader) {
+        // synchronize to ensure there are no stale values in the underlying 
SocketChannel - is this still necessary?
+        synchronized (recordReader) {
             FlowFile flowFile = session.create();
             try {
-                // lazily creating the record reader here
-                RecordReader recordReader = 
socketRecordReader.getRecordReader();
-                if (recordReader == null) {
-                    recordReader = 
socketRecordReader.createRecordReader(getLogger());
-                }
-
                 Record record;
                 try {
-                    record = recordReader.nextRecord();
+                    record = recordReader.getRecordReader().nextRecord(); // 
TODO: This nextRecord call needs a timeout of some kind
                 } catch (final Exception e) {
-                    boolean timeout = false;
-
-                    // some of the underlying record libraries wrap the real 
exception in RuntimeException, so check each
-                    // throwable (starting with the current one) to see if its 
a SocketTimeoutException
-                    Throwable cause = e;
-                    while (cause != null) {
-                        if (cause instanceof SocketTimeoutException) {
-                            timeout = true;
-                            break;
-                        }
-                        cause = cause.getCause();
-                    }
-
-                    if (timeout) {
-                        getLogger().debug("Timeout reading records, will try 
again later", e);
-                        socketReaders.offer(socketRecordReader);
-                        session.remove(flowFile);
-                        return;
-                    } else {
-                        throw e;
-                    }
+                    throw e;
                 }
 
                 if (record == null) {
-                    getLogger().debug("No records available from {}, closing 
connection", new Object[]{getRemoteAddress(socketRecordReader)});
-                    IOUtils.closeQuietly(socketRecordReader);
+                    IOUtils.closeQuietly(recordReader.getRecordReader());

Review comment:
       I agree that the debug log is useful.

##########
File path: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCPRecord.java
##########
@@ -420,12 +385,12 @@ public void onTrigger(final ProcessContext context, final 
ProcessSession session
                     getLogger().debug("Removing flow file, no records were 
written");
                     session.remove(flowFile);
                 } else {
-                    final String sender = getRemoteAddress(socketRecordReader);
+                    final String sender = recordReader.getSender().toString();
 
                     final Map<String, String> attributes = new 
HashMap<>(writeResult.getAttributes());
                     attributes.put(CoreAttributes.MIME_TYPE.key(), mimeType);
                     attributes.put("tcp.sender", sender);
-                    attributes.put("tcp.port", String.valueOf(port));
+                    attributes.put("tcp.port", String.valueOf(port)); // 
Should this be the remote port..?

Review comment:
       I agree that a new attribute for the sender port could be useful.

##########
File path: 
nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/NettyEventServerFactory.java
##########
@@ -156,9 +168,13 @@ private void setChannelOptions(final AbstractBootstrap<?, 
?> bootstrap) {
             bootstrap.option(ChannelOption.SO_RCVBUF, socketReceiveBuffer);
             bootstrap.option(ChannelOption.RCVBUF_ALLOCATOR, new 
FixedRecvByteBufAllocator(socketReceiveBuffer));
         }
+
         if (socketKeepAlive != null) {
             bootstrap.option(ChannelOption.SO_KEEPALIVE, socketKeepAlive);
         }
+
+        // ChannelOption only takes integer for time in milliseconds? strange

Review comment:
       This comment seems somewhat superfluous, although interesting, it is 
probably with the goal of limiting the theoretically maximum timeout.

##########
File path: nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/pom.xml
##########
@@ -39,5 +39,21 @@
             <version>1.16.0-SNAPSHOT</version>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-standard-record-utils</artifactId>
+            <version>1.15.0-SNAPSHOT</version>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record-serialization-service-api</artifactId>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record</artifactId>
+            <scope>compile</scope>
+        </dependency>

Review comment:
       On initial evaluation, adding these dependencies to 
`nifi-event-transport` does not seem like the right approach.  Other uses of 
this module do not need record-oriented modules, and in particular, the 
dependency on the `nifi-record-serialization-service-api` is problematic.  It 
seems better to avoid these dependencies entirely, and instead place the 
classes in another module.  This could be in `nifi-standard-processors`, where 
`ListenTCPRecord` currently exists, or for reuse, perhaps a new module named 
something like `nifi-record-event-transport`.

##########
File path: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCPRecord.java
##########
@@ -284,95 +286,57 @@ public void onScheduled(final ProcessContext context) 
throws IOException {
             clientAuth = ClientAuth.valueOf(clientAuthValue);
         }
 
-        // create a ServerSocketChannel in non-blocking mode and bind to the 
given address and port
-        final ServerSocketChannel serverSocketChannel = 
ServerSocketChannel.open();
-        serverSocketChannel.configureBlocking(false);
-        serverSocketChannel.bind(new InetSocketAddress(nicAddress, port));
-
-        this.dispatcher = new 
SocketChannelRecordReaderDispatcher(serverSocketChannel, sslContext, 
clientAuth, readTimeout,
-                maxSocketBufferSize, maxConnections, recordReaderFactory, 
socketReaders, getLogger());
+        NettyEventServerFactory eventServerFactory = new 
RecordReaderEventServerFactory(getLogger(), nicAddress, port, 
TransportProtocol.TCP, recordReaderFactory, recordReaders);
+        eventServerFactory.setSslContext(sslContext);
+        eventServerFactory.setClientAuth(clientAuth);
+        eventServerFactory.setSocketReceiveBuffer(maxSocketBufferSize);
+        eventServerFactory.setWorkerThreads(maxConnections);
+        eventServerFactory.setConnectionTimeout(readTimeout);
 
-        // start a thread to run the dispatcher
-        final Thread readerThread = new Thread(dispatcher);
-        readerThread.setName(getClass().getName() + " [" + getIdentifier() + 
"]");
-        readerThread.setDaemon(true);
-        readerThread.start();
+        eventServer = eventServerFactory.getEventServer();
     }
 
     @OnStopped
     public void onStopped() {
-        if (dispatcher != null) {
-            dispatcher.close();
-            dispatcher = null;
+        if (eventServer != null) {
+            eventServer.shutdown();
+            eventServer = null;
         }
 
-        SocketChannelRecordReader socketRecordReader;
-        while ((socketRecordReader = socketReaders.poll()) != null) {
+        NetworkRecordReader recordReader;
+        while ((recordReader = recordReaders.poll()) != null) {
             try {
-                socketRecordReader.close();
+                recordReader.getRecordReader().close();
             } catch (Exception e) {
-                getLogger().error("Couldn't close " + socketRecordReader, e);
+                getLogger().error("Couldn't close " + recordReader, e);
             }
         }
     }
 
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
-        final SocketChannelRecordReader socketRecordReader = 
pollForSocketRecordReader();
-        if (socketRecordReader == null) {
-            return;
-        }
-
-        if (socketRecordReader.isClosed()) {
-            getLogger().warn("Unable to read records from {}, socket already 
closed", new Object[] {getRemoteAddress(socketRecordReader)});
-            IOUtils.closeQuietly(socketRecordReader); // still need to call 
close so the overall count is decremented
+        NetworkRecordReader recordReader = pollForRecordReader();
+        if (recordReader == null) {
             return;
         }
 
         final int recordBatchSize = 
context.getProperty(RECORD_BATCH_SIZE).asInteger();
         final String readerErrorHandling = 
context.getProperty(READER_ERROR_HANDLING_STRATEGY).getValue();
         final RecordSetWriterFactory recordSetWriterFactory = 
context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
 
-        // synchronize to ensure there are no stale values in the underlying 
SocketChannel
-        synchronized (socketRecordReader) {
+        // synchronize to ensure there are no stale values in the underlying 
SocketChannel - is this still necessary?
+        synchronized (recordReader) {
             FlowFile flowFile = session.create();
             try {
-                // lazily creating the record reader here
-                RecordReader recordReader = 
socketRecordReader.getRecordReader();
-                if (recordReader == null) {
-                    recordReader = 
socketRecordReader.createRecordReader(getLogger());
-                }
-
                 Record record;
                 try {
-                    record = recordReader.nextRecord();
+                    record = recordReader.getRecordReader().nextRecord(); // 
TODO: This nextRecord call needs a timeout of some kind
                 } catch (final Exception e) {
-                    boolean timeout = false;
-
-                    // some of the underlying record libraries wrap the real 
exception in RuntimeException, so check each
-                    // throwable (starting with the current one) to see if its 
a SocketTimeoutException
-                    Throwable cause = e;
-                    while (cause != null) {
-                        if (cause instanceof SocketTimeoutException) {
-                            timeout = true;
-                            break;
-                        }
-                        cause = cause.getCause();
-                    }
-
-                    if (timeout) {
-                        getLogger().debug("Timeout reading records, will try 
again later", e);
-                        socketReaders.offer(socketRecordReader);
-                        session.remove(flowFile);
-                        return;
-                    } else {
-                        throw e;
-                    }
+                    throw e;

Review comment:
       Concurring with @bbende, it looks like this may need to be revisited.  
If SocketTimeoutExceptions are no longer thrown, then this makes sense, but if 
they are thrown, something similar to this needs to be maintained.

##########
File path: 
nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/record/listen/SocketChannelRecordReaderDispatcher.java
##########
@@ -94,7 +95,7 @@ public void run() {
 
                 if (logger.isDebugEnabled()) {
                     final String remoteAddress = remoteSocketAddress == null ? 
"null" : remoteSocketAddress.toString();
-                    logger.debug("Accepted connection from {}", new 
Object[]{remoteAddress});
+                    logger.debug("Accepted connection from {}", remoteAddress);

Review comment:
       Although this change is helpful, recommend reverting it from this PR 
since it is unrelated.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to