ARTEMIS-1920 AMQP throw NPE if can't find a backup server (cherry picked from commit de0747a9a40f44c309287a54f324cc9a21f31b93)
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/8058a512 Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/8058a512 Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/8058a512 Branch: refs/heads/2.6.x Commit: 8058a512e5fe97f1a139dc00a3c16d56ee92882a Parents: 40b66d1 Author: Clebert Suconic <[email protected]> Authored: Fri Jun 8 16:27:42 2018 -0400 Committer: Clebert Suconic <[email protected]> Committed: Sat Jun 9 01:56:49 2018 -0400 ---------------------------------------------------------------------- .../amqp/broker/AMQPConnectionCallback.java | 4 +- .../core/postoffice/impl/BindingsImpl.java | 2 +- .../ProtocolsMessageLoadBalancingTest.java | 85 ++++++++++++++++++++ 3 files changed, 89 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8058a512/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java index 05b4f4f..84fdd24 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java @@ -291,7 +291,9 @@ public class AMQPConnectionCallback implements FailureListener, CloseListener { ClusterConnection clusterConnection = clusterManager.getDefaultConnection(null); if (clusterConnection != null) { TopologyMemberImpl member = clusterConnection.getTopology().getMember(server.getNodeID().toString()); - return member.toBackupURI(); + if (member != null) { + return member.toBackupURI(); + } } return null; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8058a512/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java index 478c700..c669eba 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java @@ -258,7 +258,7 @@ public final class BindingsImpl implements Bindings { if (entry.getValue() instanceof RemoteQueueBinding) { RemoteQueueBinding remoteQueueBinding = (RemoteQueueBinding) entry.getValue(); if (remoteQueueBinding.getRemoteQueueID() == id) { - message.putBytesProperty(Message.HDR_ROUTE_TO_IDS, ByteBuffer.allocate(8).putLong(remoteQueueBinding.getID()).array()); + message.putExtraBytesProperty(Message.HDR_ROUTE_TO_IDS, ByteBuffer.allocate(8).putLong(remoteQueueBinding.getID()).array()); } } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8058a512/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/crossprotocol/ProtocolsMessageLoadBalancingTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/crossprotocol/ProtocolsMessageLoadBalancingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/crossprotocol/ProtocolsMessageLoadBalancingTest.java index 1c98157..8ed685c 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/crossprotocol/ProtocolsMessageLoadBalancingTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/crossprotocol/ProtocolsMessageLoadBalancingTest.java @@ -26,6 +26,7 @@ import javax.jms.Session; import java.util.Arrays; import java.util.Collection; +import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl; @@ -37,7 +38,9 @@ import org.apache.activemq.artemis.core.server.cluster.impl.ClusterConnectionImp import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; +import org.apache.activemq.artemis.junit.Wait; import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManagerFactory; +import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; import org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase; import org.apache.qpid.jms.JmsConnectionFactory; import org.junit.Assert; @@ -282,6 +285,88 @@ public class ProtocolsMessageLoadBalancingTest extends ClusterTestBase { connection.close(); } + @Test + public void testRestartConnection() throws Exception { + + startServers(MessageLoadBalancingType.STRICT); + + System.out.println("connections " + servers[1].getRemotingService().getConnections().size()); + + Wait.assertEquals(3, () -> servers[1].getRemotingService().getConnections().size()); + Wait.assertEquals(3, () -> servers[0].getRemotingService().getConnections().size()); + + RemotingConnection[] connectionsServer1 = servers[1].getRemotingService().getConnections().toArray(new RemotingConnection[3]); + RemotingConnection[] connectionsServer0 = servers[0].getRemotingService().getConnections().toArray(new RemotingConnection[3]); + + ConnectionFactory[] factory = new ConnectionFactory[NUMBER_OF_SERVERS]; + Connection[] connection = new Connection[NUMBER_OF_SERVERS]; + Session[] session = new Session[NUMBER_OF_SERVERS]; + MessageConsumer[] consumer = new MessageConsumer[NUMBER_OF_SERVERS]; + + // this will pre create consumers to make sure messages are distributed evenly without redistribution + for (int node = 0; node < NUMBER_OF_SERVERS; node++) { + factory[node] = getJmsConnectionFactory(node); + connection[node] = factory[node].createConnection(); + session[node] = connection[node].createSession(false, Session.AUTO_ACKNOWLEDGE); + consumer[node] = session[node].createConsumer(session[node].createQueue(queueName.toString())); + } + + waitForBindings(0, "queues.0", 1, 1, true); + waitForBindings(1, "queues.0", 1, 1, true); + + waitForBindings(0, "queues.0", 1, 1, false); + waitForBindings(1, "queues.0", 1, 1, false); + + for (RemotingConnection remotingConnection : servers[1].getRemotingService().getConnections()) { + remotingConnection.fail(new ActiveMQException("forcing failure")); + } + for (RemotingConnection remotingConnection : servers[1].getRemotingService().getConnections()) { + remotingConnection.fail(new ActiveMQException("forcing failure")); + } + + // this is to allow reconnects + Thread.sleep(500); + + // this will pre create consumers to make sure messages are distributed evenly without redistribution + for (int node = 0; node < NUMBER_OF_SERVERS; node++) { + try { + connection[node].close(); + } catch (Throwable e) { + e.printStackTrace(); + } + factory[node] = getJmsConnectionFactory(node); + connection[node] = factory[node].createConnection(); + session[node] = connection[node].createSession(false, Session.AUTO_ACKNOWLEDGE); + consumer[node] = session[node].createConsumer(session[node].createQueue(queueName.toString())); + } + + waitForBindings(0, "queues.0", 1, 1, true); + waitForBindings(1, "queues.0", 1, 1, true); + + waitForBindings(0, "queues.0", 1, 1, false); + waitForBindings(1, "queues.0", 1, 1, false); + + System.out.println("connections " + servers[1].getRemotingService().getConnections().size()); + + // sending Messages.. they should be load balanced + { + ConnectionFactory cf = getJmsConnectionFactory(0); + Connection cn = cf.createConnection(); + Session sn = cn.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer pd = sn.createProducer(sn.createQueue(queueName.toString())); + + for (int i = 0; i < NUMBER_OF_MESSAGES; i++) { + pd.send(sn.createTextMessage("hello " + i)); + } + + cn.close(); + } + + receiveMessages(connection[0], consumer[0], NUMBER_OF_MESSAGES / 2, true); + receiveMessages(connection[1], consumer[1], NUMBER_OF_MESSAGES / 2, true); + + } + private void receiveMessages(Connection connection, MessageConsumer messageConsumer, int messageCount,
