[ https://issues.apache.org/jira/browse/ARTEMIS-1549?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Todd Baert updated ARTEMIS-1549: -------------------------------- Description: I have setup a cluster of 2 brokers, using a simple static cluster configuration (see below). Sending a CORE message to broker1, and consuming that message from broker2 works as expected. Attempting the same over AMQP (using the AMQP .Net Lite client) results in an NPE in broker2: {code:none} 08:44:28,061 ERROR [org.apache.activemq.artemis.core.server] AMQ224016: Caught exception: java.lang.NullPointerException at org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage.setAddress(AMQPMessage.java:613) [artemis-amqp-protocol-2.3.0.jar:] at org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage.setAddress(AMQPMessage.java:64) [artemis-amqp-protocol-2.3.0.jar:] at org.apache.activemq.artemis.core.server.impl.ServerSessionImpl.send(ServerSessionImpl.java:1348) [artemis-server-2.3.0.jar:2.3.0] at org.apache.activemq.artemis.core.server.impl.ServerSessionImpl.send(ServerSessionImpl.java:1309) [artemis-server-2.3.0.jar:2.3.0] at org.apache.activemq.artemis.core.server.impl.ServerSessionImpl.send(ServerSessionImpl.java:1302) [artemis-server-2.3.0.jar:2.3.0] at org.apache.activemq.artemis.core.protocol.core.ServerSessionPacketHandler.onSessionSend(ServerSessionPacketHandler.java:690) [artemis-server-2.3.0.jar:2.3.0] at org.apache.activemq.artemis.core.protocol.core.ServerSessionPacketHandler.onMessagePacket(ServerSessionPacketHandler.java:290) [artemis-server-2.3.0.jar:2.3.0] at org.apache.activemq.artemis.utils.actors.Actor.doTask(Actor.java:33) [artemis-commons-2.3.0.jar:2.3.0] at org.apache.activemq.artemis.utils.actors.ProcessorBase$ExecutorTask.run(ProcessorBase.java:53) [artemis-commons-2.3.0.jar:2.3.0] at org.apache.activemq.artemis.utils.actors.OrderedExecutor.doTask(OrderedExecutor.java:42) [artemis-commons-2.3.0.jar:2.3.0] at org.apache.activemq.artemis.utils.actors.OrderedExecutor.doTask(OrderedExecutor.java:31) [artemis-commons-2.3.0.jar:2.3.0] at org.apache.activemq.artemis.utils.actors.ProcessorBase$ExecutorTask.run(ProcessorBase.java:53) [artemis-commons-2.3.0.jar:2.3.0] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [rt.jar:1.8.0_151] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [rt.jar:1.8.0_151] at java.lang.Thread.run(Thread.java:748) [rt.jar:1.8.0_151] {code} To be clear, the NPE exists on the broker to which the receiver is attached, every time a message is SENT by the producer (which is attached to the other broker). Attempting to send/receive the AMQP messages on the same cluster member works as expected. Here is some client code that demonstrates the issue: {code:java} using System; using System.Collections.Generic; using System.Threading; using System.Transactions; using Amqp; using Amqp.Framing; using Amqp.Sasl; using Amqp.Types; namespace Test { class Program { static void Main(string[] args) { string url1 = "amqp://localhost:5672"; string url2 = "amqp://localhost:5673"; String ADDRESS = "orders"; Connection connection1 = new Connection(new Address(url1)); Session session1 = new Session(connection1); ReceiverLink receiver = new ReceiverLink(session1, "sub1", CreateSharedDurableSubscriberSource(ADDRESS), null); Connection connection2 = new Connection(new Address(url2)); Session session2 = new Session(connection2); SenderLink sender = new SenderLink(session2, "sender", ADDRESS); receiver.Start(300, (r, m) => { r.Accept(m); Console.WriteLine("Got message: " + m.Body); }); Message outMessage = new Message("order placed at " + DateTime.Now.ToString()); outMessage.Header = new Header(); outMessage.Header.Durable = true; sender.Send(outMessage); Thread.CurrentThread.Join(); } private static Source CreateSharedDurableSubscriberSource(String address) { Source source = new Source(); source.Address = address; source.ExpiryPolicy = new Symbol("never"); source.Durable = 2; source.Capabilities = new Symbol[]{"topic", "shared", "global"}; source.DistributionMode = new Symbol("copy"); return source; } } } {code} Here is the cluster config from broker1, broker2 is configured accordingly {code:xml} <acceptors> <acceptor name="artemis">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP;useEpoll=true;amqpCredits=1000;amqpLowCredits=300</acceptor> <acceptor name="amqp">tcp://0.0.0.0:5672?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;amqpCredits=1000;amqpMinCredits=300</acceptor> </acceptors> <connectors> <connector name="broker1-connector">tcp://localhost:61616</connector> <connector name="broker2-connector">tcp://localhost:61617</connector> </connectors> <!-- TODD: credentials for the cluster --> <cluster-user>todd</cluster-user> <cluster-password>password</cluster-password> <!-- ADDED --> <cluster-connections> <cluster-connection name="preprod"> <!-- TODO - is this needed? --> <connector-ref>broker1-connector</connector-ref> <retry-interval>500</retry-interval> <use-duplicate-detection>true</use-duplicate-detection> <message-load-balancing>ON_DEMAND</message-load-balancing> <max-hops>1</max-hops> <static-connectors allow-direct-connections-only="true"> <connector-ref>broker2-connector</connector-ref> </static-connectors> </cluster-connection> </cluster-connections> {code} Message load balancing has no effect on this issue, redistributiuon-delay is set to 0 for the address in question. was: I have setup a cluster of 2 brokers, using a simple static cluster configuration (see below). Sending a CORE message to broker1, and consuming that message from broker2 works as expected. Attempting the same over AMQP (using the AMQP .Net Lite client) results in an NPE in broker2: {code:java} 08:44:28,061 ERROR [org.apache.activemq.artemis.core.server] AMQ224016: Caught exception: java.lang.NullPointerException at org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage.setAddress(AMQPMessage.java:613) [artemis-amqp-protocol-2.3.0.jar:] at org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage.setAddress(AMQPMessage.java:64) [artemis-amqp-protocol-2.3.0.jar:] at org.apache.activemq.artemis.core.server.impl.ServerSessionImpl.send(ServerSessionImpl.java:1348) [artemis-server-2.3.0.jar:2.3.0] at org.apache.activemq.artemis.core.server.impl.ServerSessionImpl.send(ServerSessionImpl.java:1309) [artemis-server-2.3.0.jar:2.3.0] at org.apache.activemq.artemis.core.server.impl.ServerSessionImpl.send(ServerSessionImpl.java:1302) [artemis-server-2.3.0.jar:2.3.0] at org.apache.activemq.artemis.core.protocol.core.ServerSessionPacketHandler.onSessionSend(ServerSessionPacketHandler.java:690) [artemis-server-2.3.0.jar:2.3.0] at org.apache.activemq.artemis.core.protocol.core.ServerSessionPacketHandler.onMessagePacket(ServerSessionPacketHandler.java:290) [artemis-server-2.3.0.jar:2.3.0] at org.apache.activemq.artemis.utils.actors.Actor.doTask(Actor.java:33) [artemis-commons-2.3.0.jar:2.3.0] at org.apache.activemq.artemis.utils.actors.ProcessorBase$ExecutorTask.run(ProcessorBase.java:53) [artemis-commons-2.3.0.jar:2.3.0] at org.apache.activemq.artemis.utils.actors.OrderedExecutor.doTask(OrderedExecutor.java:42) [artemis-commons-2.3.0.jar:2.3.0] at org.apache.activemq.artemis.utils.actors.OrderedExecutor.doTask(OrderedExecutor.java:31) [artemis-commons-2.3.0.jar:2.3.0] at org.apache.activemq.artemis.utils.actors.ProcessorBase$ExecutorTask.run(ProcessorBase.java:53) [artemis-commons-2.3.0.jar:2.3.0] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [rt.jar:1.8.0_151] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [rt.jar:1.8.0_151] at java.lang.Thread.run(Thread.java:748) [rt.jar:1.8.0_151] {code} To be clear, the NPE exists on the broker to which the receiver is attached, every time a message is SENT by the producer (which is attached to the other broker). Attempting to send/receive the AMQP messages on the same cluster member works as expected. Here is some client code that demonstrates the issue: {code:C#} using System; using System.Collections.Generic; using System.Threading; using System.Transactions; using Amqp; using Amqp.Framing; using Amqp.Sasl; using Amqp.Types; namespace Test { class Program { static void Main(string[] args) { string url1 = "amqp://localhost:5672"; string url2 = "amqp://localhost:5673"; String ADDRESS = "orders"; Connection connection1 = new Connection(new Address(url1)); Session session1 = new Session(connection1); ReceiverLink receiver = new ReceiverLink(session1, "sub1", CreateSharedDurableSubscriberSource(ADDRESS), null); Connection connection2 = new Connection(new Address(url2)); Session session2 = new Session(connection2); SenderLink sender = new SenderLink(session2, "sender", ADDRESS); receiver.Start(300, (r, m) => { r.Accept(m); Console.WriteLine("Got message: " + m.Body); }); Message outMessage = new Message("order placed at " + DateTime.Now.ToString()); outMessage.Header = new Header(); outMessage.Header.Durable = true; sender.Send(outMessage); Thread.CurrentThread.Join(); } private static Source CreateSharedDurableSubscriberSource(String address) { Source source = new Source(); source.Address = address; source.ExpiryPolicy = new Symbol("never"); source.Durable = 2; source.Capabilities = new Symbol[]{"topic", "shared", "global"}; source.DistributionMode = new Symbol("copy"); return source; } } } {code} Here is the cluster config from broker1, broker2 is configured accordingly {code:java} <acceptors> <acceptor name="artemis">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP;useEpoll=true;amqpCredits=1000;amqpLowCredits=300</acceptor> <acceptor name="amqp">tcp://0.0.0.0:5672?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;amqpCredits=1000;amqpMinCredits=300</acceptor> </acceptors> <connectors> <connector name="broker1-connector">tcp://localhost:61616</connector> <connector name="broker2-connector">tcp://localhost:61617</connector> </connectors> <!-- TODD: credentials for the cluster --> <cluster-user>todd</cluster-user> <cluster-password>password</cluster-password> <!-- ADDED --> <cluster-connections> <cluster-connection name="preprod"> <!-- TODO - is this needed? --> <connector-ref>broker1-connector</connector-ref> <retry-interval>500</retry-interval> <use-duplicate-detection>true</use-duplicate-detection> <message-load-balancing>ON_DEMAND</message-load-balancing> <max-hops>1</max-hops> <static-connectors allow-direct-connections-only="true"> <connector-ref>broker2-connector</connector-ref> </static-connectors> </cluster-connection> </cluster-connections> {code} Message load balancing has no effect on this issue, redistributiuon-delay is set to 0 for the address in question. > AMQP messages aren't redistributed across cluster bridge, NPE in > ServerSessionImpl.send() > ----------------------------------------------------------------------------------------- > > Key: ARTEMIS-1549 > URL: https://issues.apache.org/jira/browse/ARTEMIS-1549 > Project: ActiveMQ Artemis > Issue Type: Bug > Components: AMQP, Broker > Affects Versions: 2.2.0, 2.3.0, 2.4.0 > Environment: AMQP .NET lite client, .NET Core runtime 2.0, connecting > with brokers on local machine (fedora 26) > Reporter: Todd Baert > > I have setup a cluster of 2 brokers, using a simple static cluster > configuration (see below). Sending a CORE message to broker1, and consuming > that message from broker2 works as expected. Attempting the same over AMQP > (using the AMQP .Net Lite client) results in an NPE in broker2: > {code:none} > 08:44:28,061 ERROR [org.apache.activemq.artemis.core.server] AMQ224016: > Caught exception: java.lang.NullPointerException > at > org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage.setAddress(AMQPMessage.java:613) > [artemis-amqp-protocol-2.3.0.jar:] > at > org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage.setAddress(AMQPMessage.java:64) > [artemis-amqp-protocol-2.3.0.jar:] > at > org.apache.activemq.artemis.core.server.impl.ServerSessionImpl.send(ServerSessionImpl.java:1348) > [artemis-server-2.3.0.jar:2.3.0] > at > org.apache.activemq.artemis.core.server.impl.ServerSessionImpl.send(ServerSessionImpl.java:1309) > [artemis-server-2.3.0.jar:2.3.0] > at > org.apache.activemq.artemis.core.server.impl.ServerSessionImpl.send(ServerSessionImpl.java:1302) > [artemis-server-2.3.0.jar:2.3.0] > at > org.apache.activemq.artemis.core.protocol.core.ServerSessionPacketHandler.onSessionSend(ServerSessionPacketHandler.java:690) > [artemis-server-2.3.0.jar:2.3.0] > at > org.apache.activemq.artemis.core.protocol.core.ServerSessionPacketHandler.onMessagePacket(ServerSessionPacketHandler.java:290) > [artemis-server-2.3.0.jar:2.3.0] > at org.apache.activemq.artemis.utils.actors.Actor.doTask(Actor.java:33) > [artemis-commons-2.3.0.jar:2.3.0] > at > org.apache.activemq.artemis.utils.actors.ProcessorBase$ExecutorTask.run(ProcessorBase.java:53) > [artemis-commons-2.3.0.jar:2.3.0] > at > org.apache.activemq.artemis.utils.actors.OrderedExecutor.doTask(OrderedExecutor.java:42) > [artemis-commons-2.3.0.jar:2.3.0] > at > org.apache.activemq.artemis.utils.actors.OrderedExecutor.doTask(OrderedExecutor.java:31) > [artemis-commons-2.3.0.jar:2.3.0] > at > org.apache.activemq.artemis.utils.actors.ProcessorBase$ExecutorTask.run(ProcessorBase.java:53) > [artemis-commons-2.3.0.jar:2.3.0] > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > [rt.jar:1.8.0_151] > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > [rt.jar:1.8.0_151] > at java.lang.Thread.run(Thread.java:748) [rt.jar:1.8.0_151] > {code} > To be clear, the NPE exists on the broker to which the receiver is attached, > every time a message is SENT by the producer (which is attached to the other > broker). > Attempting to send/receive the AMQP messages on the same cluster member works > as expected. > Here is some client code that demonstrates the issue: > {code:java} > using System; > using System.Collections.Generic; > using System.Threading; > using System.Transactions; > using Amqp; > using Amqp.Framing; > using Amqp.Sasl; > using Amqp.Types; > namespace Test > { > class Program > { > static void Main(string[] args) > { > string url1 = "amqp://localhost:5672"; > string url2 = "amqp://localhost:5673"; > String ADDRESS = "orders"; > Connection connection1 = new Connection(new Address(url1)); > Session session1 = new Session(connection1); > ReceiverLink receiver = new ReceiverLink(session1, "sub1", > CreateSharedDurableSubscriberSource(ADDRESS), null); > Connection connection2 = new Connection(new Address(url2)); > Session session2 = new Session(connection2); > SenderLink sender = new SenderLink(session2, "sender", ADDRESS); > receiver.Start(300, (r, m) => { > r.Accept(m); > Console.WriteLine("Got message: " + m.Body); > }); > > Message outMessage = new Message("order placed at " + > DateTime.Now.ToString()); > outMessage.Header = new Header(); > outMessage.Header.Durable = true; > sender.Send(outMessage); > Thread.CurrentThread.Join(); > } > private static Source CreateSharedDurableSubscriberSource(String > address) > { > Source source = new Source(); > source.Address = address; > source.ExpiryPolicy = new Symbol("never"); > > source.Durable = 2; > source.Capabilities = new Symbol[]{"topic", "shared", "global"}; > source.DistributionMode = new Symbol("copy"); > return source; > } > } > } > {code} > Here is the cluster config from broker1, broker2 is configured accordingly > {code:xml} > <acceptors> > <acceptor > name="artemis">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP;useEpoll=true;amqpCredits=1000;amqpLowCredits=300</acceptor> > <acceptor > name="amqp">tcp://0.0.0.0:5672?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;amqpCredits=1000;amqpMinCredits=300</acceptor> > </acceptors> > <connectors> > <connector name="broker1-connector">tcp://localhost:61616</connector> > <connector name="broker2-connector">tcp://localhost:61617</connector> > </connectors> > <!-- TODD: credentials for the cluster --> > <cluster-user>todd</cluster-user> > <cluster-password>password</cluster-password> > <!-- ADDED --> > <cluster-connections> > <cluster-connection name="preprod"> > <!-- TODO - is this needed? --> > <connector-ref>broker1-connector</connector-ref> > <retry-interval>500</retry-interval> > <use-duplicate-detection>true</use-duplicate-detection> > <message-load-balancing>ON_DEMAND</message-load-balancing> > <max-hops>1</max-hops> > <static-connectors allow-direct-connections-only="true"> > <connector-ref>broker2-connector</connector-ref> > </static-connectors> > </cluster-connection> > </cluster-connections> > {code} > Message load balancing has no effect on this issue, redistributiuon-delay is > set to 0 for the address in question. -- This message was sent by Atlassian JIRA (v6.4.14#64029)