This is an automated email from the ASF dual-hosted git repository. baodi pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new e73532a8ae9 [fix][broker] Ensure KeyShared sticky mode consumer respects assigned ranges (#24730) e73532a8ae9 is described below commit e73532a8ae9f443f5942181f9c70bc796a380160 Author: Baodi Shi <ba...@apache.org> AuthorDate: Mon Sep 15 15:19:54 2025 +0800 [fix][broker] Ensure KeyShared sticky mode consumer respects assigned ranges (#24730) --- ...ashRangeExclusiveStickyKeyConsumerSelector.java | 112 ++++++++++++--------- ...angeExclusiveStickyKeyConsumerSelectorTest.java | 106 +++++++++++++++++-- .../client/api/KeySharedSubscriptionTest.java | 66 ++++++++++++ 3 files changed, 228 insertions(+), 56 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeExclusiveStickyKeyConsumerSelector.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeExclusiveStickyKeyConsumerSelector.java index 904fb702a94..7fb99838197 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeExclusiveStickyKeyConsumerSelector.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeExclusiveStickyKeyConsumerSelector.java @@ -20,14 +20,15 @@ package org.apache.pulsar.broker.service; import java.util.ArrayList; import java.util.Collections; +import java.util.Comparator; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentSkipListMap; +import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.client.api.Range; import org.apache.pulsar.common.api.proto.IntRange; -import org.apache.pulsar.common.api.proto.KeySharedMeta; import org.apache.pulsar.common.util.FutureUtil; /** @@ -38,7 +39,7 @@ import org.apache.pulsar.common.util.FutureUtil; public class HashRangeExclusiveStickyKeyConsumerSelector implements StickyKeyConsumerSelector { private final int rangeSize; private final Range keyHashRange; - private final ConcurrentSkipListMap<Integer, Consumer> rangeMap; + private final ConcurrentSkipListMap<Integer, Pair<Range, Consumer>> rangeMap; public HashRangeExclusiveStickyKeyConsumerSelector() { this(DEFAULT_RANGE_SIZE); @@ -73,47 +74,41 @@ public class HashRangeExclusiveStickyKeyConsumerSelector implements StickyKeyCon + conflictingConsumer); } for (IntRange intRange : consumer.getKeySharedMeta().getHashRangesList()) { - rangeMap.put(intRange.getStart(), consumer); - rangeMap.put(intRange.getEnd(), consumer); + rangeMap.put(intRange.getStart(), Pair.of(Range.of(intRange.getStart(), intRange.getEnd()), consumer)); } return Optional.empty(); } @Override public synchronized Optional<ImpactedConsumersResult> removeConsumer(Consumer consumer) { - rangeMap.entrySet().removeIf(entry -> entry.getValue().equals(consumer)); + rangeMap.entrySet().removeIf(entry -> entry.getValue().getRight().equals(consumer)); return Optional.empty(); } @Override public synchronized ConsumerHashAssignmentsSnapshot getConsumerHashAssignmentsSnapshot() { List<HashRangeAssignment> result = new ArrayList<>(); - Map.Entry<Integer, Consumer> prev = null; - for (Map.Entry<Integer, Consumer> entry: rangeMap.entrySet()) { - if (prev == null) { - prev = entry; - } else { - if (prev.getValue().equals(entry.getValue())) { - result.add(new HashRangeAssignment(Range.of(prev.getKey(), entry.getKey()), entry.getValue())); - } - prev = null; - } + for (Map.Entry<Integer, Pair<Range, Consumer>> entry : rangeMap.entrySet()) { + Range assignedRange = entry.getValue().getLeft(); + Consumer assignedConsumer = entry.getValue().getRight(); + result.add(new HashRangeAssignment(assignedRange, assignedConsumer)); } return ConsumerHashAssignmentsSnapshot.of(result); } @Override public Consumer select(int hash) { - if (rangeMap.size() > 0) { - Map.Entry<Integer, Consumer> ceilingEntry = rangeMap.ceilingEntry(hash); - Map.Entry<Integer, Consumer> floorEntry = rangeMap.floorEntry(hash); - Consumer ceilingConsumer = ceilingEntry != null ? ceilingEntry.getValue() : null; - Consumer floorConsumer = floorEntry != null ? floorEntry.getValue() : null; - if (floorConsumer != null && floorConsumer.equals(ceilingConsumer)) { - return ceilingConsumer; - } else { - return null; - } + if (rangeMap.isEmpty()) { + return null; + } + + Map.Entry<Integer, Pair<Range, Consumer>> floorEntry = rangeMap.floorEntry(hash); + if (floorEntry == null) { + return null; + } + Pair<Range, Consumer> pair = floorEntry.getValue(); + if (pair.getLeft().contains(hash)) { + return pair.getRight(); } else { return null; } @@ -129,10 +124,25 @@ public class HashRangeExclusiveStickyKeyConsumerSelector implements StickyKeyCon return FutureUtil.failedFuture(new BrokerServiceException.ConsumerAssignException( "Ranges for KeyShared policy must not be empty.")); } - for (IntRange intRange : ranges) { - if (intRange.getStart() > intRange.getEnd()) { + List<IntRange> sortedRanges = new ArrayList<>(ranges); + sortedRanges.sort(Comparator.comparingInt(IntRange::getStart)); + for (int i = 0; i < sortedRanges.size(); i++) { + IntRange currentRange = sortedRanges.get(i); + // 1. Validate: check if start > end for the current range + if (currentRange.getStart() > currentRange.getEnd()) { return FutureUtil.failedFuture( - new BrokerServiceException.ConsumerAssignException("Fixed hash range start > end")); + new BrokerServiceException.ConsumerAssignException("Fixed hash range start > end for range: " + + "[" + currentRange.getStart() + "," + currentRange.getEnd() + "]")); + } + // 2. Validate: check for overlaps with the next range in the sorted list + if (i < sortedRanges.size() - 1) { + IntRange nextRange = sortedRanges.get(i + 1); + if (areRangesOverlapping(currentRange, nextRange)) { + return FutureUtil.failedFuture( + new BrokerServiceException.ConsumerAssignException("Consumer's own ranges conflict: " + + "[" + currentRange.getStart() + "," + currentRange.getEnd() + "] " + + "overlaps with [" + nextRange.getStart() + "," + nextRange.getEnd() + "]")); + } } } Consumer conflictingConsumer = findConflictingConsumer(ranges); @@ -143,34 +153,38 @@ public class HashRangeExclusiveStickyKeyConsumerSelector implements StickyKeyCon } } - private synchronized Consumer findConflictingConsumer(List<IntRange> ranges) { - for (IntRange intRange : ranges) { - Map.Entry<Integer, Consumer> ceilingEntry = rangeMap.ceilingEntry(intRange.getStart()); - Map.Entry<Integer, Consumer> floorEntry = rangeMap.floorEntry(intRange.getEnd()); - - if (floorEntry != null && floorEntry.getKey() >= intRange.getStart()) { - return floorEntry.getValue(); - } - - if (ceilingEntry != null && ceilingEntry.getKey() <= intRange.getEnd()) { - return ceilingEntry.getValue(); + private synchronized Consumer findConflictingConsumer(List<IntRange> newConsumerRanges) { + for (IntRange newRange : newConsumerRanges) { + // 1. Check for potential conflicts with existing ranges that start before newRange's start. + Map.Entry<Integer, Pair<Range, Consumer>> conflictBeforeStart = rangeMap.floorEntry(newRange.getStart()); + if (conflictBeforeStart != null) { + Range existingRange = conflictBeforeStart.getValue().getLeft(); + if (areRangesOverlapping(newRange, existingRange)) { + return conflictBeforeStart.getValue().getRight(); + } } - - if (ceilingEntry != null && floorEntry != null && ceilingEntry.getValue().equals(floorEntry.getValue())) { - KeySharedMeta keySharedMeta = ceilingEntry.getValue().getKeySharedMeta(); - for (IntRange range : keySharedMeta.getHashRangesList()) { - int start = Math.max(intRange.getStart(), range.getStart()); - int end = Math.min(intRange.getEnd(), range.getEnd()); - if (end >= start) { - return ceilingEntry.getValue(); - } + // 2. Check for potential conflicts with existing ranges that start after newRange's start. + Map.Entry<Integer, Pair<Range, Consumer>> conflictAfterStart = rangeMap.ceilingEntry(newRange.getStart()); + if (conflictAfterStart != null) { + Range existingRange = conflictAfterStart.getValue().getLeft(); + if (areRangesOverlapping(newRange, existingRange)) { + return conflictAfterStart.getValue().getRight(); } } } return null; } - Map<Integer, Consumer> getRangeConsumer() { + + private static boolean areRangesOverlapping(IntRange range1, Range range2) { + return Math.max(range1.getStart(), range2.getStart()) <= Math.min(range1.getEnd(), range2.getEnd()); + } + + private static boolean areRangesOverlapping(IntRange range1, IntRange range2) { + return Math.max(range1.getStart(), range2.getStart()) <= Math.min(range1.getEnd(), range2.getEnd()); + } + + Map<Integer, Pair<Range, Consumer>> getRangeConsumer() { return Collections.unmodifiableMap(rangeMap); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/HashRangeExclusiveStickyKeyConsumerSelectorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/HashRangeExclusiveStickyKeyConsumerSelectorTest.java index f3828981c8e..7dd3bde4489 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/HashRangeExclusiveStickyKeyConsumerSelectorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/HashRangeExclusiveStickyKeyConsumerSelectorTest.java @@ -50,7 +50,7 @@ public class HashRangeExclusiveStickyKeyConsumerSelectorTest { when(consumer1.getKeySharedMeta()).thenReturn(keySharedMeta1); Assert.assertEquals(consumer1.getKeySharedMeta(), keySharedMeta1); selector.addConsumer(consumer1).get(); - Assert.assertEquals(selector.getRangeConsumer().size(), 2); + Assert.assertEquals(selector.getRangeConsumer().size(), 1); Consumer selectedConsumer; for (int i = 0; i < 3; i++) { selectedConsumer = selector.select(i); @@ -66,7 +66,7 @@ public class HashRangeExclusiveStickyKeyConsumerSelectorTest { when(consumer2.getKeySharedMeta()).thenReturn(keySharedMeta2); Assert.assertEquals(consumer2.getKeySharedMeta(), keySharedMeta2); selector.addConsumer(consumer2).get(); - Assert.assertEquals(selector.getRangeConsumer().size(), 4); + Assert.assertEquals(selector.getRangeConsumer().size(), 2); for (int i = 3; i < 10; i++) { selectedConsumer = selector.select(i); @@ -79,7 +79,7 @@ public class HashRangeExclusiveStickyKeyConsumerSelectorTest { } selector.removeConsumer(consumer1); - Assert.assertEquals(selector.getRangeConsumer().size(), 2); + Assert.assertEquals(selector.getRangeConsumer().size(), 1); selectedConsumer = selector.select(1); Assert.assertNull(selectedConsumer); @@ -89,6 +89,38 @@ public class HashRangeExclusiveStickyKeyConsumerSelectorTest { Assert.assertNull(selectedConsumer); } + + @Test + public void testConsumerSelectWithMultipleRanges() throws ExecutionException, InterruptedException { + + HashRangeExclusiveStickyKeyConsumerSelector selector = new HashRangeExclusiveStickyKeyConsumerSelector(20); + Consumer consumer1 = mock(Consumer.class); + KeySharedMeta keySharedMeta1 = new KeySharedMeta() + .setKeySharedMode(KeySharedMode.STICKY); + keySharedMeta1.addHashRange().setStart(2).setEnd(6); + keySharedMeta1.addHashRange().setStart(10).setEnd(15); + when(consumer1.getKeySharedMeta()).thenReturn(keySharedMeta1); + Assert.assertEquals(consumer1.getKeySharedMeta(), keySharedMeta1); + selector.addConsumer(consumer1).get(); + Assert.assertEquals(selector.getRangeConsumer().size(), 2); + Consumer selectedConsumer; + for (int i = 2; i <= 6; i++) { + selectedConsumer = selector.select(i); + Assert.assertEquals(selectedConsumer, consumer1); + } + for (int i = 10; i <= 15; i++) { + selectedConsumer = selector.select(i); + Assert.assertEquals(selectedConsumer, consumer1); + } + selectedConsumer = selector.select(1); + Assert.assertNull(selectedConsumer); + selectedConsumer = selector.select(9); + Assert.assertNull(selectedConsumer); + selectedConsumer = selector.select(18); + Assert.assertNull(selectedConsumer); + + } + @Test public void testEmptyRanges() { HashRangeExclusiveStickyKeyConsumerSelector selector = new HashRangeExclusiveStickyKeyConsumerSelector(10); @@ -189,6 +221,66 @@ public class HashRangeExclusiveStickyKeyConsumerSelectorTest { } } + @Test + public void testShouldConflictConsumerWithSelfOverlappingRanges() throws ExecutionException, InterruptedException { + HashRangeExclusiveStickyKeyConsumerSelector selector = new HashRangeExclusiveStickyKeyConsumerSelector(10); + + final List<IntRange> testRanges = new ArrayList<>(); + testRanges.add(new IntRange().setStart(1).setEnd(3)); + testRanges.add(new IntRange().setStart(2).setEnd(2)); + testRanges.add(new IntRange().setStart(5).setEnd(5)); + testRanges.add(new IntRange().setStart(4).setEnd(6)); + KeySharedMeta keySharedMeta = new KeySharedMeta() + .setKeySharedMode(KeySharedMode.STICKY); + keySharedMeta.addAllHashRanges(testRanges); + Consumer consumer = mock(Consumer.class); + when(consumer.getKeySharedMeta()).thenReturn(keySharedMeta); + Assert.assertEquals(consumer.getKeySharedMeta(), keySharedMeta); + + try { + selector.addConsumer(consumer).get(); + Assert.fail("should be failed"); + } catch (ExecutionException | InterruptedException e) { + // ignore + } + + Assert.assertEquals(selector.getRangeConsumer().size(), 0); + } + + @Test + public void testShouldConflictConsumerWithBoundaryRanges() throws ExecutionException, InterruptedException { + HashRangeExclusiveStickyKeyConsumerSelector selector = new HashRangeExclusiveStickyKeyConsumerSelector(10); + TransportCnx transportCnx = mock(TransportCnx.class); + + // 1. add consumer 1 with range [2, 5] + KeySharedMeta keySharedMeta1 = new KeySharedMeta() + .setKeySharedMode(KeySharedMode.STICKY); + keySharedMeta1.addAllHashRanges(List.of(new IntRange().setStart(2).setEnd(5))); + Consumer consumer1 = mock(Consumer.class); + when(consumer1.getKeySharedMeta()).thenReturn(keySharedMeta1); + when(consumer1.cnx()).thenReturn(transportCnx); + when(transportCnx.checkConnectionLiveness()).thenReturn(CompletableFuture.completedFuture(null)); + Assert.assertEquals(consumer1.getKeySharedMeta(), keySharedMeta1); + selector.addConsumer(consumer1).get(); + Assert.assertEquals(selector.getRangeConsumer().size(), 1); + + // 2. add consumer 2 with range [5, 10], should be conflict with consumer 1 + KeySharedMeta keySharedMeta2 = new KeySharedMeta() + .setKeySharedMode(KeySharedMode.STICKY); + keySharedMeta2.addAllHashRanges(List.of(new IntRange().setStart(5).setEnd(10))); + Consumer consumer2 = mock(Consumer.class); + when(consumer2.cnx()).thenReturn(transportCnx); + when(transportCnx.checkConnectionLiveness()).thenReturn(CompletableFuture.completedFuture(null)); + when(consumer2.getKeySharedMeta()).thenReturn(keySharedMeta2); + try { + selector.addConsumer(consumer2).get(); + Assert.fail("should be failed"); + } catch (ExecutionException | InterruptedException e) { + // ignore + } + Assert.assertEquals(selector.getRangeConsumer().size(), 1); + } + @Test public void testSingleRangeConflict() throws ExecutionException, InterruptedException { HashRangeExclusiveStickyKeyConsumerSelector selector = new HashRangeExclusiveStickyKeyConsumerSelector(10); @@ -202,7 +294,7 @@ public class HashRangeExclusiveStickyKeyConsumerSelectorTest { when(consumer1.getKeySharedMeta()).thenReturn(keySharedMeta1); Assert.assertEquals(consumer1.getKeySharedMeta(), keySharedMeta1); selector.addConsumer(consumer1).get(); - Assert.assertEquals(selector.getRangeConsumer().size(), 2); + Assert.assertEquals(selector.getRangeConsumer().size(), 1); final List<IntRange> testRanges = new ArrayList<>(); testRanges.add(new IntRange().setStart(4).setEnd(6)); @@ -228,7 +320,7 @@ public class HashRangeExclusiveStickyKeyConsumerSelectorTest { } catch (ExecutionException | InterruptedException e) { // ignore } - Assert.assertEquals(selector.getRangeConsumer().size(), 2); + Assert.assertEquals(selector.getRangeConsumer().size(), 1); } } @@ -245,7 +337,7 @@ public class HashRangeExclusiveStickyKeyConsumerSelectorTest { when(consumer1.getKeySharedMeta()).thenReturn(keySharedMeta1); Assert.assertEquals(consumer1.getKeySharedMeta(), keySharedMeta1); selector.addConsumer(consumer1).get(); - Assert.assertEquals(selector.getRangeConsumer().size(), 2); + Assert.assertEquals(selector.getRangeConsumer().size(), 1); final List<List<IntRange>> testRanges = new ArrayList<>(); testRanges.add(List.of( @@ -271,7 +363,7 @@ public class HashRangeExclusiveStickyKeyConsumerSelectorTest { } catch (ExecutionException | InterruptedException e) { // ignore } - Assert.assertEquals(selector.getRangeConsumer().size(), 2); + Assert.assertEquals(selector.getRangeConsumer().size(), 1); } } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java index 80b5070077d..7f7de33b43a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java @@ -2668,4 +2668,70 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase { logTopicStats(topic); } } + + @Test + public void testCustomStickyRange() throws Exception { + int messageCount = 100; + final String topicName = "persistent://public/default/test-sticky-range-" + System.nanoTime(); + final String subscriptionName = "sub-sticky-range"; + + // 0. Init topic and subscription + admin.topics().createPartitionedTopic(topicName, 4); + admin.topics().createSubscription(topicName, subscriptionName, MessageId.earliest); + + // 1. Create a producer and send messages + try (Producer<byte[]> producer = pulsarClient.newProducer() + .topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.RoundRobinPartition) + .create()) { + for (int i = 0; i < messageCount; i++) { + String key = String.valueOf(i); + producer.newMessage() + .value(String.valueOf(i).getBytes()) + .key(key) + .send(); + } + } + + // 3. One by one create consumers consume message with different sticky hash range + KeySharedPolicy.KeySharedPolicySticky policy1 = KeySharedPolicy.stickyHashRange() + .ranges(Range.of(0, 9999), Range.of(20000, 29999), Range.of(40000, 49999)); + KeySharedPolicy.KeySharedPolicySticky policy2 = KeySharedPolicy.stickyHashRange() + .ranges(Range.of(10000, 19999), Range.of(30000, 39999), Range.of(50000, 65535)); + + List<KeySharedPolicy.KeySharedPolicySticky> policies = Arrays.asList(policy1, policy2); + int[] receivedCounts = new int[2]; + for (int i = 0; i < policies.size(); i++) { + Consumer<byte[]> consumer = pulsarClient.newConsumer() + .topic(topicName) + .subscriptionName(subscriptionName) + .subscriptionType(SubscriptionType.Key_Shared) + .keySharedPolicy(policies.get(i)) + .subscribe(); + while (true) { + try { + Message<byte[]> msg = consumer.receive(2, TimeUnit.SECONDS); + if (msg == null) { + break; + } + receivedCounts[i]++; + log.debug("Consumer #{} received message with key:{} total:{}", + i + 1, msg.getKey(), receivedCounts[i]); + } catch (Exception e) { + break; + } + } + // make sure consume closed before start next consumer + consumer.close(); + } + + int consumer1Received = receivedCounts[0]; + int consumer2Received = receivedCounts[1]; + + log.info("Consumer1 total received: {}", consumer1Received); + log.info("Consumer2 total received: {}", consumer2Received); + Assert.assertEquals(consumer1Received + consumer2Received, messageCount, + "Total messages received by both consumers should be " + messageCount); + } }