bbende commented on a change in pull request #5560:
URL: https://github.com/apache/nifi/pull/5560#discussion_r759561308
##########
File path:
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCPRecord.java
##########
@@ -284,95 +286,57 @@ public void onScheduled(final ProcessContext context)
throws IOException {
clientAuth = ClientAuth.valueOf(clientAuthValue);
}
- // create a ServerSocketChannel in non-blocking mode and bind to the
given address and port
- final ServerSocketChannel serverSocketChannel =
ServerSocketChannel.open();
- serverSocketChannel.configureBlocking(false);
- serverSocketChannel.bind(new InetSocketAddress(nicAddress, port));
-
- this.dispatcher = new
SocketChannelRecordReaderDispatcher(serverSocketChannel, sslContext,
clientAuth, readTimeout,
- maxSocketBufferSize, maxConnections, recordReaderFactory,
socketReaders, getLogger());
+ NettyEventServerFactory eventServerFactory = new
RecordReaderEventServerFactory(getLogger(), nicAddress, port,
TransportProtocol.TCP, recordReaderFactory, recordReaders);
+ eventServerFactory.setSslContext(sslContext);
+ eventServerFactory.setClientAuth(clientAuth);
+ eventServerFactory.setSocketReceiveBuffer(maxSocketBufferSize);
+ eventServerFactory.setWorkerThreads(maxConnections);
+ eventServerFactory.setConnectionTimeout(readTimeout);
- // start a thread to run the dispatcher
- final Thread readerThread = new Thread(dispatcher);
- readerThread.setName(getClass().getName() + " [" + getIdentifier() +
"]");
- readerThread.setDaemon(true);
- readerThread.start();
+ eventServer = eventServerFactory.getEventServer();
}
@OnStopped
public void onStopped() {
- if (dispatcher != null) {
- dispatcher.close();
- dispatcher = null;
+ if (eventServer != null) {
+ eventServer.shutdown();
+ eventServer = null;
}
- SocketChannelRecordReader socketRecordReader;
- while ((socketRecordReader = socketReaders.poll()) != null) {
+ NetworkRecordReader recordReader;
+ while ((recordReader = recordReaders.poll()) != null) {
try {
- socketRecordReader.close();
+ recordReader.getRecordReader().close();
} catch (Exception e) {
- getLogger().error("Couldn't close " + socketRecordReader, e);
+ getLogger().error("Couldn't close " + recordReader, e);
}
}
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession
session) throws ProcessException {
- final SocketChannelRecordReader socketRecordReader =
pollForSocketRecordReader();
- if (socketRecordReader == null) {
- return;
- }
-
- if (socketRecordReader.isClosed()) {
- getLogger().warn("Unable to read records from {}, socket already
closed", new Object[] {getRemoteAddress(socketRecordReader)});
- IOUtils.closeQuietly(socketRecordReader); // still need to call
close so the overall count is decremented
+ NetworkRecordReader recordReader = pollForRecordReader();
+ if (recordReader == null) {
return;
}
final int recordBatchSize =
context.getProperty(RECORD_BATCH_SIZE).asInteger();
final String readerErrorHandling =
context.getProperty(READER_ERROR_HANDLING_STRATEGY).getValue();
final RecordSetWriterFactory recordSetWriterFactory =
context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
- // synchronize to ensure there are no stale values in the underlying
SocketChannel
- synchronized (socketRecordReader) {
+ // synchronize to ensure there are no stale values in the underlying
SocketChannel - is this still necessary?
+ synchronized (recordReader) {
FlowFile flowFile = session.create();
try {
- // lazily creating the record reader here
- RecordReader recordReader =
socketRecordReader.getRecordReader();
- if (recordReader == null) {
- recordReader =
socketRecordReader.createRecordReader(getLogger());
- }
-
Record record;
try {
- record = recordReader.nextRecord();
+ record = recordReader.getRecordReader().nextRecord(); //
TODO: This nextRecord call needs a timeout of some kind
} catch (final Exception e) {
- boolean timeout = false;
-
- // some of the underlying record libraries wrap the real
exception in RuntimeException, so check each
- // throwable (starting with the current one) to see if its
a SocketTimeoutException
- Throwable cause = e;
- while (cause != null) {
- if (cause instanceof SocketTimeoutException) {
- timeout = true;
- break;
- }
- cause = cause.getCause();
- }
-
- if (timeout) {
- getLogger().debug("Timeout reading records, will try
again later", e);
- socketReaders.offer(socketRecordReader);
- session.remove(flowFile);
- return;
- } else {
- throw e;
- }
+ throw e;
}
if (record == null) {
- getLogger().debug("No records available from {}, closing
connection", new Object[]{getRemoteAddress(socketRecordReader)});
- IOUtils.closeQuietly(socketRecordReader);
+ IOUtils.closeQuietly(recordReader.getRecordReader());
Review comment:
I'd be in favor of retaining the logging statement to debug why the
connection was being closed.
##########
File path:
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCPRecord.java
##########
@@ -420,12 +385,12 @@ public void onTrigger(final ProcessContext context, final
ProcessSession session
getLogger().debug("Removing flow file, no records were
written");
session.remove(flowFile);
} else {
- final String sender = getRemoteAddress(socketRecordReader);
+ final String sender = recordReader.getSender().toString();
final Map<String, String> attributes = new
HashMap<>(writeResult.getAttributes());
attributes.put(CoreAttributes.MIME_TYPE.key(), mimeType);
attributes.put("tcp.sender", sender);
- attributes.put("tcp.port", String.valueOf(port));
+ attributes.put("tcp.port", String.valueOf(port)); //
Should this be the remote port..?
Review comment:
The docs for tcp.port say
@WritesAttribute(attribute="tcp.port", description="The port that the
processor accepted the connection on."),
But possibly we should add another attribute like tcp.sender.port
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]