equanz opened a new issue, #23315: URL: https://github.com/apache/pulsar/issues/23315
### Search before asking - [X] I searched in the [issues](https://github.com/apache/pulsar/issues) and found nothing similar. ### Read release policy - [X] I understand that unsupported versions don't get bug fixes. I will attempt to reproduce the issue on a supported version of Pulsar client and Pulsar broker. ### Version Tested with https://github.com/apache/pulsar/tree/4f96146f13b136644a4eb0cf4ec36699e0431929 . ### Minimal reproduce step Apply the following patches and run the test. ```diff diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelectorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelectorTest.java index 48311c5733..685baeef9d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelectorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelectorTest.java @@ -29,11 +29,13 @@ import java.util.Map; import java.util.UUID; import java.util.stream.Collectors; import java.util.stream.IntStream; +import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerAssignException; import org.apache.pulsar.client.api.Range; import org.testng.Assert; import org.testng.annotations.Test; +@Slf4j @Test(groups = "broker") public class ConsistentHashingStickyKeyConsumerSelectorTest { @@ -216,4 +218,35 @@ public class ConsistentHashingStickyKeyConsumerSelectorTest { // then there should be no mapping remaining Assert.assertEquals(selector.getConsumerKeyHashRanges().size(), 0); } + + @Test + public void testModifyMappingBetweenExistingConsumers() { + final ConsistentHashingStickyKeyConsumerSelector selector = new ConsistentHashingStickyKeyConsumerSelector(100); + final String consumerName = "c1"; + final int numOfInitialConsumers = 3; + for (int i = 0; i < numOfInitialConsumers; i++) { + final Consumer consumer = mock(Consumer.class); + when(consumer.consumerName()).thenReturn(consumerName); + when(consumer.consumerId()).thenReturn((long) i); + selector.addConsumer(consumer); + } + + final int keyHash = numOfInitialConsumers + 1; + // get expected consumer + final Consumer expectedConsumer = selector.select(keyHash); + + // add new consumer + final Consumer newConsumer = mock(Consumer.class); + when(newConsumer.consumerName()).thenReturn(consumerName); + when(newConsumer.consumerId()).thenReturn((long) numOfInitialConsumers); + selector.addConsumer(newConsumer); + + final Consumer actualConsumer = selector.select(keyHash); + try { + Assert.assertEquals(actualConsumer.consumerId(), expectedConsumer.consumerId()); + } catch (AssertionError e) { + // if it changes (that is normal behavior), expect it to be the new consumer + Assert.assertEquals(actualConsumer.consumerId(), newConsumer.consumerId()); + } + } } ``` ### What did you expect to see? In auto-split hash mode, we expect that the new consumer takes the hash range from existing consumers. (The dispatcher addresses the above case by recentlyJoinedConsumers.) So, the range doesn't move between existing consumers. ### What did you see instead? ``` [ERROR] Tests run: 1, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 1.289 s <<< FAILURE! - in org.apache.pulsar.broker.service.ConsistentHashingStickyKeyConsumerSelectorTest [ERROR] org.apache.pulsar.broker.service.ConsistentHashingStickyKeyConsumerSelectorTest.testModifyMappingBetweenExistingConsumers Time elapsed: 0.212 s <<< FAILURE! java.lang.AssertionError: expected [3] but found [0] at org.testng.Assert.fail(Assert.java:110) at org.testng.Assert.failNotEquals(Assert.java:1577) at org.testng.Assert.assertEqualsImpl(Assert.java:149) at org.testng.Assert.assertEquals(Assert.java:131) at org.testng.Assert.assertEquals(Assert.java:979) at org.testng.Assert.assertEquals(Assert.java:955) at org.testng.Assert.assertEquals(Assert.java:989) at org.apache.pulsar.broker.service.ConsistentHashingStickyKeyConsumerSelectorTest.testModifyMappingBetweenExistingConsumers(ConsistentHashingStickyKeyConsumerSelectorTest.java:251) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:569) at org.testng.internal.invokers.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:139) at org.testng.internal.invokers.InvokeMethodRunnable.runOne(InvokeMethodRunnable.java:47) at org.testng.internal.invokers.InvokeMethodRunnable.call(InvokeMethodRunnable.java:76) at org.testng.internal.invokers.InvokeMethodRunnable.call(InvokeMethodRunnable.java:11) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) at java.base/java.lang.Thread.run(Thread.java:840) ``` When the hash range collides, the selector stores the consumer in the list of collisions. https://github.com/apache/pulsar/blob/4f96146f13b136644a4eb0cf4ec36699e0431929/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelector.java#L67-L73 And, get the consumer by the following calculation. https://github.com/apache/pulsar/blob/4f96146f13b136644a4eb0cf4ec36699e0431929/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelector.java#L127 `4 % 3 = 1` , then return the consumer which has consumerId 1 (add new consumer which has consumerId 3) `4 % 4 = 0` , then return the consumer which has consumerId 0 The above case leads to out-of-order. Shouldn't we care about this? ### Anything else? For ease, I use the same name as the consumer in this example. However, this issue is caused not only by consumers of the same name but also by coincidence hash collisions. (This issue was originally reported by @hrsakai .) ### Are you willing to submit a PR? - [ ] I'm willing to submit a PR! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org