This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 713e2fd NIFI-9645 - Updated PutSplunk to allow idle connection
timeouts
713e2fd is described below
commit 713e2fd03cc1f7084dc9f5ab14636a6986390257
Author: Nathan Gough <[email protected]>
AuthorDate: Fri Mar 4 21:46:42 2022 -0500
NIFI-9645 - Updated PutSplunk to allow idle connection timeouts
This closes #5841
Signed-off-by: David Handermann <[email protected]>
---
.../util/put/AbstractPutEventProcessor.java | 7 ++--
.../netty/CloseContextIdleStateHandler.java | 38 ++++++++++++++++++++++
.../transport/netty/NettyEventSenderFactory.java | 10 ++++++
.../netty/channel/StandardChannelInitializer.java | 13 ++++++++
4 files changed, 66 insertions(+), 2 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-extension-utils/nifi-event-put/src/main/java/org/apache/nifi/processor/util/put/AbstractPutEventProcessor.java
b/nifi-nar-bundles/nifi-extension-utils/nifi-event-put/src/main/java/org/apache/nifi/processor/util/put/AbstractPutEventProcessor.java
index 567f8fd..d9b8577 100644
---
a/nifi-nar-bundles/nifi-extension-utils/nifi-event-put/src/main/java/org/apache/nifi/processor/util/put/AbstractPutEventProcessor.java
+++
b/nifi-nar-bundles/nifi-extension-utils/nifi-event-put/src/main/java/org/apache/nifi/processor/util/put/AbstractPutEventProcessor.java
@@ -78,10 +78,11 @@ public abstract class AbstractPutEventProcessor<T> extends
AbstractSessionFactor
.build();
public static final PropertyDescriptor IDLE_EXPIRATION = new
PropertyDescriptor
.Builder().name("Idle Connection Expiration")
- .description("The amount of time a connection should be held open
without being used before closing the connection.")
+ .description("The amount of time a connection should be held open
without being used before closing the connection. A value of 0 seconds will
disable this feature.")
.required(true)
- .defaultValue("5 seconds")
+ .defaultValue("15 seconds")
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
// Putting these properties here so sub-classes don't have to redefine
them, but they are
@@ -249,7 +250,9 @@ public abstract class AbstractPutEventProcessor<T> extends
AbstractSessionFactor
factory.setShutdownQuietPeriod(Duration.ZERO); // Quiet period not
necessary since sending threads will have completed before shutting down event
sender
final int timeout =
context.getProperty(TIMEOUT).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS).intValue();
+ final int idleTimeout =
context.getProperty(IDLE_EXPIRATION).evaluateAttributeExpressions().asTimePeriod(TimeUnit.SECONDS).intValue();
factory.setTimeout(Duration.ofMillis(timeout));
+ factory.setIdleTimeout(Duration.ofSeconds(idleTimeout));
final PropertyValue sslContextServiceProperty =
context.getProperty(SSL_CONTEXT_SERVICE);
if (sslContextServiceProperty.isSet()) {
diff --git
a/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/CloseContextIdleStateHandler.java
b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/CloseContextIdleStateHandler.java
new file mode 100644
index 0000000..c452a32
--- /dev/null
+++
b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/CloseContextIdleStateHandler.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.event.transport.netty;
+
+import io.netty.channel.ChannelDuplexHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.timeout.IdleState;
+import io.netty.handler.timeout.IdleStateEvent;
+
+/**
+ * Idle State Handler closes channel context when state indicates idle
communications
+ */
+public class CloseContextIdleStateHandler extends ChannelDuplexHandler {
+
+ @Override
+ public void userEventTriggered(final ChannelHandlerContext context, final
Object event) {
+ if (event instanceof IdleStateEvent) {
+ final IdleStateEvent idleStateEvent = (IdleStateEvent) event;
+ if (idleStateEvent.state() == IdleState.ALL_IDLE) {
+ context.close();
+ }
+ }
+ }
+}
diff --git
a/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/NettyEventSenderFactory.java
b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/NettyEventSenderFactory.java
index b2f34d4..9030297 100644
---
a/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/NettyEventSenderFactory.java
+++
b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/NettyEventSenderFactory.java
@@ -61,6 +61,8 @@ public class NettyEventSenderFactory<T> extends
EventLoopGroupFactory implements
private Duration timeout = Duration.ofSeconds(30);
+ private Duration idleTimeout = Duration.ofSeconds(0);
+
private int maxConnections = Runtime.getRuntime().availableProcessors() *
2;
private Supplier<List<ChannelHandler>> handlerSupplier = () ->
Collections.emptyList();
@@ -116,6 +118,13 @@ public class NettyEventSenderFactory<T> extends
EventLoopGroupFactory implements
}
/**
+ * Set the idle timeout period for outgoing client connections
+ */
+ public void setIdleTimeout(final Duration idleTimeout) {
+ this.idleTimeout = Objects.requireNonNull(idleTimeout, "Timeout
required");
+ }
+
+ /**
* Set shutdown quiet period
*
* @param quietPeriod shutdown quiet period
@@ -205,6 +214,7 @@ public class NettyEventSenderFactory<T> extends
EventLoopGroupFactory implements
? new StandardChannelInitializer<>(handlerSupplier)
: new ClientSslStandardChannelInitializer<>(handlerSupplier,
sslContext);
channelInitializer.setWriteTimeout(timeout);
+ channelInitializer.setIdleTimeout(idleTimeout);
return channelInitializer;
}
}
diff --git
a/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/channel/StandardChannelInitializer.java
b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/channel/StandardChannelInitializer.java
index bbfb148..c3fcf81 100644
---
a/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/channel/StandardChannelInitializer.java
+++
b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/channel/StandardChannelInitializer.java
@@ -20,7 +20,9 @@ import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
+import io.netty.handler.timeout.IdleStateHandler;
import io.netty.handler.timeout.WriteTimeoutHandler;
+import org.apache.nifi.event.transport.netty.CloseContextIdleStateHandler;
import java.time.Duration;
import java.util.List;
@@ -37,6 +39,8 @@ public class StandardChannelInitializer<T extends Channel>
extends ChannelInitia
private Duration writeTimeout = Duration.ofSeconds(30);
+ private Duration idleTimeout = Duration.ofSeconds(0);
+
/**
* Standard Channel Initializer with handlers
*
@@ -55,10 +59,19 @@ public class StandardChannelInitializer<T extends Channel>
extends ChannelInitia
this.writeTimeout = Objects.requireNonNull(writeTimeout);
}
+ /**
+ * Set the idle timeout period for outgoing client connections
+ */
+ public void setIdleTimeout(final Duration idleTimeout) {
+ this.idleTimeout = Objects.requireNonNull(idleTimeout);
+ }
+
@Override
protected void initChannel(Channel channel) {
final ChannelPipeline pipeline = channel.pipeline();
+ pipeline.addFirst(new IdleStateHandler(idleTimeout.getSeconds(),
idleTimeout.getSeconds(), idleTimeout.getSeconds(), TimeUnit.SECONDS));
pipeline.addLast(new WriteTimeoutHandler(writeTimeout.toMillis(),
TimeUnit.MILLISECONDS));
+ pipeline.addLast(new CloseContextIdleStateHandler());
handlerSupplier.get().forEach(pipeline::addLast);
}
}