Repository: activemq-artemis
Updated Branches:
  refs/heads/master bd0f11498 -> cf525f014


ARTEMIS-2093 NPE thrown by NettyConnector::createConnection

Given that NettyConnector::createConnection isn't happening on the
channel's event loop, it could race with a channel close event, that
would clean the whole channel pipeline, leading to a NPE while
trying to use a configured channel handler of the pipeline.


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/3112b4f3
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/3112b4f3
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/3112b4f3

Branch: refs/heads/master
Commit: 3112b4f3db6a77b3d996d72bac65d539d1135ce8
Parents: bd0f114
Author: Francesco Nigro <[email protected]>
Authored: Fri Sep 21 15:06:53 2018 +0200
Committer: Francesco Nigro <[email protected]>
Committed: Fri Sep 21 15:06:53 2018 +0200

----------------------------------------------------------------------
 .../remoting/impl/netty/NettyConnector.java     | 40 ++++++++++++++++++--
 .../remoting/impl/netty/NettyConnectorTest.java | 33 ++++++++++++++++
 2 files changed, 70 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3112b4f3/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java
----------------------------------------------------------------------
diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java
index 4a87770..32deb38 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java
@@ -47,6 +47,7 @@ import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
 import java.util.stream.Stream;
 
 import io.netty.bootstrap.Bootstrap;
@@ -604,6 +605,7 @@ public class NettyConnector extends AbstractConnector {
             protocolManager.addChannelHandlers(pipeline);
 
             pipeline.addLast(new ActiveMQClientChannelHandler(channelGroup, 
handler, new Listener(), closeExecutor));
+            logger.debugf("Added ActiveMQClientChannelHandler to Channel with 
id = %s ", channel.id());
          }
       });
 
@@ -737,6 +739,20 @@ public class NettyConnector extends AbstractConnector {
 
    @Override
    public Connection createConnection() {
+      return createConnection(null);
+   }
+
+   /**
+    * Create and return a connection from this connector.
+    * <p>
+    * This method must NOT throw an exception if it fails to create the 
connection
+    * (e.g. network is not available), in this case it MUST return null.<br>
+    * This version can be used for testing purposes.
+    *
+    * @param onConnect a callback that would be called right after {@link 
Bootstrap#connect()}
+    * @return The connection, or {@code null} if unable to create a connection 
(e.g. network is unavailable)
+    */
+   public final Connection createConnection(Consumer<ChannelFuture> onConnect) 
{
       if (channelClazz == null) {
          return null;
       }
@@ -758,7 +774,9 @@ public class NettyConnector extends AbstractConnector {
       } else {
          future = bootstrap.connect(remoteDestination);
       }
-
+      if (onConnect != null) {
+         onConnect.accept(future);
+      }
       future.awaitUninterruptibly();
 
       if (future.isSuccess()) {
@@ -770,7 +788,15 @@ public class NettyConnector extends AbstractConnector {
                if (handshakeFuture.isSuccess()) {
                   ChannelPipeline channelPipeline = ch.pipeline();
                   ActiveMQChannelHandler channelHandler = 
channelPipeline.get(ActiveMQChannelHandler.class);
-                  channelHandler.active = true;
+                  if (channelHandler != null) {
+                     channelHandler.active = true;
+                  } else {
+                     ch.close().awaitUninterruptibly();
+                     ActiveMQClientLogger.LOGGER.errorCreatingNettyConnection(
+                        new IllegalStateException("No ActiveMQChannelHandler 
has been found while connecting to " +
+                                                     remoteDestination + " 
from Channel with id = " + ch.id()));
+                     return null;
+                  }
                } else {
                   ch.close().awaitUninterruptibly();
                   
ActiveMQClientLogger.LOGGER.errorCreatingNettyConnection(handshakeFuture.cause());
@@ -830,7 +856,15 @@ public class NettyConnector extends AbstractConnector {
          } else {
             ChannelPipeline channelPipeline = ch.pipeline();
             ActiveMQChannelHandler channelHandler = 
channelPipeline.get(ActiveMQChannelHandler.class);
-            channelHandler.active = true;
+            if (channelHandler != null) {
+               channelHandler.active = true;
+            } else {
+               ch.close().awaitUninterruptibly();
+               ActiveMQClientLogger.LOGGER.errorCreatingNettyConnection(
+                  new IllegalStateException("No ActiveMQChannelHandler has 
been found while connecting to " +
+                                               remoteDestination + " from 
Channel with id = " + ch.id()));
+               return null;
+            }
          }
 
          // No acceptor on a client connection

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3112b4f3/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyConnectorTest.java
----------------------------------------------------------------------
diff --git 
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyConnectorTest.java
 
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyConnectorTest.java
index 9829329..ffd2cd4 100644
--- 
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyConnectorTest.java
+++ 
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyConnectorTest.java
@@ -20,11 +20,14 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 
+import io.netty.channel.ChannelPipeline;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.TransportConfiguration;
 import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
+import 
org.apache.activemq.artemis.core.remoting.impl.netty.ActiveMQChannelHandler;
 import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnector;
 import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
 import org.apache.activemq.artemis.core.server.ActiveMQComponent;
@@ -361,4 +364,34 @@ public class NettyConnectorTest extends ActiveMQTestBase {
       connector.close();
       Assert.assertFalse(connector.isStarted());
    }
+
+   @Test
+   public void testChannelHandlerRemovedWhileCreatingConnection() throws 
Exception {
+      BufferHandler handler = (connectionID, buffer) -> {
+      };
+      Map<String, Object> params = new HashMap<>();
+      final ExecutorService closeExecutor = 
Executors.newCachedThreadPool(ActiveMQThreadFactory.defaultThreadFactory());
+      final ExecutorService threadPool = 
Executors.newCachedThreadPool(ActiveMQThreadFactory.defaultThreadFactory());
+      final ScheduledExecutorService scheduledThreadPool = 
Executors.newScheduledThreadPool(5, 
ActiveMQThreadFactory.defaultThreadFactory());
+      try {
+         NettyConnector connector = new NettyConnector(params, handler, 
listener, closeExecutor, threadPool, scheduledThreadPool);
+         connector.start();
+         final Connection connection = connector.createConnection(future -> {
+            future.awaitUninterruptibly();
+            Assert.assertTrue(future.isSuccess());
+            final ChannelPipeline pipeline = future.channel().pipeline();
+            final ActiveMQChannelHandler activeMQChannelHandler = 
pipeline.get(ActiveMQChannelHandler.class);
+            Assert.assertNotNull(activeMQChannelHandler);
+            pipeline.remove(activeMQChannelHandler);
+            Assert.assertNull(pipeline.get(ActiveMQChannelHandler.class));
+         });
+         Assert.assertNull(connection);
+         connector.close();
+      } finally {
+         closeExecutor.shutdownNow();
+         threadPool.shutdownNow();
+         scheduledThreadPool.shutdownNow();
+      }
+   }
+
 }

Reply via email to