exceptionfactory commented on a change in pull request #5560:
URL: https://github.com/apache/nifi/pull/5560#discussion_r760222503
##########
File path:
nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/NettyEventServerFactory.java
##########
@@ -137,6 +140,15 @@ public void setShutdownTimeout(final Duration timeout) {
this.shutdownTimeout = timeout;
}
+ /**
+ * Set the
+ *
+ * @param timeout
Review comment:
The documentation appears to be missing some details.
##########
File path:
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCPRecord.java
##########
@@ -284,95 +286,57 @@ public void onScheduled(final ProcessContext context)
throws IOException {
clientAuth = ClientAuth.valueOf(clientAuthValue);
}
- // create a ServerSocketChannel in non-blocking mode and bind to the
given address and port
- final ServerSocketChannel serverSocketChannel =
ServerSocketChannel.open();
- serverSocketChannel.configureBlocking(false);
- serverSocketChannel.bind(new InetSocketAddress(nicAddress, port));
-
- this.dispatcher = new
SocketChannelRecordReaderDispatcher(serverSocketChannel, sslContext,
clientAuth, readTimeout,
- maxSocketBufferSize, maxConnections, recordReaderFactory,
socketReaders, getLogger());
+ NettyEventServerFactory eventServerFactory = new
RecordReaderEventServerFactory(getLogger(), nicAddress, port,
TransportProtocol.TCP, recordReaderFactory, recordReaders);
+ eventServerFactory.setSslContext(sslContext);
+ eventServerFactory.setClientAuth(clientAuth);
+ eventServerFactory.setSocketReceiveBuffer(maxSocketBufferSize);
+ eventServerFactory.setWorkerThreads(maxConnections);
+ eventServerFactory.setConnectionTimeout(readTimeout);
- // start a thread to run the dispatcher
- final Thread readerThread = new Thread(dispatcher);
- readerThread.setName(getClass().getName() + " [" + getIdentifier() +
"]");
- readerThread.setDaemon(true);
- readerThread.start();
+ eventServer = eventServerFactory.getEventServer();
}
@OnStopped
public void onStopped() {
- if (dispatcher != null) {
- dispatcher.close();
- dispatcher = null;
+ if (eventServer != null) {
+ eventServer.shutdown();
+ eventServer = null;
}
- SocketChannelRecordReader socketRecordReader;
- while ((socketRecordReader = socketReaders.poll()) != null) {
+ NetworkRecordReader recordReader;
+ while ((recordReader = recordReaders.poll()) != null) {
try {
- socketRecordReader.close();
+ recordReader.getRecordReader().close();
} catch (Exception e) {
- getLogger().error("Couldn't close " + socketRecordReader, e);
+ getLogger().error("Couldn't close " + recordReader, e);
Review comment:
This log statement can be changed to use placeholders.
```suggestion
getLogger().error("Couldn't close " + recordReader, e);
```
```suggestion
getLogger().error("Close Record Reader [{}] Failed",
recordReader, e);
```
##########
File path:
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCPRecord.java
##########
@@ -284,95 +286,57 @@ public void onScheduled(final ProcessContext context)
throws IOException {
clientAuth = ClientAuth.valueOf(clientAuthValue);
}
- // create a ServerSocketChannel in non-blocking mode and bind to the
given address and port
- final ServerSocketChannel serverSocketChannel =
ServerSocketChannel.open();
- serverSocketChannel.configureBlocking(false);
- serverSocketChannel.bind(new InetSocketAddress(nicAddress, port));
-
- this.dispatcher = new
SocketChannelRecordReaderDispatcher(serverSocketChannel, sslContext,
clientAuth, readTimeout,
- maxSocketBufferSize, maxConnections, recordReaderFactory,
socketReaders, getLogger());
+ NettyEventServerFactory eventServerFactory = new
RecordReaderEventServerFactory(getLogger(), nicAddress, port,
TransportProtocol.TCP, recordReaderFactory, recordReaders);
+ eventServerFactory.setSslContext(sslContext);
+ eventServerFactory.setClientAuth(clientAuth);
+ eventServerFactory.setSocketReceiveBuffer(maxSocketBufferSize);
+ eventServerFactory.setWorkerThreads(maxConnections);
+ eventServerFactory.setConnectionTimeout(readTimeout);
Review comment:
Read Timeout is different than Connection Timeout, so this seems to be a
mismatch. It may also make sense to rename `Connection Timeout` to `Connect
Timeout`, so it is clear that `Connect Timeout` controls initial connect
attempts, while Read Timeout controls subsequent read operations.
##########
File path:
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenTCPRecord.java
##########
@@ -189,24 +198,17 @@ public void testRunClientAuthNone() throws
InitializationException, IOException,
Assert.assertTrue(content.contains("This is a test " + 3));
}
- protected void run(final int expectedTransferred, final SSLContext
sslContext) throws IOException, InterruptedException {
+ protected void run(final int expectedTransferred, final byte[] data, final
SSLContext sslContext, final boolean shouldInitialize) throws Exception {
final int port = NetworkUtils.availablePort();
runner.setProperty(ListenTCPRecord.PORT, Integer.toString(port));
// Run Processor and start listener without shutting down
- runner.run(1, false, true);
-
- final Thread thread = new Thread(() -> {
- try (final Socket socket = getSocket(port, sslContext)) {
- final OutputStream outputStream = socket.getOutputStream();
- outputStream.write(DATA.getBytes(StandardCharsets.UTF_8));
- outputStream.flush();
- } catch (final IOException e) {
- LOGGER.error("Failed Sending Records to Port [{}]", port, e);
- }
- });
- thread.start();
+ LOGGER.info("Before run:");
+ runner.run(1, false, shouldInitialize);
+ LOGGER.info("About to send messages:");
+ sendMessages(port, data, sslContext);
+ LOGGER.info("Sent messages to port: {}", port);
Review comment:
Recommend removing these log statements from the test method. Although
logging in tests can be helpful during initial evaluation, it creates a lot of
output during regular builds.
##########
File path:
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCPRecord.java
##########
@@ -284,95 +286,57 @@ public void onScheduled(final ProcessContext context)
throws IOException {
clientAuth = ClientAuth.valueOf(clientAuthValue);
}
- // create a ServerSocketChannel in non-blocking mode and bind to the
given address and port
- final ServerSocketChannel serverSocketChannel =
ServerSocketChannel.open();
- serverSocketChannel.configureBlocking(false);
- serverSocketChannel.bind(new InetSocketAddress(nicAddress, port));
-
- this.dispatcher = new
SocketChannelRecordReaderDispatcher(serverSocketChannel, sslContext,
clientAuth, readTimeout,
- maxSocketBufferSize, maxConnections, recordReaderFactory,
socketReaders, getLogger());
+ NettyEventServerFactory eventServerFactory = new
RecordReaderEventServerFactory(getLogger(), nicAddress, port,
TransportProtocol.TCP, recordReaderFactory, recordReaders);
+ eventServerFactory.setSslContext(sslContext);
+ eventServerFactory.setClientAuth(clientAuth);
+ eventServerFactory.setSocketReceiveBuffer(maxSocketBufferSize);
+ eventServerFactory.setWorkerThreads(maxConnections);
+ eventServerFactory.setConnectionTimeout(readTimeout);
- // start a thread to run the dispatcher
- final Thread readerThread = new Thread(dispatcher);
- readerThread.setName(getClass().getName() + " [" + getIdentifier() +
"]");
- readerThread.setDaemon(true);
- readerThread.start();
+ eventServer = eventServerFactory.getEventServer();
}
@OnStopped
public void onStopped() {
- if (dispatcher != null) {
- dispatcher.close();
- dispatcher = null;
+ if (eventServer != null) {
+ eventServer.shutdown();
+ eventServer = null;
}
- SocketChannelRecordReader socketRecordReader;
- while ((socketRecordReader = socketReaders.poll()) != null) {
+ NetworkRecordReader recordReader;
+ while ((recordReader = recordReaders.poll()) != null) {
try {
- socketRecordReader.close();
+ recordReader.getRecordReader().close();
} catch (Exception e) {
- getLogger().error("Couldn't close " + socketRecordReader, e);
+ getLogger().error("Couldn't close " + recordReader, e);
}
}
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession
session) throws ProcessException {
- final SocketChannelRecordReader socketRecordReader =
pollForSocketRecordReader();
- if (socketRecordReader == null) {
- return;
- }
-
- if (socketRecordReader.isClosed()) {
- getLogger().warn("Unable to read records from {}, socket already
closed", new Object[] {getRemoteAddress(socketRecordReader)});
- IOUtils.closeQuietly(socketRecordReader); // still need to call
close so the overall count is decremented
+ NetworkRecordReader recordReader = pollForRecordReader();
+ if (recordReader == null) {
return;
}
final int recordBatchSize =
context.getProperty(RECORD_BATCH_SIZE).asInteger();
final String readerErrorHandling =
context.getProperty(READER_ERROR_HANDLING_STRATEGY).getValue();
final RecordSetWriterFactory recordSetWriterFactory =
context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
- // synchronize to ensure there are no stale values in the underlying
SocketChannel
- synchronized (socketRecordReader) {
+ // synchronize to ensure there are no stale values in the underlying
SocketChannel - is this still necessary?
Review comment:
This still seems necessary to avoid different threads handling the same
RecordReader.
##########
File path: nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/pom.xml
##########
@@ -39,5 +39,21 @@
<version>1.16.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-standard-record-utils</artifactId>
+ <version>1.15.0-SNAPSHOT</version>
Review comment:
This version needs to be updated to `1.16.0-SNAPSHOT` to match the
current main branch.
##########
File path:
nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/channel/RecordReaderHandler.java
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.event.transport.netty.channel;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufUtil;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.handler.timeout.ReadTimeoutException;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.serialization.RecordReaderFactory;
+
+import java.io.BufferedInputStream;
+import java.io.PipedInputStream;
+import java.io.PipedOutputStream;
+import java.net.SocketAddress;
+import java.util.concurrent.BlockingQueue;
+
+/**
+ * Record Reader Handler will create piped input streams for a network based
record reader, providing a mechanism for processors
+ * like ListenTCPRecord to read data received by a Netty server.
+ */
[email protected]
+public class RecordReaderHandler extends SimpleChannelInboundHandler<ByteBuf> {
+ private final RecordReaderFactory readerFactory;
+ private final BlockingQueue<NetworkRecordReader> recordReaders;
+ private final ComponentLog logger;
+ private PipedOutputStream fromChannel;
+ private PipedInputStream toReader;
+
+ public RecordReaderHandler(final RecordReaderFactory readerFactory, final
BlockingQueue<NetworkRecordReader> recordReaders, final ComponentLog logger) {
+ this.logger = logger;
+ this.readerFactory = readerFactory;
+ this.recordReaders = recordReaders;
+ }
+
+ @Override
+ protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws
Exception {
+ final SocketAddress remoteSender = ctx.channel().remoteAddress();
+ logger.info("Netty message received to {}, sender is: {}",
ctx.channel().localAddress(), remoteSender);
+ fromChannel.write(ByteBufUtil.getBytes(msg));
+ }
+
+ @Override
+ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+ super.channelInactive(ctx);
+ fromChannel.close();
+ }
+
+ @Override
+ public void channelActive(ChannelHandlerContext ctx) throws Exception {
+ final SocketAddress remoteSender = ctx.channel().remoteAddress();
+ fromChannel = new PipedOutputStream();
+ toReader = new PipedInputStream(fromChannel);
+ recordReaders.offer(new NetworkRecordReader(remoteSender, new
BufferedInputStream(toReader), readerFactory, logger));
Review comment:
This is a difficult point for integrating the existing stream-based
Readers with message-based Netty handlers. Some further evaluation of potential
solutions may help improve the overall implementation.
##########
File path:
nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/channel/RecordReaderHandler.java
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.event.transport.netty.channel;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufUtil;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.handler.timeout.ReadTimeoutException;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.serialization.RecordReaderFactory;
+
+import java.io.BufferedInputStream;
+import java.io.PipedInputStream;
+import java.io.PipedOutputStream;
+import java.net.SocketAddress;
+import java.util.concurrent.BlockingQueue;
+
+/**
+ * Record Reader Handler will create piped input streams for a network based
record reader, providing a mechanism for processors
+ * like ListenTCPRecord to read data received by a Netty server.
+ */
[email protected]
+public class RecordReaderHandler extends SimpleChannelInboundHandler<ByteBuf> {
+ private final RecordReaderFactory readerFactory;
+ private final BlockingQueue<NetworkRecordReader> recordReaders;
+ private final ComponentLog logger;
+ private PipedOutputStream fromChannel;
+ private PipedInputStream toReader;
+
+ public RecordReaderHandler(final RecordReaderFactory readerFactory, final
BlockingQueue<NetworkRecordReader> recordReaders, final ComponentLog logger) {
+ this.logger = logger;
+ this.readerFactory = readerFactory;
+ this.recordReaders = recordReaders;
+ }
+
+ @Override
+ protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws
Exception {
+ final SocketAddress remoteSender = ctx.channel().remoteAddress();
+ logger.info("Netty message received to {}, sender is: {}",
ctx.channel().localAddress(), remoteSender);
Review comment:
This will generate a lot of logging, and probably should be removed, or
at least changed to debug.
##########
File path:
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCPRecord.java
##########
@@ -436,28 +401,23 @@ public void onTrigger(final ProcessContext context, final
ProcessSession session
session.transfer(flowFile, REL_SUCCESS);
}
- getLogger().debug("Re-queuing connection for further
processing...");
- socketReaders.offer(socketRecordReader);
-
+ getLogger().debug("Re-queuing reader for further
processing...");
+ recordReaders.offer(recordReader);
} catch (Exception e) {
getLogger().error("Error processing records: " +
e.getMessage(), e);
Review comment:
This could be adjusted to remove unnecessary repetition of the exception
message and the stack trace from the the exception itself.
```suggestion
getLogger().error("Record Processing Failed", e);
```
##########
File path:
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCPRecord.java
##########
@@ -284,95 +286,57 @@ public void onScheduled(final ProcessContext context)
throws IOException {
clientAuth = ClientAuth.valueOf(clientAuthValue);
}
- // create a ServerSocketChannel in non-blocking mode and bind to the
given address and port
- final ServerSocketChannel serverSocketChannel =
ServerSocketChannel.open();
- serverSocketChannel.configureBlocking(false);
- serverSocketChannel.bind(new InetSocketAddress(nicAddress, port));
-
- this.dispatcher = new
SocketChannelRecordReaderDispatcher(serverSocketChannel, sslContext,
clientAuth, readTimeout,
- maxSocketBufferSize, maxConnections, recordReaderFactory,
socketReaders, getLogger());
+ NettyEventServerFactory eventServerFactory = new
RecordReaderEventServerFactory(getLogger(), nicAddress, port,
TransportProtocol.TCP, recordReaderFactory, recordReaders);
+ eventServerFactory.setSslContext(sslContext);
+ eventServerFactory.setClientAuth(clientAuth);
+ eventServerFactory.setSocketReceiveBuffer(maxSocketBufferSize);
+ eventServerFactory.setWorkerThreads(maxConnections);
+ eventServerFactory.setConnectionTimeout(readTimeout);
- // start a thread to run the dispatcher
- final Thread readerThread = new Thread(dispatcher);
- readerThread.setName(getClass().getName() + " [" + getIdentifier() +
"]");
- readerThread.setDaemon(true);
- readerThread.start();
+ eventServer = eventServerFactory.getEventServer();
}
@OnStopped
public void onStopped() {
- if (dispatcher != null) {
- dispatcher.close();
- dispatcher = null;
+ if (eventServer != null) {
+ eventServer.shutdown();
+ eventServer = null;
}
- SocketChannelRecordReader socketRecordReader;
- while ((socketRecordReader = socketReaders.poll()) != null) {
+ NetworkRecordReader recordReader;
+ while ((recordReader = recordReaders.poll()) != null) {
try {
- socketRecordReader.close();
+ recordReader.getRecordReader().close();
} catch (Exception e) {
- getLogger().error("Couldn't close " + socketRecordReader, e);
+ getLogger().error("Couldn't close " + recordReader, e);
}
}
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession
session) throws ProcessException {
- final SocketChannelRecordReader socketRecordReader =
pollForSocketRecordReader();
- if (socketRecordReader == null) {
- return;
- }
-
- if (socketRecordReader.isClosed()) {
- getLogger().warn("Unable to read records from {}, socket already
closed", new Object[] {getRemoteAddress(socketRecordReader)});
- IOUtils.closeQuietly(socketRecordReader); // still need to call
close so the overall count is decremented
+ NetworkRecordReader recordReader = pollForRecordReader();
+ if (recordReader == null) {
return;
}
final int recordBatchSize =
context.getProperty(RECORD_BATCH_SIZE).asInteger();
final String readerErrorHandling =
context.getProperty(READER_ERROR_HANDLING_STRATEGY).getValue();
final RecordSetWriterFactory recordSetWriterFactory =
context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
- // synchronize to ensure there are no stale values in the underlying
SocketChannel
- synchronized (socketRecordReader) {
+ // synchronize to ensure there are no stale values in the underlying
SocketChannel - is this still necessary?
+ synchronized (recordReader) {
FlowFile flowFile = session.create();
try {
- // lazily creating the record reader here
- RecordReader recordReader =
socketRecordReader.getRecordReader();
- if (recordReader == null) {
- recordReader =
socketRecordReader.createRecordReader(getLogger());
- }
-
Record record;
try {
- record = recordReader.nextRecord();
+ record = recordReader.getRecordReader().nextRecord(); //
TODO: This nextRecord call needs a timeout of some kind
} catch (final Exception e) {
- boolean timeout = false;
-
- // some of the underlying record libraries wrap the real
exception in RuntimeException, so check each
- // throwable (starting with the current one) to see if its
a SocketTimeoutException
- Throwable cause = e;
- while (cause != null) {
- if (cause instanceof SocketTimeoutException) {
- timeout = true;
- break;
- }
- cause = cause.getCause();
- }
-
- if (timeout) {
- getLogger().debug("Timeout reading records, will try
again later", e);
- socketReaders.offer(socketRecordReader);
- session.remove(flowFile);
- return;
- } else {
- throw e;
- }
+ throw e;
}
if (record == null) {
- getLogger().debug("No records available from {}, closing
connection", new Object[]{getRemoteAddress(socketRecordReader)});
- IOUtils.closeQuietly(socketRecordReader);
+ IOUtils.closeQuietly(recordReader.getRecordReader());
Review comment:
I agree that the debug log is useful.
##########
File path:
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCPRecord.java
##########
@@ -420,12 +385,12 @@ public void onTrigger(final ProcessContext context, final
ProcessSession session
getLogger().debug("Removing flow file, no records were
written");
session.remove(flowFile);
} else {
- final String sender = getRemoteAddress(socketRecordReader);
+ final String sender = recordReader.getSender().toString();
final Map<String, String> attributes = new
HashMap<>(writeResult.getAttributes());
attributes.put(CoreAttributes.MIME_TYPE.key(), mimeType);
attributes.put("tcp.sender", sender);
- attributes.put("tcp.port", String.valueOf(port));
+ attributes.put("tcp.port", String.valueOf(port)); //
Should this be the remote port..?
Review comment:
I agree that a new attribute for the sender port could be useful.
##########
File path:
nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/NettyEventServerFactory.java
##########
@@ -156,9 +168,13 @@ private void setChannelOptions(final AbstractBootstrap<?,
?> bootstrap) {
bootstrap.option(ChannelOption.SO_RCVBUF, socketReceiveBuffer);
bootstrap.option(ChannelOption.RCVBUF_ALLOCATOR, new
FixedRecvByteBufAllocator(socketReceiveBuffer));
}
+
if (socketKeepAlive != null) {
bootstrap.option(ChannelOption.SO_KEEPALIVE, socketKeepAlive);
}
+
+ // ChannelOption only takes integer for time in milliseconds? strange
Review comment:
This comment seems somewhat superfluous, although interesting, it is
probably with the goal of limiting the theoretically maximum timeout.
##########
File path: nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/pom.xml
##########
@@ -39,5 +39,21 @@
<version>1.16.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-standard-record-utils</artifactId>
+ <version>1.15.0-SNAPSHOT</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-record-serialization-service-api</artifactId>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-record</artifactId>
+ <scope>compile</scope>
+ </dependency>
Review comment:
On initial evaluation, adding these dependencies to
`nifi-event-transport` does not seem like the right approach. Other uses of
this module do not need record-oriented modules, and in particular, the
dependency on the `nifi-record-serialization-service-api` is problematic. It
seems better to avoid these dependencies entirely, and instead place the
classes in another module. This could be in `nifi-standard-processors`, where
`ListenTCPRecord` currently exists, or for reuse, perhaps a new module named
something like `nifi-record-event-transport`.
##########
File path:
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCPRecord.java
##########
@@ -284,95 +286,57 @@ public void onScheduled(final ProcessContext context)
throws IOException {
clientAuth = ClientAuth.valueOf(clientAuthValue);
}
- // create a ServerSocketChannel in non-blocking mode and bind to the
given address and port
- final ServerSocketChannel serverSocketChannel =
ServerSocketChannel.open();
- serverSocketChannel.configureBlocking(false);
- serverSocketChannel.bind(new InetSocketAddress(nicAddress, port));
-
- this.dispatcher = new
SocketChannelRecordReaderDispatcher(serverSocketChannel, sslContext,
clientAuth, readTimeout,
- maxSocketBufferSize, maxConnections, recordReaderFactory,
socketReaders, getLogger());
+ NettyEventServerFactory eventServerFactory = new
RecordReaderEventServerFactory(getLogger(), nicAddress, port,
TransportProtocol.TCP, recordReaderFactory, recordReaders);
+ eventServerFactory.setSslContext(sslContext);
+ eventServerFactory.setClientAuth(clientAuth);
+ eventServerFactory.setSocketReceiveBuffer(maxSocketBufferSize);
+ eventServerFactory.setWorkerThreads(maxConnections);
+ eventServerFactory.setConnectionTimeout(readTimeout);
- // start a thread to run the dispatcher
- final Thread readerThread = new Thread(dispatcher);
- readerThread.setName(getClass().getName() + " [" + getIdentifier() +
"]");
- readerThread.setDaemon(true);
- readerThread.start();
+ eventServer = eventServerFactory.getEventServer();
}
@OnStopped
public void onStopped() {
- if (dispatcher != null) {
- dispatcher.close();
- dispatcher = null;
+ if (eventServer != null) {
+ eventServer.shutdown();
+ eventServer = null;
}
- SocketChannelRecordReader socketRecordReader;
- while ((socketRecordReader = socketReaders.poll()) != null) {
+ NetworkRecordReader recordReader;
+ while ((recordReader = recordReaders.poll()) != null) {
try {
- socketRecordReader.close();
+ recordReader.getRecordReader().close();
} catch (Exception e) {
- getLogger().error("Couldn't close " + socketRecordReader, e);
+ getLogger().error("Couldn't close " + recordReader, e);
}
}
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession
session) throws ProcessException {
- final SocketChannelRecordReader socketRecordReader =
pollForSocketRecordReader();
- if (socketRecordReader == null) {
- return;
- }
-
- if (socketRecordReader.isClosed()) {
- getLogger().warn("Unable to read records from {}, socket already
closed", new Object[] {getRemoteAddress(socketRecordReader)});
- IOUtils.closeQuietly(socketRecordReader); // still need to call
close so the overall count is decremented
+ NetworkRecordReader recordReader = pollForRecordReader();
+ if (recordReader == null) {
return;
}
final int recordBatchSize =
context.getProperty(RECORD_BATCH_SIZE).asInteger();
final String readerErrorHandling =
context.getProperty(READER_ERROR_HANDLING_STRATEGY).getValue();
final RecordSetWriterFactory recordSetWriterFactory =
context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
- // synchronize to ensure there are no stale values in the underlying
SocketChannel
- synchronized (socketRecordReader) {
+ // synchronize to ensure there are no stale values in the underlying
SocketChannel - is this still necessary?
+ synchronized (recordReader) {
FlowFile flowFile = session.create();
try {
- // lazily creating the record reader here
- RecordReader recordReader =
socketRecordReader.getRecordReader();
- if (recordReader == null) {
- recordReader =
socketRecordReader.createRecordReader(getLogger());
- }
-
Record record;
try {
- record = recordReader.nextRecord();
+ record = recordReader.getRecordReader().nextRecord(); //
TODO: This nextRecord call needs a timeout of some kind
} catch (final Exception e) {
- boolean timeout = false;
-
- // some of the underlying record libraries wrap the real
exception in RuntimeException, so check each
- // throwable (starting with the current one) to see if its
a SocketTimeoutException
- Throwable cause = e;
- while (cause != null) {
- if (cause instanceof SocketTimeoutException) {
- timeout = true;
- break;
- }
- cause = cause.getCause();
- }
-
- if (timeout) {
- getLogger().debug("Timeout reading records, will try
again later", e);
- socketReaders.offer(socketRecordReader);
- session.remove(flowFile);
- return;
- } else {
- throw e;
- }
+ throw e;
Review comment:
Concurring with @bbende, it looks like this may need to be revisited.
If SocketTimeoutExceptions are no longer thrown, then this makes sense, but if
they are thrown, something similar to this needs to be maintained.
##########
File path:
nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/record/listen/SocketChannelRecordReaderDispatcher.java
##########
@@ -94,7 +95,7 @@ public void run() {
if (logger.isDebugEnabled()) {
final String remoteAddress = remoteSocketAddress == null ?
"null" : remoteSocketAddress.toString();
- logger.debug("Accepted connection from {}", new
Object[]{remoteAddress});
+ logger.debug("Accepted connection from {}", remoteAddress);
Review comment:
Although this change is helpful, recommend reverting it from this PR
since it is unrelated.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]