navid1981 opened a new issue #12601:
URL: https://github.com/apache/pulsar/issues/12601


   **Describe the bug**
   In Geo Replica scenario, I have two clusters. My producer send message to 
cluster A, and I can see message in both clusters as I expected. My consumer 
consume message from Cluster B with "Shared" Consume type. but if another 
consumer connect to Cluster A with the same subscription name and "Shared" 
type, it can also consume message, however I expected it cannot consume the 
same message because Cluster B consumer received it. Is it normal behavior? 
   
   **To Reproduce**
   `package com.pulsar.georeplica;
   
   import org.apache.pulsar.client.api.*;
   import org.apache.pulsar.client.impl.auth.AuthenticationBasic;
   
   import java.util.Arrays;
   import java.util.HashMap;
   import java.util.List;
   import java.util.Map;
   
   public class GeoReplica {
       static PulsarClient client;
       static PulsarClient client2;
       static Map<String, String> map;
       static {
           try {
               map=new HashMap<>();
               map.put("userId","user");
               map.put("password","pass");
               AuthenticationBasic auth=new AuthenticationBasic();
               auth.configure(map);
               client = PulsarClient.builder()
                       .serviceUrl("ClusterB")
                       .authentication(auth)
                       .build();
               client2 = PulsarClient.builder()
                       .serviceUrl("ClusterA")
                       .authentication(auth)
                       .build();
           } catch (PulsarClientException e) {
               e.printStackTrace();
           }
       }
   
       public static void main(String[] msg) throws PulsarClientException {
           GeoReplica mp=new GeoReplica();
           mp.produce("ali");
           mp.singleMessageConsume();
   
           client.close();
       }
   
       public void produce(String msg) throws PulsarClientException {
           List<String> restrictReplicationTo = Arrays.asList(
                   "pulsar-cluster-1",
                   "pulsar-cluster-2"
           );
           Producer<byte[]> producer = client2.newProducer()
                   .topic("persistent://poc-tenant/poc-namespace/navid")
                   .producerName("FirstProducer")
                   .messageRoutingMode(MessageRoutingMode.SinglePartition)
                   .create();
   
   // You can then send messages to the broker and topic you specified:
           producer.newMessage()
                   .value(msg.getBytes())
                   .send();
           producer.close();
       }
   
       public void singleMessageConsume() throws PulsarClientException {
           Consumer consumer = client.newConsumer()
                   .topic("persistent://poc-tenant/poc-namespace/navid")
                   .subscriptionType(SubscriptionType.Shared)
                   .subscriptionName("my-subscription1")
                   .subscribe();
           // Wait for a message like listener
           while(true) {
           Message msg = consumer.receive();
           String result = "";
           try {
               // Do something with the message
               System.out.println("Message received: " + new 
String(msg.getData()));
               // Acknowledge the message so that it can be deleted by the 
message broker
               consumer.acknowledge(msg);
           } catch (Exception e) {
               // Message failed to process, redeliver later
               consumer.negativeAcknowledge(msg);
           }
           result = new String(msg.getData());
           }
   
       }
   }
   `
   
   **Expected behavior**
   If one consumer from Cluster B receive message, the other one from Cluster A 
shouldn't receive the same message.
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to