This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 9c24f79478e154e4a344d2c7046813ebda0955d6
Author: Enrico Olivelli <[email protected]>
AuthorDate: Sat Jan 29 05:14:29 2022 +0100

    KeyShared stickyHashRange subscription: prevent stuck subscription in case 
of consumer restart (#14014)
    
    ### Motivation
    When using KeyShared subscription with `stickyHashRange` it is possible to 
a stuck subscription while restarting the consumers.
    
    This bug is not a regression in 2.10, the problem is present also in Pulsar 
2.8 (and probably older versions)
    
    ### Modifications
    
    add the entry to the list of messaged to be redelivered
    
    (cherry picked from commit da9e80650b1cc3d573008c38585ec2e6ed4a00e9)
---
 ...istentStickyKeyDispatcherMultipleConsumers.java |   1 +
 .../client/api/KeySharedSubscriptionTest.java      | 128 +++++++++++++++++++++
 2 files changed, 129 insertions(+)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
index 420795c..21c98b9 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
@@ -213,6 +213,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumers 
extends PersistentDi
                 groupedEntries.computeIfAbsent(c, k -> new 
ArrayList<>()).add(entry);
                 consumerStickyKeyHashesMap.computeIfAbsent(c, k -> new 
HashSet<>()).add(stickyKeyHash);
             } else {
+                addMessageToReplay(entry.getLedgerId(), entry.getEntryId(), 
stickyKeyHash);
                 entry.release();
             }
         }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
index 8fc8980..057f0c6 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
@@ -40,6 +40,7 @@ import java.util.Random;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
@@ -1372,4 +1373,131 @@ public class KeySharedSubscriptionTest extends 
ProducerConsumerBase {
             return null;
         }
     }
+
+    @Test
+    public void testStickyKeyRangesRestartConsumers() throws 
PulsarClientException, InterruptedException {
+        final String topic = TopicName.get("persistent", "public", "default",
+                "testStickyKeyRangesRestartConsumers" + 
UUID.randomUUID()).toString();
+
+        final String subscriptionName = "my-sub";
+
+        final int numMessages = 100;
+        // start 2 consumers
+        Set<String> sentMessages = new ConcurrentSkipListSet<>();
+
+        CountDownLatch count1 = new CountDownLatch(2);
+        CountDownLatch count2 = new CountDownLatch(13); // consumer 2 usually 
receive the fix messages
+        CountDownLatch count3 = new CountDownLatch(numMessages);
+        Consumer<String> consumer1 = pulsarClient.newConsumer(
+                        Schema.STRING)
+                .topic(topic)
+                .subscriptionName(subscriptionName)
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .subscriptionType(SubscriptionType.Key_Shared)
+                
.keySharedPolicy(KeySharedPolicy.stickyHashRange().ranges(Range.of(0, 65536 / 
2)))
+                .messageListener((consumer, msg) -> {
+                    consumer.acknowledgeAsync(msg).whenComplete((m, e) -> {
+                        if (e != null) {
+                            log.error("error", e);
+                        } else {
+                            sentMessages.remove(msg.getKey());
+                            count1.countDown();
+                            count3.countDown();
+                        }
+                    });
+                })
+                .subscribe();
+
+        Consumer<String> consumer2 = pulsarClient.newConsumer(
+                        Schema.STRING)
+                .topic(topic)
+                .subscriptionName(subscriptionName)
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .subscriptionType(SubscriptionType.Key_Shared)
+                
.keySharedPolicy(KeySharedPolicy.stickyHashRange().ranges(Range.of(65536 / 2 + 
1, 65535)))
+                .messageListener((consumer, msg) -> {
+                    consumer.acknowledgeAsync(msg).whenComplete((m, e) -> {
+                        if (e != null) {
+                            log.error("error", e);
+                        } else {
+                            sentMessages.remove(msg.getKey());
+                            count2.countDown();
+                            count3.countDown();
+                        }
+                    });
+                })
+                .subscribe();
+
+        pulsar.getExecutor().submit(() -> {
+            try
+            {
+                try (Producer<String> producer = 
pulsarClient.newProducer(Schema.STRING)
+                        .topic(topic)
+                        .enableBatching(false)
+                        .create();) {
+                    for (int i = 0; i < numMessages; i++)
+                    {
+                        String key = "test" + i;
+                        sentMessages.add(key);
+                        producer.newMessage()
+                                .key(key)
+                                .value("test" + i).
+                                send();
+                        Thread.sleep(100);
+                    }
+                }
+            } catch (Throwable t) {
+                log.error("error", t);
+            }});
+
+        // wait for some messages to be received by both of the consumers
+        count1.await();
+        count2.await();
+        consumer1.close();
+        consumer2.close();
+
+        // this sleep is to trigger a race condition that happens
+        // when there are some messages that cannot be dispatched while 
consuming
+        Thread.sleep(3000);
+
+        // start consuming again...
+
+        pulsarClient.newConsumer(Schema.STRING)
+                .topic(topic)
+                .subscriptionName(subscriptionName)
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .subscriptionType(SubscriptionType.Key_Shared)
+                
.keySharedPolicy(KeySharedPolicy.stickyHashRange().ranges(Range.of(0, 65536 / 
2)))
+                .messageListener((consumer, msg) -> {
+                    consumer.acknowledgeAsync(msg).whenComplete((m, e) -> {
+                        if (e != null) {
+                            log.error("error", e);
+                        } else {
+                            sentMessages.remove(msg.getKey());
+                            count3.countDown();
+                        }
+                    });
+                })
+                .subscribe();
+        pulsarClient.newConsumer(Schema.STRING)
+                .topic(topic)
+                .subscriptionName(subscriptionName)
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .subscriptionType(SubscriptionType.Key_Shared)
+                
.keySharedPolicy(KeySharedPolicy.stickyHashRange().ranges(Range.of(65536 / 2 + 
1, 65535)))
+                .messageListener((consumer, msg) -> {
+                    consumer.acknowledgeAsync(msg).whenComplete((m, e) -> {
+                        if (e != null) {
+                            log.error("error", e);
+                        } else {
+                            sentMessages.remove(msg.getKey());
+                            count3.countDown();
+                        }
+                    });
+                })
+                .subscribe();
+        // wait for all the messages to be delivered
+        count3.await();
+        assertTrue(sentMessages.isEmpty(), "didn't receive " + sentMessages);
+    }
 }

Reply via email to