This is an automated email from the ASF dual-hosted git repository.

pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 7c50f14  NIFI-9563 Enabled ListenTCP Pool Receive Buffers property
7c50f14 is described below

commit 7c50f1429edef6efedb7bfeaff388f22c0eeaa69
Author: exceptionfactory <[email protected]>
AuthorDate: Wed Jan 12 12:20:41 2022 -0600

    NIFI-9563 Enabled ListenTCP Pool Receive Buffers property
    
    - Removed deprecation from ListenTCP Pool Receive Buffers property
    - Added BufferAllocator configuration property for NettyEventServerFactory
    
    Signed-off-by: Pierre Villard <[email protected]>
    
    This closes #5653.
---
 .../transport/configuration/BufferAllocator.java   | 27 ++++++++++++++++++++++
 .../transport/netty/NettyEventServerFactory.java   | 16 +++++++++++++
 .../apache/nifi/processors/standard/ListenTCP.java | 16 +++++++------
 .../nifi/processors/standard/TestListenTCP.java    |  1 +
 4 files changed, 53 insertions(+), 7 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/configuration/BufferAllocator.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/configuration/BufferAllocator.java
new file mode 100644
index 0000000..4d059d9
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/configuration/BufferAllocator.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.event.transport.configuration;
+
+/**
+ * Byte Buffer Allocator configuration options
+ */
+public enum BufferAllocator {
+    /** Reusable pool of buffers */
+    POOLED,
+    /** New buffer for each allocation without any pooling */
+    UNPOOLED
+}
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/NettyEventServerFactory.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/NettyEventServerFactory.java
index 753844f..bddbccf 100644
--- 
a/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/NettyEventServerFactory.java
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/NettyEventServerFactory.java
@@ -19,6 +19,7 @@ package org.apache.nifi.event.transport.netty;
 import io.netty.bootstrap.AbstractBootstrap;
 import io.netty.bootstrap.Bootstrap;
 import io.netty.bootstrap.ServerBootstrap;
+import io.netty.buffer.UnpooledByteBufAllocator;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelHandler;
 import io.netty.channel.ChannelOption;
@@ -29,6 +30,7 @@ import io.netty.channel.socket.nio.NioServerSocketChannel;
 import org.apache.nifi.event.transport.EventException;
 import org.apache.nifi.event.transport.EventServer;
 import org.apache.nifi.event.transport.EventServerFactory;
+import org.apache.nifi.event.transport.configuration.BufferAllocator;
 import org.apache.nifi.event.transport.configuration.ShutdownQuietPeriod;
 import org.apache.nifi.event.transport.configuration.ShutdownTimeout;
 import org.apache.nifi.event.transport.configuration.TransportProtocol;
@@ -68,6 +70,8 @@ public class NettyEventServerFactory extends 
EventLoopGroupFactory implements Ev
 
     private Duration shutdownTimeout = ShutdownTimeout.DEFAULT.getDuration();
 
