This is an automated email from the ASF dual-hosted git repository.

yong pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 87a3be238c2c0ac075820a185855020494206b98
Author: Enrico Olivelli <[email protected]>
AuthorDate: Sat Jan 29 05:14:29 2022 +0100

    KeyShared stickyHashRange subscription: prevent stuck subscription in case 
of consumer restart (#14014)
    
    When using KeyShared subscription with `stickyHashRange` it is possible to 
a stuck subscription while restarting the consumers.
    
    This bug is not a regression in 2.10, the problem is present also in Pulsar 
2.8 (and probably older versions)
    
    add the entry to the list of messaged to be redelivered
    
    (cherry picked from commit da9e80650b1cc3d573008c38585ec2e6ed4a00e9)
---
 ...istentStickyKeyDispatcherMultipleConsumers.java |   1 +
 .../client/api/KeySharedSubscriptionTest.java      | 129 +++++++++++++++++++++
 2 files changed, 130 insertions(+)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
index 420795c..21c98b9 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
@@ -213,6 +213,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumers 
extends PersistentDi
                 groupedEntries.computeIfAbsent(c, k -> new 
ArrayList<>()).add(entry);
                 consumerStickyKeyHashesMap.computeIfAbsent(c, k -> new 
HashSet<>()).add(stickyKeyHash);
             } else {
+                addMessageToReplay(entry.getLedgerId(), entry.getEntryId(), 
stickyKeyHash);
                 entry.release();
             }
         }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
index bc19852..b67ce5b 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
@@ -40,6 +40,8 @@ import java.util.Random;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -1303,4 +1305,131 @@ public class KeySharedSubscriptionTest extends 
ProducerConsumerBase {
             return null;
         }
     }
+
+    @Test
+    public void testStickyKeyRangesRestartConsumers() throws 
PulsarClientException, InterruptedException {
+        final String topic = TopicName.get("persistent", "public", "default",
+                "testStickyKeyRangesRestartConsumers" + 
UUID.randomUUID()).toString();
+
+        final String subscriptionName = "my-sub";
+
+        final int numMessages = 100;
+        // start 2 consumers
+        Set<String> sentMessages = new ConcurrentSkipListSet<>();
+
+        CountDownLatch count1 = new CountDownLatch(2);
+        CountDownLatch count2 = new CountDownLatch(13); // consumer 2 usually 
receive the fix messages
+        CountDownLatch count3 = new CountDownLatch(numMessages);
+        Consumer<String> consumer1 = pulsarClient.newConsumer(
+                        Schema.STRING)
+                .topic(topic)
+                .subscriptionName(subscriptionName)
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .subscriptionType(SubscriptionType.Key_Shared)
+                
.keySharedPolicy(KeySharedPolicy.stickyHashRange().ranges(Range.of(0, 65536 / 
2)))
+                .messageListener((consumer, msg) -> {
+                    consumer.acknowledgeAsync(msg).whenComplete((m, e) -> {
+                        if (e != null) {
+                            log.error("error", e);
+                        } else {
+                            sentMessages.remove(msg.getKey());
+                            count1.countDown();
+                            count3.countDown();
+                        }
+                    });
+                })
+                .subscribe();
+
+        Consumer<String> consumer2 = pulsarClient.newConsumer(
+                        Schema.STRING)
+                .topic(topic)
+                .subscriptionName(subscriptionName)
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .subscriptionType(SubscriptionType.Key_Shared)
+                
.keySharedPolicy(KeySharedPolicy.stickyHashRange().ranges(Range.of(65536 / 2 + 
1, 65535)))
+                .messageListener((consumer, msg) -> {
+                    consumer.acknowledgeAsync(msg).whenComplete((m, e) -> {
+                        if (e != null) {
+                            log.error("error", e);
+                        } else {
+                            sentMessages.remove(msg.getKey());
+                            count2.countDown();
+                            count3.countDown();
+                        }
+                    });
+                })
+                .subscribe();
+
+        pulsar.getExecutor().submit(() -> {
+            try
+            {
+                try (Producer<String> producer = 
pulsarClient.newProducer(Schema.STRING)
+                        .topic(topic)
+                        .enableBatching(false)
+                        .create();) {
+                    for (int i = 0; i < numMessages; i++)
+                    {
+                        String key = "test" + i;
+                        sentMessages.add(key);
+                        producer.newMessage()
+                                .key(key)
+                                .value("test" + i).
+                                send();
+                        Thread.sleep(100);
+                    }
+                }
+            } catch (Throwable t) {
+                log.error("error", t);
+            }});
+
+        // wait for some messages to be received by both of the consumers
+        count1.await();
+        count2.await();
+        consumer1.close();
+        consumer2.close();
+
+        // this sleep is to trigger a race condition that happens
+        // when there are some messages that cannot be dispatched while 
consuming
+        Thread.sleep(3000);
+
+        // start consuming again...
+
+        pulsarClient.newConsumer(Schema.STRING)
+                .topic(topic)
+                .subscriptionName(subscriptionName)
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .subscriptionType(SubscriptionType.Key_Shared)
+                
.keySharedPolicy(KeySharedPolicy.stickyHashRange().ranges(Range.of(0, 65536 / 
2)))
+                .messageListener((consumer, msg) -> {
+                    consumer.acknowledgeAsync(msg).whenComplete((m, e) -> {
+                        if (e != null) {
+                            log.error("error", e);
+                        } else {
+                            sentMessages.remove(msg.getKey());
+                            count3.countDown();
+                        }
+                    });
+                })
+                .subscribe();
+        pulsarClient.newConsumer(Schema.STRING)
+                .topic(topic)
+                .subscriptionName(subscriptionName)
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .subscriptionType(SubscriptionType.Key_Shared)
+                
.keySharedPolicy(KeySharedPolicy.stickyHashRange().ranges(Range.of(65536 / 2 + 
1, 65535)))
+                .messageListener((consumer, msg) -> {
+                    consumer.acknowledgeAsync(msg).whenComplete((m, e) -> {
+                        if (e != null) {
+                            log.error("error", e);
+                        } else {
+                            sentMessages.remove(msg.getKey());
+                            count3.countDown();
+                        }
+                    });
+                })
+                .subscribe();
+        // wait for all the messages to be delivered
+        count3.await();
+        assertTrue(sentMessages.isEmpty(), "didn't receive " + sentMessages);
+    }
 }

Reply via email to