This is an automated email from the ASF dual-hosted git repository.
thenatog pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new ab851b9 NIFI-8672 Added syncUninterruptibly() to Netty shutdown and
close methods
ab851b9 is described below
commit ab851b9cde4fdab291dffdfcc602d19f90ecda21
Author: exceptionfactory <[email protected]>
AuthorDate: Tue Jun 8 16:41:54 2021 -0500
NIFI-8672 Added syncUninterruptibly() to Netty shutdown and close methods
Signed-off-by: Nathan Gough <[email protected]>
This closes #5139.
---
.../event/transport/netty/NettyEventSender.java | 2 +-
.../event/transport/netty/NettyEventServer.java | 6 ++-
.../transport/netty/NettyEventSenderTest.java | 57 ++++++++++++++++++++++
.../transport/netty/NettyEventServerTest.java | 57 ++++++++++++++++++++++
.../nifi/processors/standard/TestListenSyslog.java | 5 ++
5 files changed, 124 insertions(+), 3 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/NettyEventSender.java
b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/NettyEventSender.java
index 15bc70b..7d9346c 100644
---
a/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/NettyEventSender.java
+++
b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/NettyEventSender.java
@@ -80,7 +80,7 @@ class NettyEventSender<T> implements EventSender<T> {
try {
channelPool.close();
} finally {
- group.shutdownGracefully();
+ group.shutdownGracefully().syncUninterruptibly();
}
}
diff --git
a/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/NettyEventServer.java
b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/NettyEventServer.java
index c4aa1a5..88792e9 100644
---
a/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/NettyEventServer.java
+++
b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/NettyEventServer.java
@@ -45,9 +45,11 @@ class NettyEventServer implements EventServer {
@Override
public void shutdown() {
try {
- channel.close().syncUninterruptibly();
+ if (channel.isOpen()) {
+ channel.close().syncUninterruptibly();
+ }
} finally {
- group.shutdownGracefully();
+ group.shutdownGracefully().syncUninterruptibly();
}
}
}
diff --git
a/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/test/java/org/apache/nifi/event/transport/netty/NettyEventSenderTest.java
b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/test/java/org/apache/nifi/event/transport/netty/NettyEventSenderTest.java
new file mode 100644
index 0000000..9341136
--- /dev/null
+++
b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/test/java/org/apache/nifi/event/transport/netty/NettyEventSenderTest.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.event.transport.netty;
+
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.pool.ChannelPool;
+import io.netty.util.concurrent.Future;
+import org.apache.nifi.remote.io.socket.NetworkUtils;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.verify;
+
+@RunWith(MockitoJUnitRunner.class)
+public class NettyEventSenderTest {
+ private static final String LOCALHOST = "127.0.0.1";
+
+ @Mock
+ private ChannelPool channelPool;
+
+ @Mock
+ private EventLoopGroup group;
+
+ @Mock
+ private Future<?> shutdownFuture;
+
+ @Test
+ public void testClose() {
+ final SocketAddress socketAddress =
InetSocketAddress.createUnresolved(LOCALHOST,
NetworkUtils.getAvailableTcpPort());
+ final NettyEventSender<?> sender = new NettyEventSender<>(group,
channelPool, socketAddress);
+ doReturn(shutdownFuture).when(group).shutdownGracefully();
+ sender.close();
+
+ verify(channelPool).close();
+ verify(group).shutdownGracefully();
+ }
+}
diff --git
a/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/test/java/org/apache/nifi/event/transport/netty/NettyEventServerTest.java
b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/test/java/org/apache/nifi/event/transport/netty/NettyEventServerTest.java
new file mode 100644
index 0000000..2cedded
--- /dev/null
+++
b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/test/java/org/apache/nifi/event/transport/netty/NettyEventServerTest.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.event.transport.netty;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.EventLoopGroup;
+import io.netty.util.concurrent.Future;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class NettyEventServerTest {
+ @Mock
+ private Channel channel;
+
+ @Mock
+ private EventLoopGroup group;
+
+ @Mock
+ private ChannelFuture closeFuture;
+
+ @Mock
+ private Future<?> shutdownFuture;
+
+ @Test
+ public void testShutdown() {
+ final NettyEventServer server = new NettyEventServer(group, channel);
+ when(channel.isOpen()).thenReturn(true);
+ when(channel.close()).thenReturn(closeFuture);
+ doReturn(shutdownFuture).when(group).shutdownGracefully();
+ server.shutdown();
+
+ verify(channel).close();
+ verify(group).shutdownGracefully();
+ }
+}
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java
index bf61cb3..815a639 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java
@@ -41,6 +41,7 @@ import java.util.List;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -165,6 +166,10 @@ public class TestListenSyslog {
sendMessages(protocol, port, LineEnding.UNIX, VALID_MESSAGE);
runner.run(1, STOP_ON_FINISH_ENABLED, INITIALIZE_DISABLED);
+
+ final List<MockFlowFile> invalidFlowFiles =
runner.getFlowFilesForRelationship(ListenSyslog.REL_INVALID);
+ assertTrue("Invalid FlowFiles found", invalidFlowFiles.isEmpty());
+
final List<MockFlowFile> successFlowFiles =
runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS);
assertEquals("Success FlowFiles not matched", 1,
successFlowFiles.size());