+    private BufferAllocator bufferAllocator = BufferAllocator.POOLED;
+
     public NettyEventServerFactory(final InetAddress address, final int port, 
final TransportProtocol protocol) {
         this.address = address;
         this.port = port;
@@ -138,6 +142,15 @@ public class NettyEventServerFactory extends 
EventLoopGroupFactory implements Ev
     }
 
     /**
+     * Set Buffer Allocator option overriding the default POOLED configuration
+     *
+     * @param bufferAllocator Buffer Allocator
+     */
+    public void setBufferAllocator(final BufferAllocator bufferAllocator) {
+        this.bufferAllocator = Objects.requireNonNull(bufferAllocator, "Buffer 
Allocator required");
+    }
+
+    /**
      * Get Event Server with Channel bound to configured address and port 
number
      *
      * @return Event Sender
@@ -159,6 +172,9 @@ public class NettyEventServerFactory extends 
EventLoopGroupFactory implements Ev
         if (socketKeepAlive != null) {
             bootstrap.option(ChannelOption.SO_KEEPALIVE, socketKeepAlive);
         }
+        if (BufferAllocator.UNPOOLED == bufferAllocator) {
+            bootstrap.option(ChannelOption.ALLOCATOR, 
UnpooledByteBufAllocator.DEFAULT);
+        }
     }
 
     private AbstractBootstrap<?, ?> getBootstrap() {
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCP.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCP.java
index b0ab7f8..fc7695b 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCP.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCP.java
@@ -30,6 +30,7 @@ import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.event.transport.EventException;
 import org.apache.nifi.event.transport.EventServer;
+import org.apache.nifi.event.transport.configuration.BufferAllocator;
 import org.apache.nifi.event.transport.configuration.TransportProtocol;
 import org.apache.nifi.event.transport.message.ByteArrayMessage;
 import 
org.apache.nifi.event.transport.netty.ByteArrayMessageNettyEventServerFactory;
@@ -106,13 +107,11 @@ public class ListenTCP extends AbstractProcessor {
             .required(false)
             .build();
 
-    // Deprecated
     protected static final PropertyDescriptor POOL_RECV_BUFFERS = new 
PropertyDescriptor.Builder()
             .name("pool-receive-buffers")
             .displayName("Pool Receive Buffers")
-            .description(
-                    "This property is deprecated and no longer used.")
-            .required(false)
+            .description("Enable or disable pooling of buffers that the 
processor uses for handling bytes received on socket connections. The framework 
allocates buffers as needed during processing.")
+            .required(true)
             .defaultValue("True")
             .allowableValues("True", "False")
             .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
@@ -146,7 +145,6 @@ public class ListenTCP extends AbstractProcessor {
         descriptors.add(ListenerProperties.MESSAGE_DELIMITER);
         // Deprecated
         descriptors.add(MAX_RECV_THREAD_POOL_SIZE);
-        // Deprecated
         descriptors.add(POOL_RECV_BUFFERS);
         descriptors.add(SSL_CONTEXT_SERVICE);
         descriptors.add(CLIENT_AUTH);
@@ -163,8 +161,8 @@ public class ListenTCP extends AbstractProcessor {
         int bufferSize = 
context.getProperty(ListenerProperties.RECV_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
         int socketBufferSize = 
context.getProperty(ListenerProperties.MAX_SOCKET_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
         final String networkInterface = 
context.getProperty(ListenerProperties.NETWORK_INTF_NAME).evaluateAttributeExpressions().getValue();
-        InetAddress address = 
NetworkUtils.getInterfaceAddress(networkInterface);
-        Charset charset = 
Charset.forName(context.getProperty(ListenerProperties.CHARSET).getValue());
+        final InetAddress address = 
NetworkUtils.getInterfaceAddress(networkInterface);
+        final Charset charset = 
Charset.forName(context.getProperty(ListenerProperties.CHARSET).getValue());
         port = 
context.getProperty(ListenerProperties.PORT).evaluateAttributeExpressions().asInteger();
         events = new 
LinkedBlockingQueue<>(context.getProperty(ListenerProperties.MAX_MESSAGE_QUEUE_SIZE).asInteger());
         errorEvents = new LinkedBlockingQueue<>();
@@ -181,6 +179,10 @@ public class ListenTCP extends AbstractProcessor {
             eventFactory.setClientAuth(clientAuth);
         }
 
+        final boolean poolReceiveBuffers = 
context.getProperty(POOL_RECV_BUFFERS).asBoolean();
+        final BufferAllocator bufferAllocator = poolReceiveBuffers ? 
BufferAllocator.POOLED : BufferAllocator.UNPOOLED;
+        eventFactory.setBufferAllocator(bufferAllocator);
+
         eventFactory.setSocketReceiveBuffer(socketBufferSize);
         eventFactory.setWorkerThreads(workerThreads);
         eventFactory.setThreadNamePrefix(String.format("%s[%s]", 
getClass().getSimpleName(), getIdentifier()));
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenTCP.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenTCP.java
index ff7cc7c..327dc23 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenTCP.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenTCP.java
@@ -100,6 +100,7 @@ public class TestListenTCP {
     @Test
     public void testRunBatching() throws Exception {
         runner.setProperty(ListenerProperties.MAX_BATCH_SIZE, "3");
+        runner.setProperty(ListenTCP.POOL_RECV_BUFFERS, "False");
 
         final List<String> messages = new ArrayList<>();
         messages.add("This is message 1\n");

Reply via email to