This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 8d9a2ab  [pulsar-broker] Allow broker to discover and unblock stuck 
subscription (#9789)
8d9a2ab is described below

commit 8d9a2ab2a5592e5fb1799ad152be4b5082815191
Author: Rajan Dhabalia <[email protected]>
AuthorDate: Tue Mar 9 22:53:48 2021 -0800

    [pulsar-broker] Allow broker to discover and unblock stuck subscription 
(#9789)
    
    ### Motivation
    We have been frequently seeing issue where subscription gets stuck on 
different topics and broker is not dispatching messages though consumer has 
available-permits and no pending reads (example #9788). It can happen due to 
regression bug or unknown issue when expiry runs.. one of the workarounds is 
manually unload the topic and reload it which is not feasible if this happens 
frequently to many topics. Or broker should have the capability to discover 
such stuck subscriptions and unblock them.
    Below example shows that:
    subscription has available-permit>0, there is no pending reads, cursor's 
read-position is not moving forward and that builds the backlog until we unload 
the topic. It happens frequently due to unknown reason:
    ```
    STATS-INTERNAL:
    "sub1" : {
          "markDeletePosition" : "11111111:15520",
          "readPosition" : "11111111:15521",
          "waitingReadOp" : false,
          "pendingReadOps" : 0,
          "messagesConsumedCounter" : 115521,
          "cursorLedger" : 585099247,
          "cursorLedgerLastEntry" : 597,
          "individuallyDeletedMessages" : "[]",
          "lastLedgerSwitchTimestamp" : "2021-02-25T19:55:50.357Z",
          "state" : "Open",
          "numberOfEntriesSinceFirstNotAckedMessage" : 1,
          "totalNonContiguousDeletedMessagesRange" : 0,
    
    STATS:
    "sub1" : {
          "msgRateOut" : 0.0,
          "msgThroughputOut" : 0.0,
          "msgRateRedeliver" : 0.0,
          "msgBacklog" : 30350,
          "blockedSubscriptionOnUnackedMsgs" : false,
          "msgDelayed" : 0,
          "unackedMessages" : 0,
          "type" : "Shared",
          "msgRateExpired" : 0.0,
          "consumers" : [ {
            "msgRateOut" : 0.0,
            "msgThroughputOut" : 0.0,
            "msgRateRedeliver" : 0.0,
            "consumerName" : "C1",
            "availablePermits" : 723,
            "unackedMessages" : 0,
            "blockedConsumerOnUnackedMsgs" : false,
            "metadata" : { },
            "connectedSince" : "2021-02-25T19:55:50.358285Z",
    
    ```
    
    
![image](https://user-images.githubusercontent.com/2898254/109894631-ab62d980-7c42-11eb-8dcc-a1a5f4f5d14e.png)
    
    
    ### Modification
    Add capability in broker to periodically check if subscription is stuck and 
unblock it if needed. This check is controlled by flag and for initial release 
it can be disabled by default (and we can enable by default in later release)
    
    
    ### Result
    It helps broker to handle stuck subscription and logs the message for later 
debugging.
---
 conf/broker.conf                                   |  3 +
 deployment/terraform-ansible/templates/broker.conf |  3 +
 .../apache/bookkeeper/mledger/ManagedCursor.java   |  6 ++
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java | 12 ++++
 .../bookkeeper/mledger/impl/PositionImpl.java      |  1 -
 .../mledger/impl/ManagedCursorContainerTest.java   |  5 ++
 .../bookkeeper/mledger/impl/ManagedCursorTest.java | 61 +++++++++++++++++-
 .../apache/pulsar/broker/ServiceConfiguration.java |  7 +++
 .../apache/pulsar/broker/service/Dispatcher.java   |  7 +++
 .../PersistentDispatcherMultipleConsumers.java     | 15 +++++
 .../PersistentDispatcherSingleActiveConsumer.java  | 16 +++++
 .../service/persistent/PersistentSubscription.java |  4 ++
 .../broker/service/persistent/PersistentTopic.java |  4 ++
 .../service/persistent/PersistentTopicTest.java    | 72 ++++++++++++++++++++++
 site2/docs/reference-configuration.md              |  1 +
 15 files changed, 215 insertions(+), 2 deletions(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index c1d1f91..6ba9882 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -281,6 +281,9 @@ maxUnackedMessagesPerBroker=0
 # limit/2 messages
 maxUnackedMessagesPerSubscriptionOnBrokerBlocked=0.16
 
+# Broker periodically checks if subscription is stuck and unblock if flag is 
enabled. (Default is disabled)
+unblockStuckSubscriptionEnabled=false
+
 # Tick time to schedule task that checks topic publish rate limiting across 
all topics
 # Reducing to lower value can give more accuracy while throttling publish but
 # it uses more CPU to perform frequent check. (Disable publish throttling with 
value 0)
diff --git a/deployment/terraform-ansible/templates/broker.conf 
b/deployment/terraform-ansible/templates/broker.conf
index b47ec67..5a600d6 100644
--- a/deployment/terraform-ansible/templates/broker.conf
+++ b/deployment/terraform-ansible/templates/broker.conf
@@ -233,6 +233,9 @@ maxUnackedMessagesPerBroker=0
 # limit/2 messages
 maxUnackedMessagesPerSubscriptionOnBrokerBlocked=0.16
 
+# Broker periodically checks if subscription is stuck and unblock if flag is 
enabled. (Default is disabled)
+unblockStuckSubscriptionEnabled=false
+
 # Tick time to schedule task that checks topic publish rate limiting across 
all topics
 # Reducing to lower value can give more accuracy while throttling publish but
 # it uses more CPU to perform frequent check. (Disable publish throttling with 
value 0)
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
index d39af92..be898f3 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
@@ -683,4 +683,10 @@ public interface ManagedCursor {
      */
     ManagedCursorMXBean getStats();
 
+    /**
+     * Checks if read position changed since this method was called last time.
+     *
+     * @return if read position changed
+     */
+    boolean checkAndUpdateReadPositionChanged();
 }
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 211d7ea..81e8399 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -119,6 +119,8 @@ public class ManagedCursorImpl implements ManagedCursor {
     protected static final AtomicReferenceFieldUpdater<ManagedCursorImpl, 
PositionImpl> READ_POSITION_UPDATER =
             AtomicReferenceFieldUpdater.newUpdater(ManagedCursorImpl.class, 
PositionImpl.class, "readPosition");
     protected volatile PositionImpl readPosition;
+    // keeps sample of last read-position for validation and monitoring if 
read-position is not moving forward.
+    protected volatile PositionImpl statsLastReadPosition;
 
     protected static final AtomicReferenceFieldUpdater<ManagedCursorImpl, 
MarkDeleteEntry> LAST_MARK_DELETE_ENTRY_UPDATER =
             AtomicReferenceFieldUpdater.newUpdater(ManagedCursorImpl.class, 
MarkDeleteEntry.class, "lastMarkDeleteEntry");
@@ -2970,5 +2972,15 @@ public class ManagedCursorImpl implements ManagedCursor {
         return Math.min(maxEntriesBasedOnSize, maxEntries);
     }
 
+    @Override
+    public boolean checkAndUpdateReadPositionChanged() {
+        PositionImpl lastEntry = ledger.lastConfirmedEntry;
+        boolean isReadPositionOnTail = lastEntry == null || readPosition == 
null
+                || !(lastEntry.compareTo(readPosition) > 0);
+        boolean isReadPositionChanged = readPosition != null && 
!readPosition.equals(statsLastReadPosition);
+        statsLastReadPosition = readPosition;
+        return isReadPositionOnTail || isReadPositionChanged;
+    }
+
     private static final Logger log = 
LoggerFactory.getLogger(ManagedCursorImpl.class);
 }
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/PositionImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/PositionImpl.java
index 50382f9..92baf42 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/PositionImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/PositionImpl.java
@@ -124,7 +124,6 @@ public class PositionImpl implements Position, 
Comparable<PositionImpl> {
             PositionImpl other = (PositionImpl) obj;
             return ledgerId == other.ledgerId && entryId == other.entryId;
         }
-
         return false;
     }
 
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
index 6004350..baf1c9f 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
@@ -364,6 +364,11 @@ public class ManagedCursorContainerTest {
                 throws InterruptedException, ManagedLedgerException {
             return null;
         }
+
+        @Override
+        public boolean checkAndUpdateReadPositionChanged() {
+            return false;
+        }
     }
 
     @Test
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
index 2f3d3bc..1d325f6 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
@@ -3465,12 +3465,71 @@ public class ManagedCursorTest extends 
MockedBookKeeperTestCase {
 
                     } finally {
                         factory2.shutdown();
-                    }
+                    }   
                 });
 
         factory1.shutdown();
         dirtyFactory.shutdown();
     }
 
