This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 2f75d02  Fixed KeyShared consumers getting stuck on delivery (#7105)
2f75d02 is described below

commit 2f75d0263bf708574a3cef5aa0e47891e1d6a3f3
Author: Matteo Merli <[email protected]>
AuthorDate: Thu Jun 4 17:10:52 2020 -0700

    Fixed KeyShared consumers getting stuck on delivery (#7105)
    
    Motivation
    If one consumer is slowly processing messages, this can prevent other 
consumers from making progress on the topic. Instead we're in a loop of keep 
trying to replay messages without being able to dispatch any message.
    
    The basic idea here is that we can make progress by keep going through the 
topic and dispatch these messages to the consumers that are free, at least the 
keys that belong to them.
---
 .../PersistentDispatcherMultipleConsumers.java     |  2 +-
 ...istentStickyKeyDispatcherMultipleConsumers.java | 29 +++++++++
 .../client/api/KeySharedSubscriptionTest.java      | 71 ++++++++++++++++++++++
 3 files changed, 101 insertions(+), 1 deletion(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index c7af338..b8255ba 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -784,7 +784,7 @@ public class PersistentDispatcherMultipleConsumers extends 
AbstractDispatcherMul
         }
     }
 
-    private synchronized Set<PositionImpl> getMessagesToReplayNow(int 
maxMessagesToRead) {
+    protected synchronized Set<PositionImpl> getMessagesToReplayNow(int 
maxMessagesToRead) {
         if (!messagesToRedeliver.isEmpty()) {
             return messagesToRedeliver.items(maxMessagesToRead,
                     (ledgerId, entryId) -> new PositionImpl(ledgerId, 
entryId));
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
index ce946fc..9e93ff8 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.broker.service.persistent;
 import io.netty.util.concurrent.FastThreadLocal;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -30,6 +31,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.pulsar.broker.service.BrokerServiceException;
 import org.apache.pulsar.broker.service.Consumer;
 import org.apache.pulsar.broker.service.EntryBatchIndexesAcks;
@@ -37,6 +39,7 @@ import org.apache.pulsar.broker.service.EntryBatchSizes;
 import org.apache.pulsar.broker.service.SendMessageInfo;
 import org.apache.pulsar.broker.service.StickyKeyConsumerSelector;
 import org.apache.pulsar.broker.service.Subscription;
+import 
org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.ReadType;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -45,6 +48,8 @@ public class PersistentStickyKeyDispatcherMultipleConsumers 
extends PersistentDi
 
     private final StickyKeyConsumerSelector selector;
 
+    private boolean isDispatcherStuckOnReplays = false;
+
     PersistentStickyKeyDispatcherMultipleConsumers(PersistentTopic topic, 
ManagedCursor cursor,
            Subscription subscription, StickyKeyConsumerSelector selector) {
         super(topic, cursor, subscription);
@@ -159,6 +164,30 @@ public class 
PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
                 dispatchRateLimiter.get().tryDispatchPermit(totalMessagesSent, 
totalBytesSent);
             }
         }
+
+        if (totalMessagesSent == 0) {
+            // This means, that all the messages we've just read cannot be 
dispatched right now.
+            // This condition can only happen when:
+            //  1. We have consumers ready to accept messages (otherwise the 
would not haven been triggered)
+            //  2. All keys in the current set of messages are routing to 
consumers that are currently busy
+            //
+            // The solution here is to move on and read next batch of messages 
which might hopefully contain
+            // also keys meant for other consumers.
+            isDispatcherStuckOnReplays = true;
+            readMoreEntries();
+        }
+    }
+
+    protected synchronized Set<PositionImpl> getMessagesToReplayNow(int 
maxMessagesToRead) {
+        if (isDispatcherStuckOnReplays) {
+            // If we're stuck on replay, we want to move forward reading on 
the topic (until the overall max-unacked
+            // messages kicks in), instead of keep replaying the same old 
messages, since the consumer that these
+            // messages are routing to might be busy at the moment
+            this.isDispatcherStuckOnReplays = false;
+            return Collections.emptySet();
+        } else {
+            return super.getMessagesToReplayNow(maxMessagesToRead);
+        }
     }
 
     @Override
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
index 84a1978..e0c4093 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
@@ -41,7 +41,10 @@ import java.util.Map;
 import java.util.Random;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertTrue;
@@ -392,6 +395,74 @@ public class KeySharedSubscriptionTest extends 
ProducerConsumerBase {
         }
     }
 
+    @Test(dataProvider = "batch")
+    public void testMakingProgressWithSlowerConsumer(boolean enableBatch) 
throws Exception {
+        this.conf.setSubscriptionKeySharedEnable(true);
+        String topic = "testMakingProgressWithSlowerConsumer-" + 
UUID.randomUUID();
+
+        String slowKey = "slowKey";
+
+        List<PulsarClient> clients = new ArrayList<>();
+
+        AtomicInteger receivedMessages = new AtomicInteger();
+
+        for (int i = 0; i < 10; i++) {
+            PulsarClient client = PulsarClient.builder()
+                    .serviceUrl(brokerUrl.toString())
+                    .build();
+            clients.add(client);
+
+            client.newConsumer(Schema.INT32)
+                    .topic(topic)
+                    .subscriptionName("key_shared")
+                    .subscriptionType(SubscriptionType.Key_Shared)
+                    .receiverQueueSize(1)
+                    .messageListener((consumer, msg) -> {
+                        try {
+                            if (slowKey.equals(msg.getKey())) {
+                                // Block the thread to simulate a slow consumer
+                                Thread.sleep(10000);
+                            }
+
+                            receivedMessages.incrementAndGet();
+                            consumer.acknowledge(msg);
+                        } catch (Exception e) {
+                            e.printStackTrace();
+                        }
+                    })
+                    .subscribe();
+        }
+
+        @Cleanup
+        Producer<Integer> producer = createProducer(topic, enableBatch);
+
+        // First send the "slow key" so that 1 consumer will get stuck
+        producer.newMessage()
+                .key(slowKey)
+                .value(-1)
+                .send();
+
+        int N = 1000;
+
+        // Then send all the other keys
+        for (int i = 0; i < N; i++) {
+            producer.newMessage()
+                    .key(String.valueOf(random.nextInt(NUMBER_OF_KEYS)))
+                    .value(i)
+                    .send();
+        }
+
+        // Since only 1 out of 10 consumers is stuck, we should be able to 
receive ~90% messages,
+        // plus or minus for some skew in the key distribution.
+        Thread.sleep(5000);
+
+        assertEquals((double) receivedMessages.get(), N * 0.9, N * 0.3);
+
+        for (PulsarClient c : clients) {
+            c.close();
+        }
+    }
+
     private Producer<Integer> createProducer(String topic, boolean 
enableBatch) throws PulsarClientException {
         Producer<Integer> producer = null;
         if (enableBatch) {

Reply via email to