This is an automated email from the ASF dual-hosted git repository.

thenatog pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new ab851b9  NIFI-8672 Added syncUninterruptibly() to Netty shutdown and 
close methods
ab851b9 is described below

commit ab851b9cde4fdab291dffdfcc602d19f90ecda21
Author: exceptionfactory <[email protected]>
AuthorDate: Tue Jun 8 16:41:54 2021 -0500

    NIFI-8672 Added syncUninterruptibly() to Netty shutdown and close methods
    
    Signed-off-by: Nathan Gough <[email protected]>
    
    This closes #5139.
---
 .../event/transport/netty/NettyEventSender.java    |  2 +-
 .../event/transport/netty/NettyEventServer.java    |  6 ++-
 .../transport/netty/NettyEventSenderTest.java      | 57 ++++++++++++++++++++++
 .../transport/netty/NettyEventServerTest.java      | 57 ++++++++++++++++++++++
 .../nifi/processors/standard/TestListenSyslog.java |  5 ++
 5 files changed, 124 insertions(+), 3 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/NettyEventSender.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/NettyEventSender.java
index 15bc70b..7d9346c 100644
--- 
a/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/NettyEventSender.java
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/NettyEventSender.java
@@ -80,7 +80,7 @@ class NettyEventSender<T> implements EventSender<T> {
         try {
             channelPool.close();
         } finally {
-            group.shutdownGracefully();
+            group.shutdownGracefully().syncUninterruptibly();
         }
     }
 
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/NettyEventServer.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/NettyEventServer.java
index c4aa1a5..88792e9 100644
--- 
a/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/NettyEventServer.java
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/NettyEventServer.java
@@ -45,9 +45,11 @@ class NettyEventServer implements EventServer {
     @Override
     public void shutdown() {
         try {
-            channel.close().syncUninterruptibly();
+            if (channel.isOpen()) {
+                channel.close().syncUninterruptibly();
+            }
         } finally {
-            group.shutdownGracefully();
+            group.shutdownGracefully().syncUninterruptibly();
         }
     }
 }
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/test/java/org/apache/nifi/event/transport/netty/NettyEventSenderTest.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/test/java/org/apache/nifi/event/transport/netty/NettyEventSenderTest.java
new file mode 100644
index 0000000..9341136
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/test/java/org/apache/nifi/event/transport/netty/NettyEventSenderTest.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.event.transport.netty;
+
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.pool.ChannelPool;
+import io.netty.util.concurrent.Future;
+import org.apache.nifi.remote.io.socket.NetworkUtils;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.verify;
+
+@RunWith(MockitoJUnitRunner.class)
+public class NettyEventSenderTest {
+    private static final String LOCALHOST = "127.0.0.1";
+
+    @Mock
+    private ChannelPool channelPool;
+
+    @Mock
+    private EventLoopGroup group;
+
+    @Mock
+    private Future<?> shutdownFuture;
+
+    @Test
+    public void testClose() {
+        final SocketAddress socketAddress = 
InetSocketAddress.createUnresolved(LOCALHOST, 
NetworkUtils.getAvailableTcpPort());
+        final NettyEventSender<?> sender = new NettyEventSender<>(group, 
channelPool, socketAddress);
+        doReturn(shutdownFuture).when(group).shutdownGracefully();
+        sender.close();
+
+        verify(channelPool).close();
+        verify(group).shutdownGracefully();
+    }
+}
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/test/java/org/apache/nifi/event/transport/netty/NettyEventServerTest.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/test/java/org/apache/nifi/event/transport/netty/NettyEventServerTest.java
new file mode 100644
index 0000000..2cedded
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/test/java/org/apache/nifi/event/transport/netty/NettyEventServerTest.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.event.transport.netty;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.EventLoopGroup;
+import io.netty.util.concurrent.Future;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class NettyEventServerTest {
+    @Mock
+    private Channel channel;
+
+    @Mock
+    private EventLoopGroup group;
+
+    @Mock
+    private ChannelFuture closeFuture;
+
+    @Mock
+    private Future<?> shutdownFuture;
+
+    @Test
+    public void testShutdown() {
+        final NettyEventServer server = new NettyEventServer(group, channel);
+        when(channel.isOpen()).thenReturn(true);
+        when(channel.close()).thenReturn(closeFuture);
+        doReturn(shutdownFuture).when(group).shutdownGracefully();
+        server.shutdown();
+
+        verify(channel).close();
+        verify(group).shutdownGracefully();
+    }
+}
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java
index bf61cb3..815a639 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java
@@ -41,6 +41,7 @@ import java.util.List;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -165,6 +166,10 @@ public class TestListenSyslog {
         sendMessages(protocol, port, LineEnding.UNIX, VALID_MESSAGE);
 
         runner.run(1, STOP_ON_FINISH_ENABLED, INITIALIZE_DISABLED);
+
+        final List<MockFlowFile> invalidFlowFiles = 
runner.getFlowFilesForRelationship(ListenSyslog.REL_INVALID);
+        assertTrue("Invalid FlowFiles found", invalidFlowFiles.isEmpty());
+
         final List<MockFlowFile> successFlowFiles = 
runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS);
         assertEquals("Success FlowFiles not matched", 1, 
successFlowFiles.size());
 

Reply via email to