+    @Test
+    public void testCursorCheckReadPositionChanged() throws Exception {
+        ManagedLedger ledger = factory.open("my_test_ledger", new 
ManagedLedgerConfig());
+        ManagedCursor c1 = ledger.openCursor("c1");
+
+        // check empty ledger
+        assertTrue(c1.checkAndUpdateReadPositionChanged());
+        assertTrue(c1.checkAndUpdateReadPositionChanged());
+
+        ledger.addEntry("dummy-entry-1".getBytes(Encoding));
+        ledger.addEntry("dummy-entry-1".getBytes(Encoding));
+        ledger.addEntry("dummy-entry-1".getBytes(Encoding));
+        ledger.addEntry("dummy-entry-1".getBytes(Encoding));
+
+        // read-position has not been moved
+        assertFalse(c1.checkAndUpdateReadPositionChanged());
+
+        List<Entry> entries = c1.readEntries(2);
+        entries.forEach(e -> {
+            try {
+                c1.markDelete(e.getPosition());
+                e.release();
+            } catch (Exception e1) {
+                // Ok
+            }
+        });
+
+        // read-position is moved
+        assertTrue(c1.checkAndUpdateReadPositionChanged());
+        // read-position has not been moved since last read
+        assertFalse(c1.checkAndUpdateReadPositionChanged());
+
+        c1.close();
+        ledger.close();
+
+        ledger = factory.open("my_test_ledger", new ManagedLedgerConfig());
+        // recover cursor
+        ManagedCursor c2 = ledger.openCursor("c1");
+        assertTrue(c2.checkAndUpdateReadPositionChanged());
+        assertFalse(c2.checkAndUpdateReadPositionChanged());
+
+        entries = c2.readEntries(2);
+        entries.forEach(e -> {
+            try {
+                c2.markDelete(e.getPosition());
+                e.release();
+            } catch (Exception e1) {
+                // Ok
+            }
+        });
+
+        assertTrue(c2.checkAndUpdateReadPositionChanged());
+        // returns true because read-position is on tail
+        assertTrue(c2.checkAndUpdateReadPositionChanged());
+        assertTrue(c2.checkAndUpdateReadPositionChanged());
+
+        ledger.close();
+    }
+
     private static final Logger log = 
