This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 8cfaf48 Add streaming dispatcher. (#9056)
8cfaf48 is described below
commit 8cfaf48e1dc97e30050cb29c106b614d6b9bc69c
Author: Marvin Cai <[email protected]>
AuthorDate: Mon Feb 1 20:02:50 2021 -0800
Add streaming dispatcher. (#9056)
Related to #3804
### Motivation
Trying to streamline the dispatcher's read requests to manager ledger
instead of micro batch.
### Modifications
Created a StreamingEntryReader that can streamline read request to managed
ledger.
Created StreamingDispatcher interface with necessary method to interact
with StreamingEntryReader.
Created PersistentStreamingDispatcherSingleActive/MultipleConsumer that
make use of StreamingEntryReader to read entries from managed ledger.
Add config to use streaming dispatcher.
---
build/run_unit_group.sh | 5 +
.../bookkeeper/mledger/WaitingEntryCallBack.java | 30 ++
.../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 26 +-
.../apache/bookkeeper/mledger/impl/OpAddEntry.java | 2 +
.../apache/pulsar/broker/ServiceConfiguration.java | 7 +
.../AbstractDispatcherMultipleConsumers.java | 2 +
.../PersistentDispatcherMultipleConsumers.java | 185 +++++----
.../PersistentDispatcherSingleActiveConsumer.java | 276 ++++++-------
...istentStreamingDispatcherMultipleConsumers.java | 191 +++++++++
...entStreamingDispatcherSingleActiveConsumer.java | 208 ++++++++++
.../service/persistent/PersistentSubscription.java | 19 +-
.../streamingdispatch/PendingReadEntryRequest.java | 76 ++++
.../streamingdispatch/StreamingDispatcher.java | 53 +++
.../streamingdispatch/StreamingEntryReader.java | 338 ++++++++++++++++
.../service/streamingdispatch/package-info.java | 19 +
.../PersistentDispatcherFailoverConsumerTest.java | 5 +-
.../broker/service/PersistentFailoverE2ETest.java | 5 +
.../pulsar/broker/service/PersistentTopicTest.java | 4 +-
...herFailoverConsumerStreamingDispatcherTest.java | 35 ++
...rsistentFailoverStreamingDispatcherE2ETest.java | 36 ++
...istentStreamingDispatcherBlockConsumerTest.java | 35 ++
...eDispatchStreamingDispatcherThrottlingTest.java | 36 ++
.../PersistentTopicStreamingDispatcherE2ETest.java | 35 ++
.../PersistentTopicStreamingDispatcherTest.java | 35 ++
...roducerConsumerTestStreamingDispatcherTest.java | 35 ++
.../StreamingEntryReaderTests.java | 433 +++++++++++++++++++++
.../client/api/DispatcherBlockConsumerTest.java | 1 +
.../SubscriptionMessageDispatchThrottlingTest.java | 9 +-
site2/docs/reference-configuration.md | 1 +
29 files changed, 1905 insertions(+), 237 deletions(-)
diff --git a/build/run_unit_group.sh b/build/run_unit_group.sh
index aed0ca4..b4e5530 100755
--- a/build/run_unit_group.sh
+++ b/build/run_unit_group.sh
@@ -56,12 +56,17 @@ function broker_group_2() {
-DtestForkCount=1 \
-DtestReuseFork=true
+ $MVN_TEST_COMMAND -pl pulsar-broker
-Dinclude="**/*StreamingDispatcher*Test.java" \
+ -DtestForkCount=1 \
+ -DtestReuseFork=true
+
$MVN_TEST_COMMAND -pl pulsar-broker
-Dinclude="org/apache/pulsar/broker/zookeeper/**/*.java,
org/apache/pulsar/broker/loadbalance/**/*.java,
org/apache/pulsar/broker/service/**/*.java" \
-Dexclude="**/ReplicatorTest.java,
**/MessagePublishBufferThrottleTest.java,
**/TopicOwnerTest.java,
+
**/*StreamingDispatcher*Test.java,
**/AntiAffinityNamespaceGroupTest.java"
}
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/WaitingEntryCallBack.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/WaitingEntryCallBack.java
new file mode 100644
index 0000000..fc1f3b4
--- /dev/null
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/WaitingEntryCallBack.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger;
+
+/**
+ * Contains callback that can be registered with {@link ManagedLedger} to wait
for new entries to be available.
+ */
+public interface WaitingEntryCallBack {
+
+ /**
+ * The callback {@link ManagedLedger} will trigger when new entries are
available.
+ */
+ void entriesAvailable();
+}
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index a36b1c4..232bf5b 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -110,6 +110,7 @@ import
org.apache.bookkeeper.mledger.ManagedLedgerException.NonRecoverableLedger
import
org.apache.bookkeeper.mledger.ManagedLedgerException.TooManyRequestsException;
import org.apache.bookkeeper.mledger.ManagedLedgerMXBean;
import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.WaitingEntryCallBack;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.VoidCallback;
import org.apache.bookkeeper.mledger.impl.MetaStore.MetaStoreCallback;
import org.apache.bookkeeper.mledger.intercept.ManagedLedgerInterceptor;
@@ -170,6 +171,9 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
// Cursors that are waiting to be notified when new entries are persisted
final ConcurrentLinkedQueue<ManagedCursorImpl> waitingCursors;
+ // Objects that are waiting to be notified when new entries are persisted
+ final ConcurrentLinkedQueue<WaitingEntryCallBack> waitingEntryCallBacks;
+
// This map is used for concurrent open cursor requests, where the 2nd
request will attach a listener to the
// uninitialized cursor future from the 1st request
final Map<String, CompletableFuture<ManagedCursor>> uninitializedCursors;
@@ -290,6 +294,7 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
this.mbean = new ManagedLedgerMBeanImpl(this);
this.entryCache = factory.getEntryCacheManager().getEntryCache(this);
this.waitingCursors = Queues.newConcurrentLinkedQueue();
+ this.waitingEntryCallBacks = Queues.newConcurrentLinkedQueue();
this.uninitializedCursors = Maps.newHashMap();
this.clock = config.getClock();
@@ -2109,6 +2114,21 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
}
}
+ void notifyWaitingEntryCallBacks() {
+ while (true) {
+ final WaitingEntryCallBack cb = waitingEntryCallBacks.poll();
+ if (cb == null) {
+ break;
+ }
+
+ executor.execute(safeRun(cb::entriesAvailable));
+ }
+ }
+
+ public void addWaitingEntryCallBack(WaitingEntryCallBack cb) {
+ this.waitingEntryCallBacks.add(cb);
+ }
+
private void trimConsumedLedgersInBackground() {
trimConsumedLedgersInBackground(Futures.NULL_PROMISE);
}
@@ -3086,7 +3106,7 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
* the position to validate
* @return true if the position is valid, false otherwise
*/
- boolean isValidPosition(PositionImpl position) {
+ public boolean isValidPosition(PositionImpl position) {
PositionImpl last = lastConfirmedEntry;
if (log.isDebugEnabled()) {
log.debug("IsValid position: {} -- last: {}", position, last);
@@ -3130,7 +3150,9 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
next = getNextValidPositionInternal(position);
} catch (NullPointerException e) {
next = lastConfirmedEntry.getNext();
- log.error("[{}] Can't find next valid position, fail back to the
next position of the last position.", name, e);
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Can't find next valid position, fall back to
the next position of the last position.", name, e);
+ }
}
return next;
}
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
index 742ed1e..fc3054e 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
@@ -207,6 +207,7 @@ public class OpAddEntry extends SafeRunnable implements
AddCallback, CloseCallba
cb.addComplete(lastEntry, data.asReadOnly(), ctx);
ReferenceCountUtil.release(data);
ml.notifyCursors();
+ ml.notifyWaitingEntryCallBacks();
this.recycle();
} else {
ReferenceCountUtil.release(data);
@@ -232,6 +233,7 @@ public class OpAddEntry extends SafeRunnable implements
AddCallback, CloseCallba
if (cb != null) {
cb.addComplete(PositionImpl.get(lh.getId(), entryId), null, ctx);
ml.notifyCursors();
+ ml.notifyWaitingEntryCallBacks();
this.recycle();
}
}
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 0d38279..44381bd 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -686,6 +686,13 @@ public class ServiceConfiguration implements
PulsarConfiguration {
private boolean preciseDispatcherFlowControl = false;
@FieldContext(
+ category = CATEGORY_SERVER,
+ doc = "Whether to use streaming read dispatcher. Currently is in
preview and can be changed " +
+ "in subsequent release."
+ )
+ private boolean streamingDispatch = false;
+
+ @FieldContext(
dynamic = true,
category = CATEGORY_SERVER,
doc = "Max number of concurrent lookup request broker allows to
throttle heavy incoming lookup traffic")
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherMultipleConsumers.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherMultipleConsumers.java
index 7cd9e03..2f6b9a6 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherMultipleConsumers.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherMultipleConsumers.java
@@ -72,6 +72,8 @@ public abstract class AbstractDispatcherMultipleConsumers
extends AbstractBaseDi
public abstract boolean isConsumerAvailable(Consumer consumer);
+ protected void cancelPendingRead() {}
+
/**
* <pre>
* Broker gives more priority while dispatching messages. Here, broker
follows descending priorities. (eg:
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 26f328d..2816f9c 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -80,14 +80,14 @@ public class PersistentDispatcherMultipleConsumers extends
AbstractDispatcherMul
protected volatile Range<PositionImpl>
lastIndividualDeletedRangeFromCursorRecovery;
private CompletableFuture<Void> closeFuture = null;
- LongPairSet messagesToRedeliver = new ConcurrentSortedLongPairSet(128, 2);
+ protected LongPairSet messagesToRedeliver = new
ConcurrentSortedLongPairSet(128, 2);
protected final RedeliveryTracker redeliveryTracker;
private Optional<DelayedDeliveryTracker> delayedDeliveryTracker =
Optional.empty();
- private volatile boolean havePendingRead = false;
- private volatile boolean havePendingReplayRead = false;
- private boolean shouldRewindBeforeReadingOrReplaying = false;
+ protected volatile boolean havePendingRead = false;
+ protected volatile boolean havePendingReplayRead = false;
+ protected boolean shouldRewindBeforeReadingOrReplaying = false;
protected final String name;
protected static final
AtomicIntegerFieldUpdater<PersistentDispatcherMultipleConsumers>
@@ -95,23 +95,23 @@ public class PersistentDispatcherMultipleConsumers extends
AbstractDispatcherMul
AtomicIntegerFieldUpdater.newUpdater(PersistentDispatcherMultipleConsumers.class,
"totalAvailablePermits");
protected volatile int totalAvailablePermits = 0;
- private volatile int readBatchSize;
- private final Backoff readFailureBackoff = new Backoff(15,
TimeUnit.SECONDS,
+ protected volatile int readBatchSize;
+ protected final Backoff readFailureBackoff = new Backoff(15,
TimeUnit.SECONDS,
1, TimeUnit.MINUTES, 0, TimeUnit.MILLISECONDS);
private static final
AtomicIntegerFieldUpdater<PersistentDispatcherMultipleConsumers>
TOTAL_UNACKED_MESSAGES_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(PersistentDispatcherMultipleConsumers.class,
"totalUnackedMessages");
- private volatile int totalUnackedMessages = 0;
+ protected volatile int totalUnackedMessages = 0;
private volatile int blockedDispatcherOnUnackedMsgs = FALSE;
- private static final
AtomicIntegerFieldUpdater<PersistentDispatcherMultipleConsumers>
+ protected static final
AtomicIntegerFieldUpdater<PersistentDispatcherMultipleConsumers>
BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(PersistentDispatcherMultipleConsumers.class,
"blockedDispatcherOnUnackedMsgs");
protected final ServiceConfiguration serviceConfig;
protected Optional<DispatchRateLimiter> dispatchRateLimiter =
Optional.empty();
- enum ReadType {
+ protected enum ReadType {
Normal, Replay
}
@@ -199,9 +199,7 @@ public class PersistentDispatcherMultipleConsumers extends
AbstractDispatcherMul
consumerList.remove(consumer);
log.info("Removed consumer {} with pending {} acks", consumer,
consumer.getPendingAcks().size());
if (consumerList.isEmpty()) {
- if (havePendingRead && cursor.cancelPendingReadRequest()) {
- havePendingRead = false;
- }
+ cancelPendingRead();
messagesToRedeliver.clear();
redeliveryTracker.clear();
@@ -248,81 +246,13 @@ public class PersistentDispatcherMultipleConsumers
extends AbstractDispatcherMul
// totalAvailablePermits may be updated by other threads
int currentTotalAvailablePermits = totalAvailablePermits;
if (currentTotalAvailablePermits > 0 &&
isAtleastOneConsumerAvailable()) {
- int messagesToRead = Math.min(currentTotalAvailablePermits,
readBatchSize);
-
- Consumer c = getRandomConsumer();
- // if turn on precise dispatcher flow control, adjust the record
to read
- if (c != null && c.isPreciseDispatcherFlowControl()) {
- messagesToRead = Math.min(
- (int) Math.ceil(currentTotalAvailablePermits * 1.0 /
c.getAvgMessagesPerEntry()),
- readBatchSize);
- }
-
- if (!isConsumerWritable()) {
- // If the connection is not currently writable, we issue the
read request anyway, but for a single
- // message. The intent here is to keep use the request as a
notification mechanism while avoiding to
- // read and dispatch a big batch of messages which will need
to wait before getting written to the
- // socket.
- messagesToRead = 1;
- }
-
- // throttle only if: (1) cursor is not active (or flag for
throttle-nonBacklogConsumer is enabled) bcz
- // active-cursor reads message from cache rather from bookkeeper
(2) if topic has reached message-rate
- // threshold: then schedule the read after MESSAGE_RATE_BACKOFF_MS
- if
(serviceConfig.isDispatchThrottlingOnNonBacklogConsumerEnabled() ||
!cursor.isActive()) {
- if (topic.getDispatchRateLimiter().isPresent()
- &&
topic.getDispatchRateLimiter().get().isDispatchRateLimitingEnabled()) {
- DispatchRateLimiter topicRateLimiter =
topic.getDispatchRateLimiter().get();
- if (!topicRateLimiter.hasMessageDispatchPermit()) {
- if (log.isDebugEnabled()) {
- log.debug("[{}] message-read exceeded topic
message-rate {}/{}, schedule after a {}", name,
- topicRateLimiter.getDispatchRateOnMsg(),
topicRateLimiter.getDispatchRateOnByte(),
- MESSAGE_RATE_BACKOFF_MS);
- }
- topic.getBrokerService().executor().schedule(() ->
readMoreEntries(), MESSAGE_RATE_BACKOFF_MS,
- TimeUnit.MILLISECONDS);
- return;
- } else {
- // if dispatch-rate is in msg then read only msg
according to available permit
- long availablePermitsOnMsg =
topicRateLimiter.getAvailableDispatchRateLimitOnMsg();
- if (availablePermitsOnMsg > 0) {
- messagesToRead = Math.min(messagesToRead, (int)
availablePermitsOnMsg);
- }
- }
- }
-
- if (dispatchRateLimiter.isPresent() &&
dispatchRateLimiter.get().isDispatchRateLimitingEnabled()) {
- if (!dispatchRateLimiter.get().hasMessageDispatchPermit())
{
- if (log.isDebugEnabled()) {
- log.debug("[{}] message-read exceeded subscription
message-rate {}/{},"
- + " schedule after a {}", name,
-
dispatchRateLimiter.get().getDispatchRateOnMsg(),
-
dispatchRateLimiter.get().getDispatchRateOnByte(),
- MESSAGE_RATE_BACKOFF_MS);
- }
- topic.getBrokerService().executor().schedule(() ->
readMoreEntries(), MESSAGE_RATE_BACKOFF_MS,
- TimeUnit.MILLISECONDS);
- return;
- } else {
- // if dispatch-rate is in msg then read only msg
according to available permit
- long availablePermitsOnMsg =
dispatchRateLimiter.get().getAvailableDispatchRateLimitOnMsg();
- if (availablePermitsOnMsg > 0) {
- messagesToRead = Math.min(messagesToRead, (int)
availablePermitsOnMsg);
- }
- }
- }
+ int messagesToRead =
calculateNumOfMessageToRead(currentTotalAvailablePermits);
- }
-
- if (havePendingReplayRead) {
- if (log.isDebugEnabled()) {
- log.debug("[{}] Skipping replay while awaiting previous
read to complete", name);
- }
+ if (-1 == messagesToRead) {
+ // Skip read as topic/dispatcher has exceed the dispatch rate
or previous pending read hasn't complete.
return;
}
- // If messagesToRead is 0 or less, correct it to 1 to prevent
IllegalArgumentException
- messagesToRead = Math.max(messagesToRead, 1);
Set<PositionImpl> messagesToReplayNow =
getMessagesToReplayNow(messagesToRead);
if (!messagesToReplayNow.isEmpty()) {
@@ -366,6 +296,84 @@ public class PersistentDispatcherMultipleConsumers extends
AbstractDispatcherMul
}
}
+ protected int calculateNumOfMessageToRead(int
currentTotalAvailablePermits) {
+ int messagesToRead = Math.min(currentTotalAvailablePermits,
readBatchSize);
+
+ Consumer c = getRandomConsumer();
+ // if turn on precise dispatcher flow control, adjust the record to
read
+ if (c != null && c.isPreciseDispatcherFlowControl()) {
+ messagesToRead = Math.min(
+ (int) Math.ceil(currentTotalAvailablePermits * 1.0 /
c.getAvgMessagesPerEntry()),
+ readBatchSize);
+ }
+
+ if (!isConsumerWritable()) {
+ // If the connection is not currently writable, we issue the read
request anyway, but for a single
+ // message. The intent here is to keep use the request as a
notification mechanism while avoiding to
+ // read and dispatch a big batch of messages which will need to
wait before getting written to the
+ // socket.
+ messagesToRead = 1;
+ }
+
+ // throttle only if: (1) cursor is not active (or flag for
throttle-nonBacklogConsumer is enabled) bcz
+ // active-cursor reads message from cache rather from bookkeeper (2)
if topic has reached message-rate
+ // threshold: then schedule the read after MESSAGE_RATE_BACKOFF_MS
+ if (serviceConfig.isDispatchThrottlingOnNonBacklogConsumerEnabled() ||
!cursor.isActive()) {
+ if (topic.getDispatchRateLimiter().isPresent()
+ &&
topic.getDispatchRateLimiter().get().isDispatchRateLimitingEnabled()) {
+ DispatchRateLimiter topicRateLimiter =
topic.getDispatchRateLimiter().get();
+ if (!topicRateLimiter.hasMessageDispatchPermit()) {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] message-read exceeded topic
message-rate {}/{}, schedule after a {}", name,
+ topicRateLimiter.getDispatchRateOnMsg(),
topicRateLimiter.getDispatchRateOnByte(),
+ MESSAGE_RATE_BACKOFF_MS);
+ }
+ topic.getBrokerService().executor().schedule(() ->
readMoreEntries(), MESSAGE_RATE_BACKOFF_MS,
+ TimeUnit.MILLISECONDS);
+ return -1;
+ } else {
+ // if dispatch-rate is in msg then read only msg according
to available permit
+ long availablePermitsOnMsg =
topicRateLimiter.getAvailableDispatchRateLimitOnMsg();
+ if (availablePermitsOnMsg > 0) {
+ messagesToRead = Math.min(messagesToRead, (int)
availablePermitsOnMsg);
+ }
+ }
+ }
+
+ if (dispatchRateLimiter.isPresent() &&
dispatchRateLimiter.get().isDispatchRateLimitingEnabled()) {
+ if (!dispatchRateLimiter.get().hasMessageDispatchPermit()) {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] message-read exceeded subscription
message-rate {}/{},"
+ + " schedule after a {}", name,
+
dispatchRateLimiter.get().getDispatchRateOnMsg(),
+
dispatchRateLimiter.get().getDispatchRateOnByte(),
+ MESSAGE_RATE_BACKOFF_MS);
+ }
+ topic.getBrokerService().executor().schedule(() ->
readMoreEntries(), MESSAGE_RATE_BACKOFF_MS,
+ TimeUnit.MILLISECONDS);
+ return -1;
+ } else {
+ // if dispatch-rate is in msg then read only msg according
to available permit
+ long availablePermitsOnMsg =
dispatchRateLimiter.get().getAvailableDispatchRateLimitOnMsg();
+ if (availablePermitsOnMsg > 0) {
+ messagesToRead = Math.min(messagesToRead, (int)
availablePermitsOnMsg);
+ }
+ }
+ }
+
+ }
+
+ if (havePendingReplayRead) {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Skipping replay while awaiting previous read
to complete", name);
+ }
+ return -1;
+ }
+
+ // If messagesToRead is 0 or less, correct it to 1 to prevent
IllegalArgumentException
+ return Math.max(messagesToRead, 1);
+ }
+
protected Set<? extends Position> asyncReplayEntries(Set<? extends
Position> positions) {
return cursor.asyncReplayEntries(positions, this, ReadType.Replay);
}
@@ -413,14 +421,19 @@ public class PersistentDispatcherMultipleConsumers
extends AbstractDispatcherMul
closeFuture.complete(null);
} else {
consumerList.forEach(consumer ->
consumer.disconnect(isResetCursor));
- if (havePendingRead && cursor.cancelPendingReadRequest()) {
- havePendingRead = false;
- }
+ cancelPendingRead();
}
return closeFuture;
}
@Override
+ protected void cancelPendingRead() {
+ if (havePendingRead && cursor.cancelPendingReadRequest()) {
+ havePendingRead = false;
+ }
+ }
+
+ @Override
public CompletableFuture<Void> disconnectActiveConsumers(boolean
isResetCursor) {
return disconnectAllConsumers(isResetCursor);
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
index 5501a81..4bc0728 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
@@ -58,20 +58,20 @@ import org.apache.pulsar.common.util.Codec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public final class PersistentDispatcherSingleActiveConsumer extends
AbstractDispatcherSingleActiveConsumer
+public class PersistentDispatcherSingleActiveConsumer extends
AbstractDispatcherSingleActiveConsumer
implements Dispatcher, ReadEntriesCallback {
- private final PersistentTopic topic;
- private final ManagedCursor cursor;
- private final String name;
+ protected final PersistentTopic topic;
+ protected final ManagedCursor cursor;
+ protected final String name;
private Optional<DispatchRateLimiter> dispatchRateLimiter =
Optional.empty();
- private volatile boolean havePendingRead = false;
+ protected volatile boolean havePendingRead = false;
- private volatile int readBatchSize;
- private final Backoff readFailureBackoff = new Backoff(15,
TimeUnit.SECONDS,
+ protected volatile int readBatchSize;
+ protected final Backoff readFailureBackoff = new Backoff(15,
TimeUnit.SECONDS,
1, TimeUnit.MINUTES, 0, TimeUnit.MILLISECONDS);
- private final ServiceConfiguration serviceConfig;
+ protected final ServiceConfiguration serviceConfig;
private volatile ScheduledFuture<?> readOnActiveConsumerTask = null;
private final RedeliveryTracker redeliveryTracker;
@@ -90,9 +90,7 @@ public final class PersistentDispatcherSingleActiveConsumer
extends AbstractDisp
}
protected void scheduleReadOnActiveConsumer() {
- if (havePendingRead && cursor.cancelPendingReadRequest()) {
- havePendingRead = false;
- }
+ cancelPendingRead();
if (havePendingRead) {
return;
@@ -166,6 +164,7 @@ public final class PersistentDispatcherSingleActiveConsumer
extends AbstractDisp
return false;
}
+ @Override
protected void cancelPendingRead() {
if (havePendingRead && cursor.cancelPendingReadRequest()) {
havePendingRead = false;
@@ -230,45 +229,48 @@ public final class
PersistentDispatcherSingleActiveConsumer extends AbstractDisp
SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal();
EntryBatchIndexesAcks batchIndexesAcks =
EntryBatchIndexesAcks.get(entries.size());
filterEntriesForConsumer(entries, batchSizes, sendMessageInfo,
batchIndexesAcks, cursor, false);
+ dispatchEntriesToConsumer(currentConsumer, entries, batchSizes,
batchIndexesAcks, sendMessageInfo);
+ }
+ }
- int totalMessages = sendMessageInfo.getTotalMessages();
- long totalBytes = sendMessageInfo.getTotalBytes();
-
- currentConsumer
- .sendMessages(entries, batchSizes, batchIndexesAcks,
sendMessageInfo.getTotalMessages(),
- sendMessageInfo.getTotalBytes(),
sendMessageInfo.getTotalChunkedMessages(),
- redeliveryTracker)
- .addListener(future -> {
- if (future.isSuccess()) {
- // acquire message-dispatch permits for already
delivered messages
- if
(serviceConfig.isDispatchThrottlingOnNonBacklogConsumerEnabled() ||
!cursor.isActive()) {
- if
(topic.getDispatchRateLimiter().isPresent()) {
-
topic.getDispatchRateLimiter().get().tryDispatchPermit(totalMessages,
totalBytes);
- }
+ protected void dispatchEntriesToConsumer(Consumer currentConsumer,
List<Entry> entries,
+ EntryBatchSizes batchSizes,
EntryBatchIndexesAcks batchIndexesAcks,
+ SendMessageInfo sendMessageInfo) {
+ currentConsumer
+ .sendMessages(entries, batchSizes, batchIndexesAcks,
sendMessageInfo.getTotalMessages(),
+ sendMessageInfo.getTotalBytes(),
sendMessageInfo.getTotalChunkedMessages(),
+ redeliveryTracker)
+ .addListener(future -> {
+ if (future.isSuccess()) {
+ // acquire message-dispatch permits for already delivered
messages
+ if
(serviceConfig.isDispatchThrottlingOnNonBacklogConsumerEnabled() ||
!cursor.isActive()) {
+ if (topic.getDispatchRateLimiter().isPresent()) {
+
topic.getDispatchRateLimiter().get().tryDispatchPermit(sendMessageInfo.getTotalMessages(),
+ sendMessageInfo.getTotalBytes());
+ }
- dispatchRateLimiter.ifPresent(rateLimiter ->
-
rateLimiter.tryDispatchPermit(totalMessages, totalBytes));
- }
+ dispatchRateLimiter.ifPresent(rateLimiter ->
+
rateLimiter.tryDispatchPermit(sendMessageInfo.getTotalMessages(),
+ sendMessageInfo.getTotalBytes()));
+ }
- // Schedule a new read batch operation only after
the previous batch has been written to the
- // socket
-
topic.getBrokerService().getTopicOrderedExecutor().executeOrdered(topicName,
- SafeRun.safeRun(() -> {
- synchronized
(PersistentDispatcherSingleActiveConsumer.this) {
- Consumer newConsumer =
getActiveConsumer();
- if (newConsumer != null &&
!havePendingRead) {
- readMoreEntries(newConsumer);
- } else {
- log.debug(
- "[{}-{}] Ignoring
write future complete."
- + "
consumerAvailable={} havePendingRead={}",
- name, newConsumer,
newConsumer != null, havePendingRead);
- }
- }
- }));
- }
- });
- }
+ // Schedule a new read batch operation only after the
previous batch has been written to the socket.
+
topic.getBrokerService().getTopicOrderedExecutor().executeOrdered(topicName,
+ SafeRun.safeRun(() -> {
+ synchronized
(PersistentDispatcherSingleActiveConsumer.this) {
+ Consumer newConsumer = getActiveConsumer();
+ if (newConsumer != null && !havePendingRead) {
+ readMoreEntries(newConsumer);
+ } else {
+ log.debug(
+ "[{}-{}] Ignoring write future
complete."
+ + " consumerAvailable={}
havePendingRead={}",
+ name, newConsumer, newConsumer !=
null, havePendingRead);
+ }
+ }
+ }));
+ }
+ });
}
@Override
@@ -322,9 +324,7 @@ public final class PersistentDispatcherSingleActiveConsumer
extends AbstractDisp
return;
}
- if (havePendingRead && cursor.cancelPendingReadRequest()) {
- havePendingRead = false;
- }
+ cancelPendingRead();
if (!havePendingRead) {
cursor.rewind();
@@ -354,93 +354,14 @@ public final class
PersistentDispatcherSingleActiveConsumer extends AbstractDisp
return;
}
- int availablePermits = consumer.getAvailablePermits();
-
- if (availablePermits > 0) {
- if (!consumer.isWritable()) {
- // If the connection is not currently writable, we issue the
read request anyway, but for a single
- // message. The intent here is to keep use the request as a
notification mechanism while avoiding to
- // read and dispatch a big batch of messages which will need
to wait before getting written to the
- // socket.
- availablePermits = 1;
- }
+ if (consumer.getAvailablePermits() > 0) {
+ int messagesToRead = calculateNumOfMessageToRead(consumer);
- int messagesToRead = Math.min(availablePermits, readBatchSize);
- // if turn of precise dispatcher flow control, adjust the records
to read
- if (consumer.isPreciseDispatcherFlowControl()) {
- int avgMessagesPerEntry = consumer.getAvgMessagesPerEntry();
- messagesToRead = Math.min((int) Math.ceil(availablePermits *
1.0 / avgMessagesPerEntry), readBatchSize);
+ if (-1 == messagesToRead) {
+ // Skip read as topic/dispatcher has exceed the dispatch rate.
+ return;
}
- // throttle only if: (1) cursor is not active (or flag for
throttle-nonBacklogConsumer is enabled) bcz
- // active-cursor reads message from cache rather from bookkeeper
(2) if topic has reached message-rate
- // threshold: then schedule the read after MESSAGE_RATE_BACKOFF_MS
- if
(serviceConfig.isDispatchThrottlingOnNonBacklogConsumerEnabled() ||
!cursor.isActive()) {
- if (topic.getDispatchRateLimiter().isPresent()
- &&
topic.getDispatchRateLimiter().get().isDispatchRateLimitingEnabled()) {
- DispatchRateLimiter topicRateLimiter =
topic.getDispatchRateLimiter().get();
- if (!topicRateLimiter.hasMessageDispatchPermit()) {
- if (log.isDebugEnabled()) {
- log.debug("[{}] message-read exceeded topic
message-rate {}/{}, schedule after a {}", name,
- topicRateLimiter.getDispatchRateOnMsg(),
topicRateLimiter.getDispatchRateOnByte(),
- MESSAGE_RATE_BACKOFF_MS);
- }
- topic.getBrokerService().executor().schedule(() -> {
- Consumer currentConsumer =
ACTIVE_CONSUMER_UPDATER.get(this);
- if (currentConsumer != null && !havePendingRead) {
- readMoreEntries(currentConsumer);
- } else {
- if (log.isDebugEnabled()) {
- log.debug("[{}] Skipping read retry for
topic: Current Consumer {},"
- + " havePendingRead {}",
- topic.getName(), currentConsumer,
havePendingRead);
- }
- }
- }, MESSAGE_RATE_BACKOFF_MS, TimeUnit.MILLISECONDS);
- return;
- } else {
- // if dispatch-rate is in msg then read only msg
according to available permit
- long availablePermitsOnMsg =
topicRateLimiter.getAvailableDispatchRateLimitOnMsg();
- if (availablePermitsOnMsg > 0) {
- messagesToRead = Math.min(messagesToRead, (int)
availablePermitsOnMsg);
- }
- }
- }
-
- if (dispatchRateLimiter.isPresent() &&
dispatchRateLimiter.get().isDispatchRateLimitingEnabled()) {
- if (!dispatchRateLimiter.get().hasMessageDispatchPermit())
{
- if (log.isDebugEnabled()) {
- log.debug("[{}] message-read exceeded subscription
message-rate {}/{},"
- + " schedule after a {}",
- name,
dispatchRateLimiter.get().getDispatchRateOnMsg(),
-
dispatchRateLimiter.get().getDispatchRateOnByte(),
- MESSAGE_RATE_BACKOFF_MS);
- }
- topic.getBrokerService().executor().schedule(() -> {
- Consumer currentConsumer =
ACTIVE_CONSUMER_UPDATER.get(this);
- if (currentConsumer != null && !havePendingRead) {
- readMoreEntries(currentConsumer);
- } else {
- if (log.isDebugEnabled()) {
- log.debug("[{}] Skipping read retry:
Current Consumer {}, havePendingRead {}",
- topic.getName(), currentConsumer,
havePendingRead);
- }
- }
- }, MESSAGE_RATE_BACKOFF_MS, TimeUnit.MILLISECONDS);
- return;
- } else {
- // if dispatch-rate is in msg then read only msg
according to available permit
- long subPermitsOnMsg =
dispatchRateLimiter.get().getAvailableDispatchRateLimitOnMsg();
- if (subPermitsOnMsg > 0) {
- messagesToRead = Math.min(messagesToRead, (int)
subPermitsOnMsg);
- }
- }
- }
- }
-
- // If messagesToRead is 0 or less, correct it to 1 to prevent
IllegalArgumentException
- messagesToRead = Math.max(messagesToRead, 1);
-
// Schedule read
if (log.isDebugEnabled()) {
log.debug("[{}-{}] Schedule read of {} messages", name,
consumer, messagesToRead);
@@ -459,6 +380,93 @@ public final class
PersistentDispatcherSingleActiveConsumer extends AbstractDisp
}
}
+ protected int calculateNumOfMessageToRead(Consumer consumer) {
+ int availablePermits = consumer.getAvailablePermits();
+ if (!consumer.isWritable()) {
+ // If the connection is not currently writable, we issue the read
request anyway, but for a single
+ // message. The intent here is to keep use the request as a
notification mechanism while avoiding to
+ // read and dispatch a big batch of messages which will need to
wait before getting written to the
+ // socket.
+ availablePermits = 1;
+ }
+
+ int messagesToRead = Math.min(availablePermits, readBatchSize);
+ // if turn of precise dispatcher flow control, adjust the records to
read
+ if (consumer.isPreciseDispatcherFlowControl()) {
+ int avgMessagesPerEntry = consumer.getAvgMessagesPerEntry();
+ messagesToRead = Math.min((int) Math.ceil(availablePermits * 1.0 /
avgMessagesPerEntry), readBatchSize);
+ }
+
+ // throttle only if: (1) cursor is not active (or flag for
throttle-nonBacklogConsumer is enabled) bcz
+ // active-cursor reads message from cache rather from bookkeeper (2)
if topic has reached message-rate
+ // threshold: then schedule the read after MESSAGE_RATE_BACKOFF_MS
+ if (serviceConfig.isDispatchThrottlingOnNonBacklogConsumerEnabled() ||
!cursor.isActive()) {
+ if (topic.getDispatchRateLimiter().isPresent()
+ &&
topic.getDispatchRateLimiter().get().isDispatchRateLimitingEnabled()) {
+ DispatchRateLimiter topicRateLimiter =
topic.getDispatchRateLimiter().get();
+ if (!topicRateLimiter.hasMessageDispatchPermit()) {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] message-read exceeded topic
message-rate {}/{}, schedule after a {}", name,
+ topicRateLimiter.getDispatchRateOnMsg(),
topicRateLimiter.getDispatchRateOnByte(),
+ MESSAGE_RATE_BACKOFF_MS);
+ }
+ topic.getBrokerService().executor().schedule(() -> {
+ Consumer currentConsumer =
ACTIVE_CONSUMER_UPDATER.get(this);
+ if (currentConsumer != null && !havePendingRead) {
+ readMoreEntries(currentConsumer);
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Skipping read retry for topic:
Current Consumer {},"
+ + " havePendingRead {}",
+ topic.getName(), currentConsumer,
havePendingRead);
+ }
+ }
+ }, MESSAGE_RATE_BACKOFF_MS, TimeUnit.MILLISECONDS);
+ return -1;
+ } else {
+ // if dispatch-rate is in msg then read only msg according
to available permit
+ long availablePermitsOnMsg =
topicRateLimiter.getAvailableDispatchRateLimitOnMsg();
+ if (availablePermitsOnMsg > 0) {
+ messagesToRead = Math.min(messagesToRead, (int)
availablePermitsOnMsg);
+ }
+ }
+ }
+
+ if (dispatchRateLimiter.isPresent() &&
dispatchRateLimiter.get().isDispatchRateLimitingEnabled()) {
+ if (!dispatchRateLimiter.get().hasMessageDispatchPermit()) {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] message-read exceeded subscription
message-rate {}/{},"
+ + " schedule after a {}",
+ name,
dispatchRateLimiter.get().getDispatchRateOnMsg(),
+
dispatchRateLimiter.get().getDispatchRateOnByte(),
+ MESSAGE_RATE_BACKOFF_MS);
+ }
+ topic.getBrokerService().executor().schedule(() -> {
+ Consumer currentConsumer =
ACTIVE_CONSUMER_UPDATER.get(this);
+ if (currentConsumer != null && !havePendingRead) {
+ readMoreEntries(currentConsumer);
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Skipping read retry: Current
Consumer {}, havePendingRead {}",
+ topic.getName(), currentConsumer,
havePendingRead);
+ }
+ }
+ }, MESSAGE_RATE_BACKOFF_MS, TimeUnit.MILLISECONDS);
+ return -1;
+ } else {
+ // if dispatch-rate is in msg then read only msg according
to available permit
+ long subPermitsOnMsg =
dispatchRateLimiter.get().getAvailableDispatchRateLimitOnMsg();
+ if (subPermitsOnMsg > 0) {
+ messagesToRead = Math.min(messagesToRead, (int)
subPermitsOnMsg);
+ }
+ }
+ }
+ }
+
+ // If messagesToRead is 0 or less, correct it to 1 to prevent
IllegalArgumentException
+ return Math.max(messagesToRead, 1);
+ }
+
@Override
public void readEntriesFailed(ManagedLedgerException exception, Object
ctx) {
topic.getBrokerService().getTopicOrderedExecutor().executeOrdered(topicName,
SafeRun.safeRun(() -> {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherMultipleConsumers.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherMultipleConsumers.java
new file mode 100644
index 0000000..9340e17
--- /dev/null
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherMultipleConsumers.java
@@ -0,0 +1,191 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.persistent;
+
+import com.google.common.collect.Lists;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedCursor;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.bookkeeper.mledger.util.SafeRun;
+import org.apache.pulsar.broker.service.Consumer;
+import org.apache.pulsar.broker.service.Subscription;
+import
org.apache.pulsar.broker.service.streamingdispatch.PendingReadEntryRequest;
+import org.apache.pulsar.broker.service.streamingdispatch.StreamingDispatcher;
+import org.apache.pulsar.broker.service.streamingdispatch.StreamingEntryReader;
+
+/**
+ * A {@link PersistentDispatcherMultipleConsumers} implemented {@link
StreamingDispatcher}.
+ * It'll use {@link StreamingEntryReader} to read new entries instead read as
micro batch from managed ledger.
+ */
+@Slf4j
+public class PersistentStreamingDispatcherMultipleConsumers extends
PersistentDispatcherMultipleConsumers
+ implements StreamingDispatcher {
+
+ private final StreamingEntryReader streamingEntryReader = new
StreamingEntryReader((ManagedCursorImpl) cursor,
+ this, topic);
+
+ public PersistentStreamingDispatcherMultipleConsumers(PersistentTopic
topic, ManagedCursor cursor,
+ Subscription
subscription) {
+ super(topic, cursor, subscription);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public synchronized void readEntryComplete(Entry entry,
PendingReadEntryRequest ctx) {
+
+ ReadType readType = (ReadType) ctx.ctx;
+ if (ctx.isLast()) {
+ readFailureBackoff.reduceToHalf();
+ if (readType == ReadType.Normal) {
+ havePendingRead = false;
+ } else {
+ havePendingReplayRead = false;
+ }
+ }
+
+ if (readBatchSize < serviceConfig.getDispatcherMaxReadBatchSize()) {
+ int newReadBatchSize = Math.min(readBatchSize * 2,
serviceConfig.getDispatcherMaxReadBatchSize());
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Increasing read batch size from {} to {}",
name, readBatchSize, newReadBatchSize);
+ }
+ readBatchSize = newReadBatchSize;
+ }
+
+ if (shouldRewindBeforeReadingOrReplaying && readType ==
ReadType.Normal) {
+ // All consumers got disconnected before the completion of the
read operation
+ entry.release();
+ cursor.rewind();
+ shouldRewindBeforeReadingOrReplaying = false;
+ readMoreEntries();
+ return;
+ }
+
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Distributing a messages to {} consumers", name,
consumerList.size());
+ }
+
+ cursor.seek(((ManagedLedgerImpl) cursor.getManagedLedger())
+ .getNextValidPosition((PositionImpl) entry.getPosition()));
+ sendMessagesToConsumers(readType, Lists.newArrayList(entry));
+ ctx.recycle();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void canReadMoreEntries(boolean withBackoff) {
+ havePendingRead = false;
+ topic.getBrokerService().executor().schedule(() -> {
+
topic.getBrokerService().getTopicOrderedExecutor().executeOrdered(topic.getName(),
SafeRun.safeRun(() -> {
+ synchronized
(PersistentStreamingDispatcherMultipleConsumers.this) {
+ if (!havePendingRead) {
+ log.info("[{}] Scheduling read operation", name);
+ readMoreEntries();
+ } else {
+ log.info("[{}] Skipping read since we have
pendingRead", name);
+ }
+ }
+ }));
+ }, withBackoff
+ ? readFailureBackoff.next() : 0, TimeUnit.MILLISECONDS);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void notifyConsumersEndOfTopic() {
+ if (cursor.getNumberOfEntriesInBacklog(false) == 0) {
+ // Topic has been terminated and there are no more entries to read
+ // Notify the consumer only if all the messages were already
acknowledged
+ consumerList.forEach(Consumer::reachedEndOfTopic);
+ }
+ }
+
+ @Override
+ protected void cancelPendingRead() {
+ if (havePendingRead && streamingEntryReader.cancelReadRequests()) {
+ havePendingRead = false;
+ }
+ }
+
+ @Override
+ public void readMoreEntries() {
+ // totalAvailablePermits may be updated by other threads
+ int currentTotalAvailablePermits = totalAvailablePermits;
+ if (currentTotalAvailablePermits > 0 &&
isAtleastOneConsumerAvailable()) {
+ int messagesToRead =
calculateNumOfMessageToRead(currentTotalAvailablePermits);
+
+ if (-1 == messagesToRead) {
+ // Skip read as topic/dispatcher has exceed the dispatch rate
or previous pending read hasn't complete.
+ return;
+ }
+
+ Set<PositionImpl> messagesToReplayNow =
getMessagesToReplayNow(messagesToRead);
+
+ if (!messagesToReplayNow.isEmpty()) {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Schedule replay of {} messages for {}
consumers", name, messagesToReplayNow.size(),
+ consumerList.size());
+ }
+
+ havePendingReplayRead = true;
+ Set<? extends Position> deletedMessages =
topic.isDelayedDeliveryEnabled()
+ ? asyncReplayEntriesInOrder(messagesToReplayNow) :
asyncReplayEntries(messagesToReplayNow);
+ // clear already acked positions from replay bucket
+
+ deletedMessages.forEach(position ->
messagesToRedeliver.remove(((PositionImpl) position).getLedgerId(),
+ ((PositionImpl) position).getEntryId()));
+ // if all the entries are acked-entries and cleared up from
messagesToRedeliver, try to read
+ // next entries as readCompletedEntries-callback was never
called
+ if ((messagesToReplayNow.size() - deletedMessages.size()) ==
0) {
+ havePendingReplayRead = false;
+ readMoreEntries();
+ }
+ } else if (BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER.get(this) ==
TRUE) {
+ log.warn("[{}] Dispatcher read is blocked due to unackMessages
{} reached to max {}", name,
+ totalUnackedMessages,
topic.getMaxUnackedMessagesOnSubscription());
+ } else if (!havePendingRead) {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Schedule read of {} messages for {}
consumers", name, messagesToRead,
+ consumerList.size());
+ }
+ havePendingRead = true;
+ streamingEntryReader.asyncReadEntries(messagesToRead,
serviceConfig.getDispatcherMaxReadSizeBytes(),
+ ReadType.Normal);
+ } else {
+ log.debug("[{}] Cannot schedule next read until previous one
is done", name);
+ }
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Consumer buffer is full, pause reading", name);
+ }
+ }
+ }
+
+}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherSingleActiveConsumer.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherSingleActiveConsumer.java
new file mode 100644
index 0000000..b4e4ed3
--- /dev/null
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherSingleActiveConsumer.java
@@ -0,0 +1,208 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.persistent;
+
+import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
+import com.google.common.collect.Lists;
+import java.util.concurrent.TimeUnit;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedCursor;
+import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.bookkeeper.mledger.util.SafeRun;
+import org.apache.pulsar.broker.service.Consumer;
+import org.apache.pulsar.broker.service.EntryBatchIndexesAcks;
+import org.apache.pulsar.broker.service.EntryBatchSizes;
+import org.apache.pulsar.broker.service.SendMessageInfo;
+import org.apache.pulsar.broker.service.Subscription;
+import
org.apache.pulsar.broker.service.streamingdispatch.PendingReadEntryRequest;
+import org.apache.pulsar.broker.service.streamingdispatch.StreamingDispatcher;
+import org.apache.pulsar.broker.service.streamingdispatch.StreamingEntryReader;
+import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
+
+/**
+ * A {@link PersistentDispatcherSingleActiveConsumer} implemented {@link
StreamingDispatcher}.
+ * It'll use {@link StreamingEntryReader} to read new entries instead read as
micro batch from managed ledger.
+ */
+@Slf4j
+public class PersistentStreamingDispatcherSingleActiveConsumer extends
PersistentDispatcherSingleActiveConsumer
+ implements StreamingDispatcher {
+
+ private final StreamingEntryReader streamingEntryReader = new
StreamingEntryReader((ManagedCursorImpl) cursor,
+ this, topic);
+
+ public PersistentStreamingDispatcherSingleActiveConsumer(ManagedCursor
cursor, SubType subscriptionType,
+ int
partitionIndex, PersistentTopic topic,
+ Subscription
subscription) {
+ super(cursor, subscriptionType, partitionIndex, topic, subscription);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void canReadMoreEntries(boolean withBackoff) {
+ havePendingRead = false;
+ topic.getBrokerService().executor().schedule(() -> {
+
topic.getBrokerService().getTopicOrderedExecutor().executeOrdered(topicName,
SafeRun.safeRun(() -> {
+ synchronized
(PersistentStreamingDispatcherSingleActiveConsumer.this) {
+ Consumer currentConsumer =
ACTIVE_CONSUMER_UPDATER.get(this);
+ if (currentConsumer != null && !havePendingRead) {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}-{}] Scheduling read ", name,
currentConsumer);
+ }
+ readMoreEntries(currentConsumer);
+ } else {
+ log.info("[{}-{}] Skipping read as we still
havePendingRead {}", name,
+ currentConsumer);
+ }
+ }
+ }));
+ }, withBackoff
+ ? readFailureBackoff.next() : 0, TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ protected void cancelPendingRead() {
+ if (havePendingRead && streamingEntryReader.cancelReadRequests()) {
+ havePendingRead = false;
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public synchronized void notifyConsumersEndOfTopic() {
+ if (cursor.getNumberOfEntriesInBacklog(false) == 0) {
+ // Topic has been terminated and there are no more entries to read
+ // Notify the consumer only if all the messages were already
acknowledged
+ consumers.forEach(Consumer::reachedEndOfTopic);
+ }
+ }
+
+ @Override
+ public String getName() {
+ return name;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void readEntryComplete(Entry entry, PendingReadEntryRequest ctx) {
+
topic.getBrokerService().getTopicOrderedExecutor().executeOrdered(name,
safeRun(() -> {
+ internalReadEntryComplete(entry, ctx);
+ }));
+ }
+
+ public synchronized void internalReadEntryComplete(Entry entry,
PendingReadEntryRequest ctx) {
+ if (ctx.isLast()) {
+ readFailureBackoff.reduceToHalf();
+ havePendingRead = false;
+ }
+
+ if (readBatchSize < serviceConfig.getDispatcherMaxReadBatchSize()) {
+ int newReadBatchSize = Math.min(readBatchSize * 2,
serviceConfig.getDispatcherMaxReadBatchSize());
+ if (log.isDebugEnabled()) {
+ log.debug("[{}-{}] Increasing read batch size from {} to {}",
name,
+ ((Consumer) ctx.ctx).consumerName(), readBatchSize,
newReadBatchSize);
+ }
+ readBatchSize = newReadBatchSize;
+ }
+
+ Consumer currentConsumer = ACTIVE_CONSUMER_UPDATER.get(this);
+
+ if (isKeyHashRangeFiltered) {
+ byte[] key = peekStickyKey(entry.getDataBuffer());
+ Consumer consumer = stickyKeyConsumerSelector.select(key);
+ // Skip the entry if it's not for current active consumer.
+ if (consumer == null || currentConsumer != consumer) {
+ entry.release();
+ return;
+ }
+ }
+ Consumer consumer = (Consumer) ctx.ctx;
+ ctx.recycle();
+ if (currentConsumer == null || consumer != currentConsumer) {
+ // Active consumer has changed since the read request has been
issued. We need to rewind the cursor and
+ // re-issue the read request for the new consumer
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Rewind because no available consumer found to
dispatch message to.", name);
+ }
+
+ entry.release();
+ streamingEntryReader.cancelReadRequests();
+ havePendingRead = false;
+ if (currentConsumer != null) {
+ notifyActiveConsumerChanged(currentConsumer);
+ readMoreEntries(currentConsumer);
+ }
+ } else {
+ EntryBatchSizes batchSizes = EntryBatchSizes.get(1);
+ SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal();
+ EntryBatchIndexesAcks batchIndexesAcks =
EntryBatchIndexesAcks.get(1);
+ filterEntriesForConsumer(Lists.newArrayList(entry), batchSizes,
sendMessageInfo, batchIndexesAcks,
+ cursor, false);
+ // Update cursor's read position.
+ cursor.seek(((ManagedLedgerImpl) cursor.getManagedLedger())
+ .getNextValidPosition((PositionImpl) entry.getPosition()));
+ dispatchEntriesToConsumer(currentConsumer,
Lists.newArrayList(entry), batchSizes,
+ batchIndexesAcks, sendMessageInfo);
+ }
+ }
+
+ @Override
+ protected void readMoreEntries(Consumer consumer) {
+ // consumer can be null when all consumers are disconnected from
broker.
+ // so skip reading more entries if currently there is no active
consumer.
+ if (null == consumer) {
+ return;
+ }
+
+ if (!havePendingRead && consumer.getAvailablePermits() > 0) {
+ int messagesToRead = calculateNumOfMessageToRead(consumer);
+
+ if (-1 == messagesToRead) {
+ // Skip read as topic/dispatcher has exceed the dispatch rate.
+ return;
+ }
+
+ // Schedule read
+ if (log.isDebugEnabled()) {
+ log.debug("[{}-{}] Schedule read of {} messages", name,
consumer, messagesToRead);
+ }
+ havePendingRead = true;
+
+ if (consumer.readCompacted()) {
+ topic.getCompactedTopic().asyncReadEntriesOrWait(cursor,
messagesToRead, this, consumer);
+ } else {
+ streamingEntryReader.asyncReadEntries(messagesToRead,
serviceConfig.getDispatcherMaxReadSizeBytes(),
+ consumer);
+ }
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}-{}] Consumer buffer is full, pause reading",
name, consumer);
+ }
+ }
+ }
+
+}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index a95692f..068ae87 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -181,19 +181,24 @@ public class PersistentSubscription implements
Subscription {
if (dispatcher == null || !dispatcher.isConsumerConnected()) {
Dispatcher previousDispatcher = null;
-
+ boolean useStreamingDispatcher =
topic.getBrokerService().getPulsar()
+
.getConfiguration().isStreamingDispatch();
switch (consumer.subType()) {
case Exclusive:
if (dispatcher == null || dispatcher.getType() !=
SubType.Exclusive) {
previousDispatcher = dispatcher;
- dispatcher = new
PersistentDispatcherSingleActiveConsumer(cursor,
- SubType.Exclusive, 0, topic, this);
+ dispatcher = useStreamingDispatcher ? new
PersistentStreamingDispatcherSingleActiveConsumer(cursor,
+ SubType.Exclusive, 0, topic, this) :
+ new
PersistentDispatcherSingleActiveConsumer(cursor, SubType.Exclusive, 0,
+ topic, this);
}
break;
case Shared:
if (dispatcher == null || dispatcher.getType() !=
SubType.Shared) {
previousDispatcher = dispatcher;
- dispatcher = new
PersistentDispatcherMultipleConsumers(topic, cursor, this);
+ dispatcher = useStreamingDispatcher ? new
PersistentStreamingDispatcherMultipleConsumers(topic,
+ cursor, this) : new
PersistentDispatcherMultipleConsumers(topic,
+ cursor, this);
}
break;
case Failover:
@@ -206,8 +211,10 @@ public class PersistentSubscription implements
Subscription {
if (dispatcher == null || dispatcher.getType() !=
SubType.Failover) {
previousDispatcher = dispatcher;
- dispatcher = new
PersistentDispatcherSingleActiveConsumer(cursor,
- SubType.Failover, partitionIndex, topic, this);
+ dispatcher = useStreamingDispatcher ? new
PersistentStreamingDispatcherSingleActiveConsumer(cursor,
+ SubType.Failover, partitionIndex, topic, this) :
+ new
PersistentDispatcherSingleActiveConsumer(cursor, SubType.Failover,
+ partitionIndex, topic, this);
}
break;
case Key_Shared:
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/streamingdispatch/PendingReadEntryRequest.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/streamingdispatch/PendingReadEntryRequest.java
new file mode 100644
index 0000000..7989bbc
--- /dev/null
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/streamingdispatch/PendingReadEntryRequest.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.streamingdispatch;
+
+import io.netty.util.Recycler;
+import lombok.Data;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedLedger;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+
+/**
+ * Representing a pending read request to read an entry from {@link
ManagedLedger} carrying necessary context.
+ */
+@Data
+public class PendingReadEntryRequest {
+
+ private static final Recycler<PendingReadEntryRequest> RECYCLER = new
Recycler<PendingReadEntryRequest>() {
+ protected PendingReadEntryRequest
newObject(Recycler.Handle<PendingReadEntryRequest> handle) {
+ return new PendingReadEntryRequest(handle);
+ }
+ };
+
+ public static PendingReadEntryRequest create(Object ctx, PositionImpl
position) {
+ PendingReadEntryRequest pendingReadEntryRequest = RECYCLER.get();
+ pendingReadEntryRequest.ctx = ctx;
+ pendingReadEntryRequest.position = position;
+ pendingReadEntryRequest.retry = 0;
+ pendingReadEntryRequest.isLast = false;
+ return pendingReadEntryRequest;
+ }
+
+ public void recycle() {
+ entry = null;
+ ctx = null;
+ position = null;
+ retry = -1;
+ recyclerHandle.recycle(this);
+ }
+
+ public boolean isLastRequest() {
+ return isLast;
+ }
+
+ private final Recycler.Handle<PendingReadEntryRequest> recyclerHandle;
+
+ // Entry read from ledger
+ public Entry entry;
+
+ // Passed in context that'll be pass to callback
+ public Object ctx;
+
+ // Position of entry to be read
+ public PositionImpl position;
+
+ // Number of time request has been retried.
+ int retry;
+
+ // If request is the last one of a set of requests.
+ boolean isLast;
+}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/streamingdispatch/StreamingDispatcher.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/streamingdispatch/StreamingDispatcher.java
new file mode 100644
index 0000000..814a381
--- /dev/null
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/streamingdispatch/StreamingDispatcher.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.streamingdispatch;
+
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedLedger;
+import org.apache.pulsar.broker.service.Dispatcher;
+import org.apache.pulsar.common.classification.InterfaceStability;
+
+/**
+ * A {@link Dispatcher} that'll use {@link StreamingEntryReader} to read
entries from {@link ManagedLedger}.
+ */
[email protected]
+public interface StreamingDispatcher extends Dispatcher {
+
+ /**
+ * Notify dispatcher issued read entry request has complete.
+ * @param entry Entry read.
+ * @param ctx Context passed in when issuing read entries request.
+ */
+ void readEntryComplete(Entry entry, PendingReadEntryRequest ctx);
+
+ /**
+ * Notify dispatcher can issue next read request.
+ */
+ void canReadMoreEntries(boolean withBackoff);
+
+ /**
+ * Notify dispatcher to inform consumers reached end of topic.
+ */
+ void notifyConsumersEndOfTopic();
+
+ /**
+ * @return Name of the dispatcher.
+ */
+ String getName();
+}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/streamingdispatch/StreamingEntryReader.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/streamingdispatch/StreamingEntryReader.java
new file mode 100644
index 0000000..24f9bcc
--- /dev/null
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/streamingdispatch/StreamingEntryReader.java
@@ -0,0 +1,338 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.streamingdispatch;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.AsyncCallbacks;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.WaitingEntryCallBack;
+import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.bookkeeper.mledger.util.SafeRun;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import
org.apache.pulsar.broker.transaction.buffer.exceptions.TransactionNotSealedException;
+import org.apache.pulsar.client.impl.Backoff;
+
+/**
+ * Entry reader that fulfill read request by streamline the read instead of
reading with micro batch.
+ */
+@Slf4j
+@RequiredArgsConstructor
+public class StreamingEntryReader implements AsyncCallbacks.ReadEntryCallback,
WaitingEntryCallBack {
+
+ private final int maxRetry = 3;
+
+ // Queue for read request issued yet waiting for complete from managed
ledger.
+ private ConcurrentLinkedQueue<PendingReadEntryRequest> issuedReads = new
ConcurrentLinkedQueue<>();
+
+ // Queue for read request that's wait for new entries from managed ledger.
+ private ConcurrentLinkedQueue<PendingReadEntryRequest> pendingReads = new
ConcurrentLinkedQueue<>();
+
+ private final ManagedCursorImpl cursor;
+
+ private final StreamingDispatcher dispatcher;
+
+ private final PersistentTopic topic;
+
+ private AtomicInteger currentReadSizeByte = new AtomicInteger(0);
+
+ private volatile State state;
+
+ private static final AtomicReferenceFieldUpdater<StreamingEntryReader,
State> STATE_UPDATER =
+ AtomicReferenceFieldUpdater.newUpdater(StreamingEntryReader.class,
State.class, "state");
+
+ private volatile int maxReadSizeByte;
+
+ private final Backoff readFailureBackoff = new Backoff(10,
TimeUnit.MILLISECONDS,
+ 1, TimeUnit.SECONDS, 0, TimeUnit.MILLISECONDS);
+
+ /**
+ * Read entries in streaming way, that said instead reading with micro
batch and send entries to consumer after all
+ * entries in the batch are read from ledger, this method will fire
numEntriesToRead requests to managedLedger
+ * and send entry to consumer whenever it is read && all entries before it
have been sent to consumer.
+ * @param numEntriesToRead number of entry to read from ledger.
+ * @param maxReadSizeByte maximum byte will be read from ledger.
+ * @param ctx Context send along with read request.
+ */
+ public synchronized void asyncReadEntries(int numEntriesToRead, int
maxReadSizeByte, Object ctx) {
+ if (STATE_UPDATER.compareAndSet(this, State.Canceling,
State.Canceled)) {
+ internalCancelReadRequests();
+ }
+
+ if (!issuedReads.isEmpty() || !pendingReads.isEmpty()) {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] There's pending streaming read not completed
yet. Not scheduling next read request.",
+ cursor.getName());
+ }
+ return;
+ }
+
+ PositionImpl nextReadPosition = (PositionImpl)
cursor.getReadPosition();
+ ManagedLedgerImpl managedLedger = (ManagedLedgerImpl)
cursor.getManagedLedger();
+ // Edge case, when a old ledger is full and new ledger is not yet
opened, position can point to next
+ // position of the last confirmed position, but it'll be an invalid
position. So try to update the position.
+ if (!managedLedger.isValidPosition(nextReadPosition)) {
+ nextReadPosition =
managedLedger.getNextValidPosition(nextReadPosition);
+ }
+ boolean hasEntriesToRead =
managedLedger.hasMoreEntries(nextReadPosition);
+ currentReadSizeByte.set(0);
+ STATE_UPDATER.set(this, State.Issued);
+ this.maxReadSizeByte = maxReadSizeByte;
+ for (int c = 0; c < numEntriesToRead; c++) {
+ PendingReadEntryRequest pendingReadEntryRequest =
PendingReadEntryRequest.create(ctx, nextReadPosition);
+ // Make sure once we start putting request into pending requests
queue, we won't put any following request
+ // to issued requests queue in order to guarantee the order.
+ if (hasEntriesToRead &&
managedLedger.hasMoreEntries(nextReadPosition)) {
+ issuedReads.offer(pendingReadEntryRequest);
+ } else {
+ pendingReads.offer(pendingReadEntryRequest);
+ }
+ nextReadPosition =
managedLedger.getNextValidPosition(nextReadPosition);
+ }
+
+ // Issue requests.
+ for (PendingReadEntryRequest request : issuedReads) {
+ managedLedger.asyncReadEntry(request.position, this, request);
+ }
+
+ if (!pendingReads.isEmpty()) {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}} Streaming entry reader has {} pending read
requests waiting on new entry."
+ , cursor.getName(), pendingReads.size());
+ }
+ // If new entries are available after we put request into pending
queue, fire read.
+ // Else register callback with managed ledger to get notify when
new entries are available.
+ if (managedLedger.hasMoreEntries(pendingReads.peek().position)) {
+ entriesAvailable();
+ } else if (managedLedger.isTerminated()) {
+ dispatcher.notifyConsumersEndOfTopic();
+ cleanQueue(pendingReads);
+ if (issuedReads.size() == 0) {
+ dispatcher.canReadMoreEntries(true);
+ }
+ } else {
+ managedLedger.addWaitingEntryCallBack(this);
+ }
+ }
+ }
+
+ @Override
+ public void readEntryComplete(Entry entry, Object ctx) {
+ // Don't block caller thread, complete read entry with dispatcher
dedicated thread.
+
topic.getBrokerService().getTopicOrderedExecutor().executeOrdered(dispatcher.getName(),
SafeRun.safeRun(() -> {
+ internalReadEntryComplete(entry, ctx);
+ }));
+ }
+
+ private void internalReadEntryComplete(Entry entry, Object ctx) {
+ PendingReadEntryRequest pendingReadEntryRequest =
(PendingReadEntryRequest) ctx;
+ pendingReadEntryRequest.entry = entry;
+ readFailureBackoff.reduceToHalf();
+ Entry readEntry;
+ // If we have entry to send to dispatcher.
+ if (!issuedReads.isEmpty() && issuedReads.peek() ==
pendingReadEntryRequest) {
+ while (!issuedReads.isEmpty() && issuedReads.peek().entry != null)
{
+ PendingReadEntryRequest firstPendingReadEntryRequest =
issuedReads.poll();
+ readEntry = firstPendingReadEntryRequest.entry;
+ currentReadSizeByte.addAndGet(readEntry.getLength());
+ //Cancel remaining requests and reset cursor if
maxReadSizeByte exceeded.
+ if (currentReadSizeByte.get() > maxReadSizeByte) {
+ cancelReadRequests(readEntry.getPosition());
+ dispatcher.canReadMoreEntries(false);
+ STATE_UPDATER.set(this, State.Completed);
+ return;
+ } else {
+ // All request has been completed, mark returned entry as
last.
+ if (issuedReads.isEmpty() && pendingReads.isEmpty()) {
+ firstPendingReadEntryRequest.isLast = true;
+ STATE_UPDATER.set(this, State.Completed);
+ }
+ dispatcher.readEntryComplete(readEntry,
firstPendingReadEntryRequest);
+ }
+ }
+ } else if (!issuedReads.isEmpty() && issuedReads.peek().retry >
maxRetry) {
+ cancelReadRequests(issuedReads.peek().position);
+ dispatcher.canReadMoreEntries(true);
+ STATE_UPDATER.set(this, State.Completed);
+ }
+ }
+
+ @Override
+ public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
+ // Don't block caller thread, complete read entry fail with dispatcher
dedicated thread.
+
topic.getBrokerService().getTopicOrderedExecutor().executeOrdered(dispatcher.getName(),
SafeRun.safeRun(() -> {
+ internalReadEntryFailed(exception, ctx);
+ }));
+ }
+
+ private void internalReadEntryFailed(ManagedLedgerException exception,
Object ctx) {
+ PendingReadEntryRequest pendingReadEntryRequest =
(PendingReadEntryRequest) ctx;
+ PositionImpl readPosition = pendingReadEntryRequest.position;
+ pendingReadEntryRequest.retry++;
+ long waitTimeMillis = readFailureBackoff.next();
+ if (exception.getCause() instanceof TransactionNotSealedException) {
+ waitTimeMillis = 1;
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Error reading transaction entries : {}, -
Retrying to read in {} seconds",
+ cursor.getName(), exception.getMessage(),
waitTimeMillis / 1000.0);
+ }
+ } else if (!(exception instanceof
ManagedLedgerException.TooManyRequestsException)) {
+ log.error("[{} Error reading entries at {} : {} - Retrying to read
in {} seconds", cursor.getName(),
+ readPosition, exception.getMessage(), waitTimeMillis /
1000.0);
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Got throttled by bookies while reading at {} :
{} - Retrying to read in {} seconds",
+ cursor.getName(), readPosition,
exception.getMessage(), waitTimeMillis / 1000.0);
+ }
+ }
+ if (!issuedReads.isEmpty()) {
+ if (issuedReads.peek().retry > maxRetry) {
+ cancelReadRequests(issuedReads.peek().position);
+ dispatcher.canReadMoreEntries(true);
+ STATE_UPDATER.set(this, State.Completed);
+ return;
+ }
+ if (pendingReadEntryRequest.retry <= maxRetry) {
+ retryReadRequest(pendingReadEntryRequest, waitTimeMillis);
+ }
+ }
+ }
+
+ // Cancel all issued and pending request and update cursor's read position.
+ private void cancelReadRequests(Position position) {
+ if (!issuedReads.isEmpty()) {
+ cleanQueue(issuedReads);
+ cursor.seek(position);
+ }
+
+ if (!pendingReads.isEmpty()) {
+ cleanQueue(pendingReads);
+ }
+ }
+
+ private void internalCancelReadRequests() {
+ Position readPosition = !issuedReads.isEmpty() ?
issuedReads.peek().position : pendingReads.peek().position;
+ cancelReadRequests(readPosition);
+ }
+
+ public boolean cancelReadRequests() {
+ if (STATE_UPDATER.compareAndSet(this, State.Issued, State.Canceling)) {
+ // Don't block caller thread, complete cancel read with dispatcher
dedicated thread.
+
topic.getBrokerService().getTopicOrderedExecutor().executeOrdered(topic.getName(),
SafeRun.safeRun(() -> {
+ synchronized (StreamingEntryReader.this) {
+ if (STATE_UPDATER.compareAndSet(this, State.Canceling,
State.Canceled)) {
+ internalCancelReadRequests();
+ }
+ }
+ }));
+ return true;
+ }
+ return false;
+ }
+
+ private void cleanQueue(Queue<PendingReadEntryRequest> queue) {
+ while (!queue.isEmpty()) {
+ PendingReadEntryRequest pendingReadEntryRequest = queue.poll();
+ if (pendingReadEntryRequest.entry != null) {
+ pendingReadEntryRequest.entry.release();
+ pendingReadEntryRequest.recycle();
+ }
+ }
+ }
+
+ private void retryReadRequest(PendingReadEntryRequest
pendingReadEntryRequest, long delay) {
+ topic.getBrokerService().executor().schedule(() -> {
+ // Jump again into dispatcher dedicated thread
+
topic.getBrokerService().getTopicOrderedExecutor().executeOrdered(dispatcher.getName(),
+ SafeRun.safeRun(() -> {
+ ManagedLedgerImpl managedLedger = (ManagedLedgerImpl)
cursor.getManagedLedger();
+ managedLedger.asyncReadEntry(pendingReadEntryRequest.position,
this, pendingReadEntryRequest);
+ }));
+ }, delay, TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ public void entriesAvailable() {
+
topic.getBrokerService().getTopicOrderedExecutor().executeOrdered(dispatcher.getName(),
SafeRun.safeRun(() -> {
+ internalEntriesAvailable();
+ }));
+ }
+
+ private synchronized void internalEntriesAvailable() {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}} Streaming entry reader get notification of newly
added entries from managed ledger,"
+ + " trying to issued pending read requests.",
cursor.getName());
+ }
+ ManagedLedgerImpl managedLedger = (ManagedLedgerImpl)
cursor.getManagedLedger();
+ List<PendingReadEntryRequest> newlyIssuedRequests = new ArrayList<>();
+ if (!pendingReads.isEmpty()) {
+ // Edge case, when a old ledger is full and new ledger is not yet
opened, position can point to next
+ // position of the last confirmed position, but it'll be an
invalid position. So try to update the position.
+ if (!managedLedger.isValidPosition(pendingReads.peek().position)) {
+ pendingReads.peek().position =
managedLedger.getNextValidPosition(pendingReads.peek().position);
+ }
+ while (!pendingReads.isEmpty() &&
managedLedger.hasMoreEntries(pendingReads.peek().position)) {
+ PendingReadEntryRequest next = pendingReads.poll();
+ issuedReads.offer(next);
+ newlyIssuedRequests.add(next);
+ // Need to update the position because when the
PendingReadEntryRequest is created, the position could
+ // be all set to managed ledger's last confirmed position.
+ if (!pendingReads.isEmpty()) {
+ pendingReads.peek().position =
managedLedger.getNextValidPosition(next.position);
+ }
+ }
+
+ for (PendingReadEntryRequest request : newlyIssuedRequests) {
+ managedLedger.asyncReadEntry(request.position, this, request);
+ }
+
+ if (!pendingReads.isEmpty()) {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}} Streaming entry reader has {} pending read
requests waiting on new entry."
+ , cursor.getName(), pendingReads.size());
+ }
+ if
(managedLedger.hasMoreEntries(pendingReads.peek().position)) {
+ entriesAvailable();
+ } else {
+ managedLedger.addWaitingEntryCallBack(this);
+ }
+ }
+ }
+ }
+
+ protected State getState() {
+ return STATE_UPDATER.get(this);
+ }
+
+ enum State {
+ Issued, Canceling, Canceled, Completed;
+ }
+
+}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/streamingdispatch/package-info.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/streamingdispatch/package-info.java
new file mode 100644
index 0000000..9a205ed
--- /dev/null
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/streamingdispatch/package-info.java
@@ -0,0 +1,19 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.streamingdispatch;
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
index 3497b73..a631db5 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
@@ -61,6 +61,7 @@ import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
+import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
@@ -103,7 +104,7 @@ public class PersistentDispatcherFailoverConsumerTest {
private ChannelHandlerContext channelCtx;
private LinkedBlockingQueue<CommandActiveConsumerChange> consumerChanges;
private ZooKeeper mockZk;
- private PulsarService pulsar;
+ protected PulsarService pulsar;
final String successTopicName =
"persistent://part-perf/global/perf.t1/ptopic";
final String failTopicName =
"persistent://part-perf/global/perf.t1/pfailTopic";
@@ -208,7 +209,7 @@ public class PersistentDispatcherFailoverConsumerTest {
void setupMLAsyncCallbackMocks() {
ledgerMock = mock(ManagedLedger.class);
- cursorMock = mock(ManagedCursor.class);
+ cursorMock = mock(ManagedCursorImpl.class);
doReturn(new ArrayList<Object>()).when(ledgerMock).getCursors();
doReturn("mockCursor").when(cursorMock).getName();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java
index 8313567..dd90017 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java
@@ -270,6 +270,11 @@ public class PersistentFailoverE2ETest extends
BrokerTestBase {
@Test
public void testSimpleConsumerEventsWithPartition() throws Exception {
+ // Resetting ActiveConsumerFailoverDelayTimeMillis else if
testActiveConsumerFailoverWithDelay get executed
+ // first could cause this test to fail.
+ conf.setActiveConsumerFailoverDelayTimeMillis(0);
+ restartBroker();
+
int numPartitions = 4;
final String topicName =
"persistent://prop/use/ns-abc/testSimpleConsumerEventsWithPartition-" +
System.nanoTime();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index 52a67cb..7b428a8 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -141,7 +141,7 @@ import io.netty.buffer.Unpooled;
/**
*/
public class PersistentTopicTest extends MockedBookKeeperTestCase {
- private PulsarService pulsar;
+ protected PulsarService pulsar;
private BrokerService brokerService;
private ManagedLedgerFactory mlFactoryMock;
private ServerCnx serverCnx;
@@ -1199,7 +1199,7 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
@SuppressWarnings("unchecked")
void setupMLAsyncCallbackMocks() {
ledgerMock = mock(ManagedLedger.class);
- cursorMock = mock(ManagedCursor.class);
+ cursorMock = mock(ManagedCursorImpl.class);
final CompletableFuture<Void> closeFuture = new CompletableFuture<>();
doReturn(new ArrayList<Object>()).when(ledgerMock).getCursors();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherFailoverConsumerStreamingDispatcherTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherFailoverConsumerStreamingDispatcherTest.java
new file mode 100644
index 0000000..66f49c8
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherFailoverConsumerStreamingDispatcherTest.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.persistent;
+
+import
org.apache.pulsar.broker.service.PersistentDispatcherFailoverConsumerTest;
+import org.apache.pulsar.broker.service.streamingdispatch.StreamingDispatcher;
+import org.testng.annotations.BeforeMethod;
+
+/**
+ * PersistentDispatcherFailoverConsumerTest with {@link StreamingDispatcher}
+ */
+public class PersistentDispatcherFailoverConsumerStreamingDispatcherTest
extends
+ PersistentDispatcherFailoverConsumerTest {
+ @BeforeMethod
+ public void setup() throws Exception {
+ super.setup();
+ pulsar.getConfiguration().setStreamingDispatch(true);
+ }
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentFailoverStreamingDispatcherE2ETest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentFailoverStreamingDispatcherE2ETest.java
new file mode 100644
index 0000000..ee56b5c
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentFailoverStreamingDispatcherE2ETest.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.persistent;
+
+import org.apache.pulsar.broker.service.PersistentFailoverE2ETest;
+import org.apache.pulsar.broker.service.streamingdispatch.StreamingDispatcher;
+import org.testng.annotations.BeforeClass;
+
+/**
+ * PersistentFailoverE2ETest with {@link StreamingDispatcher}
+ */
+public class PersistentFailoverStreamingDispatcherE2ETest extends
PersistentFailoverE2ETest {
+
+ @BeforeClass
+ @Override
+ protected void setup() throws Exception {
+ conf.setStreamingDispatch(true);
+ super.setup();
+ }
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherBlockConsumerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherBlockConsumerTest.java
new file mode 100644
index 0000000..bdaf88e
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherBlockConsumerTest.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.persistent;
+
+import org.apache.pulsar.broker.service.streamingdispatch.StreamingDispatcher;
+import org.apache.pulsar.client.api.DispatcherBlockConsumerTest;
+import org.testng.annotations.BeforeMethod;
+
+/**
+ * DispatcherBlockConsumerTest with {@link StreamingDispatcher}
+ */
+public class PersistentStreamingDispatcherBlockConsumerTest extends
DispatcherBlockConsumerTest {
+ @BeforeMethod
+ @Override
+ protected void setup() throws Exception {
+ super.setup();
+ conf.setStreamingDispatch(true);
+ }
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionMessageDispatchStreamingDispatcherThrottlingTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionMessageDispatchStreamingDispatcherThrottlingTest.java
new file mode 100644
index 0000000..ff170ed
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionMessageDispatchStreamingDispatcherThrottlingTest.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.persistent;
+
+import org.apache.pulsar.broker.service.streamingdispatch.StreamingDispatcher;
+import org.apache.pulsar.client.api.SubscriptionMessageDispatchThrottlingTest;
+import org.testng.annotations.BeforeMethod;
+
+/**
+ * SubscriptionMessageDispatchThrottlingTest with {@link StreamingDispatcher}
+ */
+public class
PersistentSubscriptionMessageDispatchStreamingDispatcherThrottlingTest
+ extends SubscriptionMessageDispatchThrottlingTest {
+ @BeforeMethod
+ @Override
+ protected void setup() throws Exception {
+ super.setup();
+ conf.setStreamingDispatch(true);
+ }
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicStreamingDispatcherE2ETest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicStreamingDispatcherE2ETest.java
new file mode 100644
index 0000000..b1efcf6
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicStreamingDispatcherE2ETest.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.persistent;
+
+import org.apache.pulsar.broker.service.PersistentTopicE2ETest;
+import org.apache.pulsar.broker.service.streamingdispatch.StreamingDispatcher;
+import org.testng.annotations.BeforeMethod;
+
+/**
+ * PersistentTopicE2ETest with {@link StreamingDispatcher}
+ */
+public class PersistentTopicStreamingDispatcherE2ETest extends
PersistentTopicE2ETest {
+ @BeforeMethod
+ @Override
+ protected void setup() throws Exception {
+ conf.setStreamingDispatch(true);
+ super.baseSetup();
+ }
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicStreamingDispatcherTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicStreamingDispatcherTest.java
new file mode 100644
index 0000000..f0057d9
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicStreamingDispatcherTest.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.persistent;
+
+import org.apache.pulsar.broker.service.PersistentTopicTest;
+import org.apache.pulsar.broker.service.streamingdispatch.StreamingDispatcher;
+import org.testng.annotations.BeforeMethod;
+
+/**
+ * PersistentTopicTest with {@link StreamingDispatcher}
+ */
+public class PersistentTopicStreamingDispatcherTest extends
PersistentTopicTest {
+
+ @BeforeMethod
+ public void setup() throws Exception {
+ super.setup();
+ pulsar.getConfiguration().setStreamingDispatch(true);
+ }
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/SimpleProducerConsumerTestStreamingDispatcherTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/SimpleProducerConsumerTestStreamingDispatcherTest.java
new file mode 100644
index 0000000..1bcdec7
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/SimpleProducerConsumerTestStreamingDispatcherTest.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.persistent;
+
+import org.apache.pulsar.broker.service.streamingdispatch.StreamingDispatcher;
+import org.apache.pulsar.client.api.SimpleProducerConsumerTest;
+import org.testng.annotations.BeforeMethod;
+
+/**
+ * SimpleProducerConsumerTest with {@link StreamingDispatcher}
+ */
+public class SimpleProducerConsumerTestStreamingDispatcherTest extends
SimpleProducerConsumerTest {
+ @BeforeMethod
+ @Override
+ protected void setup() throws Exception {
+ super.setup();
+ conf.setStreamingDispatch(true);
+ }
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/streamingdispatch/StreamingEntryReaderTests.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/streamingdispatch/StreamingEntryReaderTests.java
new file mode 100644
index 0000000..332f431
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/streamingdispatch/StreamingEntryReaderTests.java
@@ -0,0 +1,433 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.streamingdispatch;
+
+import com.google.common.base.Charsets;
+import org.apache.bookkeeper.common.util.OrderedExecutor;
+import org.apache.bookkeeper.common.util.OrderedScheduler;
+import org.apache.bookkeeper.mledger.AsyncCallbacks;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedCursor;
+import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.impl.EntryImpl;
+import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
+import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Stack;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.awaitility.Awaitility.await;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.reset;
+import static org.powermock.api.mockito.PowerMockito.doAnswer;
+import static org.powermock.api.mockito.PowerMockito.mock;
+import static org.powermock.api.mockito.PowerMockito.spy;
+import static org.powermock.api.mockito.PowerMockito.when;
+import static org.testng.Assert.assertEquals;
+
+/**
+ * Tests for {@link StreamingEntryReader}
+ */
+@PrepareForTest({ManagedLedgerImpl.class})
+public class StreamingEntryReaderTests extends MockedBookKeeperTestCase {
+
+ private static final Charset Encoding = Charsets.UTF_8;
+ private PersistentTopic mockTopic;
+ private StreamingDispatcher mockDispatcher;
+ private BrokerService mockBrokerService;
+ private ScheduledExecutorService scheduledExecutorService;
+ private OrderedExecutor orderedExecutor;
+ private ManagedLedgerConfig config;
+ private ManagedLedgerImpl ledger;
+ private ManagedCursor cursor;
+
+ @BeforeMethod
+ public void setup() throws Exception {
+ scheduledExecutorService = new ScheduledThreadPoolExecutor(1);
+ orderedExecutor = OrderedScheduler.newSchedulerBuilder()
+ .numThreads(1)
+ .name("StreamingEntryReaderTests").build();
+ mockTopic = mock(PersistentTopic.class);
+ mockBrokerService = mock(BrokerService.class);
+ mockDispatcher = mock(StreamingDispatcher.class);
+ config = new ManagedLedgerConfig().setMaxEntriesPerLedger(10);
+ ledger = spy((ManagedLedgerImpl) factory.open("my_test_ledger",
config));
+ cursor = ledger.openCursor("test");
+ when(mockTopic.getBrokerService()).thenReturn(mockBrokerService);
+
when(mockBrokerService.executor()).thenReturn(scheduledExecutorService);
+
when(mockBrokerService.getTopicOrderedExecutor()).thenReturn(orderedExecutor);
+ doAnswer(new Answer<Void>() {
+ @Override
+ public Void answer(InvocationOnMock invocationOnMock) throws
Throwable {
+ return null;
+ }
+ }).when(mockDispatcher).notifyConsumersEndOfTopic();
+ }
+
+ @Test
+ public void testCanReadEntryFromMLedgerHappyPath() throws Exception {
+ AtomicInteger entryCount = new AtomicInteger(0);
+ Stack<Position> positions = new Stack<>();
+
+ for (int i = 0; i < 150; i++) {
+ ledger.addEntry(String.format("message-%d", i).getBytes(Encoding));
+ }
+
+ StreamingEntryReader streamingEntryReader =new
StreamingEntryReader((ManagedCursorImpl) cursor,
+ mockDispatcher, mockTopic);
+
+ doAnswer((InvocationOnMock invocationOnMock) -> {
+ Entry entry = invocationOnMock.getArgument(0, Entry.class);
+ positions.push(entry.getPosition());
+ assertEquals(new String(entry.getData()),
String.format("message-%d", entryCount.getAndIncrement()));
+ cursor.seek(ledger.getNextValidPosition((PositionImpl)
entry.getPosition()));
+ return null;
+ }
+ ).when(mockDispatcher).readEntryComplete(any(Entry.class),
any(PendingReadEntryRequest.class));
+
+ streamingEntryReader.asyncReadEntries(50, 700, null);
+ await().atMost(500, TimeUnit.MILLISECONDS).until(() ->
entryCount.get() == 50);
+ // Check cursor's read position has been properly updated
+ assertEquals(cursor.getReadPosition(),
ledger.getNextValidPosition((PositionImpl) positions.peek()));
+ streamingEntryReader.asyncReadEntries(50, 700, null);
+ await().atMost(500, TimeUnit.MILLISECONDS).until(() ->
entryCount.get() == 100);
+ assertEquals(cursor.getReadPosition(),
ledger.getNextValidPosition((PositionImpl) positions.peek()));
+ streamingEntryReader.asyncReadEntries(50, 700, null);
+ await().atMost(500, TimeUnit.MILLISECONDS).until(() ->
entryCount.get() == 150);
+ assertEquals(cursor.getReadPosition(),
ledger.getNextValidPosition((PositionImpl) positions.peek()));
+ }
+
+ @Test
+ public void testCanReadEntryFromMLedgerSizeExceededLimit() throws
Exception {
+ AtomicBoolean readComplete = new AtomicBoolean(false);
+ Stack<Position> positions = new Stack<>();
+ List<String> entries = new ArrayList<>();
+ int size = "mmmmmmmmmmessage-0".getBytes().length;
+ for (int i = 0; i < 15; i++) {
+ ledger.addEntry(String.format("mmmmmmmmmmessage-%d",
i).getBytes(Encoding));
+ }
+
+ StreamingEntryReader streamingEntryReader =
+ new StreamingEntryReader((ManagedCursorImpl) cursor,
mockDispatcher, mockTopic);
+
+ doAnswer((InvocationOnMock invocationOnMock) -> {
+ Entry entry = invocationOnMock.getArgument(0, Entry.class);
+ positions.push(entry.getPosition());
+ entries.add(new String(entry.getData()));
+ cursor.seek(ledger.getNextValidPosition((PositionImpl)
entry.getPosition()));
+ return null;
+ }
+ ).when(mockDispatcher).readEntryComplete(any(Entry.class),
any(PendingReadEntryRequest.class));
+
+ doAnswer((InvocationOnMock invocationOnMock) -> {
+ readComplete.set(true);
+ return null;
+ }
+ ).when(mockDispatcher).canReadMoreEntries(anyBoolean());
+
+ PositionImpl position =
ledger.getPositionAfterN(ledger.getFirstPosition(), 3,
ManagedLedgerImpl.PositionBound.startExcluded);
+ // Make reading from mledger return out of order.
+ doAnswer(new Answer<Void>() {
+ @Override
+ public Void answer(InvocationOnMock invocationOnMock) throws
Throwable {
+ AsyncCallbacks.ReadEntryCallback cb =
invocationOnMock.getArgument(1, AsyncCallbacks.ReadEntryCallback.class);
+ executor.schedule(() -> {
+
cb.readEntryComplete(EntryImpl.create(position.getLedgerId(),
position.getEntryId(), "mmmmmmmmmmessage-2".getBytes()),
+ invocationOnMock.getArgument(2));
+ }, 200, TimeUnit.MILLISECONDS);
+ return null;
+ }
+ }).when(ledger).asyncReadEntry(eq(position), any(), any());
+
+ // Only 2 entries should be read with this request.
+ streamingEntryReader.asyncReadEntries(6, size * 2 + 1, null);
+ await().atMost(300, TimeUnit.MILLISECONDS).until(() ->
readComplete.get());
+ assertEquals(entries.size(), 2);
+ // Assert cursor's read position has been properly updated to the
third entry, since we should only read
+ // 2 retries with previous request
+ assertEquals(cursor.getReadPosition(),
ledger.getNextValidPosition((PositionImpl) positions.peek()));
+ reset(ledger);
+ readComplete.set(false);
+ streamingEntryReader.asyncReadEntries(6, size * 2 + 1, null);
+ await().atMost(300, TimeUnit.MILLISECONDS).until(() ->
readComplete.get());
+ readComplete.set(false);
+ streamingEntryReader.asyncReadEntries(6, size * 2 + 1, null);
+ await().atMost(300, TimeUnit.MILLISECONDS).until(() ->
readComplete.get());
+ readComplete.set(false);
+ streamingEntryReader.asyncReadEntries(6, size * 2 + 1, null);
+ await().atMost(300, TimeUnit.MILLISECONDS).until(() ->
readComplete.get());
+ assertEquals(cursor.getReadPosition(),
ledger.getNextValidPosition((PositionImpl) positions.peek()));
+ assertEquals(entries.size(), 8);
+ for (int i = 0; i < entries.size(); i++) {
+ assertEquals(String.format("mmmmmmmmmmessage-%d", i),
entries.get(i));
+ }
+ }
+
+ @Test
+ public void testCanReadEntryFromMLedgerWaitingForNewEntry() throws
Exception {
+ AtomicInteger entryCount = new AtomicInteger(0);
+ AtomicBoolean entryProcessed = new AtomicBoolean(false);
+ Stack<Position> positions = new Stack<>();
+ List<String> entries = new ArrayList<>();
+ for (int i = 0; i < 7; i++) {
+ ledger.addEntry(String.format("message-%d", i).getBytes(Encoding));
+ }
+
+ StreamingEntryReader streamingEntryReader =
+ new StreamingEntryReader((ManagedCursorImpl) cursor,
mockDispatcher, mockTopic);
+
+ doAnswer((InvocationOnMock invocationOnMock) -> {
+ Entry entry = invocationOnMock.getArgument(0, Entry.class);
+ positions.push(entry.getPosition());
+ entries.add(new String(entry.getData()));
+ entryCount.getAndIncrement();
+ cursor.seek(ledger.getNextValidPosition((PositionImpl)
entry.getPosition()));
+ entryProcessed.set(true);
+ return null;
+ }
+ ).when(mockDispatcher).readEntryComplete(any(Entry.class),
any(PendingReadEntryRequest.class));
+
+ streamingEntryReader.asyncReadEntries(5, 100, null);
+ await().atMost(300, TimeUnit.MILLISECONDS).until(() ->
entryCount.get() == 5);
+ assertEquals(cursor.getReadPosition(),
ledger.getNextValidPosition((PositionImpl) positions.peek()));
+ streamingEntryReader.asyncReadEntries(5, 100, null);
+ // We only write 7 entries initially so only 7 entries can be read.
+ await().atMost(300, TimeUnit.MILLISECONDS).until(() ->
entryCount.get() == 7);
+ // Add new entry and await for it to be send to reader.
+ entryProcessed.set(false);
+ ledger.addEntry("message-7".getBytes(Encoding));
+ await().atMost(500, TimeUnit.MILLISECONDS).until(() ->
entryProcessed.get());
+ assertEquals(entries.size(), 8);
+ entryProcessed.set(false);
+ ledger.addEntry("message-8".getBytes(Encoding));
+ await().atMost(500, TimeUnit.MILLISECONDS).until(() ->
entryProcessed.get());
+ assertEquals(entries.size(), 9);
+ entryProcessed.set(false);
+ ledger.addEntry("message-9".getBytes(Encoding));
+ await().atMost(500, TimeUnit.MILLISECONDS).until(() ->
entryProcessed.get());
+ assertEquals(entries.size(), 10);
+ assertEquals(cursor.getReadPosition(),
ledger.getNextValidPosition((PositionImpl) positions.peek()));
+ for (int i = 0; i < entries.size(); i++) {
+ assertEquals(String.format("message-%d", i), entries.get(i));
+ }
+ }
+
+ @Test
+ public void testCanCancelReadEntryRequestAndResumeReading() throws
Exception {
+ Map<Position, String> messages = new HashMap<>();
+ AtomicInteger count = new AtomicInteger(0);
+ Stack<Position> positions = new Stack<>();
+ List<String> entries = new ArrayList<>();
+
+ for (int i = 0; i < 20; i++) {
+ String msg = String.format("message-%d", i);
+ messages.put(ledger.addEntry(msg.getBytes(Encoding)), msg);
+ }
+
+ StreamingEntryReader streamingEntryReader =
+ new StreamingEntryReader((ManagedCursorImpl) cursor,
mockDispatcher, mockTopic);
+
+ doAnswer((InvocationOnMock invocationOnMock) -> {
+ Entry entry = invocationOnMock.getArgument(0, Entry.class);
+ positions.push(entry.getPosition());
+ entries.add(new String(entry.getData()));
+ cursor.seek(ledger.getNextValidPosition((PositionImpl)
entry.getPosition()));
+ return null;
+ }
+ ).when(mockDispatcher).readEntryComplete(any(Entry.class),
any(PendingReadEntryRequest.class));
+
+ // Only return 5 entries
+ doAnswer(new Answer<Void>() {
+ @Override
+ public Void answer(InvocationOnMock invocationOnMock) throws
Throwable {
+ AsyncCallbacks.ReadEntryCallback cb =
invocationOnMock.getArgument(1, AsyncCallbacks.ReadEntryCallback.class);
+ PositionImpl position = invocationOnMock.getArgument(0,
PositionImpl.class);
+ int c = count.getAndIncrement();
+ if (c < 5) {
+
cb.readEntryComplete(EntryImpl.create(position.getLedgerId(),
position.getEntryId(),
+ messages.get(position).getBytes()),
+ invocationOnMock.getArgument(2));
+ }
+ return null;
+ }
+ }).when(ledger).asyncReadEntry(any(), any(), any());
+
+ streamingEntryReader.asyncReadEntries(20, 200, null);
+ streamingEntryReader.cancelReadRequests();
+ await().atMost(10000, TimeUnit.MILLISECONDS).until(() ->
streamingEntryReader.getState() == StreamingEntryReader.State.Canceled);
+ // Only have 5 entry as we make ledger only return 5 entries and
cancel the request.
+ assertEquals(entries.size(), 5);
+ assertEquals(cursor.getReadPosition(),
ledger.getNextValidPosition((PositionImpl) positions.peek()));
+ // Clear mock and try to read remaining entries
+ reset(ledger);
+ streamingEntryReader.asyncReadEntries(15, 200, null);
+ streamingEntryReader.cancelReadRequests();
+ await().atMost(10000, TimeUnit.MILLISECONDS).until(() ->
streamingEntryReader.getState() == StreamingEntryReader.State.Completed);
+ // Only have 5 entry as we make ledger only return 5 entries and
cancel the request.
+ assertEquals(entries.size(), 20);
+ assertEquals(cursor.getReadPosition(),
ledger.getNextValidPosition((PositionImpl) positions.peek()));
+ // Make sure message still returned in order
+ for (int i = 0; i < entries.size(); i++) {
+ assertEquals(String.format("message-%d", i), entries.get(i));
+ }
+ }
+
+ @Test
+ public void testCanHandleExceptionAndRetry() throws Exception {
+ Map<Position, String> messages = new HashMap<>();
+ AtomicBoolean entryProcessed = new AtomicBoolean(false);
+ AtomicInteger count = new AtomicInteger(0);
+ Stack<Position> positions = new Stack<>();
+ List<String> entries = new ArrayList<>();
+ for (int i = 0; i < 12; i++) {
+ String msg = String.format("message-%d", i);
+ messages.put(ledger.addEntry(msg.getBytes(Encoding)), msg);
+ }
+
+ StreamingEntryReader streamingEntryReader =
+ new StreamingEntryReader((ManagedCursorImpl) cursor,
mockDispatcher, mockTopic);
+
+ doAnswer((InvocationOnMock invocationOnMock) -> {
+ Entry entry = invocationOnMock.getArgument(0, Entry.class);
+ positions.push(entry.getPosition());
+ entries.add(new String(entry.getData()));
+ cursor.seek(ledger.getNextValidPosition((PositionImpl)
entry.getPosition()));
+
+ if (entries.size() == 6 || entries.size() == 12) {
+ entryProcessed.set(true);
+ }
+ return null;
+ }
+ ).when(mockDispatcher).readEntryComplete(any(Entry.class),
any(PendingReadEntryRequest.class));
+
+ // Make reading from mledger throw exception randomly.
+ doAnswer(new Answer<Void>() {
+ @Override
+ public Void answer(InvocationOnMock invocationOnMock) throws
Throwable {
+ AsyncCallbacks.ReadEntryCallback cb =
invocationOnMock.getArgument(1, AsyncCallbacks.ReadEntryCallback.class);
+ PositionImpl position = invocationOnMock.getArgument(0,
PositionImpl.class);
+ int c = count.getAndIncrement();
+ if (c >= 3 && c < 5 || c >= 9 && c < 11) {
+ cb.readEntryFailed(new
ManagedLedgerException.TooManyRequestsException("Fake exception."),
+ invocationOnMock.getArgument(2));
+ } else {
+
cb.readEntryComplete(EntryImpl.create(position.getLedgerId(),
position.getEntryId(),
+ messages.get(position).getBytes()),
+ invocationOnMock.getArgument(2));
+ }
+ return null;
+ }
+ }).when(ledger).asyncReadEntry(any(), any(), any());
+
+ streamingEntryReader.asyncReadEntries(6, 100, null);
+ await().atMost(3000, TimeUnit.MILLISECONDS).until(() ->
entryProcessed.get());
+ assertEquals(entries.size(), 6);
+ assertEquals(cursor.getReadPosition(),
ledger.getNextValidPosition((PositionImpl) positions.peek()));
+ entryProcessed.set(false);
+ streamingEntryReader.asyncReadEntries(6, 100, null);
+ await().atMost(3000, TimeUnit.MILLISECONDS).until(() ->
entryProcessed.get());
+ assertEquals(entries.size(), 12);
+ assertEquals(cursor.getReadPosition(),
ledger.getNextValidPosition((PositionImpl) positions.peek()));
+ // Make sure message still returned in order
+ for (int i = 0; i < entries.size(); i++) {
+ assertEquals(String.format("message-%d", i), entries.get(i));
+ }
+ }
+
+ @Test
+ public void testWillCancelReadAfterExhaustingRetry() throws Exception {
+ Map<Position, String> messages = new HashMap<>();
+ AtomicInteger count = new AtomicInteger(0);
+ Stack<Position> positions = new Stack<>();
+ List<String> entries = new ArrayList<>();
+ for (int i = 0; i < 12; i++) {
+ String msg = String.format("message-%d", i);
+ messages.put(ledger.addEntry(msg.getBytes(Encoding)), msg);
+ }
+
+ StreamingEntryReader streamingEntryReader =
+ new StreamingEntryReader((ManagedCursorImpl) cursor,
mockDispatcher, mockTopic);
+
+ doAnswer((InvocationOnMock invocationOnMock) -> {
+ Entry entry = invocationOnMock.getArgument(0, Entry.class);
+ positions.push(entry.getPosition());
+ cursor.seek(ledger.getNextValidPosition((PositionImpl)
entry.getPosition()));
+ entries.add(new String(entry.getData()));
+ return null;
+ }
+ ).when(mockDispatcher).readEntryComplete(any(Entry.class),
any(PendingReadEntryRequest.class));
+
+ // Fail after first 3 read.
+ doAnswer(new Answer<Void>() {
+ @Override
+ public Void answer(InvocationOnMock invocationOnMock) throws
Throwable {
+ AsyncCallbacks.ReadEntryCallback cb =
invocationOnMock.getArgument(1, AsyncCallbacks.ReadEntryCallback.class);
+ PositionImpl position = invocationOnMock.getArgument(0,
PositionImpl.class);
+ int c = count.getAndIncrement();
+ if (c >= 3) {
+ cb.readEntryFailed(new
ManagedLedgerException.TooManyRequestsException("Fake exception."),
+ invocationOnMock.getArgument(2));
+ } else {
+
cb.readEntryComplete(EntryImpl.create(position.getLedgerId(),
position.getEntryId(),
+ messages.get(position).getBytes()),
+ invocationOnMock.getArgument(2));
+ }
+ return null;
+ }
+ }).when(ledger).asyncReadEntry(any(), any(), any());
+
+ streamingEntryReader.asyncReadEntries(5, 100, null);
+ await().atMost(10, TimeUnit.SECONDS).until(() ->
streamingEntryReader.getState() == StreamingEntryReader.State.Completed);
+ // Issued 5 read, should only have 3 entries as others were canceled
after exhausting retries.
+ assertEquals(entries.size(), 3);
+ for (int i = 0; i < entries.size(); i++) {
+ assertEquals(String.format("message-%d", i), entries.get(i));
+ }
+ reset(ledger);
+ streamingEntryReader.asyncReadEntries(5, 100, null);
+ await().atMost(500, TimeUnit.MILLISECONDS).until(() ->
streamingEntryReader.getState() == StreamingEntryReader.State.Completed);
+ assertEquals(entries.size(), 8);
+ for (int i = 0; i < entries.size(); i++) {
+ assertEquals(String.format("message-%d", i), entries.get(i));
+ }
+ }
+
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java
index 80bf968..62a6edd 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java
@@ -620,6 +620,7 @@ public class DispatcherBlockConsumerTest extends
ProducerConsumerBase {
int receivedMsgCount = 0;
for (int i = 0; i < totalProducedMsgs; i++) {
Message<?> msg = consumer.receive(500, TimeUnit.MILLISECONDS);
+ assertTrue(msg != null);
if (!unackMessages.contains(i)) {
consumer.acknowledge(msg);
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java
index 7ed88f5..2b63411 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.client.api;
import com.google.common.collect.Sets;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.pulsar.broker.service.Dispatcher;
@@ -35,6 +36,8 @@ import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.Test;
+import static org.awaitility.Awaitility.await;
+
public class SubscriptionMessageDispatchThrottlingTest extends
MessageDispatchThrottlingTest {
private static final Logger log =
LoggerFactory.getLogger(SubscriptionMessageDispatchThrottlingTest.class);
@@ -189,17 +192,13 @@ public class SubscriptionMessageDispatchThrottlingTest
extends MessageDispatchTh
Assert.assertTrue(isMessageRateUpdate);
Assert.assertEquals(admin.namespaces().getSubscriptionDispatchRate(namespace),
dispatchRate);
- long start = System.currentTimeMillis();
// Asynchronously produce messages
for (int i = 0; i < numProducedMessages; i++) {
final String message = "my-message-" + i;
producer.send(message.getBytes());
}
- latch.await();
+ await().atMost(2500, TimeUnit.MILLISECONDS).until(() ->
latch.getCount() == 0);
Assert.assertEquals(totalReceived.get(), numProducedMessages);
- long end = System.currentTimeMillis();
-
- Assert.assertTrue((end - start) >= 2000);
consumer.close();
producer.close();
diff --git a/site2/docs/reference-configuration.md
b/site2/docs/reference-configuration.md
index 52f6d92..f31bd80 100644
--- a/site2/docs/reference-configuration.md
+++ b/site2/docs/reference-configuration.md
@@ -503,6 +503,7 @@ The value of 0 disables message-byte dispatch-throttling.|0|
|dispatcherMinReadBatchSize|The minimum number of entries to read from
BookKeeper. By default, it is 1 entry. When there is an error occurred on
reading entries from bookkeeper, the broker will backoff the batch size to this
minimum number.|1|
|dispatcherMaxRoundRobinBatchSize|The maximum number of entries to dispatch
for a shared subscription. By default, it is 20 entries.|20|
| preciseDispatcherFlowControl | Precise dispathcer flow control according to
history message number of each entry. | false |
+| streamingDispatch | Whether to use streaming read dispatcher. It can be
useful when there's a huge backlog to drain and instead of read with micro
batch we can streamline the read from bookkeeper to make the most of consumer
capacity till we hit bookkeeper read limit or consumer process limit, then we
can use consumer flow control to tune the speed. This feature is currently in
preview and can be changed in subsequent release. | false |
| maxConcurrentLookupRequest | Maximum number of concurrent lookup request
that the broker allows to throttle heavy incoming lookup traffic. | 50000 |
| maxConcurrentTopicLoadRequest | Maximum number of concurrent topic loading
request that the broker allows to control the number of zk-operations. | 5000 |
| maxConcurrentNonPersistentMessagePerConnection | Maximum number of
concurrent non-persistent message that can be processed per connection. | 1000 |