468120308 opened a new issue #14652:
URL: https://github.com/apache/pulsar/issues/14652


   #### Expected behavior
   
   I configured replication and subscription replication in two clusters (A, 
B). The application connects to cluster A to consume all messages of the 
specified topic, and then restarts the application to connect to cluster B. The 
same topic should not need to be consumed. 
   
   #### Actual behavior
   
   restarts the application to connect to cluster B. The same topic needs to be 
consumed repeatedly. Subscription replication did not take effect. 
   
   #### Steps to reproduce
   
      public static void main(String[] args) throws Exception {
           PulsarClient client = PulsarClient.builder()
                   .serviceUrl("pulsar://**********:6650")
                   .build();
   
           Producer producer = client.newProducer(Schema.BYTES)
                   .topic(topicName)
                   .create();
   
           Consumer consumer = client.newConsumer(Schema.BYTES)
                   .topic(topicName)
                   
.subscriptionType(SubscriptionType.Shared).subscriptionName("test")
                   .replicateSubscriptionState(true)
                   .subscribe();
   
           producer.send("cxctest".getBytes(StandardCharsets.UTF_8));
           producer.send("cxctest2".getBytes(StandardCharsets.UTF_8));
           producer.send("cxctest3".getBytes(StandardCharsets.UTF_8));
   
           ConsumerDemo consumerDemo = new ConsumerDemo(client, consumer);
           consumerDemo.run();
       }
   
       static class ConsumerDemo implements Runnable {
   
           PulsarClient client ;
   
           Consumer consumer;
   
           public ConsumerDemo(PulsarClient client, Consumer consumer) {
               this.client = client;
               this.consumer = consumer;
           }
   
           @Override
           public void run() {
               try {
                   do {
                       Message msg = consumer.receive();
                       consumer.acknowledge(msg.getMessageId());
                       System.out.println(new String(msg.getData()));
                   } while (true);
               } catch (Exception e) {
                   System.out.println(Throwables.getStackTraceAsString(e));
               }
           }
       }
   
   #### System configuration
   **Pulsar version**: 2.7.2
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to