yonyong opened a new issue, #21725:
URL: https://github.com/apache/pulsar/issues/21725

   ### Search before asking
   
   - [X] I searched in the [issues](https://github.com/apache/pulsar/issues) 
and found nothing similar.
   
   
   ### Version
   
    2.6.0
   
   ### Minimal reproduce step
   
   # Same Code
   1. producer config
   ```Java
   producer = client.newProducer(Schema.BYTES)
                   .topic(PulsarProperty.topic)
                   .enableBatching(true)
                   .sendTimeout(0, TimeUnit.SECONDS)
                   .create();
   ```
   2. producer send Msg
   ```Java
   while (true) {
               producer.flush();
   
               int key = (int) (Math.random() * 10 % 3) + 1;
               String keyMsg = "KEY-" + key;
               String msg = "got message! key:" + keyMsg + "; seq:" + seq;
               producer.newMessage()
                       .key(keyMsg)
                       .orderingKey(keyMsg.getBytes())
                       .value(msg.getBytes())
                       .send();
               seq ++;
               Thread.sleep(1000);
           }
   ```
   3. consumer1 & consumer2 config
   ```Java
   consumer = client.newConsumer()
                   .topic(PulsarProperty.topic)
                   .subscriptionName("keyshared-subscription1")
                   .subscriptionType(SubscriptionType.Key_Shared)
                   .keySharedPolicy(KeySharedPolicy.autoSplitHashRange())
                   .consumerName("keyshared-consumer-1")
                   .subscribe();
   ```
   ```Java
   consumer = client.newConsumer()
                   .topic(PulsarProperty.topic)
                   .subscriptionName("keyshared-subscription1")
                   .subscriptionType(SubscriptionType.Key_Shared)
                   .keySharedPolicy(KeySharedPolicy.autoSplitHashRange())
                   .consumerName("keyshared-consumer-2")
                   .subscribe();
   ```
   5. consumer receive Msg
   ```Java
   while (true) {
               Message msg = consumer.receive();
               try {
                   // Do something with the message
                   
System.out.println("---------------------------------------------------------------------------");
                   System.out.println("Message received: " + new 
String(msg.getData()));
                   System.out.println(msg.getKey());
                   System.out.println(String.format("Key:%s,order key:%s", 
msg.getKey(),msg.hasOrderingKey()));
                   
System.out.println("---------------------------------------------------------------------------");
   
                   // Acknowledge the message so that it can be deleted by the 
message broker
                   consumer.acknowledge(msg);
               } catch (Exception e) {
                   // Message failed to process, redeliver later
                   consumer.negativeAcknowledge(msg);
               }
           }
   ```
   # Use Version 2.6.0
   ```xml
           <dependency>
               <groupId>org.apache.pulsar</groupId>
               <artifactId>pulsar-client</artifactId>
               <version>2.6.0</version>
           </dependency>
   ```
   
   # Use Version 2.7.1
   ```xml
           <dependency>
               <groupId>org.apache.pulsar</groupId>
               <artifactId>pulsar-client</artifactId>
               <version>2.7.1</version>
           </dependency>
   ```
   
   ### What did you expect to see?
   
   Two consumers as in the key_shared pattern the same key is consumed by the 
corresponding consumer.
   
   ### What did you see instead?
   
   # Result:
   When using version 2.6.0, when two consumers are enabled, all messages will 
only be consumed by one consumer (A), and the another consumer (B) will never 
receive any message unless consumer A is disconnected, which seems to be a 
fail-over mode but not Key_Shared. When we upgraded to 2.7.1, only the same key 
is consumed by the corresponding consumer as in the key_shared mode, as 
expected.
   
   # Analysis:
   ```json
   [
       {
           "msgRateOut": 1.5664867649048742,
           "msgThroughputOut": 242.91165105279143,
           "bytesOutCounter": 11361,
           "msgOutCounter": 73,
           "msgRateRedeliver": 0,
           "chunkedMessageRate": 0,
           "consumerName": "keyshared-consumer-2",
           "availablePermits": 927,
           "unackedMessages": 0,
           "avgMessagesPerEntry": 6,
           "blockedConsumerOnUnackedMsgs": false,
           "readPositionWhenJoining": "404032:3949",
           "lastAckedTimestamp": 1702548043440,
           "lastConsumedTimestamp": 1702548043339,
           "keyHashRanges": [
               "[32769, 65536]"
           ],
           "metadata": {},
           "address": "/192.168.75.44:60187",
           "clientVersion": "2.6.0",
           "connectedSince": "2023-12-14T17:59:51.518+08:00"
       },
       {
           "msgRateOut": 0,
           "msgThroughputOut": 0,
           "bytesOutCounter": 0,
           "msgOutCounter": 0,
           "msgRateRedeliver": 0,
           "chunkedMessageRate": 0,
           "consumerName": "keyshared-consumer-1",
           "availablePermits": 1000,
           "unackedMessages": 0,
           "avgMessagesPerEntry": 1000,
           "blockedConsumerOnUnackedMsgs": false,
           "readPositionWhenJoining": "404032:3977",
           "lastAckedTimestamp": 0,
           "lastConsumedTimestamp": 0,
           "keyHashRanges": [
               "[0, 32768]"
           ],
           "metadata": {},
           "address": "/192.168.75.44:60195",
           "clientVersion": "2.6.0",
           "connectedSince": "2023-12-14T17:59:57.268+08:00"
       }
   ]
   ```
   broker shows that both versions of hashRange are same, so we can conclude 
that there is a difference in how the producer logic is handled between 2.6.0 
and 2.7.1 that is causing the problem.
   
   # Question
   If my judgement is correct, please deal with this issue, why 2.6.0 can't be 
installed to work in normal key_shared mode?
   
   
   ### Anything else?
   
   2.6.0 console log
   
![image](https://github.com/apache/pulsar/assets/34122685/8eceb0f0-7c1b-4a40-a868-ae241c7ac9b5)
   
   2.7.1 console log
   
![image](https://github.com/apache/pulsar/assets/34122685/ba1f36c4-d228-46ef-87fe-17b96f662640)
   
   
   ### Are you willing to submit a PR?
   
   - [X] I'm willing to submit a PR!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to