[ 
https://issues.apache.org/jira/browse/ARTEMIS-1549?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16284773#comment-16284773
 ] 

clebert suconic commented on ARTEMIS-1549:
------------------------------------------

[~toddbaert] Can you try a snapshot from master?

> AMQP messages aren't redistributed across cluster bridge, NPE in 
> ServerSessionImpl.send()
> -----------------------------------------------------------------------------------------
>
>                 Key: ARTEMIS-1549
>                 URL: https://issues.apache.org/jira/browse/ARTEMIS-1549
>             Project: ActiveMQ Artemis
>          Issue Type: Bug
>          Components: AMQP, Broker
>    Affects Versions: 2.2.0, 2.3.0, 2.4.0
>         Environment: AMQP .NET lite client, .NET Core runtime 2.0, connecting 
> with brokers on local machine (fedora 26)
>            Reporter: Todd Baert
>
> I have setup a cluster of 2 brokers, using a simple static cluster 
> configuration (see below). Sending a CORE message to broker1, and consuming 
> that message from broker2 works as expected. Attempting the same over AMQP 
> (using the AMQP .Net Lite client) results in an NPE in broker2:
> {code:none}
> 08:44:28,061 ERROR [org.apache.activemq.artemis.core.server] AMQ224016: 
> Caught exception: java.lang.NullPointerException
>       at 
> org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage.setAddress(AMQPMessage.java:613)
>  [artemis-amqp-protocol-2.3.0.jar:]
>       at 
> org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage.setAddress(AMQPMessage.java:64)
>  [artemis-amqp-protocol-2.3.0.jar:]
>       at 
> org.apache.activemq.artemis.core.server.impl.ServerSessionImpl.send(ServerSessionImpl.java:1348)
>  [artemis-server-2.3.0.jar:2.3.0]
>       at 
> org.apache.activemq.artemis.core.server.impl.ServerSessionImpl.send(ServerSessionImpl.java:1309)
>  [artemis-server-2.3.0.jar:2.3.0]
>       at 
> org.apache.activemq.artemis.core.server.impl.ServerSessionImpl.send(ServerSessionImpl.java:1302)
>  [artemis-server-2.3.0.jar:2.3.0]
>       at 
> org.apache.activemq.artemis.core.protocol.core.ServerSessionPacketHandler.onSessionSend(ServerSessionPacketHandler.java:690)
>  [artemis-server-2.3.0.jar:2.3.0]
>       at 
> org.apache.activemq.artemis.core.protocol.core.ServerSessionPacketHandler.onMessagePacket(ServerSessionPacketHandler.java:290)
>  [artemis-server-2.3.0.jar:2.3.0]
>       at org.apache.activemq.artemis.utils.actors.Actor.doTask(Actor.java:33) 
> [artemis-commons-2.3.0.jar:2.3.0]
>       at 
> org.apache.activemq.artemis.utils.actors.ProcessorBase$ExecutorTask.run(ProcessorBase.java:53)
>  [artemis-commons-2.3.0.jar:2.3.0]
>       at 
> org.apache.activemq.artemis.utils.actors.OrderedExecutor.doTask(OrderedExecutor.java:42)
>  [artemis-commons-2.3.0.jar:2.3.0]
>       at 
> org.apache.activemq.artemis.utils.actors.OrderedExecutor.doTask(OrderedExecutor.java:31)
>  [artemis-commons-2.3.0.jar:2.3.0]
>       at 
> org.apache.activemq.artemis.utils.actors.ProcessorBase$ExecutorTask.run(ProcessorBase.java:53)
>  [artemis-commons-2.3.0.jar:2.3.0]
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  [rt.jar:1.8.0_151]
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  [rt.jar:1.8.0_151]
>       at java.lang.Thread.run(Thread.java:748) [rt.jar:1.8.0_151]
> {code}
> To be clear, the NPE exists on the broker to which the receiver is attached, 
> every time a message is SENT by the producer (which is attached to the other 
> broker).
> Attempting to send/receive the AMQP messages on the same cluster member works 
> as expected.
> Here is some client code that demonstrates the issue:
> {code:java}
> using System;
> using System.Collections.Generic;
> using System.Threading;
> using System.Transactions;
> using Amqp;
> using Amqp.Framing;
> using Amqp.Sasl;
> using Amqp.Types;
> namespace Test
> {
>     class Program
>     {
>         static void Main(string[] args)
>         {
>             string url1 = "amqp://localhost:5672";
>             string url2 = "amqp://localhost:5673";
>             String ADDRESS = "orders";
>             Connection connection1 = new Connection(new Address(url1));
>             Session session1 = new Session(connection1);
>             ReceiverLink receiver = new ReceiverLink(session1, "sub1", 
> CreateSharedDurableSubscriberSource(ADDRESS), null);
>             Connection connection2 = new Connection(new Address(url2));
>             Session session2 = new Session(connection2);
>             SenderLink sender = new SenderLink(session2, "sender", ADDRESS);
>             receiver.Start(300, (r,  m) => {
>                 r.Accept(m);
>                 Console.WriteLine("Got message: " + m.Body);
>             });
>         
>             Message outMessage = new Message("order placed at " + 
> DateTime.Now.ToString());
>             outMessage.Header = new Header();
>             outMessage.Header.Durable = true;                    
>             sender.Send(outMessage);
>             Thread.CurrentThread.Join();
>         }
>         private static Source CreateSharedDurableSubscriberSource(String 
> address)
>         {
>             Source source = new Source();
>             source.Address = address;
>             source.ExpiryPolicy = new Symbol("never");
>             
>             source.Durable = 2;
>             source.Capabilities = new Symbol[]{"topic", "shared", "global"};
>             source.DistributionMode = new Symbol("copy");
>             return source;
>         }
>     }
> }
> {code}
> Here is the cluster config from broker1, broker2 is configured accordingly
> {code:xml}
>       <acceptors>
>          <acceptor 
> name="artemis">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP;useEpoll=true;amqpCredits=1000;amqpLowCredits=300</acceptor>
>          <acceptor 
> name="amqp">tcp://0.0.0.0:5672?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;amqpCredits=1000;amqpMinCredits=300</acceptor>
>       </acceptors>
>       <connectors>
>         <connector name="broker1-connector">tcp://localhost:61616</connector>
>         <connector name="broker2-connector">tcp://localhost:61617</connector>
>       </connectors>
>       <cluster-user>todd</cluster-user>
>       <cluster-password>password</cluster-password>      
>       <cluster-connections>
>         <cluster-connection name="preprod">
>           <connector-ref>broker1-connector</connector-ref>
>           <retry-interval>500</retry-interval>
>           <use-duplicate-detection>true</use-duplicate-detection>
>           <message-load-balancing>ON_DEMAND</message-load-balancing>          
>           <max-hops>1</max-hops>
>           <static-connectors allow-direct-connections-only="true">
>               <connector-ref>broker2-connector</connector-ref>
>           </static-connectors>
>         </cluster-connection>
>       </cluster-connections>
> {code}
> message-load-balancing has no effect on this issue, redistributiuon-delay is 
> set to 0 for the address in question.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to