Repository: activemq-artemis Updated Branches: refs/heads/2.6.x 9a949230d -> f90afad1b
ARTEMIS-2093 NPE thrown by NettyConnector::createConnection Given that NettyConnector::createConnection isn't happening on the channel's event loop, it could race with a channel close event, that would clean the whole channel pipeline, leading to a NPE while trying to use a configured channel handler of the pipeline. (cherry picked from commit 3112b4f3db6a77b3d996d72bac65d539d1135ce8) Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/f90afad1 Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/f90afad1 Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/f90afad1 Branch: refs/heads/2.6.x Commit: f90afad1b75238ecc11c01fb5b1e7f4537ff26d8 Parents: 9a94923 Author: Francesco Nigro <[email protected]> Authored: Fri Sep 21 15:06:53 2018 +0200 Committer: Francesco Nigro <[email protected]> Committed: Fri Sep 21 16:09:09 2018 +0200 ---------------------------------------------------------------------- .../remoting/impl/netty/NettyConnector.java | 40 +++++++++++- .../remoting/impl/netty/NettyConnectorTest.java | 67 ++++++++++++++++++++ 2 files changed, 104 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f90afad1/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java index 8dd35e4..31668b3 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java @@ -47,6 +47,7 @@ import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; import java.util.stream.Stream; import io.netty.bootstrap.Bootstrap; @@ -597,6 +598,7 @@ public class NettyConnector extends AbstractConnector { protocolManager.addChannelHandlers(pipeline); pipeline.addLast(new ActiveMQClientChannelHandler(channelGroup, handler, new Listener(), closeExecutor)); + logger.debugf("Added ActiveMQClientChannelHandler to Channel with id = %s ", channel.id()); } }); @@ -712,6 +714,20 @@ public class NettyConnector extends AbstractConnector { @Override public Connection createConnection() { + return createConnection(null); + } + + /** + * Create and return a connection from this connector. + * <p> + * This method must NOT throw an exception if it fails to create the connection + * (e.g. network is not available), in this case it MUST return null.<br> + * This version can be used for testing purposes. + * + * @param onConnect a callback that would be called right after {@link Bootstrap#connect()} + * @return The connection, or {@code null} if unable to create a connection (e.g. network is unavailable) + */ + public final Connection createConnection(Consumer<ChannelFuture> onConnect) { if (channelClazz == null) { return null; } @@ -733,7 +749,9 @@ public class NettyConnector extends AbstractConnector { } else { future = bootstrap.connect(remoteDestination); } - + if (onConnect != null) { + onConnect.accept(future); + } future.awaitUninterruptibly(); if (future.isSuccess()) { @@ -745,7 +763,15 @@ public class NettyConnector extends AbstractConnector { if (handshakeFuture.isSuccess()) { ChannelPipeline channelPipeline = ch.pipeline(); ActiveMQChannelHandler channelHandler = channelPipeline.get(ActiveMQChannelHandler.class); - channelHandler.active = true; + if (channelHandler != null) { + channelHandler.active = true; + } else { + ch.close().awaitUninterruptibly(); + ActiveMQClientLogger.LOGGER.errorCreatingNettyConnection( + new IllegalStateException("No ActiveMQChannelHandler has been found while connecting to " + + remoteDestination + " from Channel with id = " + ch.id())); + return null; + } } else { ch.close().awaitUninterruptibly(); ActiveMQClientLogger.LOGGER.errorCreatingNettyConnection(handshakeFuture.cause()); @@ -805,7 +831,15 @@ public class NettyConnector extends AbstractConnector { } else { ChannelPipeline channelPipeline = ch.pipeline(); ActiveMQChannelHandler channelHandler = channelPipeline.get(ActiveMQChannelHandler.class); - channelHandler.active = true; + if (channelHandler != null) { + channelHandler.active = true; + } else { + ch.close().awaitUninterruptibly(); + ActiveMQClientLogger.LOGGER.errorCreatingNettyConnection( + new IllegalStateException("No ActiveMQChannelHandler has been found while connecting to " + + remoteDestination + " from Channel with id = " + ch.id())); + return null; + } } // No acceptor on a client connection http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f90afad1/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyConnectorTest.java ---------------------------------------------------------------------- diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyConnectorTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyConnectorTest.java index d302be7..e3e279f 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyConnectorTest.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyConnectorTest.java @@ -18,13 +18,20 @@ package org.apache.activemq.artemis.tests.unit.core.remoting.impl.netty; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import io.netty.channel.ChannelPipeline; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.api.core.TransportConfiguration; +import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl; +import org.apache.activemq.artemis.core.remoting.impl.netty.ActiveMQChannelHandler; import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnector; import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants; import org.apache.activemq.artemis.core.server.ActiveMQComponent; +import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.spi.core.remoting.BufferHandler; import org.apache.activemq.artemis.spi.core.remoting.ClientConnectionLifeCycleListener; import org.apache.activemq.artemis.spi.core.remoting.ClientProtocolManager; @@ -32,10 +39,40 @@ import org.apache.activemq.artemis.spi.core.remoting.Connection; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.artemis.utils.ActiveMQThreadFactory; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; public class NettyConnectorTest extends ActiveMQTestBase { + private ActiveMQServer server; + private ExecutorService executorService; + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + executorService = Executors.newCachedThreadPool(ActiveMQThreadFactory.defaultThreadFactory()); + + Map<String, Object> params = new HashMap<>(); + params.put(TransportConstants.SSL_ENABLED_PROP_NAME, true); + params.put(TransportConstants.SSL_PROVIDER, TransportConstants.OPENSSL_PROVIDER); + params.put(TransportConstants.KEYSTORE_PATH_PROP_NAME, "openssl-server-side-keystore.jks"); + params.put(TransportConstants.KEYSTORE_PASSWORD_PROP_NAME, "secureexample"); + params.put(TransportConstants.TRUSTSTORE_PATH_PROP_NAME, "openssl-server-side-truststore.jks"); + params.put(TransportConstants.TRUSTSTORE_PASSWORD_PROP_NAME, "secureexample"); + params.put(TransportConstants.NEED_CLIENT_AUTH_PROP_NAME, true); + ConfigurationImpl config = createBasicConfig().addAcceptorConfiguration(new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, params, "nettySSL")); + server = createServer(false, config); + server.start(); + waitForServerToStart(server); + } + + @Override + public void tearDown() throws Exception { + executorService.shutdown(); + super.tearDown(); + } + private ClientConnectionLifeCycleListener listener = new ClientConnectionLifeCycleListener() { @Override public void connectionException(final Object connectionID, final ActiveMQException me) { @@ -197,4 +234,34 @@ public class NettyConnectorTest extends ActiveMQTestBase { connector.close(); Assert.assertFalse(connector.isStarted()); } + + @Test + public void testChannelHandlerRemovedWhileCreatingConnection() throws Exception { + BufferHandler handler = (connectionID, buffer) -> { + }; + Map<String, Object> params = new HashMap<>(); + final ExecutorService closeExecutor = Executors.newCachedThreadPool(ActiveMQThreadFactory.defaultThreadFactory()); + final ExecutorService threadPool = Executors.newCachedThreadPool(ActiveMQThreadFactory.defaultThreadFactory()); + final ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5, ActiveMQThreadFactory.defaultThreadFactory()); + try { + NettyConnector connector = new NettyConnector(params, handler, listener, closeExecutor, threadPool, scheduledThreadPool); + connector.start(); + final Connection connection = connector.createConnection(future -> { + future.awaitUninterruptibly(); + Assert.assertTrue(future.isSuccess()); + final ChannelPipeline pipeline = future.channel().pipeline(); + final ActiveMQChannelHandler activeMQChannelHandler = pipeline.get(ActiveMQChannelHandler.class); + Assert.assertNotNull(activeMQChannelHandler); + pipeline.remove(activeMQChannelHandler); + Assert.assertNull(pipeline.get(ActiveMQChannelHandler.class)); + }); + Assert.assertNull(connection); + connector.close(); + } finally { + closeExecutor.shutdownNow(); + threadPool.shutdownNow(); + scheduledThreadPool.shutdownNow(); + } + } + }
