[ 
https://issues.apache.org/jira/browse/ARTEMIS-1556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16289973#comment-16289973
 ] 

Todd Baert commented on ARTEMIS-1556:
-------------------------------------

I see. I didn't think it was necessary to create the subscription on all 
brokers. My misunderstanding. I've modified my code to match your sample, and 
it seems to behave as you describe. Thanks for your patience. :)

{code:java}
using System;
using System.Collections.Generic;
using System.Threading;
using System.Transactions;
using Amqp;
using Amqp.Framing;
using Amqp.Sasl;
using Amqp.Types;

namespace amqp_client_demo
{
    class Program
    {
        static void Main(string[] args)
        {
            string url1 = "amqp://localhost:5672";
            string url2 = "amqp://localhost:5673";
            String ADDRESS = "orders";

            Connection connection1 = new Connection(new Address(url1));
            Session session1 = new Session(connection1);
            ReceiverLink receiver1 = new ReceiverLink(session1, "test|1", new 
Source(){ExpiryPolicy = new Symbol("never"), DistributionMode = new 
Symbol("move"), Durable = 2, Capabilities = new Symbol[]{"topic", "shared", 
"global"}, Address = ADDRESS}, null);
          
            Connection connection2 = new Connection(new Address(url2));
            Session session2 = new Session(connection2);
            ReceiverLink receiver2 = new ReceiverLink(session2, "test|2", new 
Source(){ExpiryPolicy = new Symbol("never"), DistributionMode = new 
Symbol("move"), Durable = 2, Capabilities = new Symbol[]{"topic", "shared", 
"global"}, Address = ADDRESS}, null);            
            
            MessageCallback callback = (r, m) => {
               r.Accept(m);
               Console.WriteLine("Subscriber " + r.Name + " received: " + 
m.Body);
            };

            receiver2.Start(300, callback);
            receiver1.Start(300, callback);

            SenderLink sender = new SenderLink(session1, "sender", ADDRESS);
            for (var i=0; i < 5; i++)
            {
                sender.Send(new Message(DateTime.Now));
                Thread.Sleep(1000);
            }
        }
    }
}
{code}

//Subscriber test|1 received: 12/13/17 9:43:35 PM
//Subscriber test|2 received: 12/13/17 9:43:36 PM
//Subscriber test|1 received: 12/13/17 9:43:37 PM
//Subscriber test|2 received: 12/13/17 9:43:38 PM
//Subscriber test|1 received: 12/13/17 9:43:39 PM


> Cannot Recover AMQP Subscription Existing On Remote Cluster Member 
> -------------------------------------------------------------------
>
>                 Key: ARTEMIS-1556
>                 URL: https://issues.apache.org/jira/browse/ARTEMIS-1556
>             Project: ActiveMQ Artemis
>          Issue Type: Bug
>          Components: Broker
>    Affects Versions: 2.4.0
>         Environment: Latest broker snapshot running on fedora 26. AMQP .Net 
> Lite client. 
>            Reporter: Todd Baert
>
> Assuming broker1 and broker2 exist in a cluster, if an AMQP Source 
> constituting a shared durable subscription (having capabilities "shared", 
> "global" and "topic") is created on broker1, and attempted to be recovered 
> from a client connected to broker2 (by creating an attach frame with a null 
> source, with desired capabilities  "shared", "global" and "topic"), an 
> Amqp.AmqpException is thrown.
> In a cluster, shouldn't it be possible to recover subscriptions that exist on 
> another broker? Shouldn't the search for this subscription occur on all 
> cluster members?
> I don't believe this is happening. It looks like we just search locally (not 
> sure about this) - but certainly the exception is thrown and the sub is not 
> recovered:
> See code below from 
> org.apache.activemq.artemis.protocol.amqp.proton.ProtonServerSenderContext 
> (comments are not my own, they are from source):
> {code:java}
>       if (source == null) {
>          // Attempt to recover a previous subscription happens when a link 
> reattach happens on a
>          // subscription queue
>          String clientId = getClientId();
>          String pubId = sender.getName();
>          global = hasRemoteDesiredCapability(sender, GLOBAL);
>          queue = createQueueName(connection.isUseCoreSubscriptionNaming(), 
> clientId, pubId, true, global, false);
>          QueueQueryResult result = sessionSPI.queueQuery(queue, 
> RoutingType.MULTICAST, false);
>          multicast = true;
>          routingTypeToUse = RoutingType.MULTICAST;
>          // Once confirmed that the address exists we need to return a Source 
> that reflects
>          // the lifetime policy and capabilities of the new subscription.
>          if (result.isExists()) {
> {code}
> I have a C# sample that demonstrates the issue:
> {code:java}
> using System;
> using System.Collections.Generic;
> using System.Threading;
> using System.Transactions;
> using Amqp;
> using Amqp.Framing;
> using Amqp.Sasl;
> using Amqp.Types;
> namespace amqp_client_demo
> {
>     class Program
>     {
>         static void Main(string[] args)
>         {
>             string url1 = "amqp://localhost:5672";
>             string url2 = "amqp://localhost:5673";
>             String ADDRESS = "orders";
>             Connection connection1 = new Connection(new Address(url1));
>             Session session1 = new Session(connection1);
>             ReceiverLink receiver1 = new ReceiverLink(session1, "test", new 
> Source(){Durable = 2, Capabilities = new Symbol[]{"topic", "shared", 
> "global"}, Address = ADDRESS}, null);
>           
>             SenderLink sender = new SenderLink(session1, "sender", ADDRESS);
>             sender.Send(new Message("test message"));
>             Connection connection2 = new Connection(new Address(url2));
>             Session session2 = new Session(connection2);
>             ReceiverLink receiver2 = new ReceiverLink(session2, "test", new 
> Attach(){Source = null, DesiredCapabilities = new Symbol[]{"topic", "shared", 
> "global"}}, null);
>             
>             
> Console.WriteLine(receiver2.Receive(TimeSpan.FromSeconds(1)).Body); 
> //Unhandled Exception: Amqp.AmqpException: Unknown subscription link: test
>         }
>     }
> }
> {code}
> If url1 and url2 are modified to point to the same cluster members, this 
> works fine. Otherwise the exception (see comment) is thrown.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to