thenatog commented on a change in pull request #5560:
URL: https://github.com/apache/nifi/pull/5560#discussion_r760477508



##########
File path: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCPRecord.java
##########
@@ -284,95 +286,57 @@ public void onScheduled(final ProcessContext context) 
throws IOException {
             clientAuth = ClientAuth.valueOf(clientAuthValue);
         }
 
-        // create a ServerSocketChannel in non-blocking mode and bind to the 
given address and port
-        final ServerSocketChannel serverSocketChannel = 
ServerSocketChannel.open();
-        serverSocketChannel.configureBlocking(false);
-        serverSocketChannel.bind(new InetSocketAddress(nicAddress, port));
-
-        this.dispatcher = new 
SocketChannelRecordReaderDispatcher(serverSocketChannel, sslContext, 
clientAuth, readTimeout,
-                maxSocketBufferSize, maxConnections, recordReaderFactory, 
socketReaders, getLogger());
+        NettyEventServerFactory eventServerFactory = new 
RecordReaderEventServerFactory(getLogger(), nicAddress, port, 
TransportProtocol.TCP, recordReaderFactory, recordReaders);
+        eventServerFactory.setSslContext(sslContext);
+        eventServerFactory.setClientAuth(clientAuth);
+        eventServerFactory.setSocketReceiveBuffer(maxSocketBufferSize);
+        eventServerFactory.setWorkerThreads(maxConnections);
+        eventServerFactory.setConnectionTimeout(readTimeout);
 
-        // start a thread to run the dispatcher
-        final Thread readerThread = new Thread(dispatcher);
-        readerThread.setName(getClass().getName() + " [" + getIdentifier() + 
"]");
-        readerThread.setDaemon(true);
-        readerThread.start();
+        eventServer = eventServerFactory.getEventServer();
     }
 
     @OnStopped
     public void onStopped() {
-        if (dispatcher != null) {
-            dispatcher.close();
-            dispatcher = null;
+        if (eventServer != null) {
+            eventServer.shutdown();
+            eventServer = null;
         }
 
-        SocketChannelRecordReader socketRecordReader;
-        while ((socketRecordReader = socketReaders.poll()) != null) {
+        NetworkRecordReader recordReader;
+        while ((recordReader = recordReaders.poll()) != null) {
             try {
-                socketRecordReader.close();
+                recordReader.getRecordReader().close();
             } catch (Exception e) {
-                getLogger().error("Couldn't close " + socketRecordReader, e);
+                getLogger().error("Couldn't close " + recordReader, e);
             }
         }
     }
 
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
-        final SocketChannelRecordReader socketRecordReader = 
pollForSocketRecordReader();
-        if (socketRecordReader == null) {
-            return;
-        }
-
-        if (socketRecordReader.isClosed()) {
-            getLogger().warn("Unable to read records from {}, socket already 
closed", new Object[] {getRemoteAddress(socketRecordReader)});
-            IOUtils.closeQuietly(socketRecordReader); // still need to call 
close so the overall count is decremented
+        NetworkRecordReader recordReader = pollForRecordReader();
+        if (recordReader == null) {
             return;
         }
 
         final int recordBatchSize = 
context.getProperty(RECORD_BATCH_SIZE).asInteger();
         final String readerErrorHandling = 
context.getProperty(READER_ERROR_HANDLING_STRATEGY).getValue();
         final RecordSetWriterFactory recordSetWriterFactory = 
context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
 
-        // synchronize to ensure there are no stale values in the underlying 
SocketChannel
-        synchronized (socketRecordReader) {
+        // synchronize to ensure there are no stale values in the underlying 
SocketChannel - is this still necessary?
+        synchronized (recordReader) {
             FlowFile flowFile = session.create();
             try {
-                // lazily creating the record reader here
-                RecordReader recordReader = 
socketRecordReader.getRecordReader();
-                if (recordReader == null) {
-                    recordReader = 
socketRecordReader.createRecordReader(getLogger());
-                }
-
                 Record record;
                 try {
-                    record = recordReader.nextRecord();
+                    record = recordReader.getRecordReader().nextRecord(); // 
TODO: This nextRecord call needs a timeout of some kind

Review comment:
       I believe we're setting an equivalent timeout on the Netty server with: 
   
   eventServerFactory.setConnectionTimeout(readTimeout);
   
   The reason I had this comment here, and I will remove it, was because I was 
having a locking issue with the piped stream on reading the next record. I've 
somewhat resolved this by closing the stream after some amount of idle time 
using a netty handler.
   
   I'm unsure if the creating/reading in the same try block will cause problems 
so I will revisit that.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to