LoggerFactory.getLogger(ManagedCursorTest.class);
 }
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 526c414..b723705 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -557,6 +557,13 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
     @FieldContext(
             category = CATEGORY_POLICIES,
             dynamic = true,
+            doc = "Broker periodically checks if subscription is stuck and 
unblock if flag is enabled. "
+                    + "(Default is disabled)"
+        )
+    private boolean unblockStuckSubscriptionEnabled = false;
+    @FieldContext(
+            category = CATEGORY_POLICIES,
+            dynamic = true,
             doc = "Tick time to schedule task that checks topic publish rate 
limiting across all topics  "
                     + "Reducing to lower value can give more accuracy while 
throttling publish but "
                     + "it uses more CPU to perform frequent check. (Disable 
publish throttling with value 0)"
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java
index 0d3f3f4..7dab8b6 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java
@@ -124,4 +124,11 @@ public interface Dispatcher {
         // No-op
     }
 
+    /**
+     * Checks if dispatcher is stuck and unblocks the dispatch if needed.
+     */
+    default boolean checkAndUnblockIfStuck() {
+        return false;
+    }
+
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index c6caa70..eec0f93 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -853,6 +853,21 @@ public class PersistentDispatcherMultipleConsumers extends 
AbstractDispatcherMul
         this.messagesToRedeliver.add(ledgerId, entryId);
     }
 
