This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 8d9a2ab [pulsar-broker] Allow broker to discover and unblock stuck
subscription (#9789)
8d9a2ab is described below
commit 8d9a2ab2a5592e5fb1799ad152be4b5082815191
Author: Rajan Dhabalia <[email protected]>
AuthorDate: Tue Mar 9 22:53:48 2021 -0800
[pulsar-broker] Allow broker to discover and unblock stuck subscription
(#9789)
### Motivation
We have been frequently seeing issue where subscription gets stuck on
different topics and broker is not dispatching messages though consumer has
available-permits and no pending reads (example #9788). It can happen due to
regression bug or unknown issue when expiry runs.. one of the workarounds is
manually unload the topic and reload it which is not feasible if this happens
frequently to many topics. Or broker should have the capability to discover
such stuck subscriptions and unblock them.
Below example shows that:
subscription has available-permit>0, there is no pending reads, cursor's
read-position is not moving forward and that builds the backlog until we unload
the topic. It happens frequently due to unknown reason:
```
STATS-INTERNAL:
"sub1" : {
"markDeletePosition" : "11111111:15520",
"readPosition" : "11111111:15521",
"waitingReadOp" : false,
"pendingReadOps" : 0,
"messagesConsumedCounter" : 115521,
"cursorLedger" : 585099247,
"cursorLedgerLastEntry" : 597,
"individuallyDeletedMessages" : "[]",
"lastLedgerSwitchTimestamp" : "2021-02-25T19:55:50.357Z",
"state" : "Open",
"numberOfEntriesSinceFirstNotAckedMessage" : 1,
"totalNonContiguousDeletedMessagesRange" : 0,
STATS:
"sub1" : {
"msgRateOut" : 0.0,
"msgThroughputOut" : 0.0,
"msgRateRedeliver" : 0.0,
"msgBacklog" : 30350,
"blockedSubscriptionOnUnackedMsgs" : false,
"msgDelayed" : 0,
"unackedMessages" : 0,
"type" : "Shared",
"msgRateExpired" : 0.0,
"consumers" : [ {
"msgRateOut" : 0.0,
"msgThroughputOut" : 0.0,
"msgRateRedeliver" : 0.0,
"consumerName" : "C1",
"availablePermits" : 723,
"unackedMessages" : 0,
"blockedConsumerOnUnackedMsgs" : false,
"metadata" : { },
"connectedSince" : "2021-02-25T19:55:50.358285Z",
```

### Modification
Add capability in broker to periodically check if subscription is stuck and
unblock it if needed. This check is controlled by flag and for initial release
it can be disabled by default (and we can enable by default in later release)
### Result
It helps broker to handle stuck subscription and logs the message for later
debugging.
---
conf/broker.conf | 3 +
deployment/terraform-ansible/templates/broker.conf | 3 +
.../apache/bookkeeper/mledger/ManagedCursor.java | 6 ++
.../bookkeeper/mledger/impl/ManagedCursorImpl.java | 12 ++++
.../bookkeeper/mledger/impl/PositionImpl.java | 1 -
.../mledger/impl/ManagedCursorContainerTest.java | 5 ++
.../bookkeeper/mledger/impl/ManagedCursorTest.java | 61 +++++++++++++++++-
.../apache/pulsar/broker/ServiceConfiguration.java | 7 +++
.../apache/pulsar/broker/service/Dispatcher.java | 7 +++
.../PersistentDispatcherMultipleConsumers.java | 15 +++++
.../PersistentDispatcherSingleActiveConsumer.java | 16 +++++
.../service/persistent/PersistentSubscription.java | 4 ++
.../broker/service/persistent/PersistentTopic.java | 4 ++
.../service/persistent/PersistentTopicTest.java | 72 ++++++++++++++++++++++
site2/docs/reference-configuration.md | 1 +
15 files changed, 215 insertions(+), 2 deletions(-)
diff --git a/conf/broker.conf b/conf/broker.conf
index c1d1f91..6ba9882 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -281,6 +281,9 @@ maxUnackedMessagesPerBroker=0
# limit/2 messages
maxUnackedMessagesPerSubscriptionOnBrokerBlocked=0.16
+# Broker periodically checks if subscription is stuck and unblock if flag is
enabled. (Default is disabled)
+unblockStuckSubscriptionEnabled=false
+
# Tick time to schedule task that checks topic publish rate limiting across
all topics
# Reducing to lower value can give more accuracy while throttling publish but
# it uses more CPU to perform frequent check. (Disable publish throttling with
value 0)
diff --git a/deployment/terraform-ansible/templates/broker.conf
b/deployment/terraform-ansible/templates/broker.conf
index b47ec67..5a600d6 100644
--- a/deployment/terraform-ansible/templates/broker.conf
+++ b/deployment/terraform-ansible/templates/broker.conf
@@ -233,6 +233,9 @@ maxUnackedMessagesPerBroker=0
# limit/2 messages
maxUnackedMessagesPerSubscriptionOnBrokerBlocked=0.16
+# Broker periodically checks if subscription is stuck and unblock if flag is
enabled. (Default is disabled)
+unblockStuckSubscriptionEnabled=false
+
# Tick time to schedule task that checks topic publish rate limiting across
all topics
# Reducing to lower value can give more accuracy while throttling publish but
# it uses more CPU to perform frequent check. (Disable publish throttling with
value 0)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
index d39af92..be898f3 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
@@ -683,4 +683,10 @@ public interface ManagedCursor {
*/
ManagedCursorMXBean getStats();
+ /**
+ * Checks if read position changed since this method was called last time.
+ *
+ * @return if read position changed
+ */
+ boolean checkAndUpdateReadPositionChanged();
}
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 211d7ea..81e8399 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -119,6 +119,8 @@ public class ManagedCursorImpl implements ManagedCursor {
protected static final AtomicReferenceFieldUpdater<ManagedCursorImpl,
PositionImpl> READ_POSITION_UPDATER =
AtomicReferenceFieldUpdater.newUpdater(ManagedCursorImpl.class,
PositionImpl.class, "readPosition");
protected volatile PositionImpl readPosition;
+ // keeps sample of last read-position for validation and monitoring if
read-position is not moving forward.
+ protected volatile PositionImpl statsLastReadPosition;
protected static final AtomicReferenceFieldUpdater<ManagedCursorImpl,
MarkDeleteEntry> LAST_MARK_DELETE_ENTRY_UPDATER =
AtomicReferenceFieldUpdater.newUpdater(ManagedCursorImpl.class,
MarkDeleteEntry.class, "lastMarkDeleteEntry");
@@ -2970,5 +2972,15 @@ public class ManagedCursorImpl implements ManagedCursor {
return Math.min(maxEntriesBasedOnSize, maxEntries);
}
+ @Override
+ public boolean checkAndUpdateReadPositionChanged() {
+ PositionImpl lastEntry = ledger.lastConfirmedEntry;
+ boolean isReadPositionOnTail = lastEntry == null || readPosition ==
null
+ || !(lastEntry.compareTo(readPosition) > 0);
+ boolean isReadPositionChanged = readPosition != null &&
!readPosition.equals(statsLastReadPosition);
+ statsLastReadPosition = readPosition;
+ return isReadPositionOnTail || isReadPositionChanged;
+ }
+
private static final Logger log =
LoggerFactory.getLogger(ManagedCursorImpl.class);
}
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/PositionImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/PositionImpl.java
index 50382f9..92baf42 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/PositionImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/PositionImpl.java
@@ -124,7 +124,6 @@ public class PositionImpl implements Position,
Comparable<PositionImpl> {
PositionImpl other = (PositionImpl) obj;
return ledgerId == other.ledgerId && entryId == other.entryId;
}
-
return false;
}
diff --git
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
index 6004350..baf1c9f 100644
---
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
+++
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
@@ -364,6 +364,11 @@ public class ManagedCursorContainerTest {
throws InterruptedException, ManagedLedgerException {
return null;
}
+
+ @Override
+ public boolean checkAndUpdateReadPositionChanged() {
+ return false;
+ }
}
@Test
diff --git
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
index 2f3d3bc..1d325f6 100644
---
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
+++
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
@@ -3465,12 +3465,71 @@ public class ManagedCursorTest extends
MockedBookKeeperTestCase {
} finally {
factory2.shutdown();
- }
+ }
});
factory1.shutdown();
dirtyFactory.shutdown();
}
+ @Test
+ public void testCursorCheckReadPositionChanged() throws Exception {
+ ManagedLedger ledger = factory.open("my_test_ledger", new
ManagedLedgerConfig());
+ ManagedCursor c1 = ledger.openCursor("c1");
+
+ // check empty ledger
+ assertTrue(c1.checkAndUpdateReadPositionChanged());
+ assertTrue(c1.checkAndUpdateReadPositionChanged());
+
+ ledger.addEntry("dummy-entry-1".getBytes(Encoding));
+ ledger.addEntry("dummy-entry-1".getBytes(Encoding));
+ ledger.addEntry("dummy-entry-1".getBytes(Encoding));
+ ledger.addEntry("dummy-entry-1".getBytes(Encoding));
+
+ // read-position has not been moved
+ assertFalse(c1.checkAndUpdateReadPositionChanged());
+
+ List<Entry> entries = c1.readEntries(2);
+ entries.forEach(e -> {
+ try {
+ c1.markDelete(e.getPosition());
+ e.release();
+ } catch (Exception e1) {
+ // Ok
+ }
+ });
+
+ // read-position is moved
+ assertTrue(c1.checkAndUpdateReadPositionChanged());
+ // read-position has not been moved since last read
+ assertFalse(c1.checkAndUpdateReadPositionChanged());
+
+ c1.close();
+ ledger.close();
+
+ ledger = factory.open("my_test_ledger", new ManagedLedgerConfig());
+ // recover cursor
+ ManagedCursor c2 = ledger.openCursor("c1");
+ assertTrue(c2.checkAndUpdateReadPositionChanged());
+ assertFalse(c2.checkAndUpdateReadPositionChanged());
+
+ entries = c2.readEntries(2);
+ entries.forEach(e -> {
+ try {
+ c2.markDelete(e.getPosition());
+ e.release();
+ } catch (Exception e1) {
+ // Ok
+ }
+ });
+
+ assertTrue(c2.checkAndUpdateReadPositionChanged());
+ // returns true because read-position is on tail
+ assertTrue(c2.checkAndUpdateReadPositionChanged());
+ assertTrue(c2.checkAndUpdateReadPositionChanged());
+
+ ledger.close();
+ }
+
private static final Logger log =
LoggerFactory.getLogger(ManagedCursorTest.class);
}
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 526c414..b723705 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -557,6 +557,13 @@ public class ServiceConfiguration implements
PulsarConfiguration {
@FieldContext(
category = CATEGORY_POLICIES,
dynamic = true,
+ doc = "Broker periodically checks if subscription is stuck and
unblock if flag is enabled. "
+ + "(Default is disabled)"
+ )
+ private boolean unblockStuckSubscriptionEnabled = false;
+ @FieldContext(
+ category = CATEGORY_POLICIES,
+ dynamic = true,
doc = "Tick time to schedule task that checks topic publish rate
limiting across all topics "
+ "Reducing to lower value can give more accuracy while
throttling publish but "
+ "it uses more CPU to perform frequent check. (Disable
publish throttling with value 0)"
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java
index 0d3f3f4..7dab8b6 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java
@@ -124,4 +124,11 @@ public interface Dispatcher {
// No-op
}
+ /**
+ * Checks if dispatcher is stuck and unblocks the dispatch if needed.
+ */
+ default boolean checkAndUnblockIfStuck() {
+ return false;
+ }
+
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index c6caa70..eec0f93 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -853,6 +853,21 @@ public class PersistentDispatcherMultipleConsumers extends
AbstractDispatcherMul
this.messagesToRedeliver.add(ledgerId, entryId);
}
+ @Override
+ public boolean checkAndUnblockIfStuck() {
+ if (cursor.checkAndUpdateReadPositionChanged()) {
+ return false;
+ }
+ // consider dispatch is stuck if : dispatcher has backlog,
available-permits and there is no pending read
+ if (totalAvailablePermits > 0 && !havePendingReplayRead &&
!havePendingRead
+ && cursor.getNumberOfEntriesInBacklog(false) > 0) {
+ log.warn("{}-{} Dispatcher is stuck and unblocking by issuing
reads", topic.getName(), name);
+ readMoreEntries();
+ return true;
+ }
+ return false;
+ }
+
public PersistentTopic getTopic() {
return topic;
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
index 4bc0728..6e28a3c 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
@@ -576,5 +576,21 @@ public class PersistentDispatcherSingleActiveConsumer
extends AbstractDispatcher
return disconnectAllConsumers();
}
+ @Override
+ public boolean checkAndUnblockIfStuck() {
+ if (cursor.checkAndUpdateReadPositionChanged()) {
+ return false;
+ }
+ Consumer consumer = ACTIVE_CONSUMER_UPDATER.get(this);
+ int totalAvailablePermits = consumer.getAvailablePermits();
+ // consider dispatch is stuck if : dispatcher has backlog,
available-permits and there is no pending read
+ if (totalAvailablePermits > 0 && !havePendingRead &&
cursor.getNumberOfEntriesInBacklog(false) > 0) {
+ log.warn("{}-{} Dispatcher is stuck and unblocking by issuing
reads", topic.getName(), name);
+ readMoreEntries(consumer);
+ return true;
+ }
+ return false;
+ }
+
private static final Logger log =
LoggerFactory.getLogger(PersistentDispatcherSingleActiveConsumer.class);
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index a11e875..7c5681f 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -1092,5 +1092,9 @@ public class PersistentSubscription implements
Subscription {
return
this.pendingAckHandle.checkIsCanDeleteConsumerPendingAck(position);
}
+ public boolean checkAndUnblockIfStuck() {
+ return dispatcher.checkAndUnblockIfStuck();
+ }
+
private static final Logger log =
LoggerFactory.getLogger(PersistentSubscription.class);
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 0951ec5..a7391d5 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1621,6 +1621,10 @@ public class PersistentTopic extends AbstractTopic
topicStatsHelper.aggMsgRateOut += subMsgRateOut;
topicStatsHelper.aggMsgThroughputOut += subMsgThroughputOut;
nsStats.msgBacklog +=
subscription.getNumberOfEntriesInBacklog(false);
+ // check stuck subscription
+ if
(brokerService.getPulsar().getConfig().isUnblockStuckSubscriptionEnabled()) {
+ subscription.checkAndUnblockIfStuck();
+ }
} catch (Exception e) {
log.error("Got exception when creating consumer stats for
subscription {}: {}", subscriptionName,
e.getMessage(), e);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
index 9961f3a..278e05b 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
@@ -18,20 +18,29 @@
*/
package org.apache.pulsar.broker.service.persistent;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
+import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
import java.lang.reflect.Field;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.pulsar.broker.service.BrokerTestBase;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.TopicName;
import org.testng.annotations.AfterMethod;
@@ -84,4 +93,67 @@ public class PersistentTopicTest extends BrokerTestBase {
producer.close();
}
+
+ /**
+ * Test validates if topic's dispatcher is stuck then broker can doscover
and unblock it.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testUnblockStuckSubscription() throws Exception {
+ final String topicName =
"persistent://prop/ns-abc/stuckSubscriptionTopic";
+ final String sharedSubName = "shared";
+ final String failoverSubName = "failOver";
+
+ Consumer<String> consumer1 =
pulsarClient.newConsumer(Schema.STRING).topic(topicName)
+
.subscriptionType(SubscriptionType.Shared).subscriptionName(sharedSubName).subscribe();
+ Consumer<String> consumer2 =
pulsarClient.newConsumer(Schema.STRING).topic(topicName)
+
.subscriptionType(SubscriptionType.Failover).subscriptionName(failoverSubName).subscribe();
+ Producer<String> producer =
pulsarClient.newProducer(Schema.STRING).topic(topicName).create();
+
+ PersistentTopic topic = (PersistentTopic)
pulsar.getBrokerService().getTopicReference(topicName).get();
+ PersistentSubscription sharedSub =
topic.getSubscription(sharedSubName);
+ PersistentSubscription failOverSub =
topic.getSubscription(failoverSubName);
+
+ PersistentDispatcherMultipleConsumers sharedDispatcher =
(PersistentDispatcherMultipleConsumers) sharedSub
+ .getDispatcher();
+ PersistentDispatcherSingleActiveConsumer failOverDispatcher =
(PersistentDispatcherSingleActiveConsumer) failOverSub
+ .getDispatcher();
+
+ // build backlog
+ consumer1.close();
+ consumer2.close();
+
+ // block sub to read messages
+ sharedDispatcher.havePendingRead = true;
+ failOverDispatcher.havePendingRead = true;
+
+ producer.newMessage().value("test").eventTime(5).send();
+ producer.newMessage().value("test").eventTime(5).send();
+
+ consumer1 =
pulsarClient.newConsumer(Schema.STRING).topic(topicName).subscriptionType(SubscriptionType.Shared)
+ .subscriptionName(sharedSubName).subscribe();
+ consumer2 =
pulsarClient.newConsumer(Schema.STRING).topic(topicName).subscriptionType(SubscriptionType.Failover)
+ .subscriptionName(failoverSubName).subscribe();
+ Message<String> msg = consumer1.receive(2, TimeUnit.SECONDS);
+ assertNull(msg);
+ msg = consumer2.receive(2, TimeUnit.SECONDS);
+ assertNull(msg);
+
+ // allow reads but dispatchers are still blocked
+ sharedDispatcher.havePendingRead = false;
+ failOverDispatcher.havePendingRead = false;
+
+ // run task to unblock stuck dispatcher: first iteration sets the
lastReadPosition and next iteration will
+ // unblock the dispatcher read because read-position has not been
moved since last iteration.
+ sharedSub.checkAndUnblockIfStuck();
+ failOverDispatcher.checkAndUnblockIfStuck();
+ assertTrue(sharedSub.checkAndUnblockIfStuck());
+ assertTrue(failOverDispatcher.checkAndUnblockIfStuck());
+
+ msg = consumer1.receive(5, TimeUnit.SECONDS);
+ assertNotNull(msg);
+ msg = consumer2.receive(5, TimeUnit.SECONDS);
+ assertNotNull(msg);
+ }
}
diff --git a/site2/docs/reference-configuration.md
b/site2/docs/reference-configuration.md
index 8308f62..117e30d 100644
--- a/site2/docs/reference-configuration.md
+++ b/site2/docs/reference-configuration.md
@@ -484,6 +484,7 @@ You can set the log level and configuration in the
[log4j2.yaml](https://github
|maxUnackedMessagesPerSubscription| The same as above, except per subscription
rather than per consumer. |200000|
| maxUnackedMessagesPerBroker | Maximum number of unacknowledged messages
allowed per broker. Once this limit reaches, the broker stops dispatching
messages to all shared subscriptions which has a higher number of
unacknowledged messages until subscriptions start acknowledging messages back
and unacknowledged messages count reaches to limit/2. When the value is set to
0, unacknowledged message limit check is disabled and broker does not block
dispatchers. | 0 |
| maxUnackedMessagesPerSubscriptionOnBrokerBlocked | Once the broker reaches
maxUnackedMessagesPerBroker limit, it blocks subscriptions which have higher
unacknowledged messages than this percentage limit and subscription does not
receive any new messages until that subscription acknowledges messages back. |
0.16 |
+| unblockStuckSubscriptionEnabled|Broker periodically checks if subscription
is stuck and unblock if flag is enabled.|false|
|maxNumPartitionsPerPartitionedTopic|Max number of partitions per partitioned
topic. Use 0 or negative number to disable the check|0|
|zookeeperSessionExpiredPolicy|There are two policies when ZooKeeper session
expired happens, "shutdown" and "reconnect". If it is set to "shutdown" policy,
when ZooKeeper session expired happens, the broker is shutdown. If it is set to
"reconnect" policy, the broker tries to reconnect to ZooKeeper server and
re-register metadata to ZooKeeper. Note: the "reconnect" policy is an
experiment feature.|shutdown|
| topicPublisherThrottlingTickTimeMillis | Tick time to schedule task that
checks topic publish rate limiting across all topics. A lower value can improve
accuracy while throttling publish but it uses more CPU to perform frequent
check. (Disable publish throttling with value 0) | 10|