gresockj commented on a change in pull request #5044:
URL: https://github.com/apache/nifi/pull/5044#discussion_r632070182



##########
File path: 
nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/message/ByteArrayMessage.java
##########
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.event.transport.message;
+
+/**
+ * Byte Array Message with Sender
+ */
+public class ByteArrayMessage {
+    private byte[] message;

Review comment:
       Any benefit in making these final, since there's no getters?

##########
File path: 
nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/NettyEventSenderFactory.java
##########
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.event.transport.netty;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.pool.ChannelHealthChecker;
+import io.netty.channel.pool.ChannelPool;
+import io.netty.channel.pool.ChannelPoolHandler;
+import io.netty.channel.pool.FixedChannelPool;
+import io.netty.channel.socket.nio.NioDatagramChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import org.apache.nifi.event.transport.EventSender;
+import org.apache.nifi.event.transport.EventSenderFactory;
+import org.apache.nifi.event.transport.configuration.TransportProtocol;
+import 
org.apache.nifi.event.transport.netty.channel.pool.InitializingChannelPoolHandler;
+import 
org.apache.nifi.event.transport.netty.channel.ssl.ClientSslStandardChannelInitializer;
+import 
org.apache.nifi.event.transport.netty.channel.StandardChannelInitializer;
+
+import javax.net.ssl.SSLContext;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.function.Supplier;
+
+/**
+ * Netty Event Sender Factory
+ */
+public class NettyEventSenderFactory<T> extends EventLoopGroupFactory 
implements EventSenderFactory<T> {
+    private static final int MAX_PENDING_ACQUIRES = 1024;
+
+    private Duration timeout = Duration.ofSeconds(30);
+
+    private int maxConnections = Runtime.getRuntime().availableProcessors() * 
2;
+
+    private Supplier<List<ChannelHandler>> handlerSupplier = () -> 
Collections.emptyList();
+
+    private final String address;

Review comment:
       Can you move the final ones to the top, just to make it easier to 
visually distinguish the mutable vs. immutable fields?

##########
File path: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
##########
@@ -152,7 +145,7 @@
                 "The maximum number of Syslog events to add to a single 
FlowFile. If multiple events are available, they will be concatenated along 
with "
                 + "the <Message Delimiter> up to this configured maximum 
number of messages")
         .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
-        .expressionLanguageSupported(false)
+        
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)

Review comment:
       If you enable EL, you'll need to call evaluateAttributeExpressions when 
you use the property below.  Also, I'm not sure FLOWFILE_ATTRIBUTES is 
appropriate here, since ListenSyslog is annotated with INPUT_FORBIDDEN.

##########
File path: 
nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/EventLoopGroupFactory.java
##########
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.event.transport.netty;
+
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.util.concurrent.DefaultThreadFactory;
+
+import java.util.Objects;
+import java.util.concurrent.ThreadFactory;
+
+/**
+ * Event Loop Group Factory for standardized instance creation
+ */
+class EventLoopGroupFactory {
+    private static final String DEFAULT_THREAD_NAME_PREFIX = 
"NettyEventLoopGroup";
+
+    private static final boolean DAEMON_THREAD_ENABLED = true;
+
+    private String threadNamePrefix = DEFAULT_THREAD_NAME_PREFIX;
+
+    private int workerThreads;
+
+    /**
+     * Set Thread Name Prefix used in Netty NioEventLoopGroup defaults to 
NettyChannel
+     *
+     * @param threadNamePrefix Thread Name Prefix
+     */
+    public void setThreadNamePrefix(final String threadNamePrefix) {
+        this.threadNamePrefix = Objects.requireNonNull(threadNamePrefix, 
"Thread Name Prefix required");
+    }
+
+    /**
+     * Set Worker Threads used in Netty NioEventLoopGroup with 0 interpreted 
as the default based on available processors
+     *
+     * @param workerThreads NioEventLoopGroup Worker Threads
+     */
+    public void setWorkerThreads(final int workerThreads) {
+        this.workerThreads = workerThreads;
+    }
+
+    EventLoopGroup getEventLoopGroup() {

Review comment:
       Why package scope instead of protected?

##########
File path: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSyslog.java
##########
@@ -222,213 +224,97 @@ protected void init(final ProcessorInitializationContext 
context) {
     }
 
     @OnScheduled
-    public void onScheduled(final ProcessContext context) throws IOException {
-        // initialize the queue of senders, one per task, senders will get 
created on the fly in onTrigger
-        this.senderPool = new 
LinkedBlockingQueue<>(context.getMaxConcurrentTasks());
-    }
-
-    protected ChannelSender createSender(final ProcessContext context) throws 
IOException {
-        final int port = 
context.getProperty(PORT).evaluateAttributeExpressions().asInteger();
-        final String host = 
context.getProperty(HOSTNAME).evaluateAttributeExpressions().getValue();
+    public void onScheduled(final ProcessContext context) throws 
InterruptedException {
+        eventSender = getEventSender(context);
         final String protocol = context.getProperty(PROTOCOL).getValue();
-        final int maxSendBuffer = 
context.getProperty(MAX_SOCKET_SEND_BUFFER_SIZE).evaluateAttributeExpressions().asDataSize(DataUnit.B).intValue();
-        final int timeout = 
context.getProperty(TIMEOUT).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS).intValue();
-        final SSLContextService sslContextService = 
context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
-        return createSender(sslContextService, protocol, host, port, 
maxSendBuffer, timeout);
-    }
-
-    // visible for testing to override and provide a mock sender if desired
-    protected ChannelSender createSender(final SSLContextService 
sslContextService, final String protocol, final String host,
-                                         final int port, final int 
maxSendBufferSize, final int timeout)
-            throws IOException {
-
-        ChannelSender sender;
-        if (protocol.equals(UDP_VALUE.getValue())) {
-            sender = new DatagramChannelSender(host, port, maxSendBufferSize, 
getLogger());
-        } else {
-            // if an SSLContextService is provided then we make a secure sender
-            if (sslContextService != null) {
-                final SSLContext sslContext = 
sslContextService.createContext();
-                sender = new SSLSocketChannelSender(host, port, 
maxSendBufferSize, sslContext, getLogger());
-            } else {
-                sender = new SocketChannelSender(host, port, 
maxSendBufferSize, getLogger());
-            }
-        }
-        sender.setTimeout(timeout);
-        sender.open();
-        return sender;
+        final String hostname = 
context.getProperty(HOSTNAME).evaluateAttributeExpressions().getValue();
+        final int port = 
context.getProperty(PORT).evaluateAttributeExpressions().asInteger();
+        transitUri = String.format("%s://%s:%s", protocol, hostname, port);
     }
 
     @OnStopped
-    public void onStopped() {
-        if (senderPool != null) {
-            ChannelSender sender = senderPool.poll();
-            while (sender != null) {
-                sender.close();
-                sender = senderPool.poll();
-            }
+    public void onStopped() throws Exception {
+        if (eventSender != null) {
+            eventSender.close();
         }
     }
 
-    private PruneResult pruneIdleSenders(final long idleThreshold){
-        int numClosed = 0;
-        int numConsidered = 0;
-
-        long currentTime = System.currentTimeMillis();
-        final List<ChannelSender> putBack = new ArrayList<>();
-
-        // if a connection hasn't been used with in the threshold then it gets 
closed
-        ChannelSender sender;
-        while ((sender = senderPool.poll()) != null) {
-            numConsidered++;
-            if (currentTime > (sender.getLastUsed() + idleThreshold)) {
-                getLogger().debug("Closing idle connection...");
-                sender.close();
-                numClosed++;
-            } else {
-                putBack.add(sender);
-            }
-        }
-
-        // re-queue senders that weren't idle, but if the queue is full then 
close the sender
-        for (ChannelSender putBackSender : putBack) {
-            boolean returned = senderPool.offer(putBackSender);
-            if (!returned) {
-                putBackSender.close();
-            }
-        }
-
-        return new PruneResult(numClosed, numConsidered);
-    }
-
     @Override
-    public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
-        final String protocol = context.getProperty(PROTOCOL).getValue();
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
         final int batchSize = 
context.getProperty(BATCH_SIZE).evaluateAttributeExpressions().asInteger();
-
         final List<FlowFile> flowFiles = session.get(batchSize);

Review comment:
       I noticed that PutSyslog continually runs even with no incoming 
flowfiles, probably because the logic to yield and return if flowFiles is null 
or empty has been removed.  What do you think about adding that check back?

##########
File path: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSyslog.java
##########
@@ -150,6 +149,7 @@
                     "messages will be sent over a secure connection.")
             .required(false)
             .identifiesControllerService(SSLContextService.class)
+            .dependsOn(PROTOCOL, TCP_VALUE)

Review comment:
       I noticed that in ListenSyslog, the SSL Context Service property is 
always available, but in PutSyslog it's conditional on TCP being set.  Can you 
add the condition to both?
   
   Also, if you have an SSL Context Service selected and then change it back to 
UDP, the processor is marked as invalid even though the SSL Context Service 
property is no longer available to be configured.  Is it possible to unset this 
property if protocol is set to UDP, or is that just a limitation of the current 
"dependsOn" functionality?

##########
File path: 
nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/NettyEventSenderFactory.java
##########
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.event.transport.netty;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.pool.ChannelHealthChecker;
+import io.netty.channel.pool.ChannelPool;
+import io.netty.channel.pool.ChannelPoolHandler;
+import io.netty.channel.pool.FixedChannelPool;
+import io.netty.channel.socket.nio.NioDatagramChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import org.apache.nifi.event.transport.EventSender;
+import org.apache.nifi.event.transport.EventSenderFactory;
+import org.apache.nifi.event.transport.configuration.TransportProtocol;
+import 
org.apache.nifi.event.transport.netty.channel.pool.InitializingChannelPoolHandler;
+import 
org.apache.nifi.event.transport.netty.channel.ssl.ClientSslStandardChannelInitializer;
+import 
org.apache.nifi.event.transport.netty.channel.StandardChannelInitializer;
+
+import javax.net.ssl.SSLContext;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.function.Supplier;
+
+/**
+ * Netty Event Sender Factory
+ */
+public class NettyEventSenderFactory<T> extends EventLoopGroupFactory 
implements EventSenderFactory<T> {
+    private static final int MAX_PENDING_ACQUIRES = 1024;
+
+    private Duration timeout = Duration.ofSeconds(30);
+
+    private int maxConnections = Runtime.getRuntime().availableProcessors() * 
2;
+
+    private Supplier<List<ChannelHandler>> handlerSupplier = () -> 
Collections.emptyList();
+
+    private final String address;
+
+    private final int port;
+
+    private final TransportProtocol protocol;
+
+    private SSLContext sslContext;
+
+    public NettyEventSenderFactory(final String address, final int port, final 
TransportProtocol protocol) {
+        this.address = address;
+        this.port = port;
+        this.protocol = protocol;
+    }
+
+    /**
+     * Set Channel Handler Supplier
+     *
+     * @param handlerSupplier Channel Handler Supplier
+     */
+    public void setHandlerSupplier(final Supplier<List<ChannelHandler>> 
handlerSupplier) {
+        this.handlerSupplier = Objects.requireNonNull(handlerSupplier);
+    }
+
+    /**
+     * Set SSL Context to enable TLS Channel Handler
+     *
+     * @param sslContext SSL Context
+     */
+    public void setSslContext(final SSLContext sslContext) {
+        this.sslContext = sslContext;
+    }
+
+    /**
+     * Set Timeout for Connections and Communication
+     *
+     * @param timeout Timeout Duration
+     */
+    public void setTimeout(final Duration timeout) {
+        this.timeout = Objects.requireNonNull(timeout, "Timeout required");
+    }
+
+    /**
+     * Set Maximum Connections for Channel Pool
+     *
+     * @param maxConnections Maximum Number of connections defaults to 
available processors multiplied by 2
+     */
+    public void setMaxConnections(final int maxConnections) {
+        this.maxConnections = maxConnections;
+    }
+
+    /**
+     * Get Event Sender with connected Channel
+     *
+     * @return Connected Event Sender
+     */
+    public EventSender<T> getEventSender() {
+        final Bootstrap bootstrap = new Bootstrap();
+        bootstrap.remoteAddress(new InetSocketAddress(address, port));
+        final EventLoopGroup group = getEventLoopGroup();
+        bootstrap.group(group);
+
+        if (TransportProtocol.UDP.equals(protocol)) {
+            bootstrap.channel(NioDatagramChannel.class);
+        } else {
+            bootstrap.channel(NioSocketChannel.class);
+        }
+
+        setChannelOptions(bootstrap);
+        return getConfiguredEventSender(bootstrap);
+    }
+
+    private void setChannelOptions(final Bootstrap bootstrap) {
+        final int timeoutMilliseconds = (int) timeout.toMillis();
+        bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 
timeoutMilliseconds);
+    }
+
+    private EventSender<T> getConfiguredEventSender(final Bootstrap bootstrap) 
{
+        final SocketAddress remoteAddress = bootstrap.config().remoteAddress();
+        final ChannelPool channelPool = getChannelPool(bootstrap);
+        return new NettyEventSender<>(bootstrap.config().group(), channelPool, 
remoteAddress);
+    }
+
+    private ChannelPool getChannelPool(final Bootstrap bootstrap) {
+        final ChannelInitializer<Channel> channelInitializer = 
getChannelInitializer();
+        final ChannelPoolHandler handler = new 
InitializingChannelPoolHandler(channelInitializer);
+        return new FixedChannelPool(bootstrap,
+                handler,
+                ChannelHealthChecker.ACTIVE,
+                FixedChannelPool.AcquireTimeoutAction.FAIL,
+                timeout.toMillis(),
+                maxConnections,
+                MAX_PENDING_ACQUIRES);
+    }
+
+    private ChannelInitializer<Channel> getChannelInitializer() {
+        StandardChannelInitializer<Channel> channelInitializer;
+        if (sslContext == null) {

Review comment:
       Ternary here?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to