yonyong opened a new issue, #21725: URL: https://github.com/apache/pulsar/issues/21725
### Search before asking - [X] I searched in the [issues](https://github.com/apache/pulsar/issues) and found nothing similar. ### Version 2.6.0 ### Minimal reproduce step # Same Code 1. producer config ```Java producer = client.newProducer(Schema.BYTES) .topic(PulsarProperty.topic) .enableBatching(true) .sendTimeout(0, TimeUnit.SECONDS) .create(); ``` 2. producer send Msg ```Java while (true) { producer.flush(); int key = (int) (Math.random() * 10 % 3) + 1; String keyMsg = "KEY-" + key; String msg = "got message! key:" + keyMsg + "; seq:" + seq; producer.newMessage() .key(keyMsg) .orderingKey(keyMsg.getBytes()) .value(msg.getBytes()) .send(); seq ++; Thread.sleep(1000); } ``` 3. consumer1 & consumer2 config ```Java consumer = client.newConsumer() .topic(PulsarProperty.topic) .subscriptionName("keyshared-subscription1") .subscriptionType(SubscriptionType.Key_Shared) .keySharedPolicy(KeySharedPolicy.autoSplitHashRange()) .consumerName("keyshared-consumer-1") .subscribe(); ``` ```Java consumer = client.newConsumer() .topic(PulsarProperty.topic) .subscriptionName("keyshared-subscription1") .subscriptionType(SubscriptionType.Key_Shared) .keySharedPolicy(KeySharedPolicy.autoSplitHashRange()) .consumerName("keyshared-consumer-2") .subscribe(); ``` 5. consumer receive Msg ```Java while (true) { Message msg = consumer.receive(); try { // Do something with the message System.out.println("---------------------------------------------------------------------------"); System.out.println("Message received: " + new String(msg.getData())); System.out.println(msg.getKey()); System.out.println(String.format("Key:%s,order key:%s", msg.getKey(),msg.hasOrderingKey())); System.out.println("---------------------------------------------------------------------------"); // Acknowledge the message so that it can be deleted by the message broker consumer.acknowledge(msg); } catch (Exception e) { // Message failed to process, redeliver later consumer.negativeAcknowledge(msg); } } ``` # Use Version 2.6.0 ```xml <dependency> <groupId>org.apache.pulsar</groupId> <artifactId>pulsar-client</artifactId> <version>2.6.0</version> </dependency> ``` # Use Version 2.7.1 ```xml <dependency> <groupId>org.apache.pulsar</groupId> <artifactId>pulsar-client</artifactId> <version>2.7.1</version> </dependency> ``` ### What did you expect to see? Two consumers as in the key_shared pattern the same key is consumed by the corresponding consumer. ### What did you see instead? # Result: When using version 2.6.0, when two consumers are enabled, all messages will only be consumed by one consumer (A), and the another consumer (B) will never receive any message unless consumer A is disconnected, which seems to be a fail-over mode but not Key_Shared. When we upgraded to 2.7.1, only the same key is consumed by the corresponding consumer as in the key_shared mode, as expected. # Analysis: ```json [ { "msgRateOut": 1.5664867649048742, "msgThroughputOut": 242.91165105279143, "bytesOutCounter": 11361, "msgOutCounter": 73, "msgRateRedeliver": 0, "chunkedMessageRate": 0, "consumerName": "keyshared-consumer-2", "availablePermits": 927, "unackedMessages": 0, "avgMessagesPerEntry": 6, "blockedConsumerOnUnackedMsgs": false, "readPositionWhenJoining": "404032:3949", "lastAckedTimestamp": 1702548043440, "lastConsumedTimestamp": 1702548043339, "keyHashRanges": [ "[32769, 65536]" ], "metadata": {}, "address": "/192.168.75.44:60187", "clientVersion": "2.6.0", "connectedSince": "2023-12-14T17:59:51.518+08:00" }, { "msgRateOut": 0, "msgThroughputOut": 0, "bytesOutCounter": 0, "msgOutCounter": 0, "msgRateRedeliver": 0, "chunkedMessageRate": 0, "consumerName": "keyshared-consumer-1", "availablePermits": 1000, "unackedMessages": 0, "avgMessagesPerEntry": 1000, "blockedConsumerOnUnackedMsgs": false, "readPositionWhenJoining": "404032:3977", "lastAckedTimestamp": 0, "lastConsumedTimestamp": 0, "keyHashRanges": [ "[0, 32768]" ], "metadata": {}, "address": "/192.168.75.44:60195", "clientVersion": "2.6.0", "connectedSince": "2023-12-14T17:59:57.268+08:00" } ] ``` broker shows that both versions of hashRange are same, so we can conclude that there is a difference in how the producer logic is handled between 2.6.0 and 2.7.1 that is causing the problem. # Question If my judgement is correct, please deal with this issue, why 2.6.0 can't be installed to work in normal key_shared mode? ### Anything else? 2.6.0 console log  2.7.1 console log  ### Are you willing to submit a PR? - [X] I'm willing to submit a PR! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org