This is an automated email from the ASF dual-hosted git repository.
turcsanyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 39f8a00 NIFI-8039 Adding properties to ListenTCP in order to allow
refine behaviour under higher load; Refining thread pool for better scalability
39f8a00 is described below
commit 39f8a008d4bde2024eb804ff7017542a6e86b572
Author: Bence Simon <[email protected]>
AuthorDate: Wed Nov 25 21:53:01 2020 +0100
NIFI-8039 Adding properties to ListenTCP in order to allow refine behaviour
under higher load; Refining thread pool for better scalability
NIFI-8039 Review findings; refining thread pool to be able to scale down
properly when not under load
NIFI-8039 Answers to PR comments
This closes #4689.
Signed-off-by: Peter Turcsanyi <[email protected]>
---
.../apache/nifi/processors/beats/ListenBeats.java | 28 ++++----
.../handler/TestBeatsSocketChannelHandler.java | 9 +--
.../util/listen/AbstractListenEventProcessor.java | 20 +-----
.../util/listen/dispatcher/ByteBufferFactory.java | 37 ++++++++++
.../util/listen/dispatcher/ByteBufferPool.java | 55 +++++++++++++++
.../util/listen/dispatcher/ByteBufferSource.java | 39 ++++++++++
.../dispatcher/DatagramChannelDispatcher.java | 22 ++----
.../listen/dispatcher/SocketChannelDispatcher.java | 82 +++++++++++++---------
.../processors/lumberjack/ListenLumberjack.java | 28 ++++----
.../handler/ITLumberjackSocketChannelHandler.java | 36 +++++-----
.../nifi/processors/standard/ListenRELP.java | 28 ++++----
.../nifi/processors/standard/ListenSyslog.java | 62 ++++++++--------
.../apache/nifi/processors/standard/ListenTCP.java | 80 +++++++++++++++++----
.../apache/nifi/processors/standard/ListenUDP.java | 7 +-
.../nifi/processors/standard/ListenUDPRecord.java | 7 +-
.../relp/handler/TestRELPSocketChannelHandler.java | 9 +--
16 files changed, 366 insertions(+), 183 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/ListenBeats.java
b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/ListenBeats.java
index eab3e76..bedca27 100644
---
a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/ListenBeats.java
+++
b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/ListenBeats.java
@@ -16,17 +16,6 @@
*/
package org.apache.nifi.processors.beats;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.charset.Charset;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.BlockingQueue;
-import javax.net.ssl.SSLContext;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
@@ -45,6 +34,8 @@ import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import
org.apache.nifi.processor.util.listen.AbstractListenEventBatchingProcessor;
import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher;
+import org.apache.nifi.processor.util.listen.dispatcher.ByteBufferPool;
+import org.apache.nifi.processor.util.listen.dispatcher.ByteBufferSource;
import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher;
import
org.apache.nifi.processor.util.listen.dispatcher.SocketChannelDispatcher;
import org.apache.nifi.processor.util.listen.event.EventFactory;
@@ -61,6 +52,17 @@ import org.apache.nifi.security.util.ClientAuth;
import org.apache.nifi.ssl.RestrictedSSLContextService;
import org.apache.nifi.ssl.SSLContextService;
+import javax.net.ssl.SSLContext;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+
@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
@Tags({"listen", "beats", "tcp", "logs"})
@CapabilityDescription("Listens for messages sent by libbeat compatible
clients (e.g. filebeats, metricbeats, etc) using Libbeat's 'output.logstash',
writing its JSON formatted payload " +
@@ -147,7 +149,7 @@ public class ListenBeats extends
AbstractListenEventBatchingProcessor<BeatsEvent
final Charset charSet =
Charset.forName(context.getProperty(CHARSET).getValue());
// initialize the buffer pool based on max number of connections and
the buffer size
- final BlockingQueue<ByteBuffer> bufferPool =
createBufferPool(maxConnections, bufferSize);
+ final ByteBufferSource byteBufferSource = new
ByteBufferPool(maxConnections, bufferSize);
// if an SSLContextService was provided then create an SSLContext to
pass down to the dispatcher
SSLContext sslContext = null;
@@ -161,7 +163,7 @@ public class ListenBeats extends
AbstractListenEventBatchingProcessor<BeatsEvent
}
// if we decide to support SSL then get the context and pass it in here
- return new SocketChannelDispatcher<>(eventFactory, handlerFactory,
bufferPool, events,
+ return new SocketChannelDispatcher<>(eventFactory, handlerFactory,
byteBufferSource, events,
getLogger(), maxConnections, sslContext, clientAuth, charSet);
}
diff --git
a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/test/java/org/apache/nifi/processors/beats/handler/TestBeatsSocketChannelHandler.java
b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/test/java/org/apache/nifi/processors/beats/handler/TestBeatsSocketChannelHandler.java
index 44f73a2..7c6c1cb 100644
---
a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/test/java/org/apache/nifi/processors/beats/handler/TestBeatsSocketChannelHandler.java
+++
b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/test/java/org/apache/nifi/processors/beats/handler/TestBeatsSocketChannelHandler.java
@@ -33,6 +33,8 @@ import javax.xml.bind.DatatypeConverter;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher;
+import org.apache.nifi.processor.util.listen.dispatcher.ByteBufferPool;
+import org.apache.nifi.processor.util.listen.dispatcher.ByteBufferSource;
import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher;
import
org.apache.nifi.processor.util.listen.dispatcher.SocketChannelDispatcher;
import org.apache.nifi.processor.util.listen.event.Event;
@@ -51,7 +53,7 @@ import org.mockito.Mockito;
public class TestBeatsSocketChannelHandler {
private EventFactory<TestEvent> eventFactory;
private ChannelHandlerFactory<TestEvent,AsyncChannelDispatcher>
channelHandlerFactory;
- private BlockingQueue<ByteBuffer> byteBuffers;
+ private ByteBufferSource byteBufferSource;
private BlockingQueue<TestEvent> events;
private ComponentLog logger = Mockito.mock(ComponentLog.class);
private int maxConnections;
@@ -64,8 +66,7 @@ public class TestBeatsSocketChannelHandler {
eventFactory = new TestEventHolderFactory();
channelHandlerFactory = new BeatsSocketChannelHandlerFactory<>();
- byteBuffers = new LinkedBlockingQueue<>();
- byteBuffers.add(ByteBuffer.allocate(4096));
+ byteBufferSource = new ByteBufferPool(1, 4096);
events = new LinkedBlockingQueue<>();
logger = Mockito.mock(ComponentLog.class);
@@ -74,7 +75,7 @@ public class TestBeatsSocketChannelHandler {
sslContext = null;
charset = StandardCharsets.UTF_8;
- dispatcher = new SocketChannelDispatcher<>(eventFactory,
channelHandlerFactory, byteBuffers, events, logger,
+ dispatcher = new SocketChannelDispatcher<>(eventFactory,
channelHandlerFactory, byteBufferSource, events, logger,
maxConnections, sslContext, charset);
}
diff --git
a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventProcessor.java
b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventProcessor.java
index 20df868..edba61a 100644
---
a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventProcessor.java
+++
b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventProcessor.java
@@ -16,8 +16,6 @@
*/
package org.apache.nifi.processor.util.listen;
-import static
org.apache.nifi.processor.util.listen.ListenerProperties.NETWORK_INTF_NAME;
-
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
@@ -36,7 +34,6 @@ import org.apache.nifi.processor.util.listen.event.Event;
import java.io.IOException;
import java.net.InetAddress;
import java.net.NetworkInterface;
-import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collections;
@@ -47,6 +44,8 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
+import static
org.apache.nifi.processor.util.listen.ListenerProperties.NETWORK_INTF_NAME;
+
/**
* An abstract processor to extend from when listening for events over a
channel. This processor
* will start a ChannelDispatcher, and optionally a ChannelResponseDispatcher,
in a background
@@ -229,21 +228,6 @@ public abstract class AbstractListenEventProcessor<E
extends Event> extends Abst
}
/**
- * Creates a pool of ByteBuffers with the given size.
- *
- * @param poolSize the number of buffers to initialize the pool with
- * @param bufferSize the size of each buffer
- * @return a blocking queue with size equal to poolSize and each buffer
equal to bufferSize
- */
- protected BlockingQueue<ByteBuffer> createBufferPool(final int poolSize,
final int bufferSize) {
- final LinkedBlockingQueue<ByteBuffer> bufferPool = new
LinkedBlockingQueue<>(poolSize);
- for (int i = 0; i < poolSize; i++) {
- bufferPool.offer(ByteBuffer.allocate(bufferSize));
- }
- return bufferPool;
- }
-
- /**
* If pollErrorQueue is true, the error queue will be checked first and
event will be
* returned from the error queue if available.
*
diff --git
a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/ByteBufferFactory.java
b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/ByteBufferFactory.java
new file mode 100644
index 0000000..86a035a
--- /dev/null
+++
b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/ByteBufferFactory.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processor.util.listen.dispatcher;
+
+import java.nio.ByteBuffer;
+
+public class ByteBufferFactory implements ByteBufferSource {
+ private final int bufferSize;
+
+ public ByteBufferFactory(final int bufferSize) {
+ this.bufferSize = bufferSize;
+ }
+
+ @Override
+ public ByteBuffer acquire() {
+ return ByteBuffer.allocate(bufferSize);
+ }
+
+ @Override
+ public void release(final ByteBuffer byteBuffer) {
+ // nothing to do
+ }
+}
diff --git
a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/ByteBufferPool.java
b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/ByteBufferPool.java
new file mode 100644
index 0000000..42f476f
--- /dev/null
+++
b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/ByteBufferPool.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processor.util.listen.dispatcher;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+public class ByteBufferPool implements ByteBufferSource {
+
+ private final BlockingQueue<ByteBuffer> pool;
+
+ public ByteBufferPool(final int poolSize, final int bufferSize) {
+ if (poolSize <= 0) {
+ throw new IllegalArgumentException("A pool of available
ByteBuffers is required");
+ }
+
+ this.pool = new LinkedBlockingQueue<>(poolSize);
+
+ for (int i = 0; i < poolSize; i++) {
+ pool.offer(ByteBuffer.allocate(bufferSize));
+ }
+ }
+
+ @Override
+ public ByteBuffer acquire() {
+ final ByteBuffer buffer = pool.poll();
+ buffer.clear();
+ buffer.mark();
+ return buffer;
+ }
+
+ @Override
+ public void release(final ByteBuffer byteBuffer) {
+ try {
+ pool.put(byteBuffer);
+ } catch (final InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+}
diff --git
a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/ByteBufferSource.java
b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/ByteBufferSource.java
new file mode 100644
index 0000000..002827d
--- /dev/null
+++
b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/ByteBufferSource.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processor.util.listen.dispatcher;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Manages byte buffers for the dispatchers.
+ */
+public interface ByteBufferSource {
+
+ /**
+ * @return Returns a buffer for usage. The buffer can be pooled or created
on demand depending on the implementation.
+ * If the source is not capable to provide an instance, it returns {@code
null} instead.
+ */
+ ByteBuffer acquire();
+
+ /**
+ * With calling this method the client releases the buffer. It might be
reused by the handler and not to be used
+ * by this client any more.
+ *
+ * @param byteBuffer The byte buffer the client acquired previously.
+ */
+ void release(ByteBuffer byteBuffer);
+}
diff --git
a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/DatagramChannelDispatcher.java
b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/DatagramChannelDispatcher.java
index 69a1998..cd721e6 100644
---
a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/DatagramChannelDispatcher.java
+++
b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/DatagramChannelDispatcher.java
@@ -43,7 +43,7 @@ import java.util.concurrent.BlockingQueue;
public class DatagramChannelDispatcher<E extends Event<DatagramChannel>>
implements ChannelDispatcher {
private final EventFactory<E> eventFactory;
- private final BlockingQueue<ByteBuffer> bufferPool;
+ private final ByteBufferSource bufferSource;
private final EventQueue<E> events;
private final ComponentLog logger;
private final String sendingHost;
@@ -54,28 +54,24 @@ public class DatagramChannelDispatcher<E extends
Event<DatagramChannel>> impleme
private volatile boolean stopped = false;
public DatagramChannelDispatcher(final EventFactory<E> eventFactory,
- final BlockingQueue<ByteBuffer>
bufferPool,
+ final ByteBufferSource bufferSource,
final BlockingQueue<E> events,
final ComponentLog logger) {
- this(eventFactory, bufferPool, events, logger, null, null);
+ this(eventFactory, bufferSource, events, logger, null, null);
}
public DatagramChannelDispatcher(final EventFactory<E> eventFactory,
- final BlockingQueue<ByteBuffer>
bufferPool,
+ final ByteBufferSource bufferSource,
final BlockingQueue<E> events,
final ComponentLog logger,
final String sendingHost,
final Integer sendingPort) {
this.eventFactory = eventFactory;
- this.bufferPool = bufferPool;
+ this.bufferSource = bufferSource;
this.logger = logger;
this.sendingHost = sendingHost;
this.sendingPort = sendingPort;
this.events = new EventQueue<>(events, logger);
-
- if (bufferPool == null || bufferPool.size() == 0) {
- throw new IllegalArgumentException("A pool of available
ByteBuffers is required");
- }
}
@Override
@@ -110,7 +106,7 @@ public class DatagramChannelDispatcher<E extends
Event<DatagramChannel>> impleme
@Override
public void run() {
- final ByteBuffer buffer = bufferPool.poll();
+ final ByteBuffer buffer = bufferSource.acquire();
while (!stopped) {
try {
int selected = selector.select();
@@ -155,11 +151,7 @@ public class DatagramChannelDispatcher<E extends
Event<DatagramChannel>> impleme
}
if (buffer != null) {
- try {
- bufferPool.put(buffer);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
+ bufferSource.release(buffer);
}
}
diff --git
a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/SocketChannelDispatcher.java
b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/SocketChannelDispatcher.java
index d0be256..41e1a70 100644
---
a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/SocketChannelDispatcher.java
+++
b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/SocketChannelDispatcher.java
@@ -16,6 +16,17 @@
*/
package org.apache.nifi.processor.util.listen.dispatcher;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.concurrent.BasicThreadFactory;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.util.listen.event.Event;
+import org.apache.nifi.processor.util.listen.event.EventFactory;
+import org.apache.nifi.processor.util.listen.handler.ChannelHandlerFactory;
+import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel;
+import org.apache.nifi.security.util.ClientAuth;
+
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
@@ -29,20 +40,10 @@ import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Iterator;
import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.SSLEngine;
-import org.apache.commons.io.IOUtils;
-import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.processor.util.listen.event.Event;
-import org.apache.nifi.processor.util.listen.event.EventFactory;
-import org.apache.nifi.processor.util.listen.handler.ChannelHandlerFactory;
-import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel;
-import org.apache.nifi.security.util.ClientAuth;
/**
* Accepts Socket connections on the given port and creates a handler for each
connection to
@@ -52,15 +53,16 @@ public class SocketChannelDispatcher<E extends
Event<SocketChannel>> implements
private final EventFactory<E> eventFactory;
private final ChannelHandlerFactory<E, AsyncChannelDispatcher>
handlerFactory;
- private final BlockingQueue<ByteBuffer> bufferPool;
+ private final ByteBufferSource bufferSource;
private final BlockingQueue<E> events;
private final ComponentLog logger;
private final int maxConnections;
+ private final int maxThreadPoolSize;
private final SSLContext sslContext;
private final ClientAuth clientAuth;
private final Charset charset;
- private ExecutorService executor;
+ private ThreadPoolExecutor executor;
private volatile boolean stopped = false;
private Selector selector;
private final BlockingQueue<SelectionKey> keyQueue;
@@ -68,45 +70,63 @@ public class SocketChannelDispatcher<E extends
Event<SocketChannel>> implements
public SocketChannelDispatcher(final EventFactory<E> eventFactory,
final ChannelHandlerFactory<E,
AsyncChannelDispatcher> handlerFactory,
- final BlockingQueue<ByteBuffer> bufferPool,
+ final ByteBufferSource bufferSource,
+ final BlockingQueue<E> events,
+ final ComponentLog logger,
+ final int maxConnections,
+ final SSLContext sslContext,
+ final Charset charset) {
+ this(eventFactory, handlerFactory, bufferSource, events, logger,
maxConnections, sslContext, ClientAuth.REQUIRED, charset);
+ }
+
+ public SocketChannelDispatcher(final EventFactory<E> eventFactory,
+ final ChannelHandlerFactory<E,
AsyncChannelDispatcher> handlerFactory,
+ final ByteBufferSource bufferSource,
final BlockingQueue<E> events,
final ComponentLog logger,
final int maxConnections,
final SSLContext sslContext,
+ final ClientAuth clientAuth,
final Charset charset) {
- this(eventFactory, handlerFactory, bufferPool, events, logger,
maxConnections, sslContext, ClientAuth.REQUIRED, charset);
+ this(eventFactory, handlerFactory, bufferSource, events, logger,
maxConnections, maxConnections, sslContext, clientAuth, charset);
}
public SocketChannelDispatcher(final EventFactory<E> eventFactory,
final ChannelHandlerFactory<E,
AsyncChannelDispatcher> handlerFactory,
- final BlockingQueue<ByteBuffer> bufferPool,
+ final ByteBufferSource bufferSource,
final BlockingQueue<E> events,
final ComponentLog logger,
final int maxConnections,
+ final int maxThreadPoolSize,
final SSLContext sslContext,
final ClientAuth clientAuth,
final Charset charset) {
this.eventFactory = eventFactory;
this.handlerFactory = handlerFactory;
- this.bufferPool = bufferPool;
+ this.bufferSource = bufferSource;
this.events = events;
this.logger = logger;
this.maxConnections = maxConnections;
+ this.maxThreadPoolSize = maxThreadPoolSize;
this.keyQueue = new LinkedBlockingQueue<>(maxConnections);
this.sslContext = sslContext;
this.clientAuth = clientAuth;
this.charset = charset;
-
- if (bufferPool == null || bufferPool.size() == 0 || bufferPool.size()
!= maxConnections) {
- throw new IllegalArgumentException(
- "A pool of available ByteBuffers equal to the maximum
number of connections is required");
- }
}
@Override
public void open(final InetAddress nicAddress, final int port, final int
maxBufferSize) throws IOException {
+ final InetSocketAddress inetSocketAddress = new
InetSocketAddress(nicAddress, port);
+
stopped = false;
- executor = Executors.newFixedThreadPool(maxConnections);
+ executor = new ThreadPoolExecutor(
+ maxThreadPoolSize,
+ maxThreadPoolSize,
+ 60L,
+ TimeUnit.SECONDS,
+ new LinkedBlockingQueue<>(),
+ new
BasicThreadFactory.Builder().namingPattern(inetSocketAddress.toString() +
"-worker-%d").build());
+ executor.allowCoreThreadTimeOut(true);
final ServerSocketChannel serverSocketChannel =
ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
@@ -120,7 +140,7 @@ public class SocketChannelDispatcher<E extends
Event<SocketChannel>> implements
}
}
- serverSocketChannel.socket().bind(new InetSocketAddress(nicAddress,
port));
+ serverSocketChannel.socket().bind(inetSocketAddress);
selector = Selector.open();
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
@@ -161,9 +181,7 @@ public class SocketChannelDispatcher<E extends
Event<SocketChannel>> implements
SelectionKey readKey =
socketChannel.register(selector, SelectionKey.OP_READ);
// Prepare the byte buffer for the reads, clear it
out
- ByteBuffer buffer = bufferPool.poll();
- buffer.clear();
- buffer.mark();
+ ByteBuffer buffer = bufferSource.acquire();
// If we have an SSLContext then create an
SSLEngine for the channel
SSLSocketChannel sslSocketChannel = null;
@@ -265,13 +283,9 @@ public class SocketChannelDispatcher<E extends
Event<SocketChannel>> implements
@Override
public void completeConnection(SelectionKey key) {
- // connection is done. Return the buffer to the pool
- SocketChannelAttachment attachment = (SocketChannelAttachment)
key.attachment();
- try {
- bufferPool.put(attachment.getByteBuffer());
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
+ // connection is done. Releasing buffer
+ final SocketChannelAttachment attachment = (SocketChannelAttachment)
key.attachment();
+ bufferSource.release(attachment.getByteBuffer());
currentConnections.decrementAndGet();
}
diff --git
a/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/ListenLumberjack.java
b/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/ListenLumberjack.java
index 7ff65ee..9ce0a76 100644
---
a/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/ListenLumberjack.java
+++
b/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/ListenLumberjack.java
@@ -17,17 +17,6 @@
package org.apache.nifi.processors.lumberjack;
import com.google.gson.Gson;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.charset.Charset;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.BlockingQueue;
-import javax.net.ssl.SSLContext;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
@@ -45,6 +34,8 @@ import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import
org.apache.nifi.processor.util.listen.AbstractListenEventBatchingProcessor;
import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher;
+import org.apache.nifi.processor.util.listen.dispatcher.ByteBufferPool;
+import org.apache.nifi.processor.util.listen.dispatcher.ByteBufferSource;
import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher;
import
org.apache.nifi.processor.util.listen.dispatcher.SocketChannelDispatcher;
import org.apache.nifi.processor.util.listen.event.EventFactory;
@@ -61,6 +52,17 @@ import org.apache.nifi.security.util.ClientAuth;
import org.apache.nifi.ssl.RestrictedSSLContextService;
import org.apache.nifi.ssl.SSLContextService;
+import javax.net.ssl.SSLContext;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+
@Deprecated
@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
@Tags({"listen", "lumberjack", "tcp", "logs"})
@@ -135,7 +137,7 @@ public class ListenLumberjack extends
AbstractListenEventBatchingProcessor<Lumbe
final Charset charSet =
Charset.forName(context.getProperty(CHARSET).getValue());
// initialize the buffer pool based on max number of connections and
the buffer size
- final BlockingQueue<ByteBuffer> bufferPool =
createBufferPool(maxConnections, bufferSize);
+ final ByteBufferSource byteBufferSource = new
ByteBufferPool(maxConnections, bufferSize);
// if an SSLContextService was provided then create an SSLContext to
pass down to the dispatcher
SSLContext sslContext = null;
@@ -145,7 +147,7 @@ public class ListenLumberjack extends
AbstractListenEventBatchingProcessor<Lumbe
}
// if we decide to support SSL then get the context and pass it in here
- return new SocketChannelDispatcher<>(eventFactory, handlerFactory,
bufferPool, events,
+ return new SocketChannelDispatcher<>(eventFactory, handlerFactory,
byteBufferSource, events,
getLogger(), maxConnections, sslContext, charSet);
}
diff --git
a/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/test/java/org/apache/nifi/processors/lumberjack/handler/ITLumberjackSocketChannelHandler.java
b/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/test/java/org/apache/nifi/processors/lumberjack/handler/ITLumberjackSocketChannelHandler.java
index 0ef1e41..45e74b6 100644
---
a/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/test/java/org/apache/nifi/processors/lumberjack/handler/ITLumberjackSocketChannelHandler.java
+++
b/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/test/java/org/apache/nifi/processors/lumberjack/handler/ITLumberjackSocketChannelHandler.java
@@ -16,21 +16,10 @@
*/
package org.apache.nifi.processors.lumberjack.handler;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.nio.ByteBuffer;
-import java.nio.channels.SocketChannel;
-import java.nio.charset.Charset;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import javax.net.ssl.SSLContext;
-import javax.xml.bind.DatatypeConverter;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher;
+import org.apache.nifi.processor.util.listen.dispatcher.ByteBufferPool;
+import org.apache.nifi.processor.util.listen.dispatcher.ByteBufferSource;
import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher;
import
org.apache.nifi.processor.util.listen.dispatcher.SocketChannelDispatcher;
import org.apache.nifi.processor.util.listen.event.Event;
@@ -43,11 +32,25 @@ import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
+import javax.net.ssl.SSLContext;
+import javax.xml.bind.DatatypeConverter;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
@SuppressWarnings("deprecation")
public class ITLumberjackSocketChannelHandler {
private EventFactory<TestEvent> eventFactory;
private ChannelHandlerFactory<TestEvent,AsyncChannelDispatcher>
channelHandlerFactory;
- private BlockingQueue<ByteBuffer> byteBuffers;
+ private ByteBufferSource byteBufferSource;
private BlockingQueue<TestEvent> events;
private ComponentLog logger = Mockito.mock(ComponentLog.class);
private int maxConnections;
@@ -60,8 +63,7 @@ public class ITLumberjackSocketChannelHandler {
eventFactory = new TestEventHolderFactory();
channelHandlerFactory = new LumberjackSocketChannelHandlerFactory<>();
- byteBuffers = new LinkedBlockingQueue<>();
- byteBuffers.add(ByteBuffer.allocate(4096));
+ byteBufferSource = new ByteBufferPool(1, 4096);
events = new LinkedBlockingQueue<>();
logger = Mockito.mock(ComponentLog.class);
@@ -70,7 +72,7 @@ public class ITLumberjackSocketChannelHandler {
sslContext = null;
charset = StandardCharsets.UTF_8;
- dispatcher = new SocketChannelDispatcher<>(eventFactory,
channelHandlerFactory, byteBuffers, events, logger,
+ dispatcher = new SocketChannelDispatcher<>(eventFactory,
channelHandlerFactory, byteBufferSource, events, logger,
maxConnections, sslContext, charset);
}
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenRELP.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenRELP.java
index 09a68eb..53477a6 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenRELP.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenRELP.java
@@ -16,17 +16,6 @@
*/
package org.apache.nifi.processors.standard;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.charset.Charset;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.BlockingQueue;
-import javax.net.ssl.SSLContext;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
@@ -45,6 +34,8 @@ import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import
org.apache.nifi.processor.util.listen.AbstractListenEventBatchingProcessor;
import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher;
+import org.apache.nifi.processor.util.listen.dispatcher.ByteBufferPool;
+import org.apache.nifi.processor.util.listen.dispatcher.ByteBufferSource;
import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher;
import
org.apache.nifi.processor.util.listen.dispatcher.SocketChannelDispatcher;
import org.apache.nifi.processor.util.listen.event.EventFactory;
@@ -61,6 +52,17 @@ import org.apache.nifi.security.util.ClientAuth;
import org.apache.nifi.ssl.RestrictedSSLContextService;
import org.apache.nifi.ssl.SSLContextService;
+import javax.net.ssl.SSLContext;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+
@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
@Tags({"listen", "relp", "tcp", "logs"})
@CapabilityDescription("Listens for RELP messages being sent to a given port
over TCP. Each message will be " +
@@ -135,7 +137,7 @@ public class ListenRELP extends
AbstractListenEventBatchingProcessor<RELPEvent>
final Charset charSet =
Charset.forName(context.getProperty(CHARSET).getValue());
// initialize the buffer pool based on max number of connections and
the buffer size
- final BlockingQueue<ByteBuffer> bufferPool =
createBufferPool(maxConnections, bufferSize);
+ final ByteBufferSource byteBufferSource = new
ByteBufferPool(maxConnections, bufferSize);
// if an SSLContextService was provided then create an SSLContext to
pass down to the dispatcher
SSLContext sslContext = null;
@@ -150,7 +152,7 @@ public class ListenRELP extends
AbstractListenEventBatchingProcessor<RELPEvent>
}
// if we decide to support SSL then get the context and pass it in here
- return new SocketChannelDispatcher<>(eventFactory, handlerFactory,
bufferPool, events,
+ return new SocketChannelDispatcher<>(eventFactory, handlerFactory,
byteBufferSource, events,
getLogger(), maxConnections, sslContext, clientAuth, charSet);
}
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
index 77a9a28..4cc7b33 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
@@ -16,28 +16,6 @@
*/
package org.apache.nifi.processors.standard;
-import static
org.apache.nifi.processor.util.listen.ListenerProperties.NETWORK_INTF_NAME;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.net.InetAddress;
-import java.net.NetworkInterface;
-import java.nio.ByteBuffer;
-import java.nio.channels.SelectableChannel;
-import java.nio.channels.SocketChannel;
-import java.nio.charset.Charset;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import javax.net.ssl.SSLContext;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
@@ -62,6 +40,8 @@ import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher;
+import org.apache.nifi.processor.util.listen.dispatcher.ByteBufferPool;
+import org.apache.nifi.processor.util.listen.dispatcher.ByteBufferSource;
import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher;
import
org.apache.nifi.processor.util.listen.dispatcher.DatagramChannelDispatcher;
import
org.apache.nifi.processor.util.listen.dispatcher.SocketChannelDispatcher;
@@ -77,6 +57,28 @@ import org.apache.nifi.syslog.attributes.SyslogAttributes;
import org.apache.nifi.syslog.events.SyslogEvent;
import org.apache.nifi.syslog.parsers.SyslogParser;
+import javax.net.ssl.SSLContext;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.InetAddress;
+import java.net.NetworkInterface;
+import java.nio.channels.SelectableChannel;
+import java.nio.channels.SocketChannel;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import static
org.apache.nifi.processor.util.listen.ListenerProperties.NETWORK_INTF_NAME;
+
@SupportsBatching
@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
@Tags({"syslog", "listen", "udp", "tcp", "logs"})
@@ -202,7 +204,7 @@ public class ListenSyslog extends AbstractSyslogProcessor {
private volatile ChannelDispatcher channelDispatcher;
private volatile SyslogParser parser;
- private volatile BlockingQueue<ByteBuffer> bufferPool;
+ private volatile ByteBufferSource byteBufferSource;
private volatile BlockingQueue<RawSyslogEvent> syslogEvents;
private final BlockingQueue<RawSyslogEvent> errorEvents = new
LinkedBlockingQueue<>();
private volatile byte[] messageDemarcatorBytes; //it is only the array
reference that is volatile - not the contents.
@@ -303,11 +305,7 @@ public class ListenSyslog extends AbstractSyslogProcessor {
maxConnections =
context.getProperty(MAX_CONNECTIONS).asLong().intValue();
}
- bufferPool = new LinkedBlockingQueue<>(maxConnections);
- for (int i = 0; i < maxConnections; i++) {
- bufferPool.offer(ByteBuffer.allocate(bufferSize));
- }
-
+ byteBufferSource = new ByteBufferPool(maxConnections, bufferSize);
parser = new SyslogParser(Charset.forName(charSet));
syslogEvents = new LinkedBlockingQueue<>(maxMessageQueueSize);
@@ -319,7 +317,7 @@ public class ListenSyslog extends AbstractSyslogProcessor {
// create either a UDP or TCP reader and call open() to bind to the
given port
final SSLContextService sslContextService =
context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
- channelDispatcher = createChannelReader(context, protocol, bufferPool,
syslogEvents, maxConnections, sslContextService, Charset.forName(charSet));
+ channelDispatcher = createChannelReader(context, protocol,
byteBufferSource, syslogEvents, maxConnections, sslContextService,
Charset.forName(charSet));
channelDispatcher.open(nicIPAddress, port, maxChannelBufferSize);
final Thread readerThread = new Thread(channelDispatcher);
@@ -334,14 +332,14 @@ public class ListenSyslog extends AbstractSyslogProcessor
{
}
// visible for testing to be overridden and provide a mock
ChannelDispatcher if desired
- protected ChannelDispatcher createChannelReader(final ProcessContext
context, final String protocol, final BlockingQueue<ByteBuffer> bufferPool,
+ protected ChannelDispatcher createChannelReader(final ProcessContext
context, final String protocol, final ByteBufferSource byteBufferSource,
final
BlockingQueue<RawSyslogEvent> events, final int maxConnections,
final SSLContextService
sslContextService, final Charset charset) throws IOException {
final EventFactory<RawSyslogEvent> eventFactory = new
RawSyslogEventFactory();
if (UDP_VALUE.getValue().equals(protocol)) {
- return new DatagramChannelDispatcher(eventFactory, bufferPool,
events, getLogger());
+ return new DatagramChannelDispatcher(eventFactory,
byteBufferSource, events, getLogger());
} else {
// if an SSLContextService was provided then create an SSLContext
to pass down to the dispatcher
SSLContext sslContext = null;
@@ -354,7 +352,7 @@ public class ListenSyslog extends AbstractSyslogProcessor {
}
final ChannelHandlerFactory<RawSyslogEvent<SocketChannel>,
AsyncChannelDispatcher> handlerFactory = new SocketChannelHandlerFactory<>();
- return new SocketChannelDispatcher(eventFactory, handlerFactory,
bufferPool, events, getLogger(), maxConnections, sslContext, clientAuth,
charset);
+ return new SocketChannelDispatcher(eventFactory, handlerFactory,
byteBufferSource, events, getLogger(), maxConnections, sslContext, clientAuth,
charset);
}
}
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCP.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCP.java
index 8359221..7aac899 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCP.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCP.java
@@ -16,18 +16,6 @@
*/
package org.apache.nifi.processors.standard;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.SocketChannel;
-import java.nio.charset.Charset;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.BlockingQueue;
-import javax.net.ssl.SSLContext;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
@@ -40,8 +28,12 @@ import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.util.StandardValidators;
import
org.apache.nifi.processor.util.listen.AbstractListenEventBatchingProcessor;
import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher;
+import org.apache.nifi.processor.util.listen.dispatcher.ByteBufferFactory;
+import org.apache.nifi.processor.util.listen.dispatcher.ByteBufferPool;
+import org.apache.nifi.processor.util.listen.dispatcher.ByteBufferSource;
import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher;
import
org.apache.nifi.processor.util.listen.dispatcher.SocketChannelDispatcher;
import org.apache.nifi.processor.util.listen.event.EventFactory;
@@ -53,6 +45,18 @@ import org.apache.nifi.security.util.ClientAuth;
import org.apache.nifi.ssl.RestrictedSSLContextService;
import org.apache.nifi.ssl.SSLContextService;
+import javax.net.ssl.SSLContext;
+import java.io.IOException;
+import java.nio.channels.SocketChannel;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+
@SupportsBatching
@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
@Tags({"listen", "tcp", "tls", "ssl"})
@@ -83,10 +87,37 @@ public class ListenTCP extends
AbstractListenEventBatchingProcessor<StandardEven
.defaultValue(ClientAuth.REQUIRED.name())
.build();
+ public static final PropertyDescriptor MAX_RECV_THREAD_POOL_SIZE = new
PropertyDescriptor.Builder()
+ .name("max-receiving-threads")
+ .displayName("Max Number of Receiving Message Handler Threads")
+ .description(
+ "The maximum number of threads might be available for
handling receiving messages ready all the time. " +
+ "Cannot be bigger than the \"Max Number of TCP
Connections\". " +
+ "If not set, the value of \"Max Number of TCP
Connections\" will be used.")
+ .addValidator(StandardValidators.createLongValidator(1, 65535,
true))
+ .required(false)
+ .build();
+
+ protected static final PropertyDescriptor POOL_RECV_BUFFERS = new
PropertyDescriptor.Builder()
+ .name("pool-receive-buffers")
+ .displayName("Pool Receive Buffers")
+ .description(
+ "When turned on, the processor uses pre-populated pool of
buffers when receiving messages. " +
+ "This is prepared during initialisation of the processor.
" +
+ "With high value of Max Number of TCP Connections and
Receive Buffer Size this strategy might allocate significant amount of memory!
" +
+ "When turned off, the byte buffers will be created on
demand and be destroyed after use.")
+ .required(true)
+ .defaultValue("True")
+ .allowableValues("True", "False")
+ .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+ .build();
+
@Override
protected List<PropertyDescriptor> getAdditionalProperties() {
return Arrays.asList(
MAX_CONNECTIONS,
+ MAX_RECV_THREAD_POOL_SIZE,
+ POOL_RECV_BUFFERS,
SSL_CONTEXT_SERVICE,
CLIENT_AUTH
);
@@ -105,6 +136,20 @@ public class ListenTCP extends
AbstractListenEventBatchingProcessor<StandardEven
.valid(false).subject("Client Auth").build());
}
+ final int maxConnections =
validationContext.getProperty(MAX_CONNECTIONS).asInteger();
+
+ if (validationContext.getProperty(MAX_RECV_THREAD_POOL_SIZE).isSet()) {
+ final int maxPoolSize =
validationContext.getProperty(MAX_RECV_THREAD_POOL_SIZE).asInteger();
+
+ if (maxPoolSize > maxConnections) {
+ results.add(new ValidationResult.Builder()
+ .explanation("\"" +
MAX_RECV_THREAD_POOL_SIZE.getDisplayName() + "\" cannot be bigger than \"" +
MAX_CONNECTIONS.getDisplayName() + "\"")
+ .valid(false)
+ .subject(MAX_RECV_THREAD_POOL_SIZE.getDisplayName())
+ .build());
+ }
+ }
+
return results;
}
@@ -113,11 +158,17 @@ public class ListenTCP extends
AbstractListenEventBatchingProcessor<StandardEven
throws IOException {
final int maxConnections =
context.getProperty(MAX_CONNECTIONS).asInteger();
+ final int maxThreadPoolSize =
context.getProperty(MAX_RECV_THREAD_POOL_SIZE).isSet()
+ ? context.getProperty(MAX_RECV_THREAD_POOL_SIZE).asInteger()
+ : maxConnections;
+
final int bufferSize =
context.getProperty(RECV_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
final Charset charSet =
Charset.forName(context.getProperty(CHARSET).getValue());
// initialize the buffer pool based on max number of connections and
the buffer size
- final BlockingQueue<ByteBuffer> bufferPool =
createBufferPool(maxConnections, bufferSize);
+ final ByteBufferSource byteBufferSource =
context.getProperty(POOL_RECV_BUFFERS).asBoolean()
+ ? new ByteBufferPool(maxConnections, bufferSize)
+ : new ByteBufferFactory(bufferSize);
// if an SSLContextService was provided then create an SSLContext to
pass down to the dispatcher
SSLContext sslContext = null;
@@ -132,7 +183,8 @@ public class ListenTCP extends
AbstractListenEventBatchingProcessor<StandardEven
final EventFactory<StandardEvent> eventFactory = new
StandardEventFactory();
final ChannelHandlerFactory<StandardEvent<SocketChannel>,
AsyncChannelDispatcher> handlerFactory = new SocketChannelHandlerFactory<>();
- return new SocketChannelDispatcher(eventFactory, handlerFactory,
bufferPool, events, getLogger(), maxConnections, sslContext, clientAuth,
charSet);
+ return new SocketChannelDispatcher(eventFactory, handlerFactory,
byteBufferSource, events, getLogger(), maxConnections,
+ maxThreadPoolSize, sslContext, clientAuth, charSet);
}
@Override
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenUDP.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenUDP.java
index c8ffecb..76795f0 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenUDP.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenUDP.java
@@ -33,6 +33,8 @@ import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.util.StandardValidators;
import
org.apache.nifi.processor.util.listen.AbstractListenEventBatchingProcessor;
+import org.apache.nifi.processor.util.listen.dispatcher.ByteBufferPool;
+import org.apache.nifi.processor.util.listen.dispatcher.ByteBufferSource;
import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher;
import
org.apache.nifi.processor.util.listen.dispatcher.DatagramChannelDispatcher;
import org.apache.nifi.processor.util.listen.event.EventFactory;
@@ -42,7 +44,6 @@ import
org.apache.nifi.processor.util.listen.event.StandardEventFactory;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
-import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -124,9 +125,9 @@ public class ListenUDP extends
AbstractListenEventBatchingProcessor<StandardEven
final String sendingHost =
context.getProperty(SENDING_HOST).evaluateAttributeExpressions().getValue();
final Integer sendingHostPort =
context.getProperty(SENDING_HOST_PORT).evaluateAttributeExpressions().asInteger();
final Integer bufferSize =
context.getProperty(RECV_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
- final BlockingQueue<ByteBuffer> bufferPool =
createBufferPool(context.getMaxConcurrentTasks(), bufferSize);
+ final ByteBufferSource byteBufferSource = new
ByteBufferPool(context.getMaxConcurrentTasks(), bufferSize);
final EventFactory<StandardEvent> eventFactory = new
StandardEventFactory();
- return new DatagramChannelDispatcher<>(eventFactory, bufferPool,
events, getLogger(), sendingHost, sendingHostPort);
+ return new DatagramChannelDispatcher<>(eventFactory, byteBufferSource,
events, getLogger(), sendingHost, sendingHostPort);
}
@Override
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenUDPRecord.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenUDPRecord.java
index 6a723ea..2a20d5d 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenUDPRecord.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenUDPRecord.java
@@ -39,6 +39,8 @@ import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processor.util.listen.AbstractListenEventProcessor;
+import org.apache.nifi.processor.util.listen.dispatcher.ByteBufferPool;
+import org.apache.nifi.processor.util.listen.dispatcher.ByteBufferSource;
import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher;
import
org.apache.nifi.processor.util.listen.dispatcher.DatagramChannelDispatcher;
import org.apache.nifi.processor.util.listen.event.EventFactory;
@@ -58,7 +60,6 @@ import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.UnknownHostException;
-import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -214,9 +215,9 @@ public class ListenUDPRecord extends
AbstractListenEventProcessor<StandardEvent>
final String sendingHost =
context.getProperty(SENDING_HOST).evaluateAttributeExpressions().getValue();
final Integer sendingHostPort =
context.getProperty(SENDING_HOST_PORT).evaluateAttributeExpressions().asInteger();
final Integer bufferSize =
context.getProperty(RECV_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
- final BlockingQueue<ByteBuffer> bufferPool =
createBufferPool(context.getMaxConcurrentTasks(), bufferSize);
+ final ByteBufferSource byteBufferSource = new
ByteBufferPool(context.getMaxConcurrentTasks(), bufferSize);
final EventFactory<StandardEvent> eventFactory = new
StandardEventFactory();
- return new DatagramChannelDispatcher<>(eventFactory, bufferPool,
events, getLogger(), sendingHost, sendingHostPort);
+ return new DatagramChannelDispatcher<>(eventFactory, byteBufferSource,
events, getLogger(), sendingHost, sendingHostPort);
}
@Override
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/relp/handler/TestRELPSocketChannelHandler.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/relp/handler/TestRELPSocketChannelHandler.java
index 5bfaca7..d7b7d5c 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/relp/handler/TestRELPSocketChannelHandler.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/relp/handler/TestRELPSocketChannelHandler.java
@@ -19,6 +19,8 @@ package org.apache.nifi.processors.standard.relp.handler;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher;
+import org.apache.nifi.processor.util.listen.dispatcher.ByteBufferPool;
+import org.apache.nifi.processor.util.listen.dispatcher.ByteBufferSource;
import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher;
import
org.apache.nifi.processor.util.listen.dispatcher.SocketChannelDispatcher;
import org.apache.nifi.processor.util.listen.event.Event;
@@ -48,7 +50,7 @@ public class TestRELPSocketChannelHandler {
private EventFactory<TestEvent> eventFactory;
private ChannelHandlerFactory<TestEvent,AsyncChannelDispatcher>
channelHandlerFactory;
- private BlockingQueue<ByteBuffer> byteBuffers;
+ private ByteBufferSource byteBufferSource;
private BlockingQueue<TestEvent> events;
private ComponentLog logger = Mockito.mock(ComponentLog.class);
private int maxConnections;
@@ -61,8 +63,7 @@ public class TestRELPSocketChannelHandler {
eventFactory = new TestEventHolderFactory();
channelHandlerFactory = new RELPSocketChannelHandlerFactory<>();
- byteBuffers = new LinkedBlockingQueue<>();
- byteBuffers.add(ByteBuffer.allocate(4096));
+ byteBufferSource = new ByteBufferPool(1, 4096);
events = new LinkedBlockingQueue<>();
logger = Mockito.mock(ComponentLog.class);
@@ -71,7 +72,7 @@ public class TestRELPSocketChannelHandler {
sslContext = null;
charset = StandardCharsets.UTF_8;
- dispatcher = new SocketChannelDispatcher<>(eventFactory,
channelHandlerFactory, byteBuffers, events, logger,
+ dispatcher = new SocketChannelDispatcher<>(eventFactory,
channelHandlerFactory, byteBufferSource, events, logger,
maxConnections, sslContext, charset);
}