This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 713e2fd  NIFI-9645 - Updated PutSplunk to allow idle connection 
timeouts
713e2fd is described below

commit 713e2fd03cc1f7084dc9f5ab14636a6986390257
Author: Nathan Gough <[email protected]>
AuthorDate: Fri Mar 4 21:46:42 2022 -0500

    NIFI-9645 - Updated PutSplunk to allow idle connection timeouts
    
    This closes #5841
    
    Signed-off-by: David Handermann <[email protected]>
---
 .../util/put/AbstractPutEventProcessor.java        |  7 ++--
 .../netty/CloseContextIdleStateHandler.java        | 38 ++++++++++++++++++++++
 .../transport/netty/NettyEventSenderFactory.java   | 10 ++++++
 .../netty/channel/StandardChannelInitializer.java  | 13 ++++++++
 4 files changed, 66 insertions(+), 2 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-event-put/src/main/java/org/apache/nifi/processor/util/put/AbstractPutEventProcessor.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-event-put/src/main/java/org/apache/nifi/processor/util/put/AbstractPutEventProcessor.java
index 567f8fd..d9b8577 100644
--- 
a/nifi-nar-bundles/nifi-extension-utils/nifi-event-put/src/main/java/org/apache/nifi/processor/util/put/AbstractPutEventProcessor.java
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-event-put/src/main/java/org/apache/nifi/processor/util/put/AbstractPutEventProcessor.java
@@ -78,10 +78,11 @@ public abstract class AbstractPutEventProcessor<T> extends 
AbstractSessionFactor
             .build();
     public static final PropertyDescriptor IDLE_EXPIRATION = new 
PropertyDescriptor
             .Builder().name("Idle Connection Expiration")
-            .description("The amount of time a connection should be held open 
without being used before closing the connection.")
+            .description("The amount of time a connection should be held open 
without being used before closing the connection. A value of 0 seconds will 
disable this feature.")
             .required(true)
-            .defaultValue("5 seconds")
+            .defaultValue("15 seconds")
             .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
             .build();
 
     // Putting these properties here so sub-classes don't have to redefine 
them, but they are
@@ -249,7 +250,9 @@ public abstract class AbstractPutEventProcessor<T> extends 
AbstractSessionFactor
         factory.setShutdownQuietPeriod(Duration.ZERO); // Quiet period not 
necessary since sending threads will have completed before shutting down event 
sender
 
         final int timeout = 
context.getProperty(TIMEOUT).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS).intValue();
+        final int idleTimeout = 
context.getProperty(IDLE_EXPIRATION).evaluateAttributeExpressions().asTimePeriod(TimeUnit.SECONDS).intValue();
         factory.setTimeout(Duration.ofMillis(timeout));
+        factory.setIdleTimeout(Duration.ofSeconds(idleTimeout));
 
         final PropertyValue sslContextServiceProperty = 
context.getProperty(SSL_CONTEXT_SERVICE);
         if (sslContextServiceProperty.isSet()) {
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/CloseContextIdleStateHandler.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/CloseContextIdleStateHandler.java
new file mode 100644
index 0000000..c452a32
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/CloseContextIdleStateHandler.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.event.transport.netty;
+
+import io.netty.channel.ChannelDuplexHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.timeout.IdleState;
+import io.netty.handler.timeout.IdleStateEvent;
+
+/**
+ * Idle State Handler closes channel context when state indicates idle 
communications
+ */
+public class CloseContextIdleStateHandler extends ChannelDuplexHandler {
+
+    @Override
+    public void userEventTriggered(final ChannelHandlerContext context, final 
Object event) {
+        if (event instanceof IdleStateEvent) {
+            final IdleStateEvent idleStateEvent = (IdleStateEvent) event;
+            if (idleStateEvent.state() == IdleState.ALL_IDLE) {
+                context.close();
+            }
+        }
+    }
+}
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/NettyEventSenderFactory.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/NettyEventSenderFactory.java
index b2f34d4..9030297 100644
--- 
a/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/NettyEventSenderFactory.java
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/NettyEventSenderFactory.java
@@ -61,6 +61,8 @@ public class NettyEventSenderFactory<T> extends 
EventLoopGroupFactory implements
 
     private Duration timeout = Duration.ofSeconds(30);
 
+    private Duration idleTimeout = Duration.ofSeconds(0);
+
     private int maxConnections = Runtime.getRuntime().availableProcessors() * 
2;
 
     private Supplier<List<ChannelHandler>> handlerSupplier = () -> 
Collections.emptyList();
@@ -116,6 +118,13 @@ public class NettyEventSenderFactory<T> extends 
EventLoopGroupFactory implements
     }
 
     /**
+     * Set the idle timeout period for outgoing client connections
+     */
+    public void setIdleTimeout(final Duration idleTimeout) {
+        this.idleTimeout = Objects.requireNonNull(idleTimeout, "Timeout 
required");
+    }
+
+    /**
      * Set shutdown quiet period
      *
      * @param quietPeriod shutdown quiet period
@@ -205,6 +214,7 @@ public class NettyEventSenderFactory<T> extends 
EventLoopGroupFactory implements
                 ? new StandardChannelInitializer<>(handlerSupplier)
                 : new ClientSslStandardChannelInitializer<>(handlerSupplier, 
sslContext);
         channelInitializer.setWriteTimeout(timeout);
+        channelInitializer.setIdleTimeout(idleTimeout);
         return channelInitializer;
     }
 }
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/channel/StandardChannelInitializer.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/channel/StandardChannelInitializer.java
index bbfb148..c3fcf81 100644
--- 
a/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/channel/StandardChannelInitializer.java
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/channel/StandardChannelInitializer.java
@@ -20,7 +20,9 @@ import io.netty.channel.Channel;
 import io.netty.channel.ChannelHandler;
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelPipeline;
+import io.netty.handler.timeout.IdleStateHandler;
 import io.netty.handler.timeout.WriteTimeoutHandler;
+import org.apache.nifi.event.transport.netty.CloseContextIdleStateHandler;
 
 import java.time.Duration;
 import java.util.List;
@@ -37,6 +39,8 @@ public class StandardChannelInitializer<T extends Channel> 
extends ChannelInitia
 
     private Duration writeTimeout = Duration.ofSeconds(30);
 
+    private Duration idleTimeout = Duration.ofSeconds(0);
+
     /**
      * Standard Channel Initializer with handlers
      *
@@ -55,10 +59,19 @@ public class StandardChannelInitializer<T extends Channel> 
extends ChannelInitia
         this.writeTimeout = Objects.requireNonNull(writeTimeout);
     }
 
+    /**
+     * Set the idle timeout period for outgoing client connections
+     */
+    public void setIdleTimeout(final Duration idleTimeout) {
+        this.idleTimeout = Objects.requireNonNull(idleTimeout);
+    }
+
     @Override
     protected void initChannel(Channel channel) {
         final ChannelPipeline pipeline = channel.pipeline();
+        pipeline.addFirst(new IdleStateHandler(idleTimeout.getSeconds(), 
idleTimeout.getSeconds(), idleTimeout.getSeconds(), TimeUnit.SECONDS));
         pipeline.addLast(new WriteTimeoutHandler(writeTimeout.toMillis(), 
TimeUnit.MILLISECONDS));
+        pipeline.addLast(new CloseContextIdleStateHandler());
         handlerSupplier.get().forEach(pipeline::addLast);
     }
 }

Reply via email to