+    @Override
+    public boolean checkAndUnblockIfStuck() {
+        if (cursor.checkAndUpdateReadPositionChanged()) {
+            return false;
+        }
+        // consider dispatch is stuck if : dispatcher has backlog, 
available-permits and there is no pending read
+        if (totalAvailablePermits > 0 && !havePendingReplayRead && 
!havePendingRead
+                && cursor.getNumberOfEntriesInBacklog(false) > 0) {
+            log.warn("{}-{} Dispatcher is stuck and unblocking by issuing 
reads", topic.getName(), name);
+            readMoreEntries();
+            return true;
+        }
+        return false;
+    }
+
     public PersistentTopic getTopic() {
         return topic;
     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
index 4bc0728..6e28a3c 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
@@ -576,5 +576,21 @@ public class PersistentDispatcherSingleActiveConsumer 
extends AbstractDispatcher
         return disconnectAllConsumers();
     }
 
+    @Override
+    public boolean checkAndUnblockIfStuck() {
+        if (cursor.checkAndUpdateReadPositionChanged()) {
+            return false;
+        }
+        Consumer consumer = ACTIVE_CONSUMER_UPDATER.get(this);
+        int totalAvailablePermits = consumer.getAvailablePermits();
+        // consider dispatch is stuck if : dispatcher has backlog, 
available-permits and there is no pending read
+        if (totalAvailablePermits > 0 && !havePendingRead && 
cursor.getNumberOfEntriesInBacklog(false) > 0) {
+            log.warn("{}-{} Dispatcher is stuck and unblocking by issuing 
reads", topic.getName(), name);
+            readMoreEntries(consumer);
+            return true;
+        }
+        return false;
+    }
+
     private static final Logger log = 
LoggerFactory.getLogger(PersistentDispatcherSingleActiveConsumer.class);
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index a11e875..7c5681f 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -1092,5 +1092,9 @@ public class PersistentSubscription implements 
Subscription {
         return 
this.pendingAckHandle.checkIsCanDeleteConsumerPendingAck(position);
     }
 
+    public boolean checkAndUnblockIfStuck() {
+        return dispatcher.checkAndUnblockIfStuck();
+    }
+
     private static final Logger log = 
LoggerFactory.getLogger(PersistentSubscription.class);
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 0951ec5..a7391d5 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1621,6 +1621,10 @@ public class PersistentTopic extends AbstractTopic
                 topicStatsHelper.aggMsgRateOut += subMsgRateOut;
                 topicStatsHelper.aggMsgThroughputOut += subMsgThroughputOut;
                 nsStats.msgBacklog += 
subscription.getNumberOfEntriesInBacklog(false);
+                // check stuck subscription
+                if 
(brokerService.getPulsar().getConfig().isUnblockStuckSubscriptionEnabled()) {
+                    subscription.checkAndUnblockIfStuck();
+                }
             } catch (Exception e) {
                 log.error("Got exception when creating consumer stats for 
subscription {}: {}", subscriptionName,
                         e.getMessage(), e);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
index 9961f3a..278e05b 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
@@ -18,20 +18,29 @@
  */
 package org.apache.pulsar.broker.service.persistent;
 
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.mock;
+import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
 
 import java.lang.reflect.Field;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.mledger.ManagedLedger;
 import org.apache.pulsar.broker.service.BrokerTestBase;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.naming.TopicName;
 import org.testng.annotations.AfterMethod;
@@ -84,4 +93,67 @@ public class PersistentTopicTest extends BrokerTestBase {
 
         producer.close();
     }
+
+    /**
+     * Test validates if topic's dispatcher is stuck then broker can doscover 
and unblock it.
+     * 
+     * @throws Exception
+     */
+    @Test
+    public void testUnblockStuckSubscription() throws Exception {
+        final String topicName = 
"persistent://prop/ns-abc/stuckSubscriptionTopic";
+        final String sharedSubName = "shared";
+        final String failoverSubName = "failOver";
+
+        Consumer<String> consumer1 = 
pulsarClient.newConsumer(Schema.STRING).topic(topicName)
+                
.subscriptionType(SubscriptionType.Shared).subscriptionName(sharedSubName).subscribe();
+        Consumer<String> consumer2 = 
pulsarClient.newConsumer(Schema.STRING).topic(topicName)
+                
.subscriptionType(SubscriptionType.Failover).subscriptionName(failoverSubName).subscribe();
+        Producer<String> producer = 
pulsarClient.newProducer(Schema.STRING).topic(topicName).create();
+
+        PersistentTopic topic = (PersistentTopic) 
pulsar.getBrokerService().getTopicReference(topicName).get();
+        PersistentSubscription sharedSub = 
topic.getSubscription(sharedSubName);
+        PersistentSubscription failOverSub = 
topic.getSubscription(failoverSubName);
+
+        PersistentDispatcherMultipleConsumers sharedDispatcher = 
(PersistentDispatcherMultipleConsumers) sharedSub
+                .getDispatcher();
+        PersistentDispatcherSingleActiveConsumer failOverDispatcher = 
(PersistentDispatcherSingleActiveConsumer) failOverSub
+                .getDispatcher();
+
+        // build backlog
+        consumer1.close();
+        consumer2.close();
+
+        // block sub to read messages
+        sharedDispatcher.havePendingRead = true;
+        failOverDispatcher.havePendingRead = true;
+
+        producer.newMessage().value("test").eventTime(5).send();
+        producer.newMessage().value("test").eventTime(5).send();
+
+        consumer1 = 
pulsarClient.newConsumer(Schema.STRING).topic(topicName).subscriptionType(SubscriptionType.Shared)
+                .subscriptionName(sharedSubName).subscribe();
+        consumer2 = 
pulsarClient.newConsumer(Schema.STRING).topic(topicName).subscriptionType(SubscriptionType.Failover)
+                .subscriptionName(failoverSubName).subscribe();
+        Message<String> msg = consumer1.receive(2, TimeUnit.SECONDS);
+        assertNull(msg);
+        msg = consumer2.receive(2, TimeUnit.SECONDS);
+        assertNull(msg);
+
+        // allow reads but dispatchers are still blocked
+        sharedDispatcher.havePendingRead = false;
+        failOverDispatcher.havePendingRead = false;
+
+        // run task to unblock stuck dispatcher: first iteration sets the 
lastReadPosition and next iteration will
+        // unblock the dispatcher read because read-position has not been 
moved since last iteration.
+        sharedSub.checkAndUnblockIfStuck();
+        failOverDispatcher.checkAndUnblockIfStuck();
+        assertTrue(sharedSub.checkAndUnblockIfStuck());
+        assertTrue(failOverDispatcher.checkAndUnblockIfStuck());
+
+        msg = consumer1.receive(5, TimeUnit.SECONDS);
+        assertNotNull(msg);
+        msg = consumer2.receive(5, TimeUnit.SECONDS);
+        assertNotNull(msg);
+    }
 }
diff --git a/site2/docs/reference-configuration.md 
b/site2/docs/reference-configuration.md
index 8308f62..117e30d 100644
--- a/site2/docs/reference-configuration.md
+++ b/site2/docs/reference-configuration.md
@@ -484,6 +484,7 @@ You can set the log level and configuration in the  
[log4j2.yaml](https://github
 |maxUnackedMessagesPerSubscription| The same as above, except per subscription 
rather than per consumer.  |200000|
 | maxUnackedMessagesPerBroker | Maximum number of unacknowledged messages 
allowed per broker. Once this limit reaches, the broker stops dispatching 
messages to all shared subscriptions which has a higher number of 
unacknowledged messages until subscriptions start acknowledging messages back 
and unacknowledged messages count reaches to limit/2. When the value is set to 
0, unacknowledged message limit check is disabled and broker does not block 
dispatchers. | 0 |
 | maxUnackedMessagesPerSubscriptionOnBrokerBlocked | Once the broker reaches 
maxUnackedMessagesPerBroker limit, it blocks subscriptions which have higher 
unacknowledged messages than this percentage limit and subscription does not 
receive any new messages until that subscription acknowledges messages back. | 
0.16 |
+| unblockStuckSubscriptionEnabled|Broker periodically checks if subscription 
is stuck and unblock if flag is enabled.|false|
 |maxNumPartitionsPerPartitionedTopic|Max number of partitions per partitioned 
topic. Use 0 or negative number to disable the check|0|
 |zookeeperSessionExpiredPolicy|There are two policies when ZooKeeper session 
expired happens, "shutdown" and "reconnect". If it is set to "shutdown" policy, 
when ZooKeeper session expired happens, the broker is shutdown. If it is set to 
"reconnect" policy, the broker tries to reconnect to ZooKeeper server and 
re-register metadata to ZooKeeper. Note: the "reconnect" policy is an 
experiment feature.|shutdown|
 | topicPublisherThrottlingTickTimeMillis | Tick time to schedule task that 
checks topic publish rate limiting across all topics. A lower value can improve 
accuracy while throttling publish but it uses more CPU to perform frequent 
check. (Disable publish throttling with value 0) | 10|

Reply via email to