gresockj commented on a change in pull request #5044: URL: https://github.com/apache/nifi/pull/5044#discussion_r632070182
########## File path: nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/message/ByteArrayMessage.java ########## @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.event.transport.message; + +/** + * Byte Array Message with Sender + */ +public class ByteArrayMessage { + private byte[] message; Review comment: Any benefit in making these final, since there's no getters? ########## File path: nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/NettyEventSenderFactory.java ########## @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.event.transport.netty; + +import io.netty.bootstrap.Bootstrap; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.pool.ChannelHealthChecker; +import io.netty.channel.pool.ChannelPool; +import io.netty.channel.pool.ChannelPoolHandler; +import io.netty.channel.pool.FixedChannelPool; +import io.netty.channel.socket.nio.NioDatagramChannel; +import io.netty.channel.socket.nio.NioSocketChannel; +import org.apache.nifi.event.transport.EventSender; +import org.apache.nifi.event.transport.EventSenderFactory; +import org.apache.nifi.event.transport.configuration.TransportProtocol; +import org.apache.nifi.event.transport.netty.channel.pool.InitializingChannelPoolHandler; +import org.apache.nifi.event.transport.netty.channel.ssl.ClientSslStandardChannelInitializer; +import org.apache.nifi.event.transport.netty.channel.StandardChannelInitializer; + +import javax.net.ssl.SSLContext; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.time.Duration; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.function.Supplier; + +/** + * Netty Event Sender Factory + */ +public class NettyEventSenderFactory<T> extends EventLoopGroupFactory implements EventSenderFactory<T> { + private static final int MAX_PENDING_ACQUIRES = 1024; + + private Duration timeout = Duration.ofSeconds(30); + + private int maxConnections = Runtime.getRuntime().availableProcessors() * 2; + + private Supplier<List<ChannelHandler>> handlerSupplier = () -> Collections.emptyList(); + + private final String address; Review comment: Can you move the final ones to the top, just to make it easier to visually distinguish the mutable vs. immutable fields? ########## File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java ########## @@ -152,7 +145,7 @@ "The maximum number of Syslog events to add to a single FlowFile. If multiple events are available, they will be concatenated along with " + "the <Message Delimiter> up to this configured maximum number of messages") .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) - .expressionLanguageSupported(false) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) Review comment: If you enable EL, you'll need to call evaluateAttributeExpressions when you use the property below. Also, I'm not sure FLOWFILE_ATTRIBUTES is appropriate here, since ListenSyslog is annotated with INPUT_FORBIDDEN. ########## File path: nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/EventLoopGroupFactory.java ########## @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.event.transport.netty; + +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.util.concurrent.DefaultThreadFactory; + +import java.util.Objects; +import java.util.concurrent.ThreadFactory; + +/** + * Event Loop Group Factory for standardized instance creation + */ +class EventLoopGroupFactory { + private static final String DEFAULT_THREAD_NAME_PREFIX = "NettyEventLoopGroup"; + + private static final boolean DAEMON_THREAD_ENABLED = true; + + private String threadNamePrefix = DEFAULT_THREAD_NAME_PREFIX; + + private int workerThreads; + + /** + * Set Thread Name Prefix used in Netty NioEventLoopGroup defaults to NettyChannel + * + * @param threadNamePrefix Thread Name Prefix + */ + public void setThreadNamePrefix(final String threadNamePrefix) { + this.threadNamePrefix = Objects.requireNonNull(threadNamePrefix, "Thread Name Prefix required"); + } + + /** + * Set Worker Threads used in Netty NioEventLoopGroup with 0 interpreted as the default based on available processors + * + * @param workerThreads NioEventLoopGroup Worker Threads + */ + public void setWorkerThreads(final int workerThreads) { + this.workerThreads = workerThreads; + } + + EventLoopGroup getEventLoopGroup() { Review comment: Why package scope instead of protected? ########## File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSyslog.java ########## @@ -222,213 +224,97 @@ protected void init(final ProcessorInitializationContext context) { } @OnScheduled - public void onScheduled(final ProcessContext context) throws IOException { - // initialize the queue of senders, one per task, senders will get created on the fly in onTrigger - this.senderPool = new LinkedBlockingQueue<>(context.getMaxConcurrentTasks()); - } - - protected ChannelSender createSender(final ProcessContext context) throws IOException { - final int port = context.getProperty(PORT).evaluateAttributeExpressions().asInteger(); - final String host = context.getProperty(HOSTNAME).evaluateAttributeExpressions().getValue(); + public void onScheduled(final ProcessContext context) throws InterruptedException { + eventSender = getEventSender(context); final String protocol = context.getProperty(PROTOCOL).getValue(); - final int maxSendBuffer = context.getProperty(MAX_SOCKET_SEND_BUFFER_SIZE).evaluateAttributeExpressions().asDataSize(DataUnit.B).intValue(); - final int timeout = context.getProperty(TIMEOUT).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS).intValue(); - final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class); - return createSender(sslContextService, protocol, host, port, maxSendBuffer, timeout); - } - - // visible for testing to override and provide a mock sender if desired - protected ChannelSender createSender(final SSLContextService sslContextService, final String protocol, final String host, - final int port, final int maxSendBufferSize, final int timeout) - throws IOException { - - ChannelSender sender; - if (protocol.equals(UDP_VALUE.getValue())) { - sender = new DatagramChannelSender(host, port, maxSendBufferSize, getLogger()); - } else { - // if an SSLContextService is provided then we make a secure sender - if (sslContextService != null) { - final SSLContext sslContext = sslContextService.createContext(); - sender = new SSLSocketChannelSender(host, port, maxSendBufferSize, sslContext, getLogger()); - } else { - sender = new SocketChannelSender(host, port, maxSendBufferSize, getLogger()); - } - } - sender.setTimeout(timeout); - sender.open(); - return sender; + final String hostname = context.getProperty(HOSTNAME).evaluateAttributeExpressions().getValue(); + final int port = context.getProperty(PORT).evaluateAttributeExpressions().asInteger(); + transitUri = String.format("%s://%s:%s", protocol, hostname, port); } @OnStopped - public void onStopped() { - if (senderPool != null) { - ChannelSender sender = senderPool.poll(); - while (sender != null) { - sender.close(); - sender = senderPool.poll(); - } + public void onStopped() throws Exception { + if (eventSender != null) { + eventSender.close(); } } - private PruneResult pruneIdleSenders(final long idleThreshold){ - int numClosed = 0; - int numConsidered = 0; - - long currentTime = System.currentTimeMillis(); - final List<ChannelSender> putBack = new ArrayList<>(); - - // if a connection hasn't been used with in the threshold then it gets closed - ChannelSender sender; - while ((sender = senderPool.poll()) != null) { - numConsidered++; - if (currentTime > (sender.getLastUsed() + idleThreshold)) { - getLogger().debug("Closing idle connection..."); - sender.close(); - numClosed++; - } else { - putBack.add(sender); - } - } - - // re-queue senders that weren't idle, but if the queue is full then close the sender - for (ChannelSender putBackSender : putBack) { - boolean returned = senderPool.offer(putBackSender); - if (!returned) { - putBackSender.close(); - } - } - - return new PruneResult(numClosed, numConsidered); - } - @Override - public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { - final String protocol = context.getProperty(PROTOCOL).getValue(); + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { final int batchSize = context.getProperty(BATCH_SIZE).evaluateAttributeExpressions().asInteger(); - final List<FlowFile> flowFiles = session.get(batchSize); Review comment: I noticed that PutSyslog continually runs even with no incoming flowfiles, probably because the logic to yield and return if flowFiles is null or empty has been removed. What do you think about adding that check back? ########## File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSyslog.java ########## @@ -150,6 +149,7 @@ "messages will be sent over a secure connection.") .required(false) .identifiesControllerService(SSLContextService.class) + .dependsOn(PROTOCOL, TCP_VALUE) Review comment: I noticed that in ListenSyslog, the SSL Context Service property is always available, but in PutSyslog it's conditional on TCP being set. Can you add the condition to both? Also, if you have an SSL Context Service selected and then change it back to UDP, the processor is marked as invalid even though the SSL Context Service property is no longer available to be configured. Is it possible to unset this property if protocol is set to UDP, or is that just a limitation of the current "dependsOn" functionality? ########## File path: nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/NettyEventSenderFactory.java ########## @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.event.transport.netty; + +import io.netty.bootstrap.Bootstrap; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.pool.ChannelHealthChecker; +import io.netty.channel.pool.ChannelPool; +import io.netty.channel.pool.ChannelPoolHandler; +import io.netty.channel.pool.FixedChannelPool; +import io.netty.channel.socket.nio.NioDatagramChannel; +import io.netty.channel.socket.nio.NioSocketChannel; +import org.apache.nifi.event.transport.EventSender; +import org.apache.nifi.event.transport.EventSenderFactory; +import org.apache.nifi.event.transport.configuration.TransportProtocol; +import org.apache.nifi.event.transport.netty.channel.pool.InitializingChannelPoolHandler; +import org.apache.nifi.event.transport.netty.channel.ssl.ClientSslStandardChannelInitializer; +import org.apache.nifi.event.transport.netty.channel.StandardChannelInitializer; + +import javax.net.ssl.SSLContext; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.time.Duration; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.function.Supplier; + +/** + * Netty Event Sender Factory + */ +public class NettyEventSenderFactory<T> extends EventLoopGroupFactory implements EventSenderFactory<T> { + private static final int MAX_PENDING_ACQUIRES = 1024; + + private Duration timeout = Duration.ofSeconds(30); + + private int maxConnections = Runtime.getRuntime().availableProcessors() * 2; + + private Supplier<List<ChannelHandler>> handlerSupplier = () -> Collections.emptyList(); + + private final String address; + + private final int port; + + private final TransportProtocol protocol; + + private SSLContext sslContext; + + public NettyEventSenderFactory(final String address, final int port, final TransportProtocol protocol) { + this.address = address; + this.port = port; + this.protocol = protocol; + } + + /** + * Set Channel Handler Supplier + * + * @param handlerSupplier Channel Handler Supplier + */ + public void setHandlerSupplier(final Supplier<List<ChannelHandler>> handlerSupplier) { + this.handlerSupplier = Objects.requireNonNull(handlerSupplier); + } + + /** + * Set SSL Context to enable TLS Channel Handler + * + * @param sslContext SSL Context + */ + public void setSslContext(final SSLContext sslContext) { + this.sslContext = sslContext; + } + + /** + * Set Timeout for Connections and Communication + * + * @param timeout Timeout Duration + */ + public void setTimeout(final Duration timeout) { + this.timeout = Objects.requireNonNull(timeout, "Timeout required"); + } + + /** + * Set Maximum Connections for Channel Pool + * + * @param maxConnections Maximum Number of connections defaults to available processors multiplied by 2 + */ + public void setMaxConnections(final int maxConnections) { + this.maxConnections = maxConnections; + } + + /** + * Get Event Sender with connected Channel + * + * @return Connected Event Sender + */ + public EventSender<T> getEventSender() { + final Bootstrap bootstrap = new Bootstrap(); + bootstrap.remoteAddress(new InetSocketAddress(address, port)); + final EventLoopGroup group = getEventLoopGroup(); + bootstrap.group(group); + + if (TransportProtocol.UDP.equals(protocol)) { + bootstrap.channel(NioDatagramChannel.class); + } else { + bootstrap.channel(NioSocketChannel.class); + } + + setChannelOptions(bootstrap); + return getConfiguredEventSender(bootstrap); + } + + private void setChannelOptions(final Bootstrap bootstrap) { + final int timeoutMilliseconds = (int) timeout.toMillis(); + bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, timeoutMilliseconds); + } + + private EventSender<T> getConfiguredEventSender(final Bootstrap bootstrap) { + final SocketAddress remoteAddress = bootstrap.config().remoteAddress(); + final ChannelPool channelPool = getChannelPool(bootstrap); + return new NettyEventSender<>(bootstrap.config().group(), channelPool, remoteAddress); + } + + private ChannelPool getChannelPool(final Bootstrap bootstrap) { + final ChannelInitializer<Channel> channelInitializer = getChannelInitializer(); + final ChannelPoolHandler handler = new InitializingChannelPoolHandler(channelInitializer); + return new FixedChannelPool(bootstrap, + handler, + ChannelHealthChecker.ACTIVE, + FixedChannelPool.AcquireTimeoutAction.FAIL, + timeout.toMillis(), + maxConnections, + MAX_PENDING_ACQUIRES); + } + + private ChannelInitializer<Channel> getChannelInitializer() { + StandardChannelInitializer<Channel> channelInitializer; + if (sslContext == null) { Review comment: Ternary here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org