This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 8cfaf48  Add streaming dispatcher. (#9056)
8cfaf48 is described below

commit 8cfaf48e1dc97e30050cb29c106b614d6b9bc69c
Author: Marvin Cai <[email protected]>
AuthorDate: Mon Feb 1 20:02:50 2021 -0800

    Add streaming dispatcher. (#9056)
    
    Related to  #3804
    
    ### Motivation
    
    Trying to streamline the dispatcher's read requests to manager ledger 
instead of micro batch.
    
    ### Modifications
    
    Created a StreamingEntryReader that can streamline read request to managed 
ledger.
    Created StreamingDispatcher interface with necessary method to interact 
with StreamingEntryReader.
    Created PersistentStreamingDispatcherSingleActive/MultipleConsumer that 
make use of StreamingEntryReader to read entries from managed ledger.
    Add config to use streaming dispatcher.
---
 build/run_unit_group.sh                            |   5 +
 .../bookkeeper/mledger/WaitingEntryCallBack.java   |  30 ++
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java |  26 +-
 .../apache/bookkeeper/mledger/impl/OpAddEntry.java |   2 +
 .../apache/pulsar/broker/ServiceConfiguration.java |   7 +
 .../AbstractDispatcherMultipleConsumers.java       |   2 +
 .../PersistentDispatcherMultipleConsumers.java     | 185 +++++----
 .../PersistentDispatcherSingleActiveConsumer.java  | 276 ++++++-------
 ...istentStreamingDispatcherMultipleConsumers.java | 191 +++++++++
 ...entStreamingDispatcherSingleActiveConsumer.java | 208 ++++++++++
 .../service/persistent/PersistentSubscription.java |  19 +-
 .../streamingdispatch/PendingReadEntryRequest.java |  76 ++++
 .../streamingdispatch/StreamingDispatcher.java     |  53 +++
 .../streamingdispatch/StreamingEntryReader.java    | 338 ++++++++++++++++
 .../service/streamingdispatch/package-info.java    |  19 +
 .../PersistentDispatcherFailoverConsumerTest.java  |   5 +-
 .../broker/service/PersistentFailoverE2ETest.java  |   5 +
 .../pulsar/broker/service/PersistentTopicTest.java |   4 +-
 ...herFailoverConsumerStreamingDispatcherTest.java |  35 ++
 ...rsistentFailoverStreamingDispatcherE2ETest.java |  36 ++
 ...istentStreamingDispatcherBlockConsumerTest.java |  35 ++
 ...eDispatchStreamingDispatcherThrottlingTest.java |  36 ++
 .../PersistentTopicStreamingDispatcherE2ETest.java |  35 ++
 .../PersistentTopicStreamingDispatcherTest.java    |  35 ++
 ...roducerConsumerTestStreamingDispatcherTest.java |  35 ++
 .../StreamingEntryReaderTests.java                 | 433 +++++++++++++++++++++
 .../client/api/DispatcherBlockConsumerTest.java    |   1 +
 .../SubscriptionMessageDispatchThrottlingTest.java |   9 +-
 site2/docs/reference-configuration.md              |   1 +
 29 files changed, 1905 insertions(+), 237 deletions(-)

diff --git a/build/run_unit_group.sh b/build/run_unit_group.sh
index aed0ca4..b4e5530 100755
--- a/build/run_unit_group.sh
+++ b/build/run_unit_group.sh
@@ -56,12 +56,17 @@ function broker_group_2() {
                                       -DtestForkCount=1 \
                                       -DtestReuseFork=true
 
+  $MVN_TEST_COMMAND -pl pulsar-broker 
-Dinclude="**/*StreamingDispatcher*Test.java" \
+                                      -DtestForkCount=1 \
+                                      -DtestReuseFork=true
+
   $MVN_TEST_COMMAND -pl pulsar-broker 
-Dinclude="org/apache/pulsar/broker/zookeeper/**/*.java,
                                                  
org/apache/pulsar/broker/loadbalance/**/*.java,
                                                  
org/apache/pulsar/broker/service/**/*.java" \
                                       -Dexclude="**/ReplicatorTest.java,
                                                  
**/MessagePublishBufferThrottleTest.java,
                                                  **/TopicOwnerTest.java,
+                                                 
**/*StreamingDispatcher*Test.java,
                                                  
**/AntiAffinityNamespaceGroupTest.java"
 }
 
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/WaitingEntryCallBack.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/WaitingEntryCallBack.java
new file mode 100644
index 0000000..fc1f3b4
--- /dev/null
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/WaitingEntryCallBack.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger;
+
+/**
+ * Contains callback that can be registered with {@link ManagedLedger} to wait 
for new entries to be available.
+ */
+public interface WaitingEntryCallBack {
+
+    /**
+     * The callback {@link ManagedLedger} will trigger when new entries are 
available.
+     */
+    void entriesAvailable();
+}
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index a36b1c4..232bf5b 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -110,6 +110,7 @@ import 
org.apache.bookkeeper.mledger.ManagedLedgerException.NonRecoverableLedger
 import 
org.apache.bookkeeper.mledger.ManagedLedgerException.TooManyRequestsException;
 import org.apache.bookkeeper.mledger.ManagedLedgerMXBean;
 import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.WaitingEntryCallBack;
 import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.VoidCallback;
 import org.apache.bookkeeper.mledger.impl.MetaStore.MetaStoreCallback;
 import org.apache.bookkeeper.mledger.intercept.ManagedLedgerInterceptor;
@@ -170,6 +171,9 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
     // Cursors that are waiting to be notified when new entries are persisted
     final ConcurrentLinkedQueue<ManagedCursorImpl> waitingCursors;
 
+    // Objects that are waiting to be notified when new entries are persisted
+    final ConcurrentLinkedQueue<WaitingEntryCallBack> waitingEntryCallBacks;
+
     // This map is used for concurrent open cursor requests, where the 2nd 
request will attach a listener to the
     // uninitialized cursor future from the 1st request
     final Map<String, CompletableFuture<ManagedCursor>> uninitializedCursors;
@@ -290,6 +294,7 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
         this.mbean = new ManagedLedgerMBeanImpl(this);
         this.entryCache = factory.getEntryCacheManager().getEntryCache(this);
         this.waitingCursors = Queues.newConcurrentLinkedQueue();
+        this.waitingEntryCallBacks = Queues.newConcurrentLinkedQueue();
         this.uninitializedCursors = Maps.newHashMap();
         this.clock = config.getClock();
 
@@ -2109,6 +2114,21 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
         }
     }
 
+    void notifyWaitingEntryCallBacks() {
+        while (true) {
+            final WaitingEntryCallBack cb = waitingEntryCallBacks.poll();
+            if (cb == null) {
+                break;
+            }
+
+            executor.execute(safeRun(cb::entriesAvailable));
+        }
+    }
+
+    public void addWaitingEntryCallBack(WaitingEntryCallBack cb) {
+        this.waitingEntryCallBacks.add(cb);
+    }
+
     private void trimConsumedLedgersInBackground() {
         trimConsumedLedgersInBackground(Futures.NULL_PROMISE);
     }
@@ -3086,7 +3106,7 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
      *            the position to validate
      * @return true if the position is valid, false otherwise
      */
-    boolean isValidPosition(PositionImpl position) {
+    public boolean isValidPosition(PositionImpl position) {
         PositionImpl last = lastConfirmedEntry;
         if (log.isDebugEnabled()) {
             log.debug("IsValid position: {} -- last: {}", position, last);
@@ -3130,7 +3150,9 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
             next = getNextValidPositionInternal(position);
         } catch (NullPointerException e) {
             next = lastConfirmedEntry.getNext();
-            log.error("[{}] Can't find next valid position, fail back to the 
next position of the last position.", name, e);
+            if (log.isDebugEnabled()) {
+                log.debug("[{}] Can't find next valid position, fall back to 
the next position of the last position.", name, e);
+            }
         }
         return next;
     }
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
index 742ed1e..fc3054e 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
@@ -207,6 +207,7 @@ public class OpAddEntry extends SafeRunnable implements 
AddCallback, CloseCallba
                 cb.addComplete(lastEntry, data.asReadOnly(), ctx);
                 ReferenceCountUtil.release(data);
                 ml.notifyCursors();
+                ml.notifyWaitingEntryCallBacks();
                 this.recycle();
             } else {
                 ReferenceCountUtil.release(data);
@@ -232,6 +233,7 @@ public class OpAddEntry extends SafeRunnable implements 
AddCallback, CloseCallba
         if (cb != null) {
             cb.addComplete(PositionImpl.get(lh.getId(), entryId), null, ctx);
             ml.notifyCursors();
+            ml.notifyWaitingEntryCallBacks();
             this.recycle();
         }
     }
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 0d38279..44381bd 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -686,6 +686,13 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
     private boolean preciseDispatcherFlowControl = false;
 
     @FieldContext(
+        category = CATEGORY_SERVER,
+        doc = "Whether to use streaming read dispatcher. Currently is in 
preview and can be changed " +
+                "in subsequent release."
+    )
+    private boolean streamingDispatch = false;
+
+    @FieldContext(
         dynamic = true,
         category = CATEGORY_SERVER,
         doc = "Max number of concurrent lookup request broker allows to 
throttle heavy incoming lookup traffic")
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherMultipleConsumers.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherMultipleConsumers.java
index 7cd9e03..2f6b9a6 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherMultipleConsumers.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherMultipleConsumers.java
@@ -72,6 +72,8 @@ public abstract class AbstractDispatcherMultipleConsumers 
extends AbstractBaseDi
 
     public abstract boolean isConsumerAvailable(Consumer consumer);
 
+    protected void cancelPendingRead() {}
+
     /**
      * <pre>
      * Broker gives more priority while dispatching messages. Here, broker 
follows descending priorities. (eg:
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 26f328d..2816f9c 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -80,14 +80,14 @@ public class PersistentDispatcherMultipleConsumers extends 
AbstractDispatcherMul
     protected volatile Range<PositionImpl> 
lastIndividualDeletedRangeFromCursorRecovery;
 
     private CompletableFuture<Void> closeFuture = null;
-    LongPairSet messagesToRedeliver = new ConcurrentSortedLongPairSet(128, 2);
+    protected LongPairSet messagesToRedeliver = new 
ConcurrentSortedLongPairSet(128, 2);
     protected final RedeliveryTracker redeliveryTracker;
 
     private Optional<DelayedDeliveryTracker> delayedDeliveryTracker = 
Optional.empty();
 
-    private volatile boolean havePendingRead = false;
-    private volatile boolean havePendingReplayRead = false;
-    private boolean shouldRewindBeforeReadingOrReplaying = false;
+    protected volatile boolean havePendingRead = false;
+    protected volatile boolean havePendingReplayRead = false;
+    protected boolean shouldRewindBeforeReadingOrReplaying = false;
     protected final String name;
 
     protected static final 
AtomicIntegerFieldUpdater<PersistentDispatcherMultipleConsumers>
@@ -95,23 +95,23 @@ public class PersistentDispatcherMultipleConsumers extends 
AbstractDispatcherMul
             
AtomicIntegerFieldUpdater.newUpdater(PersistentDispatcherMultipleConsumers.class,
                     "totalAvailablePermits");
     protected volatile int totalAvailablePermits = 0;
-    private volatile int readBatchSize;
-    private final Backoff readFailureBackoff = new Backoff(15, 
TimeUnit.SECONDS,
+    protected volatile int readBatchSize;
+    protected final Backoff readFailureBackoff = new Backoff(15, 
TimeUnit.SECONDS,
             1, TimeUnit.MINUTES, 0, TimeUnit.MILLISECONDS);
     private static final 
AtomicIntegerFieldUpdater<PersistentDispatcherMultipleConsumers>
             TOTAL_UNACKED_MESSAGES_UPDATER =
             
AtomicIntegerFieldUpdater.newUpdater(PersistentDispatcherMultipleConsumers.class,
                     "totalUnackedMessages");
-    private volatile int totalUnackedMessages = 0;
+    protected volatile int totalUnackedMessages = 0;
     private volatile int blockedDispatcherOnUnackedMsgs = FALSE;
-    private static final 
AtomicIntegerFieldUpdater<PersistentDispatcherMultipleConsumers>
+    protected static final 
AtomicIntegerFieldUpdater<PersistentDispatcherMultipleConsumers>
             BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER =
             
AtomicIntegerFieldUpdater.newUpdater(PersistentDispatcherMultipleConsumers.class,
                     "blockedDispatcherOnUnackedMsgs");
     protected final ServiceConfiguration serviceConfig;
     protected Optional<DispatchRateLimiter> dispatchRateLimiter = 
Optional.empty();
 
-    enum ReadType {
+    protected enum ReadType {
         Normal, Replay
     }
 
@@ -199,9 +199,7 @@ public class PersistentDispatcherMultipleConsumers extends 
AbstractDispatcherMul
             consumerList.remove(consumer);
             log.info("Removed consumer {} with pending {} acks", consumer, 
consumer.getPendingAcks().size());
             if (consumerList.isEmpty()) {
-                if (havePendingRead && cursor.cancelPendingReadRequest()) {
-                    havePendingRead = false;
-                }
+                cancelPendingRead();
 
                 messagesToRedeliver.clear();
                 redeliveryTracker.clear();
@@ -248,81 +246,13 @@ public class PersistentDispatcherMultipleConsumers 
extends AbstractDispatcherMul
         // totalAvailablePermits may be updated by other threads
         int currentTotalAvailablePermits = totalAvailablePermits;
         if (currentTotalAvailablePermits > 0 && 
isAtleastOneConsumerAvailable()) {
-            int messagesToRead = Math.min(currentTotalAvailablePermits, 
readBatchSize);
-
-            Consumer c = getRandomConsumer();
-            // if turn on precise dispatcher flow control, adjust the record 
to read
-            if (c != null && c.isPreciseDispatcherFlowControl()) {
-                messagesToRead = Math.min(
-                        (int) Math.ceil(currentTotalAvailablePermits * 1.0 / 
c.getAvgMessagesPerEntry()),
-                        readBatchSize);
-            }
-
-            if (!isConsumerWritable()) {
-                // If the connection is not currently writable, we issue the 
read request anyway, but for a single
-                // message. The intent here is to keep use the request as a 
notification mechanism while avoiding to
-                // read and dispatch a big batch of messages which will need 
to wait before getting written to the
-                // socket.
-                messagesToRead = 1;
-            }
-
-            // throttle only if: (1) cursor is not active (or flag for 
throttle-nonBacklogConsumer is enabled) bcz
-            // active-cursor reads message from cache rather from bookkeeper 
(2) if topic has reached message-rate
-            // threshold: then schedule the read after MESSAGE_RATE_BACKOFF_MS
-            if 
(serviceConfig.isDispatchThrottlingOnNonBacklogConsumerEnabled() || 
!cursor.isActive()) {
-                if (topic.getDispatchRateLimiter().isPresent()
-                        && 
topic.getDispatchRateLimiter().get().isDispatchRateLimitingEnabled()) {
-                    DispatchRateLimiter topicRateLimiter = 
topic.getDispatchRateLimiter().get();
-                    if (!topicRateLimiter.hasMessageDispatchPermit()) {
-                        if (log.isDebugEnabled()) {
-                            log.debug("[{}] message-read exceeded topic 
message-rate {}/{}, schedule after a {}", name,
-                                    topicRateLimiter.getDispatchRateOnMsg(), 
topicRateLimiter.getDispatchRateOnByte(),
-                                    MESSAGE_RATE_BACKOFF_MS);
-                        }
-                        topic.getBrokerService().executor().schedule(() -> 
readMoreEntries(), MESSAGE_RATE_BACKOFF_MS,
-                                TimeUnit.MILLISECONDS);
-                        return;
-                    } else {
-                        // if dispatch-rate is in msg then read only msg 
according to available permit
-                        long availablePermitsOnMsg = 
topicRateLimiter.getAvailableDispatchRateLimitOnMsg();
-                        if (availablePermitsOnMsg > 0) {
-                            messagesToRead = Math.min(messagesToRead, (int) 
availablePermitsOnMsg);
-                        }
-                    }
-                }
-
-                if (dispatchRateLimiter.isPresent() && 
dispatchRateLimiter.get().isDispatchRateLimitingEnabled()) {
-                    if (!dispatchRateLimiter.get().hasMessageDispatchPermit()) 
{
-                        if (log.isDebugEnabled()) {
-                            log.debug("[{}] message-read exceeded subscription 
message-rate {}/{},"
-                                            + " schedule after a {}", name,
-                                    
dispatchRateLimiter.get().getDispatchRateOnMsg(),
-                                    
dispatchRateLimiter.get().getDispatchRateOnByte(),
-                                    MESSAGE_RATE_BACKOFF_MS);
-                        }
-                        topic.getBrokerService().executor().schedule(() -> 
readMoreEntries(), MESSAGE_RATE_BACKOFF_MS,
-                            TimeUnit.MILLISECONDS);
-                        return;
-                    } else {
-                        // if dispatch-rate is in msg then read only msg 
according to available permit
-                        long availablePermitsOnMsg = 
dispatchRateLimiter.get().getAvailableDispatchRateLimitOnMsg();
-                        if (availablePermitsOnMsg > 0) {
-                            messagesToRead = Math.min(messagesToRead, (int) 
availablePermitsOnMsg);
-                        }
-                    }
-                }
+            int messagesToRead = 
calculateNumOfMessageToRead(currentTotalAvailablePermits);
 
-            }
-
-            if (havePendingReplayRead) {
-                if (log.isDebugEnabled()) {
-                    log.debug("[{}] Skipping replay while awaiting previous 
read to complete", name);
-                }
+            if (-1 == messagesToRead) {
+                // Skip read as topic/dispatcher has exceed the dispatch rate 
or previous pending read hasn't complete.
                 return;
             }
 
-            // If messagesToRead is 0 or less, correct it to 1 to prevent 
IllegalArgumentException
-            messagesToRead = Math.max(messagesToRead, 1);
             Set<PositionImpl> messagesToReplayNow = 
getMessagesToReplayNow(messagesToRead);
 
             if (!messagesToReplayNow.isEmpty()) {
@@ -366,6 +296,84 @@ public class PersistentDispatcherMultipleConsumers extends 
AbstractDispatcherMul
         }
     }
 
+    protected int calculateNumOfMessageToRead(int 
currentTotalAvailablePermits) {
+        int messagesToRead = Math.min(currentTotalAvailablePermits, 
readBatchSize);
+
+        Consumer c = getRandomConsumer();
+        // if turn on precise dispatcher flow control, adjust the record to 
read
+        if (c != null && c.isPreciseDispatcherFlowControl()) {
+            messagesToRead = Math.min(
+                    (int) Math.ceil(currentTotalAvailablePermits * 1.0 / 
c.getAvgMessagesPerEntry()),
+                    readBatchSize);
+        }
+
+        if (!isConsumerWritable()) {
+            // If the connection is not currently writable, we issue the read 
request anyway, but for a single
+            // message. The intent here is to keep use the request as a 
notification mechanism while avoiding to
+            // read and dispatch a big batch of messages which will need to 
wait before getting written to the
+            // socket.
+            messagesToRead = 1;
+        }
+
+        // throttle only if: (1) cursor is not active (or flag for 
throttle-nonBacklogConsumer is enabled) bcz
+        // active-cursor reads message from cache rather from bookkeeper (2) 
if topic has reached message-rate
+        // threshold: then schedule the read after MESSAGE_RATE_BACKOFF_MS
+        if (serviceConfig.isDispatchThrottlingOnNonBacklogConsumerEnabled() || 
!cursor.isActive()) {
+            if (topic.getDispatchRateLimiter().isPresent()
+                    && 
topic.getDispatchRateLimiter().get().isDispatchRateLimitingEnabled()) {
+                DispatchRateLimiter topicRateLimiter = 
topic.getDispatchRateLimiter().get();
+                if (!topicRateLimiter.hasMessageDispatchPermit()) {
+                    if (log.isDebugEnabled()) {
+                        log.debug("[{}] message-read exceeded topic 
message-rate {}/{}, schedule after a {}", name,
+                                topicRateLimiter.getDispatchRateOnMsg(), 
topicRateLimiter.getDispatchRateOnByte(),
+                                MESSAGE_RATE_BACKOFF_MS);
+                    }
+                    topic.getBrokerService().executor().schedule(() -> 
readMoreEntries(), MESSAGE_RATE_BACKOFF_MS,
+                            TimeUnit.MILLISECONDS);
+                    return -1;
+                } else {
+                    // if dispatch-rate is in msg then read only msg according 
to available permit
+                    long availablePermitsOnMsg = 
topicRateLimiter.getAvailableDispatchRateLimitOnMsg();
+                    if (availablePermitsOnMsg > 0) {
+                        messagesToRead = Math.min(messagesToRead, (int) 
availablePermitsOnMsg);
+                    }
+                }
+            }
+
+            if (dispatchRateLimiter.isPresent() && 
dispatchRateLimiter.get().isDispatchRateLimitingEnabled()) {
+                if (!dispatchRateLimiter.get().hasMessageDispatchPermit()) {
+                    if (log.isDebugEnabled()) {
+                        log.debug("[{}] message-read exceeded subscription 
message-rate {}/{},"
+                                        + " schedule after a {}", name,
+                                
dispatchRateLimiter.get().getDispatchRateOnMsg(),
+                                
dispatchRateLimiter.get().getDispatchRateOnByte(),
+                                MESSAGE_RATE_BACKOFF_MS);
+                    }
+                    topic.getBrokerService().executor().schedule(() -> 
readMoreEntries(), MESSAGE_RATE_BACKOFF_MS,
+                            TimeUnit.MILLISECONDS);
+                    return -1;
+                } else {
+                    // if dispatch-rate is in msg then read only msg according 
to available permit
+                    long availablePermitsOnMsg = 
dispatchRateLimiter.get().getAvailableDispatchRateLimitOnMsg();
+                    if (availablePermitsOnMsg > 0) {
+                        messagesToRead = Math.min(messagesToRead, (int) 
availablePermitsOnMsg);
+                    }
+                }
+            }
+
+        }
+
+        if (havePendingReplayRead) {
+            if (log.isDebugEnabled()) {
+                log.debug("[{}] Skipping replay while awaiting previous read 
to complete", name);
+            }
+            return -1;
+        }
+
+        // If messagesToRead is 0 or less, correct it to 1 to prevent 
IllegalArgumentException
+        return Math.max(messagesToRead, 1);
+    }
+
     protected Set<? extends Position> asyncReplayEntries(Set<? extends 
Position> positions) {
         return cursor.asyncReplayEntries(positions, this, ReadType.Replay);
     }
@@ -413,14 +421,19 @@ public class PersistentDispatcherMultipleConsumers 
extends AbstractDispatcherMul
             closeFuture.complete(null);
         } else {
             consumerList.forEach(consumer -> 
consumer.disconnect(isResetCursor));
-            if (havePendingRead && cursor.cancelPendingReadRequest()) {
-                havePendingRead = false;
-            }
+            cancelPendingRead();
         }
         return closeFuture;
     }
 
     @Override
+    protected void cancelPendingRead() {
+        if (havePendingRead && cursor.cancelPendingReadRequest()) {
+            havePendingRead = false;
+        }
+    }
+
+    @Override
     public CompletableFuture<Void> disconnectActiveConsumers(boolean 
isResetCursor) {
         return disconnectAllConsumers(isResetCursor);
     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
index 5501a81..4bc0728 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
@@ -58,20 +58,20 @@ import org.apache.pulsar.common.util.Codec;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public final class PersistentDispatcherSingleActiveConsumer extends 
AbstractDispatcherSingleActiveConsumer
+public class PersistentDispatcherSingleActiveConsumer extends 
AbstractDispatcherSingleActiveConsumer
         implements Dispatcher, ReadEntriesCallback {
 
-    private final PersistentTopic topic;
-    private final ManagedCursor cursor;
-    private final String name;
+    protected final PersistentTopic topic;
+    protected final ManagedCursor cursor;
+    protected final String name;
     private Optional<DispatchRateLimiter> dispatchRateLimiter = 
Optional.empty();
 
-    private volatile boolean havePendingRead = false;
+    protected volatile boolean havePendingRead = false;
 
-    private volatile int readBatchSize;
-    private final Backoff readFailureBackoff = new Backoff(15, 
TimeUnit.SECONDS,
+    protected volatile int readBatchSize;
+    protected final Backoff readFailureBackoff = new Backoff(15, 
TimeUnit.SECONDS,
             1, TimeUnit.MINUTES, 0, TimeUnit.MILLISECONDS);
-    private final ServiceConfiguration serviceConfig;
+    protected final ServiceConfiguration serviceConfig;
     private volatile ScheduledFuture<?> readOnActiveConsumerTask = null;
 
     private final RedeliveryTracker redeliveryTracker;
@@ -90,9 +90,7 @@ public final class PersistentDispatcherSingleActiveConsumer 
extends AbstractDisp
     }
 
     protected void scheduleReadOnActiveConsumer() {
-        if (havePendingRead && cursor.cancelPendingReadRequest()) {
-            havePendingRead = false;
-        }
+        cancelPendingRead();
 
         if (havePendingRead) {
             return;
@@ -166,6 +164,7 @@ public final class PersistentDispatcherSingleActiveConsumer 
extends AbstractDisp
         return false;
     }
 
+    @Override
     protected void cancelPendingRead() {
         if (havePendingRead && cursor.cancelPendingReadRequest()) {
             havePendingRead = false;
@@ -230,45 +229,48 @@ public final class 
PersistentDispatcherSingleActiveConsumer extends AbstractDisp
             SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal();
             EntryBatchIndexesAcks batchIndexesAcks = 
EntryBatchIndexesAcks.get(entries.size());
             filterEntriesForConsumer(entries, batchSizes, sendMessageInfo, 
batchIndexesAcks, cursor, false);
+            dispatchEntriesToConsumer(currentConsumer, entries, batchSizes, 
batchIndexesAcks, sendMessageInfo);
+        }
+    }
 
-            int totalMessages = sendMessageInfo.getTotalMessages();
-            long totalBytes = sendMessageInfo.getTotalBytes();
-
-            currentConsumer
-                    .sendMessages(entries, batchSizes, batchIndexesAcks, 
sendMessageInfo.getTotalMessages(),
-                            sendMessageInfo.getTotalBytes(), 
sendMessageInfo.getTotalChunkedMessages(),
-                            redeliveryTracker)
-                    .addListener(future -> {
-                        if (future.isSuccess()) {
-                            // acquire message-dispatch permits for already 
delivered messages
-                            if 
(serviceConfig.isDispatchThrottlingOnNonBacklogConsumerEnabled() || 
!cursor.isActive()) {
-                                if 
(topic.getDispatchRateLimiter().isPresent()) {
-                                    
topic.getDispatchRateLimiter().get().tryDispatchPermit(totalMessages, 
totalBytes);
-                                }
+    protected void dispatchEntriesToConsumer(Consumer currentConsumer, 
List<Entry> entries,
+                                             EntryBatchSizes batchSizes, 
EntryBatchIndexesAcks batchIndexesAcks,
+                                             SendMessageInfo sendMessageInfo) {
+        currentConsumer
+            .sendMessages(entries, batchSizes, batchIndexesAcks, 
sendMessageInfo.getTotalMessages(),
+                    sendMessageInfo.getTotalBytes(), 
sendMessageInfo.getTotalChunkedMessages(),
+                    redeliveryTracker)
+            .addListener(future -> {
+                if (future.isSuccess()) {
+                    // acquire message-dispatch permits for already delivered 
messages
+                    if 
(serviceConfig.isDispatchThrottlingOnNonBacklogConsumerEnabled() || 
!cursor.isActive()) {
+                        if (topic.getDispatchRateLimiter().isPresent()) {
+                            
topic.getDispatchRateLimiter().get().tryDispatchPermit(sendMessageInfo.getTotalMessages(),
+                                    sendMessageInfo.getTotalBytes());
+                        }
 
-                                dispatchRateLimiter.ifPresent(rateLimiter ->
-                                        
rateLimiter.tryDispatchPermit(totalMessages, totalBytes));
-                            }
+                        dispatchRateLimiter.ifPresent(rateLimiter ->
+                                
rateLimiter.tryDispatchPermit(sendMessageInfo.getTotalMessages(),
+                                        sendMessageInfo.getTotalBytes()));
+                    }
 
-                            // Schedule a new read batch operation only after 
the previous batch has been written to the
-                            // socket
-                            
topic.getBrokerService().getTopicOrderedExecutor().executeOrdered(topicName,
-                                    SafeRun.safeRun(() -> {
-                                        synchronized 
(PersistentDispatcherSingleActiveConsumer.this) {
-                                            Consumer newConsumer = 
getActiveConsumer();
-                                            if (newConsumer != null && 
!havePendingRead) {
-                                                readMoreEntries(newConsumer);
-                                            } else {
-                                                log.debug(
-                                                        "[{}-{}] Ignoring 
write future complete."
-                                                                + " 
consumerAvailable={} havePendingRead={}",
-                                                        name, newConsumer, 
newConsumer != null, havePendingRead);
-                                            }
-                                        }
-                                    }));
-                        }
-                    });
-        }
+                    // Schedule a new read batch operation only after the 
previous batch has been written to the socket.
+                    
topic.getBrokerService().getTopicOrderedExecutor().executeOrdered(topicName,
+                        SafeRun.safeRun(() -> {
+                            synchronized 
(PersistentDispatcherSingleActiveConsumer.this) {
+                                Consumer newConsumer = getActiveConsumer();
+                                if (newConsumer != null && !havePendingRead) {
+                                    readMoreEntries(newConsumer);
+                                } else {
+                                    log.debug(
+                                            "[{}-{}] Ignoring write future 
complete."
+                                                    + " consumerAvailable={} 
havePendingRead={}",
+                                            name, newConsumer, newConsumer != 
null, havePendingRead);
+                                }
+                            }
+                        }));
+                }
+            });
     }
 
     @Override
@@ -322,9 +324,7 @@ public final class PersistentDispatcherSingleActiveConsumer 
extends AbstractDisp
             return;
         }
 
-        if (havePendingRead && cursor.cancelPendingReadRequest()) {
-            havePendingRead = false;
-        }
+        cancelPendingRead();
 
         if (!havePendingRead) {
             cursor.rewind();
@@ -354,93 +354,14 @@ public final class 
PersistentDispatcherSingleActiveConsumer extends AbstractDisp
             return;
         }
 
-        int availablePermits = consumer.getAvailablePermits();
-
-        if (availablePermits > 0) {
-            if (!consumer.isWritable()) {
-                // If the connection is not currently writable, we issue the 
read request anyway, but for a single
-                // message. The intent here is to keep use the request as a 
notification mechanism while avoiding to
-                // read and dispatch a big batch of messages which will need 
to wait before getting written to the
-                // socket.
-                availablePermits = 1;
-            }
+        if (consumer.getAvailablePermits() > 0) {
+            int messagesToRead = calculateNumOfMessageToRead(consumer);
 
-            int messagesToRead = Math.min(availablePermits, readBatchSize);
-            // if turn of precise dispatcher flow control, adjust the records 
to read
-            if (consumer.isPreciseDispatcherFlowControl()) {
-                int avgMessagesPerEntry = consumer.getAvgMessagesPerEntry();
-                messagesToRead = Math.min((int) Math.ceil(availablePermits * 
1.0 / avgMessagesPerEntry), readBatchSize);
+            if (-1 == messagesToRead) {
+                // Skip read as topic/dispatcher has exceed the dispatch rate.
+                return;
             }
 
-            // throttle only if: (1) cursor is not active (or flag for 
throttle-nonBacklogConsumer is enabled) bcz
-            // active-cursor reads message from cache rather from bookkeeper 
(2) if topic has reached message-rate
-            // threshold: then schedule the read after MESSAGE_RATE_BACKOFF_MS
-            if 
(serviceConfig.isDispatchThrottlingOnNonBacklogConsumerEnabled() || 
!cursor.isActive()) {
-                if (topic.getDispatchRateLimiter().isPresent()
-                        && 
topic.getDispatchRateLimiter().get().isDispatchRateLimitingEnabled()) {
-                    DispatchRateLimiter topicRateLimiter = 
topic.getDispatchRateLimiter().get();
-                    if (!topicRateLimiter.hasMessageDispatchPermit()) {
-                        if (log.isDebugEnabled()) {
-                            log.debug("[{}] message-read exceeded topic 
message-rate {}/{}, schedule after a {}", name,
-                                topicRateLimiter.getDispatchRateOnMsg(), 
topicRateLimiter.getDispatchRateOnByte(),
-                                MESSAGE_RATE_BACKOFF_MS);
-                        }
-                        topic.getBrokerService().executor().schedule(() -> {
-                            Consumer currentConsumer = 
ACTIVE_CONSUMER_UPDATER.get(this);
-                            if (currentConsumer != null && !havePendingRead) {
-                                readMoreEntries(currentConsumer);
-                            } else {
-                                if (log.isDebugEnabled()) {
-                                    log.debug("[{}] Skipping read retry for 
topic: Current Consumer {},"
-                                                    + " havePendingRead {}",
-                                            topic.getName(), currentConsumer, 
havePendingRead);
-                                }
-                            }
-                        }, MESSAGE_RATE_BACKOFF_MS, TimeUnit.MILLISECONDS);
-                        return;
-                    } else {
-                        // if dispatch-rate is in msg then read only msg 
according to available permit
-                        long availablePermitsOnMsg = 
topicRateLimiter.getAvailableDispatchRateLimitOnMsg();
-                        if (availablePermitsOnMsg > 0) {
-                            messagesToRead = Math.min(messagesToRead, (int) 
availablePermitsOnMsg);
-                        }
-                    }
-                }
-
-                if (dispatchRateLimiter.isPresent() && 
dispatchRateLimiter.get().isDispatchRateLimitingEnabled()) {
-                    if (!dispatchRateLimiter.get().hasMessageDispatchPermit()) 
{
-                        if (log.isDebugEnabled()) {
-                            log.debug("[{}] message-read exceeded subscription 
message-rate {}/{},"
-                                            + " schedule after a {}",
-                                    name, 
dispatchRateLimiter.get().getDispatchRateOnMsg(),
-                                    
dispatchRateLimiter.get().getDispatchRateOnByte(),
-                                    MESSAGE_RATE_BACKOFF_MS);
-                        }
-                        topic.getBrokerService().executor().schedule(() -> {
-                            Consumer currentConsumer = 
ACTIVE_CONSUMER_UPDATER.get(this);
-                            if (currentConsumer != null && !havePendingRead) {
-                                readMoreEntries(currentConsumer);
-                            } else {
-                                if (log.isDebugEnabled()) {
-                                    log.debug("[{}] Skipping read retry: 
Current Consumer {}, havePendingRead {}",
-                                        topic.getName(), currentConsumer, 
havePendingRead);
-                                }
-                            }
-                        }, MESSAGE_RATE_BACKOFF_MS, TimeUnit.MILLISECONDS);
-                        return;
-                    } else {
-                        // if dispatch-rate is in msg then read only msg 
according to available permit
-                        long subPermitsOnMsg = 
dispatchRateLimiter.get().getAvailableDispatchRateLimitOnMsg();
-                        if (subPermitsOnMsg > 0) {
-                            messagesToRead = Math.min(messagesToRead, (int) 
subPermitsOnMsg);
-                        }
-                    }
-                }
-            }
-
-            // If messagesToRead is 0 or less, correct it to 1 to prevent 
IllegalArgumentException
-            messagesToRead = Math.max(messagesToRead, 1);
-
             // Schedule read
             if (log.isDebugEnabled()) {
                 log.debug("[{}-{}] Schedule read of {} messages", name, 
consumer, messagesToRead);
@@ -459,6 +380,93 @@ public final class 
PersistentDispatcherSingleActiveConsumer extends AbstractDisp
         }
     }
 
+    protected int calculateNumOfMessageToRead(Consumer consumer) {
+        int availablePermits = consumer.getAvailablePermits();
+        if (!consumer.isWritable()) {
+            // If the connection is not currently writable, we issue the read 
request anyway, but for a single
+            // message. The intent here is to keep use the request as a 
notification mechanism while avoiding to
+            // read and dispatch a big batch of messages which will need to 
wait before getting written to the
+            // socket.
+            availablePermits = 1;
+        }
+
+        int messagesToRead = Math.min(availablePermits, readBatchSize);
+        // if turn of precise dispatcher flow control, adjust the records to 
read
+        if (consumer.isPreciseDispatcherFlowControl()) {
+            int avgMessagesPerEntry = consumer.getAvgMessagesPerEntry();
+            messagesToRead = Math.min((int) Math.ceil(availablePermits * 1.0 / 
avgMessagesPerEntry), readBatchSize);
+        }
+
+        // throttle only if: (1) cursor is not active (or flag for 
throttle-nonBacklogConsumer is enabled) bcz
+        // active-cursor reads message from cache rather from bookkeeper (2) 
if topic has reached message-rate
+        // threshold: then schedule the read after MESSAGE_RATE_BACKOFF_MS
+        if (serviceConfig.isDispatchThrottlingOnNonBacklogConsumerEnabled() || 
!cursor.isActive()) {
+            if (topic.getDispatchRateLimiter().isPresent()
+                    && 
topic.getDispatchRateLimiter().get().isDispatchRateLimitingEnabled()) {
+                DispatchRateLimiter topicRateLimiter = 
topic.getDispatchRateLimiter().get();
+                if (!topicRateLimiter.hasMessageDispatchPermit()) {
+                    if (log.isDebugEnabled()) {
+                        log.debug("[{}] message-read exceeded topic 
message-rate {}/{}, schedule after a {}", name,
+                                topicRateLimiter.getDispatchRateOnMsg(), 
topicRateLimiter.getDispatchRateOnByte(),
+                                MESSAGE_RATE_BACKOFF_MS);
+                    }
+                    topic.getBrokerService().executor().schedule(() -> {
+                        Consumer currentConsumer = 
ACTIVE_CONSUMER_UPDATER.get(this);
+                        if (currentConsumer != null && !havePendingRead) {
+                            readMoreEntries(currentConsumer);
+                        } else {
+                            if (log.isDebugEnabled()) {
+                                log.debug("[{}] Skipping read retry for topic: 
Current Consumer {},"
+                                                + " havePendingRead {}",
+                                        topic.getName(), currentConsumer, 
havePendingRead);
+                            }
+                        }
+                    }, MESSAGE_RATE_BACKOFF_MS, TimeUnit.MILLISECONDS);
+                    return -1;
+                } else {
+                    // if dispatch-rate is in msg then read only msg according 
to available permit
+                    long availablePermitsOnMsg = 
topicRateLimiter.getAvailableDispatchRateLimitOnMsg();
+                    if (availablePermitsOnMsg > 0) {
+                        messagesToRead = Math.min(messagesToRead, (int) 
availablePermitsOnMsg);
+                    }
+                }
+            }
+
+            if (dispatchRateLimiter.isPresent() && 
dispatchRateLimiter.get().isDispatchRateLimitingEnabled()) {
+                if (!dispatchRateLimiter.get().hasMessageDispatchPermit()) {
+                    if (log.isDebugEnabled()) {
+                        log.debug("[{}] message-read exceeded subscription 
message-rate {}/{},"
+                                        + " schedule after a {}",
+                                name, 
dispatchRateLimiter.get().getDispatchRateOnMsg(),
+                                
dispatchRateLimiter.get().getDispatchRateOnByte(),
+                                MESSAGE_RATE_BACKOFF_MS);
+                    }
+                    topic.getBrokerService().executor().schedule(() -> {
+                        Consumer currentConsumer = 
ACTIVE_CONSUMER_UPDATER.get(this);
+                        if (currentConsumer != null && !havePendingRead) {
+                            readMoreEntries(currentConsumer);
+                        } else {
+                            if (log.isDebugEnabled()) {
+                                log.debug("[{}] Skipping read retry: Current 
Consumer {}, havePendingRead {}",
+                                        topic.getName(), currentConsumer, 
havePendingRead);
+                            }
+                        }
+                    }, MESSAGE_RATE_BACKOFF_MS, TimeUnit.MILLISECONDS);
+                    return -1;
+                } else {
+                    // if dispatch-rate is in msg then read only msg according 
to available permit
+                    long subPermitsOnMsg = 
dispatchRateLimiter.get().getAvailableDispatchRateLimitOnMsg();
+                    if (subPermitsOnMsg > 0) {
+                        messagesToRead = Math.min(messagesToRead, (int) 
subPermitsOnMsg);
+                    }
+                }
+            }
+        }
+
+        // If messagesToRead is 0 or less, correct it to 1 to prevent 
IllegalArgumentException
+        return Math.max(messagesToRead, 1);
+    }
+
     @Override
     public void readEntriesFailed(ManagedLedgerException exception, Object 
ctx) {
         
topic.getBrokerService().getTopicOrderedExecutor().executeOrdered(topicName, 
SafeRun.safeRun(() -> {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherMultipleConsumers.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherMultipleConsumers.java
new file mode 100644
index 0000000..9340e17
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherMultipleConsumers.java
@@ -0,0 +1,191 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.persistent;
+
+import com.google.common.collect.Lists;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedCursor;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.bookkeeper.mledger.util.SafeRun;
+import org.apache.pulsar.broker.service.Consumer;
+import org.apache.pulsar.broker.service.Subscription;
+import 
org.apache.pulsar.broker.service.streamingdispatch.PendingReadEntryRequest;
+import org.apache.pulsar.broker.service.streamingdispatch.StreamingDispatcher;
+import org.apache.pulsar.broker.service.streamingdispatch.StreamingEntryReader;
+
+/**
+ * A {@link PersistentDispatcherMultipleConsumers} implemented {@link 
StreamingDispatcher}.
+ * It'll use {@link StreamingEntryReader} to read new entries instead read as 
micro batch from managed ledger.
+ */
+@Slf4j
+public class PersistentStreamingDispatcherMultipleConsumers extends 
PersistentDispatcherMultipleConsumers
+    implements StreamingDispatcher {
+
+    private final StreamingEntryReader streamingEntryReader = new 
StreamingEntryReader((ManagedCursorImpl) cursor,
+            this, topic);
+
+    public PersistentStreamingDispatcherMultipleConsumers(PersistentTopic 
topic, ManagedCursor cursor,
+                                                          Subscription 
subscription) {
+        super(topic, cursor, subscription);
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public synchronized void readEntryComplete(Entry entry, 
PendingReadEntryRequest ctx) {
+
+        ReadType readType = (ReadType) ctx.ctx;
+        if (ctx.isLast()) {
+            readFailureBackoff.reduceToHalf();
+            if (readType == ReadType.Normal) {
+                havePendingRead = false;
+            } else {
+                havePendingReplayRead = false;
+            }
+        }
+
+        if (readBatchSize < serviceConfig.getDispatcherMaxReadBatchSize()) {
+            int newReadBatchSize = Math.min(readBatchSize * 2, 
serviceConfig.getDispatcherMaxReadBatchSize());
+            if (log.isDebugEnabled()) {
+                log.debug("[{}] Increasing read batch size from {} to {}", 
name, readBatchSize, newReadBatchSize);
+            }
+            readBatchSize = newReadBatchSize;
+        }
+
+        if (shouldRewindBeforeReadingOrReplaying && readType == 
ReadType.Normal) {
+            // All consumers got disconnected before the completion of the 
read operation
+            entry.release();
+            cursor.rewind();
+            shouldRewindBeforeReadingOrReplaying = false;
+            readMoreEntries();
+            return;
+        }
+
+        if (log.isDebugEnabled()) {
+            log.debug("[{}] Distributing a messages to {} consumers", name, 
consumerList.size());
+        }
+
+        cursor.seek(((ManagedLedgerImpl) cursor.getManagedLedger())
+                .getNextValidPosition((PositionImpl) entry.getPosition()));
+        sendMessagesToConsumers(readType, Lists.newArrayList(entry));
+        ctx.recycle();
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void canReadMoreEntries(boolean withBackoff) {
+        havePendingRead = false;
+        topic.getBrokerService().executor().schedule(() -> {
+        
topic.getBrokerService().getTopicOrderedExecutor().executeOrdered(topic.getName(),
 SafeRun.safeRun(() -> {
+                synchronized 
(PersistentStreamingDispatcherMultipleConsumers.this) {
+                    if (!havePendingRead) {
+                        log.info("[{}] Scheduling read operation", name);
+                        readMoreEntries();
+                    } else {
+                        log.info("[{}] Skipping read since we have 
pendingRead", name);
+                    }
+                }
+            }));
+        }, withBackoff
+                ? readFailureBackoff.next() : 0, TimeUnit.MILLISECONDS);
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void notifyConsumersEndOfTopic() {
+        if (cursor.getNumberOfEntriesInBacklog(false) == 0) {
+            // Topic has been terminated and there are no more entries to read
+            // Notify the consumer only if all the messages were already 
acknowledged
+            consumerList.forEach(Consumer::reachedEndOfTopic);
+        }
+    }
+
+    @Override
+    protected void cancelPendingRead() {
+        if (havePendingRead && streamingEntryReader.cancelReadRequests()) {
+            havePendingRead = false;
+        }
+    }
+
+    @Override
+    public void readMoreEntries() {
+        // totalAvailablePermits may be updated by other threads
+        int currentTotalAvailablePermits = totalAvailablePermits;
+        if (currentTotalAvailablePermits > 0 && 
isAtleastOneConsumerAvailable()) {
+            int messagesToRead = 
calculateNumOfMessageToRead(currentTotalAvailablePermits);
+
+            if (-1 == messagesToRead) {
+                // Skip read as topic/dispatcher has exceed the dispatch rate 
or previous pending read hasn't complete.
+                return;
+            }
+
+            Set<PositionImpl> messagesToReplayNow = 
getMessagesToReplayNow(messagesToRead);
+
+            if (!messagesToReplayNow.isEmpty()) {
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}] Schedule replay of {} messages for {} 
consumers", name, messagesToReplayNow.size(),
+                            consumerList.size());
+                }
+
+                havePendingReplayRead = true;
+                Set<? extends Position> deletedMessages = 
topic.isDelayedDeliveryEnabled()
+                        ? asyncReplayEntriesInOrder(messagesToReplayNow) : 
asyncReplayEntries(messagesToReplayNow);
+                // clear already acked positions from replay bucket
+
+                deletedMessages.forEach(position -> 
messagesToRedeliver.remove(((PositionImpl) position).getLedgerId(),
+                        ((PositionImpl) position).getEntryId()));
+                // if all the entries are acked-entries and cleared up from 
messagesToRedeliver, try to read
+                // next entries as readCompletedEntries-callback was never 
called
+                if ((messagesToReplayNow.size() - deletedMessages.size()) == 
0) {
+                    havePendingReplayRead = false;
+                    readMoreEntries();
+                }
+            } else if (BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER.get(this) == 
TRUE) {
+                log.warn("[{}] Dispatcher read is blocked due to unackMessages 
{} reached to max {}", name,
+                        totalUnackedMessages, 
topic.getMaxUnackedMessagesOnSubscription());
+            } else if (!havePendingRead) {
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}] Schedule read of {} messages for {} 
consumers", name, messagesToRead,
+                            consumerList.size());
+                }
+                havePendingRead = true;
+                streamingEntryReader.asyncReadEntries(messagesToRead, 
serviceConfig.getDispatcherMaxReadSizeBytes(),
+                        ReadType.Normal);
+            } else {
+                log.debug("[{}] Cannot schedule next read until previous one 
is done", name);
+            }
+        } else {
+            if (log.isDebugEnabled()) {
+                log.debug("[{}] Consumer buffer is full, pause reading", name);
+            }
+        }
+    }
+
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherSingleActiveConsumer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherSingleActiveConsumer.java
new file mode 100644
index 0000000..b4e4ed3
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherSingleActiveConsumer.java
@@ -0,0 +1,208 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.persistent;
+
+import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
+import com.google.common.collect.Lists;
+import java.util.concurrent.TimeUnit;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedCursor;
+import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.bookkeeper.mledger.util.SafeRun;
+import org.apache.pulsar.broker.service.Consumer;
+import org.apache.pulsar.broker.service.EntryBatchIndexesAcks;
+import org.apache.pulsar.broker.service.EntryBatchSizes;
+import org.apache.pulsar.broker.service.SendMessageInfo;
+import org.apache.pulsar.broker.service.Subscription;
+import 
org.apache.pulsar.broker.service.streamingdispatch.PendingReadEntryRequest;
+import org.apache.pulsar.broker.service.streamingdispatch.StreamingDispatcher;
+import org.apache.pulsar.broker.service.streamingdispatch.StreamingEntryReader;
+import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
+
+/**
+ * A {@link PersistentDispatcherSingleActiveConsumer} implemented {@link 
StreamingDispatcher}.
+ * It'll use {@link StreamingEntryReader} to read new entries instead read as 
micro batch from managed ledger.
+ */
+@Slf4j
+public class PersistentStreamingDispatcherSingleActiveConsumer extends 
PersistentDispatcherSingleActiveConsumer
+        implements StreamingDispatcher {
+
+    private final StreamingEntryReader streamingEntryReader = new 
StreamingEntryReader((ManagedCursorImpl) cursor,
+            this, topic);
+
+    public PersistentStreamingDispatcherSingleActiveConsumer(ManagedCursor 
cursor, SubType subscriptionType,
+                                                             int 
partitionIndex, PersistentTopic topic,
+                                                             Subscription 
subscription) {
+        super(cursor, subscriptionType, partitionIndex, topic, subscription);
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void canReadMoreEntries(boolean withBackoff) {
+        havePendingRead = false;
+        topic.getBrokerService().executor().schedule(() -> {
+            
topic.getBrokerService().getTopicOrderedExecutor().executeOrdered(topicName, 
SafeRun.safeRun(() -> {
+                synchronized 
(PersistentStreamingDispatcherSingleActiveConsumer.this) {
+                    Consumer currentConsumer = 
ACTIVE_CONSUMER_UPDATER.get(this);
+                    if (currentConsumer != null && !havePendingRead) {
+                        if (log.isDebugEnabled()) {
+                            log.debug("[{}-{}] Scheduling read ", name, 
currentConsumer);
+                        }
+                        readMoreEntries(currentConsumer);
+                    } else {
+                        log.info("[{}-{}] Skipping read as we still 
havePendingRead {}", name,
+                                currentConsumer);
+                    }
+                }
+            }));
+        }, withBackoff
+                ? readFailureBackoff.next() : 0, TimeUnit.MILLISECONDS);
+    }
+
+    @Override
+    protected void cancelPendingRead() {
+        if (havePendingRead && streamingEntryReader.cancelReadRequests()) {
+            havePendingRead = false;
+        }
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public synchronized void notifyConsumersEndOfTopic() {
+        if (cursor.getNumberOfEntriesInBacklog(false) == 0) {
+            // Topic has been terminated and there are no more entries to read
+            // Notify the consumer only if all the messages were already 
acknowledged
+            consumers.forEach(Consumer::reachedEndOfTopic);
+        }
+    }
+
+    @Override
+    public String getName() {
+        return name;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void readEntryComplete(Entry entry, PendingReadEntryRequest ctx) {
+        
topic.getBrokerService().getTopicOrderedExecutor().executeOrdered(name, 
safeRun(() -> {
+            internalReadEntryComplete(entry, ctx);
+        }));
+    }
+
+    public synchronized void internalReadEntryComplete(Entry entry, 
PendingReadEntryRequest ctx) {
+        if (ctx.isLast()) {
+            readFailureBackoff.reduceToHalf();
+            havePendingRead = false;
+        }
+
+        if (readBatchSize < serviceConfig.getDispatcherMaxReadBatchSize()) {
+            int newReadBatchSize = Math.min(readBatchSize * 2, 
serviceConfig.getDispatcherMaxReadBatchSize());
+            if (log.isDebugEnabled()) {
+                log.debug("[{}-{}] Increasing read batch size from {} to {}", 
name,
+                        ((Consumer) ctx.ctx).consumerName(), readBatchSize, 
newReadBatchSize);
+            }
+            readBatchSize = newReadBatchSize;
+        }
+
+        Consumer currentConsumer = ACTIVE_CONSUMER_UPDATER.get(this);
+
+        if (isKeyHashRangeFiltered) {
+            byte[] key = peekStickyKey(entry.getDataBuffer());
+            Consumer consumer = stickyKeyConsumerSelector.select(key);
+            // Skip the entry if it's not for current active consumer.
+            if (consumer == null || currentConsumer != consumer) {
+                entry.release();
+                return;
+            }
+        }
+        Consumer consumer = (Consumer) ctx.ctx;
+        ctx.recycle();
+        if (currentConsumer == null || consumer != currentConsumer) {
+            // Active consumer has changed since the read request has been 
issued. We need to rewind the cursor and
+            // re-issue the read request for the new consumer
+            if (log.isDebugEnabled()) {
+                log.debug("[{}] Rewind because no available consumer found to 
dispatch message to.", name);
+            }
+
+            entry.release();
+            streamingEntryReader.cancelReadRequests();
+            havePendingRead = false;
+            if (currentConsumer != null) {
+                notifyActiveConsumerChanged(currentConsumer);
+                readMoreEntries(currentConsumer);
+            }
+        } else {
+            EntryBatchSizes batchSizes = EntryBatchSizes.get(1);
+            SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal();
+            EntryBatchIndexesAcks batchIndexesAcks = 
EntryBatchIndexesAcks.get(1);
+            filterEntriesForConsumer(Lists.newArrayList(entry), batchSizes, 
sendMessageInfo, batchIndexesAcks,
+                    cursor, false);
+            // Update cursor's read position.
+            cursor.seek(((ManagedLedgerImpl) cursor.getManagedLedger())
+                    .getNextValidPosition((PositionImpl) entry.getPosition()));
+            dispatchEntriesToConsumer(currentConsumer, 
Lists.newArrayList(entry), batchSizes,
+                    batchIndexesAcks, sendMessageInfo);
+        }
+    }
+
+    @Override
+    protected void readMoreEntries(Consumer consumer) {
+        // consumer can be null when all consumers are disconnected from 
broker.
+        // so skip reading more entries if currently there is no active 
consumer.
+        if (null == consumer) {
+            return;
+        }
+
+        if (!havePendingRead && consumer.getAvailablePermits() > 0) {
+            int messagesToRead = calculateNumOfMessageToRead(consumer);
+
+            if (-1 == messagesToRead) {
+                // Skip read as topic/dispatcher has exceed the dispatch rate.
+                return;
+            }
+
+            // Schedule read
+            if (log.isDebugEnabled()) {
+                log.debug("[{}-{}] Schedule read of {} messages", name, 
consumer, messagesToRead);
+            }
+            havePendingRead = true;
+
+            if (consumer.readCompacted()) {
+                topic.getCompactedTopic().asyncReadEntriesOrWait(cursor, 
messagesToRead, this, consumer);
+            } else {
+                streamingEntryReader.asyncReadEntries(messagesToRead, 
serviceConfig.getDispatcherMaxReadSizeBytes(),
+                        consumer);
+            }
+        } else {
+            if (log.isDebugEnabled()) {
+                log.debug("[{}-{}] Consumer buffer is full, pause reading", 
name, consumer);
+            }
+        }
+    }
+
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index a95692f..068ae87 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -181,19 +181,24 @@ public class PersistentSubscription implements 
Subscription {
 
         if (dispatcher == null || !dispatcher.isConsumerConnected()) {
             Dispatcher previousDispatcher = null;
-
+            boolean useStreamingDispatcher = 
topic.getBrokerService().getPulsar()
+                                                    
.getConfiguration().isStreamingDispatch();
             switch (consumer.subType()) {
             case Exclusive:
                 if (dispatcher == null || dispatcher.getType() != 
SubType.Exclusive) {
                     previousDispatcher = dispatcher;
-                    dispatcher = new 
PersistentDispatcherSingleActiveConsumer(cursor,
-                            SubType.Exclusive, 0, topic, this);
+                    dispatcher = useStreamingDispatcher ? new 
PersistentStreamingDispatcherSingleActiveConsumer(cursor,
+                            SubType.Exclusive, 0, topic, this) :
+                            new 
PersistentDispatcherSingleActiveConsumer(cursor, SubType.Exclusive, 0,
+                                    topic, this);
                 }
                 break;
             case Shared:
                 if (dispatcher == null || dispatcher.getType() != 
SubType.Shared) {
                     previousDispatcher = dispatcher;
-                    dispatcher = new 
PersistentDispatcherMultipleConsumers(topic, cursor, this);
+                    dispatcher = useStreamingDispatcher ? new 
PersistentStreamingDispatcherMultipleConsumers(topic,
+                            cursor, this) : new 
PersistentDispatcherMultipleConsumers(topic,
+                            cursor, this);
                 }
                 break;
             case Failover:
@@ -206,8 +211,10 @@ public class PersistentSubscription implements 
Subscription {
 
                 if (dispatcher == null || dispatcher.getType() != 
SubType.Failover) {
                     previousDispatcher = dispatcher;
-                    dispatcher = new 
PersistentDispatcherSingleActiveConsumer(cursor,
-                            SubType.Failover, partitionIndex, topic, this);
+                    dispatcher = useStreamingDispatcher ? new 
PersistentStreamingDispatcherSingleActiveConsumer(cursor,
+                            SubType.Failover, partitionIndex, topic, this) :
+                            new 
PersistentDispatcherSingleActiveConsumer(cursor, SubType.Failover,
+                                    partitionIndex, topic, this);
                 }
                 break;
             case Key_Shared:
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/streamingdispatch/PendingReadEntryRequest.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/streamingdispatch/PendingReadEntryRequest.java
new file mode 100644
index 0000000..7989bbc
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/streamingdispatch/PendingReadEntryRequest.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.streamingdispatch;
+
+import io.netty.util.Recycler;
+import lombok.Data;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedLedger;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+
+/**
+ * Representing a pending read request to read an entry from {@link 
ManagedLedger} carrying necessary context.
+ */
+@Data
+public class PendingReadEntryRequest {
+
+    private static final Recycler<PendingReadEntryRequest> RECYCLER = new 
Recycler<PendingReadEntryRequest>() {
+        protected PendingReadEntryRequest 
newObject(Recycler.Handle<PendingReadEntryRequest> handle) {
+            return new PendingReadEntryRequest(handle);
+        }
+    };
+
+    public static PendingReadEntryRequest create(Object ctx, PositionImpl 
position) {
+        PendingReadEntryRequest pendingReadEntryRequest = RECYCLER.get();
+        pendingReadEntryRequest.ctx = ctx;
+        pendingReadEntryRequest.position = position;
+        pendingReadEntryRequest.retry = 0;
+        pendingReadEntryRequest.isLast = false;
+        return pendingReadEntryRequest;
+    }
+
+    public void recycle() {
+        entry = null;
+        ctx = null;
+        position = null;
+        retry = -1;
+        recyclerHandle.recycle(this);
+    }
+
+    public boolean isLastRequest() {
+        return isLast;
+    }
+
+    private final Recycler.Handle<PendingReadEntryRequest> recyclerHandle;
+
+    // Entry read from ledger
+    public Entry entry;
+
+    // Passed in context that'll be pass to callback
+    public Object ctx;
+
+    // Position of entry to be read
+    public PositionImpl position;
+
+    // Number of time request has been retried.
+    int retry;
+
+    // If request is the last one of a set of requests.
+    boolean isLast;
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/streamingdispatch/StreamingDispatcher.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/streamingdispatch/StreamingDispatcher.java
new file mode 100644
index 0000000..814a381
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/streamingdispatch/StreamingDispatcher.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.streamingdispatch;
+
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedLedger;
+import org.apache.pulsar.broker.service.Dispatcher;
+import org.apache.pulsar.common.classification.InterfaceStability;
+
+/**
+ * A {@link Dispatcher} that'll use {@link StreamingEntryReader} to read 
entries from {@link ManagedLedger}.
+ */
[email protected]
+public interface StreamingDispatcher extends Dispatcher {
+
+    /**
+     * Notify dispatcher issued read entry request has complete.
+     * @param entry Entry read.
+     * @param ctx   Context passed in when issuing read entries request.
+     */
+    void readEntryComplete(Entry entry, PendingReadEntryRequest ctx);
+
+    /**
+     * Notify dispatcher can issue next read request.
+     */
+    void canReadMoreEntries(boolean withBackoff);
+
+    /**
+     * Notify dispatcher to inform consumers reached end of topic.
+     */
+    void notifyConsumersEndOfTopic();
+
+    /**
+     * @return Name of the dispatcher.
+     */
+    String getName();
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/streamingdispatch/StreamingEntryReader.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/streamingdispatch/StreamingEntryReader.java
new file mode 100644
index 0000000..24f9bcc
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/streamingdispatch/StreamingEntryReader.java
@@ -0,0 +1,338 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.streamingdispatch;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.AsyncCallbacks;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.WaitingEntryCallBack;
+import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.bookkeeper.mledger.util.SafeRun;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import 
org.apache.pulsar.broker.transaction.buffer.exceptions.TransactionNotSealedException;
+import org.apache.pulsar.client.impl.Backoff;
+
+/**
+ * Entry reader that fulfill read request by streamline the read instead of 
reading with micro batch.
+ */
+@Slf4j
+@RequiredArgsConstructor
+public class StreamingEntryReader implements AsyncCallbacks.ReadEntryCallback, 
WaitingEntryCallBack {
+
+    private final int maxRetry = 3;
+
+    // Queue for read request issued yet waiting for complete from managed 
ledger.
+    private ConcurrentLinkedQueue<PendingReadEntryRequest> issuedReads = new 
ConcurrentLinkedQueue<>();
+
+    // Queue for read request that's wait for new entries from managed ledger.
+    private ConcurrentLinkedQueue<PendingReadEntryRequest> pendingReads = new 
ConcurrentLinkedQueue<>();
+
+    private final ManagedCursorImpl cursor;
+
+    private final StreamingDispatcher dispatcher;
+
+    private final PersistentTopic topic;
+
+    private AtomicInteger currentReadSizeByte = new AtomicInteger(0);
+
+    private volatile State state;
+
+    private static final AtomicReferenceFieldUpdater<StreamingEntryReader, 
State> STATE_UPDATER =
+            AtomicReferenceFieldUpdater.newUpdater(StreamingEntryReader.class, 
State.class, "state");
+
+    private volatile int maxReadSizeByte;
+
+    private final Backoff readFailureBackoff = new Backoff(10, 
TimeUnit.MILLISECONDS,
+            1, TimeUnit.SECONDS, 0, TimeUnit.MILLISECONDS);
+
+    /**
+     * Read entries in streaming way, that said instead reading with micro 
batch and send entries to consumer after all
+     * entries in the batch are read from ledger, this method will fire 
numEntriesToRead requests to managedLedger
+     * and send entry to consumer whenever it is read && all entries before it 
have been sent to consumer.
+     * @param numEntriesToRead number of entry to read from ledger.
+     * @param maxReadSizeByte maximum byte will be read from ledger.
+     * @param ctx Context send along with read request.
+     */
+    public synchronized void asyncReadEntries(int numEntriesToRead, int 
maxReadSizeByte, Object ctx) {
+        if (STATE_UPDATER.compareAndSet(this, State.Canceling, 
State.Canceled)) {
+            internalCancelReadRequests();
+        }
+
+        if (!issuedReads.isEmpty() || !pendingReads.isEmpty()) {
+            if (log.isDebugEnabled()) {
+                log.debug("[{}] There's pending streaming read not completed 
yet. Not scheduling next read request.",
+                        cursor.getName());
+            }
+            return;
+        }
+
+        PositionImpl nextReadPosition = (PositionImpl) 
cursor.getReadPosition();
+        ManagedLedgerImpl managedLedger = (ManagedLedgerImpl) 
cursor.getManagedLedger();
+        // Edge case, when a old ledger is full and new ledger is not yet 
opened, position can point to next
+        // position of the last confirmed position, but it'll be an invalid 
position. So try to update the position.
+        if (!managedLedger.isValidPosition(nextReadPosition)) {
+            nextReadPosition = 
managedLedger.getNextValidPosition(nextReadPosition);
+        }
+        boolean hasEntriesToRead = 
managedLedger.hasMoreEntries(nextReadPosition);
+        currentReadSizeByte.set(0);
+        STATE_UPDATER.set(this, State.Issued);
+        this.maxReadSizeByte = maxReadSizeByte;
+        for (int c = 0; c < numEntriesToRead; c++) {
+            PendingReadEntryRequest pendingReadEntryRequest = 
PendingReadEntryRequest.create(ctx, nextReadPosition);
+            // Make sure once we start putting request into pending requests 
queue, we won't put any following request
+            // to issued requests queue in order to guarantee the order.
+            if (hasEntriesToRead && 
managedLedger.hasMoreEntries(nextReadPosition)) {
+                issuedReads.offer(pendingReadEntryRequest);
+            } else {
+                pendingReads.offer(pendingReadEntryRequest);
+            }
+            nextReadPosition = 
managedLedger.getNextValidPosition(nextReadPosition);
+        }
+
+        // Issue requests.
+        for (PendingReadEntryRequest request : issuedReads) {
+            managedLedger.asyncReadEntry(request.position, this, request);
+        }
+
+        if (!pendingReads.isEmpty()) {
+            if (log.isDebugEnabled()) {
+                log.debug("[{}} Streaming entry reader has {} pending read 
requests waiting on new entry."
+                        , cursor.getName(), pendingReads.size());
+            }
+            // If new entries are available after we put request into pending 
queue, fire read.
+            // Else register callback with managed ledger to get notify when 
new entries are available.
+            if (managedLedger.hasMoreEntries(pendingReads.peek().position)) {
+                entriesAvailable();
+            } else if (managedLedger.isTerminated()) {
+                dispatcher.notifyConsumersEndOfTopic();
+                cleanQueue(pendingReads);
+                if (issuedReads.size() == 0) {
+                    dispatcher.canReadMoreEntries(true);
+                }
+            } else {
+                managedLedger.addWaitingEntryCallBack(this);
+            }
+        }
+    }
+
+    @Override
+    public void readEntryComplete(Entry entry, Object ctx) {
+        // Don't block caller thread, complete read entry with dispatcher 
dedicated thread.
+        
topic.getBrokerService().getTopicOrderedExecutor().executeOrdered(dispatcher.getName(),
 SafeRun.safeRun(() -> {
+            internalReadEntryComplete(entry, ctx);
+        }));
+    }
+
+    private void internalReadEntryComplete(Entry entry, Object ctx) {
+        PendingReadEntryRequest pendingReadEntryRequest = 
(PendingReadEntryRequest) ctx;
+        pendingReadEntryRequest.entry = entry;
+        readFailureBackoff.reduceToHalf();
+        Entry readEntry;
+        // If we have entry to send to dispatcher.
+        if (!issuedReads.isEmpty() && issuedReads.peek() == 
pendingReadEntryRequest) {
+            while (!issuedReads.isEmpty() && issuedReads.peek().entry != null) 
{
+                PendingReadEntryRequest firstPendingReadEntryRequest = 
issuedReads.poll();
+                readEntry = firstPendingReadEntryRequest.entry;
+                currentReadSizeByte.addAndGet(readEntry.getLength());
+                //Cancel remaining requests and reset cursor if 
maxReadSizeByte exceeded.
+                if (currentReadSizeByte.get() > maxReadSizeByte) {
+                    cancelReadRequests(readEntry.getPosition());
+                    dispatcher.canReadMoreEntries(false);
+                    STATE_UPDATER.set(this, State.Completed);
+                    return;
+                } else {
+                    // All request has been completed, mark returned entry as 
last.
+                    if (issuedReads.isEmpty() && pendingReads.isEmpty()) {
+                        firstPendingReadEntryRequest.isLast = true;
+                        STATE_UPDATER.set(this, State.Completed);
+                    }
+                    dispatcher.readEntryComplete(readEntry, 
firstPendingReadEntryRequest);
+                }
+            }
+        } else if (!issuedReads.isEmpty() && issuedReads.peek().retry > 
maxRetry) {
+            cancelReadRequests(issuedReads.peek().position);
+            dispatcher.canReadMoreEntries(true);
+            STATE_UPDATER.set(this, State.Completed);
+        }
+    }
+
+    @Override
+    public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
+        // Don't block caller thread, complete read entry fail with dispatcher 
dedicated thread.
+        
topic.getBrokerService().getTopicOrderedExecutor().executeOrdered(dispatcher.getName(),
 SafeRun.safeRun(() -> {
+            internalReadEntryFailed(exception, ctx);
+        }));
+    }
+
+    private void internalReadEntryFailed(ManagedLedgerException exception, 
Object ctx) {
+        PendingReadEntryRequest pendingReadEntryRequest = 
(PendingReadEntryRequest) ctx;
+        PositionImpl readPosition = pendingReadEntryRequest.position;
+        pendingReadEntryRequest.retry++;
+        long waitTimeMillis = readFailureBackoff.next();
+        if (exception.getCause() instanceof TransactionNotSealedException) {
+            waitTimeMillis = 1;
+            if (log.isDebugEnabled()) {
+                log.debug("[{}] Error reading transaction entries : {}, - 
Retrying to read in {} seconds",
+                        cursor.getName(), exception.getMessage(), 
waitTimeMillis / 1000.0);
+            }
+        } else if (!(exception instanceof 
ManagedLedgerException.TooManyRequestsException)) {
+            log.error("[{} Error reading entries at {} : {} - Retrying to read 
in {} seconds", cursor.getName(),
+                    readPosition, exception.getMessage(), waitTimeMillis / 
1000.0);
+        } else {
+            if (log.isDebugEnabled()) {
+                log.debug("[{}] Got throttled by bookies while reading at {} : 
{} - Retrying to read in {} seconds",
+                        cursor.getName(), readPosition, 
exception.getMessage(), waitTimeMillis / 1000.0);
+            }
+        }
+        if (!issuedReads.isEmpty()) {
+            if (issuedReads.peek().retry > maxRetry) {
+                cancelReadRequests(issuedReads.peek().position);
+                dispatcher.canReadMoreEntries(true);
+                STATE_UPDATER.set(this, State.Completed);
+                return;
+            }
+            if (pendingReadEntryRequest.retry <= maxRetry) {
+                retryReadRequest(pendingReadEntryRequest, waitTimeMillis);
+            }
+        }
+    }
+
+    // Cancel all issued and pending request and update cursor's read position.
+    private void cancelReadRequests(Position position) {
+        if (!issuedReads.isEmpty()) {
+            cleanQueue(issuedReads);
+            cursor.seek(position);
+        }
+
+        if (!pendingReads.isEmpty()) {
+            cleanQueue(pendingReads);
+        }
+    }
+
+    private void internalCancelReadRequests() {
+        Position readPosition = !issuedReads.isEmpty() ? 
issuedReads.peek().position : pendingReads.peek().position;
+        cancelReadRequests(readPosition);
+    }
+
+    public boolean cancelReadRequests() {
+        if (STATE_UPDATER.compareAndSet(this, State.Issued, State.Canceling)) {
+            // Don't block caller thread, complete cancel read with dispatcher 
dedicated thread.
+            
topic.getBrokerService().getTopicOrderedExecutor().executeOrdered(topic.getName(),
 SafeRun.safeRun(() -> {
+                synchronized (StreamingEntryReader.this) {
+                    if (STATE_UPDATER.compareAndSet(this, State.Canceling, 
State.Canceled)) {
+                        internalCancelReadRequests();
+                    }
+                }
+            }));
+            return true;
+        }
+        return false;
+    }
+
+    private void cleanQueue(Queue<PendingReadEntryRequest> queue) {
+        while (!queue.isEmpty()) {
+            PendingReadEntryRequest pendingReadEntryRequest = queue.poll();
+            if (pendingReadEntryRequest.entry != null) {
+                pendingReadEntryRequest.entry.release();
+                pendingReadEntryRequest.recycle();
+            }
+        }
+    }
+
+    private void retryReadRequest(PendingReadEntryRequest 
pendingReadEntryRequest, long delay) {
+        topic.getBrokerService().executor().schedule(() -> {
+            // Jump again into dispatcher dedicated thread
+            
topic.getBrokerService().getTopicOrderedExecutor().executeOrdered(dispatcher.getName(),
+                    SafeRun.safeRun(() -> {
+                ManagedLedgerImpl managedLedger = (ManagedLedgerImpl) 
cursor.getManagedLedger();
+                managedLedger.asyncReadEntry(pendingReadEntryRequest.position, 
this, pendingReadEntryRequest);
+            }));
+        }, delay, TimeUnit.MILLISECONDS);
+    }
+
+    @Override
+    public void entriesAvailable() {
+        
topic.getBrokerService().getTopicOrderedExecutor().executeOrdered(dispatcher.getName(),
 SafeRun.safeRun(() -> {
+            internalEntriesAvailable();
+        }));
+    }
+
+    private synchronized void internalEntriesAvailable() {
+        if (log.isDebugEnabled()) {
+            log.debug("[{}} Streaming entry reader get notification of newly 
added entries from managed ledger,"
+                    + " trying to issued pending read requests.", 
cursor.getName());
+        }
+        ManagedLedgerImpl managedLedger = (ManagedLedgerImpl) 
cursor.getManagedLedger();
+        List<PendingReadEntryRequest> newlyIssuedRequests = new ArrayList<>();
+        if (!pendingReads.isEmpty()) {
+            // Edge case, when a old ledger is full and new ledger is not yet 
opened, position can point to next
+            // position of the last confirmed position, but it'll be an 
invalid position. So try to update the position.
+            if (!managedLedger.isValidPosition(pendingReads.peek().position)) {
+                pendingReads.peek().position = 
managedLedger.getNextValidPosition(pendingReads.peek().position);
+            }
+            while (!pendingReads.isEmpty() && 
managedLedger.hasMoreEntries(pendingReads.peek().position)) {
+                PendingReadEntryRequest next = pendingReads.poll();
+                issuedReads.offer(next);
+                newlyIssuedRequests.add(next);
+                // Need to update the position because when the 
PendingReadEntryRequest is created, the position could
+                // be all set to managed ledger's last confirmed position.
+                if (!pendingReads.isEmpty()) {
+                    pendingReads.peek().position = 
managedLedger.getNextValidPosition(next.position);
+                }
+            }
+
+            for (PendingReadEntryRequest request : newlyIssuedRequests) {
+                managedLedger.asyncReadEntry(request.position, this, request);
+            }
+
+            if (!pendingReads.isEmpty()) {
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}} Streaming entry reader has {} pending read 
requests waiting on new entry."
+                            , cursor.getName(), pendingReads.size());
+                }
+                if 
(managedLedger.hasMoreEntries(pendingReads.peek().position)) {
+                    entriesAvailable();
+                } else {
+                    managedLedger.addWaitingEntryCallBack(this);
+                }
+            }
+        }
+    }
+
+    protected State getState() {
+        return STATE_UPDATER.get(this);
+    }
+
+    enum State {
+        Issued, Canceling, Canceled, Completed;
+    }
+
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/streamingdispatch/package-info.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/streamingdispatch/package-info.java
new file mode 100644
index 0000000..9a205ed
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/streamingdispatch/package-info.java
@@ -0,0 +1,19 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.streamingdispatch;
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
index 3497b73..a631db5 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
@@ -61,6 +61,7 @@ import org.apache.bookkeeper.mledger.ManagedLedger;
 import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
+import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
@@ -103,7 +104,7 @@ public class PersistentDispatcherFailoverConsumerTest {
     private ChannelHandlerContext channelCtx;
     private LinkedBlockingQueue<CommandActiveConsumerChange> consumerChanges;
     private ZooKeeper mockZk;
-    private PulsarService pulsar;
+    protected PulsarService pulsar;
     final String successTopicName = 
"persistent://part-perf/global/perf.t1/ptopic";
     final String failTopicName = 
"persistent://part-perf/global/perf.t1/pfailTopic";
 
@@ -208,7 +209,7 @@ public class PersistentDispatcherFailoverConsumerTest {
 
     void setupMLAsyncCallbackMocks() {
         ledgerMock = mock(ManagedLedger.class);
-        cursorMock = mock(ManagedCursor.class);
+        cursorMock = mock(ManagedCursorImpl.class);
 
         doReturn(new ArrayList<Object>()).when(ledgerMock).getCursors();
         doReturn("mockCursor").when(cursorMock).getName();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java
index 8313567..dd90017 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java
@@ -270,6 +270,11 @@ public class PersistentFailoverE2ETest extends 
BrokerTestBase {
 
     @Test
     public void testSimpleConsumerEventsWithPartition() throws Exception {
+        // Resetting ActiveConsumerFailoverDelayTimeMillis else if 
testActiveConsumerFailoverWithDelay get executed
+        // first could cause this test to fail.
+        conf.setActiveConsumerFailoverDelayTimeMillis(0);
+        restartBroker();
+
         int numPartitions = 4;
 
         final String topicName = 
"persistent://prop/use/ns-abc/testSimpleConsumerEventsWithPartition-" + 
System.nanoTime();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index 52a67cb..7b428a8 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -141,7 +141,7 @@ import io.netty.buffer.Unpooled;
 /**
  */
 public class PersistentTopicTest extends MockedBookKeeperTestCase {
-    private PulsarService pulsar;
+    protected PulsarService pulsar;
     private BrokerService brokerService;
     private ManagedLedgerFactory mlFactoryMock;
     private ServerCnx serverCnx;
@@ -1199,7 +1199,7 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
     @SuppressWarnings("unchecked")
     void setupMLAsyncCallbackMocks() {
         ledgerMock = mock(ManagedLedger.class);
-        cursorMock = mock(ManagedCursor.class);
+        cursorMock = mock(ManagedCursorImpl.class);
         final CompletableFuture<Void> closeFuture = new CompletableFuture<>();
 
         doReturn(new ArrayList<Object>()).when(ledgerMock).getCursors();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherFailoverConsumerStreamingDispatcherTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherFailoverConsumerStreamingDispatcherTest.java
new file mode 100644
index 0000000..66f49c8
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherFailoverConsumerStreamingDispatcherTest.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.persistent;
+
+import 
org.apache.pulsar.broker.service.PersistentDispatcherFailoverConsumerTest;
+import org.apache.pulsar.broker.service.streamingdispatch.StreamingDispatcher;
+import org.testng.annotations.BeforeMethod;
+
+/**
+ * PersistentDispatcherFailoverConsumerTest with {@link StreamingDispatcher}
+ */
+public class PersistentDispatcherFailoverConsumerStreamingDispatcherTest 
extends
+        PersistentDispatcherFailoverConsumerTest {
+    @BeforeMethod
+    public void setup() throws Exception {
+        super.setup();
+        pulsar.getConfiguration().setStreamingDispatch(true);
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentFailoverStreamingDispatcherE2ETest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentFailoverStreamingDispatcherE2ETest.java
new file mode 100644
index 0000000..ee56b5c
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentFailoverStreamingDispatcherE2ETest.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.persistent;
+
+import org.apache.pulsar.broker.service.PersistentFailoverE2ETest;
+import org.apache.pulsar.broker.service.streamingdispatch.StreamingDispatcher;
+import org.testng.annotations.BeforeClass;
+
+/**
+ * PersistentFailoverE2ETest with {@link StreamingDispatcher}
+ */
+public class PersistentFailoverStreamingDispatcherE2ETest extends 
PersistentFailoverE2ETest {
+
+    @BeforeClass
+    @Override
+    protected void setup() throws Exception {
+        conf.setStreamingDispatch(true);
+        super.setup();
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherBlockConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherBlockConsumerTest.java
new file mode 100644
index 0000000..bdaf88e
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherBlockConsumerTest.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.persistent;
+
+import org.apache.pulsar.broker.service.streamingdispatch.StreamingDispatcher;
+import org.apache.pulsar.client.api.DispatcherBlockConsumerTest;
+import org.testng.annotations.BeforeMethod;
+
+/**
+ * DispatcherBlockConsumerTest with {@link StreamingDispatcher}
+ */
+public class PersistentStreamingDispatcherBlockConsumerTest extends 
DispatcherBlockConsumerTest {
+    @BeforeMethod
+    @Override
+    protected void setup() throws Exception {
+        super.setup();
+        conf.setStreamingDispatch(true);
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionMessageDispatchStreamingDispatcherThrottlingTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionMessageDispatchStreamingDispatcherThrottlingTest.java
new file mode 100644
index 0000000..ff170ed
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionMessageDispatchStreamingDispatcherThrottlingTest.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.persistent;
+
+import org.apache.pulsar.broker.service.streamingdispatch.StreamingDispatcher;
+import org.apache.pulsar.client.api.SubscriptionMessageDispatchThrottlingTest;
+import org.testng.annotations.BeforeMethod;
+
+/**
+ * SubscriptionMessageDispatchThrottlingTest with {@link StreamingDispatcher}
+ */
+public class 
PersistentSubscriptionMessageDispatchStreamingDispatcherThrottlingTest
+    extends SubscriptionMessageDispatchThrottlingTest {
+    @BeforeMethod
+    @Override
+    protected void setup() throws Exception {
+        super.setup();
+        conf.setStreamingDispatch(true);
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicStreamingDispatcherE2ETest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicStreamingDispatcherE2ETest.java
new file mode 100644
index 0000000..b1efcf6
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicStreamingDispatcherE2ETest.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.persistent;
+
+import org.apache.pulsar.broker.service.PersistentTopicE2ETest;
+import org.apache.pulsar.broker.service.streamingdispatch.StreamingDispatcher;
+import org.testng.annotations.BeforeMethod;
+
+/**
+ * PersistentTopicE2ETest with {@link StreamingDispatcher}
+ */
+public class PersistentTopicStreamingDispatcherE2ETest extends 
PersistentTopicE2ETest {
+    @BeforeMethod
+    @Override
+    protected void setup() throws Exception {
+        conf.setStreamingDispatch(true);
+        super.baseSetup();
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicStreamingDispatcherTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicStreamingDispatcherTest.java
new file mode 100644
index 0000000..f0057d9
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicStreamingDispatcherTest.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.persistent;
+
+import org.apache.pulsar.broker.service.PersistentTopicTest;
+import org.apache.pulsar.broker.service.streamingdispatch.StreamingDispatcher;
+import org.testng.annotations.BeforeMethod;
+
+/**
+ * PersistentTopicTest with {@link StreamingDispatcher}
+ */
+public class PersistentTopicStreamingDispatcherTest extends 
PersistentTopicTest {
+
+    @BeforeMethod
+    public void setup() throws Exception {
+        super.setup();
+        pulsar.getConfiguration().setStreamingDispatch(true);
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/SimpleProducerConsumerTestStreamingDispatcherTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/SimpleProducerConsumerTestStreamingDispatcherTest.java
new file mode 100644
index 0000000..1bcdec7
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/SimpleProducerConsumerTestStreamingDispatcherTest.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.persistent;
+
+import org.apache.pulsar.broker.service.streamingdispatch.StreamingDispatcher;
+import org.apache.pulsar.client.api.SimpleProducerConsumerTest;
+import org.testng.annotations.BeforeMethod;
+
+/**
+ * SimpleProducerConsumerTest with {@link StreamingDispatcher}
+ */
+public class SimpleProducerConsumerTestStreamingDispatcherTest extends 
SimpleProducerConsumerTest {
+    @BeforeMethod
+    @Override
+    protected void setup() throws Exception {
+        super.setup();
+        conf.setStreamingDispatch(true);
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/streamingdispatch/StreamingEntryReaderTests.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/streamingdispatch/StreamingEntryReaderTests.java
new file mode 100644
index 0000000..332f431
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/streamingdispatch/StreamingEntryReaderTests.java
@@ -0,0 +1,433 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.streamingdispatch;
+
+import com.google.common.base.Charsets;
+import org.apache.bookkeeper.common.util.OrderedExecutor;
+import org.apache.bookkeeper.common.util.OrderedScheduler;
+import org.apache.bookkeeper.mledger.AsyncCallbacks;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedCursor;
+import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.impl.EntryImpl;
+import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
+import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Stack;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.awaitility.Awaitility.await;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.reset;
+import static org.powermock.api.mockito.PowerMockito.doAnswer;
+import static org.powermock.api.mockito.PowerMockito.mock;
+import static org.powermock.api.mockito.PowerMockito.spy;
+import static org.powermock.api.mockito.PowerMockito.when;
+import static org.testng.Assert.assertEquals;
+
+/**
+ * Tests for {@link StreamingEntryReader}
+ */
+@PrepareForTest({ManagedLedgerImpl.class})
+public class StreamingEntryReaderTests extends MockedBookKeeperTestCase {
+
+    private static final Charset Encoding = Charsets.UTF_8;
+    private PersistentTopic mockTopic;
+    private StreamingDispatcher mockDispatcher;
+    private BrokerService mockBrokerService;
+    private ScheduledExecutorService scheduledExecutorService;
+    private OrderedExecutor orderedExecutor;
+    private ManagedLedgerConfig config;
+    private ManagedLedgerImpl ledger;
+    private ManagedCursor cursor;
+
+    @BeforeMethod
+    public void setup() throws Exception {
+        scheduledExecutorService = new ScheduledThreadPoolExecutor(1);
+        orderedExecutor = OrderedScheduler.newSchedulerBuilder()
+                .numThreads(1)
+                .name("StreamingEntryReaderTests").build();
+        mockTopic = mock(PersistentTopic.class);
+        mockBrokerService = mock(BrokerService.class);
+        mockDispatcher = mock(StreamingDispatcher.class);
+        config = new ManagedLedgerConfig().setMaxEntriesPerLedger(10);
+        ledger = spy((ManagedLedgerImpl) factory.open("my_test_ledger", 
config));
+        cursor = ledger.openCursor("test");
+        when(mockTopic.getBrokerService()).thenReturn(mockBrokerService);
+        
when(mockBrokerService.executor()).thenReturn(scheduledExecutorService);
+        
when(mockBrokerService.getTopicOrderedExecutor()).thenReturn(orderedExecutor);
+        doAnswer(new Answer<Void>() {
+            @Override
+            public Void answer(InvocationOnMock invocationOnMock) throws 
Throwable {
+                return null;
+            }
+        }).when(mockDispatcher).notifyConsumersEndOfTopic();
+    }
+
+    @Test
+    public void testCanReadEntryFromMLedgerHappyPath() throws Exception {
+        AtomicInteger entryCount = new AtomicInteger(0);
+        Stack<Position> positions = new Stack<>();
+
+        for (int i = 0; i < 150; i++) {
+            ledger.addEntry(String.format("message-%d", i).getBytes(Encoding));
+        }
+
+        StreamingEntryReader streamingEntryReader =new 
StreamingEntryReader((ManagedCursorImpl) cursor,
+                mockDispatcher, mockTopic);
+
+        doAnswer((InvocationOnMock invocationOnMock) -> {
+                Entry entry = invocationOnMock.getArgument(0, Entry.class);
+                positions.push(entry.getPosition());
+                assertEquals(new String(entry.getData()), 
String.format("message-%d", entryCount.getAndIncrement()));
+                cursor.seek(ledger.getNextValidPosition((PositionImpl) 
entry.getPosition()));
+                return null;
+            }
+        ).when(mockDispatcher).readEntryComplete(any(Entry.class), 
any(PendingReadEntryRequest.class));
+
+        streamingEntryReader.asyncReadEntries(50, 700, null);
+        await().atMost(500, TimeUnit.MILLISECONDS).until(() -> 
entryCount.get() == 50);
+        // Check cursor's read position has been properly updated
+        assertEquals(cursor.getReadPosition(), 
ledger.getNextValidPosition((PositionImpl) positions.peek()));
+        streamingEntryReader.asyncReadEntries(50, 700, null);
+        await().atMost(500, TimeUnit.MILLISECONDS).until(() -> 
entryCount.get() == 100);
+        assertEquals(cursor.getReadPosition(), 
ledger.getNextValidPosition((PositionImpl) positions.peek()));
+        streamingEntryReader.asyncReadEntries(50, 700, null);
+        await().atMost(500, TimeUnit.MILLISECONDS).until(() -> 
entryCount.get() == 150);
+        assertEquals(cursor.getReadPosition(), 
ledger.getNextValidPosition((PositionImpl) positions.peek()));
+    }
+
+    @Test
+    public void testCanReadEntryFromMLedgerSizeExceededLimit() throws 
Exception {
+        AtomicBoolean readComplete = new AtomicBoolean(false);
+        Stack<Position> positions = new Stack<>();
+        List<String> entries = new ArrayList<>();
+        int size = "mmmmmmmmmmessage-0".getBytes().length;
+        for (int i = 0; i < 15; i++) {
+            ledger.addEntry(String.format("mmmmmmmmmmessage-%d", 
i).getBytes(Encoding));
+        }
+
+        StreamingEntryReader streamingEntryReader =
+                new StreamingEntryReader((ManagedCursorImpl) cursor, 
mockDispatcher, mockTopic);
+
+        doAnswer((InvocationOnMock invocationOnMock) -> {
+                Entry entry = invocationOnMock.getArgument(0, Entry.class);
+                positions.push(entry.getPosition());
+                entries.add(new String(entry.getData()));
+                cursor.seek(ledger.getNextValidPosition((PositionImpl) 
entry.getPosition()));
+                return null;
+            }
+        ).when(mockDispatcher).readEntryComplete(any(Entry.class), 
any(PendingReadEntryRequest.class));
+
+        doAnswer((InvocationOnMock invocationOnMock) -> {
+                readComplete.set(true);
+                return null;
+            }
+        ).when(mockDispatcher).canReadMoreEntries(anyBoolean());
+
+        PositionImpl position = 
ledger.getPositionAfterN(ledger.getFirstPosition(), 3, 
ManagedLedgerImpl.PositionBound.startExcluded);
+        // Make reading from mledger return out of order.
+        doAnswer(new Answer<Void>() {
+            @Override
+            public Void answer(InvocationOnMock invocationOnMock) throws 
Throwable {
+                AsyncCallbacks.ReadEntryCallback cb = 
invocationOnMock.getArgument(1, AsyncCallbacks.ReadEntryCallback.class);
+                executor.schedule(() -> {
+                    
cb.readEntryComplete(EntryImpl.create(position.getLedgerId(), 
position.getEntryId(), "mmmmmmmmmmessage-2".getBytes()),
+                            invocationOnMock.getArgument(2));
+                }, 200, TimeUnit.MILLISECONDS);
+                return null;
+            }
+        }).when(ledger).asyncReadEntry(eq(position), any(), any());
+
+        // Only 2 entries should be read with this request.
+        streamingEntryReader.asyncReadEntries(6, size * 2 + 1, null);
+        await().atMost(300, TimeUnit.MILLISECONDS).until(() -> 
readComplete.get());
+        assertEquals(entries.size(), 2);
+        // Assert cursor's read position has been properly updated to the 
third entry, since we should only read
+        // 2 retries with previous request
+        assertEquals(cursor.getReadPosition(), 
ledger.getNextValidPosition((PositionImpl) positions.peek()));
+        reset(ledger);
+        readComplete.set(false);
+        streamingEntryReader.asyncReadEntries(6, size * 2 + 1, null);
+        await().atMost(300, TimeUnit.MILLISECONDS).until(() -> 
readComplete.get());
+        readComplete.set(false);
+        streamingEntryReader.asyncReadEntries(6, size * 2 + 1, null);
+        await().atMost(300, TimeUnit.MILLISECONDS).until(() -> 
readComplete.get());
+        readComplete.set(false);
+        streamingEntryReader.asyncReadEntries(6, size * 2 + 1, null);
+        await().atMost(300, TimeUnit.MILLISECONDS).until(() -> 
readComplete.get());
+        assertEquals(cursor.getReadPosition(), 
ledger.getNextValidPosition((PositionImpl) positions.peek()));
+        assertEquals(entries.size(), 8);
+        for (int i = 0; i < entries.size(); i++) {
+            assertEquals(String.format("mmmmmmmmmmessage-%d", i), 
entries.get(i));
+        }
+    }
+
+    @Test
+    public void testCanReadEntryFromMLedgerWaitingForNewEntry() throws 
Exception {
+        AtomicInteger entryCount = new AtomicInteger(0);
+        AtomicBoolean entryProcessed = new AtomicBoolean(false);
+        Stack<Position> positions = new Stack<>();
+        List<String> entries = new ArrayList<>();
+        for (int i = 0; i < 7; i++) {
+            ledger.addEntry(String.format("message-%d", i).getBytes(Encoding));
+        }
+
+        StreamingEntryReader streamingEntryReader =
+                new StreamingEntryReader((ManagedCursorImpl) cursor, 
mockDispatcher, mockTopic);
+
+        doAnswer((InvocationOnMock invocationOnMock) -> {
+                Entry entry = invocationOnMock.getArgument(0, Entry.class);
+                positions.push(entry.getPosition());
+                entries.add(new String(entry.getData()));
+                entryCount.getAndIncrement();
+                cursor.seek(ledger.getNextValidPosition((PositionImpl) 
entry.getPosition()));
+                entryProcessed.set(true);
+                return null;
+            }
+        ).when(mockDispatcher).readEntryComplete(any(Entry.class), 
any(PendingReadEntryRequest.class));
+
+        streamingEntryReader.asyncReadEntries(5,  100, null);
+        await().atMost(300, TimeUnit.MILLISECONDS).until(() -> 
entryCount.get() == 5);
+        assertEquals(cursor.getReadPosition(), 
ledger.getNextValidPosition((PositionImpl) positions.peek()));
+        streamingEntryReader.asyncReadEntries(5, 100, null);
+        // We only write 7 entries initially so only 7 entries can be read.
+        await().atMost(300, TimeUnit.MILLISECONDS).until(() -> 
entryCount.get() == 7);
+        // Add new entry and await for it to be send to reader.
+        entryProcessed.set(false);
+        ledger.addEntry("message-7".getBytes(Encoding));
+        await().atMost(500, TimeUnit.MILLISECONDS).until(() -> 
entryProcessed.get());
+        assertEquals(entries.size(), 8);
+        entryProcessed.set(false);
+        ledger.addEntry("message-8".getBytes(Encoding));
+        await().atMost(500, TimeUnit.MILLISECONDS).until(() -> 
entryProcessed.get());
+        assertEquals(entries.size(), 9);
+        entryProcessed.set(false);
+        ledger.addEntry("message-9".getBytes(Encoding));
+        await().atMost(500, TimeUnit.MILLISECONDS).until(() -> 
entryProcessed.get());
+        assertEquals(entries.size(), 10);
+        assertEquals(cursor.getReadPosition(), 
ledger.getNextValidPosition((PositionImpl) positions.peek()));
+        for (int i = 0; i < entries.size(); i++) {
+            assertEquals(String.format("message-%d", i), entries.get(i));
+        }
+    }
+
+    @Test
+    public void testCanCancelReadEntryRequestAndResumeReading() throws 
Exception {
+        Map<Position, String> messages = new HashMap<>();
+        AtomicInteger count = new AtomicInteger(0);
+        Stack<Position> positions = new Stack<>();
+        List<String> entries = new ArrayList<>();
+
+        for (int i = 0; i < 20; i++) {
+            String msg = String.format("message-%d", i);
+            messages.put(ledger.addEntry(msg.getBytes(Encoding)), msg);
+        }
+
+        StreamingEntryReader streamingEntryReader =
+                new StreamingEntryReader((ManagedCursorImpl) cursor, 
mockDispatcher, mockTopic);
+
+        doAnswer((InvocationOnMock invocationOnMock) -> {
+                Entry entry = invocationOnMock.getArgument(0, Entry.class);
+                positions.push(entry.getPosition());
+                entries.add(new String(entry.getData()));
+                cursor.seek(ledger.getNextValidPosition((PositionImpl) 
entry.getPosition()));
+                return null;
+            }
+        ).when(mockDispatcher).readEntryComplete(any(Entry.class), 
any(PendingReadEntryRequest.class));
+
+        // Only return 5 entries
+        doAnswer(new Answer<Void>() {
+            @Override
+            public Void answer(InvocationOnMock invocationOnMock) throws 
Throwable {
+                AsyncCallbacks.ReadEntryCallback cb = 
invocationOnMock.getArgument(1, AsyncCallbacks.ReadEntryCallback.class);
+                PositionImpl position = invocationOnMock.getArgument(0, 
PositionImpl.class);
+                int c = count.getAndIncrement();
+                if (c < 5) {
+                    
cb.readEntryComplete(EntryImpl.create(position.getLedgerId(), 
position.getEntryId(),
+                            messages.get(position).getBytes()),
+                            invocationOnMock.getArgument(2));
+                }
+                return null;
+            }
+        }).when(ledger).asyncReadEntry(any(), any(), any());
+
+        streamingEntryReader.asyncReadEntries(20,  200, null);
+        streamingEntryReader.cancelReadRequests();
+        await().atMost(10000, TimeUnit.MILLISECONDS).until(() -> 
streamingEntryReader.getState() == StreamingEntryReader.State.Canceled);
+        // Only have 5 entry as we make ledger only return 5 entries and 
cancel the request.
+        assertEquals(entries.size(), 5);
+        assertEquals(cursor.getReadPosition(), 
ledger.getNextValidPosition((PositionImpl) positions.peek()));
+        // Clear mock and try to read remaining entries
+        reset(ledger);
+        streamingEntryReader.asyncReadEntries(15,  200, null);
+        streamingEntryReader.cancelReadRequests();
+        await().atMost(10000, TimeUnit.MILLISECONDS).until(() -> 
streamingEntryReader.getState() == StreamingEntryReader.State.Completed);
+        // Only have 5 entry as we make ledger only return 5 entries and 
cancel the request.
+        assertEquals(entries.size(), 20);
+        assertEquals(cursor.getReadPosition(), 
ledger.getNextValidPosition((PositionImpl) positions.peek()));
+        // Make sure message still returned in order
+        for (int i = 0; i < entries.size(); i++) {
+            assertEquals(String.format("message-%d", i), entries.get(i));
+        }
+    }
+
+    @Test
+    public void testCanHandleExceptionAndRetry() throws Exception {
+        Map<Position, String> messages = new HashMap<>();
+        AtomicBoolean entryProcessed = new AtomicBoolean(false);
+        AtomicInteger count = new AtomicInteger(0);
+        Stack<Position> positions = new Stack<>();
+        List<String> entries = new ArrayList<>();
+        for (int i = 0; i < 12; i++) {
+            String msg = String.format("message-%d", i);
+            messages.put(ledger.addEntry(msg.getBytes(Encoding)), msg);
+        }
+
+        StreamingEntryReader streamingEntryReader =
+                new StreamingEntryReader((ManagedCursorImpl) cursor, 
mockDispatcher, mockTopic);
+
+        doAnswer((InvocationOnMock invocationOnMock) -> {
+                Entry entry = invocationOnMock.getArgument(0, Entry.class);
+                positions.push(entry.getPosition());
+                entries.add(new String(entry.getData()));
+                cursor.seek(ledger.getNextValidPosition((PositionImpl) 
entry.getPosition()));
+
+                if (entries.size() == 6 || entries.size() == 12) {
+                    entryProcessed.set(true);
+                }
+                return null;
+            }
+        ).when(mockDispatcher).readEntryComplete(any(Entry.class), 
any(PendingReadEntryRequest.class));
+
+        // Make reading from mledger throw exception randomly.
+        doAnswer(new Answer<Void>() {
+            @Override
+            public Void answer(InvocationOnMock invocationOnMock) throws 
Throwable {
+                AsyncCallbacks.ReadEntryCallback cb = 
invocationOnMock.getArgument(1, AsyncCallbacks.ReadEntryCallback.class);
+                PositionImpl position = invocationOnMock.getArgument(0, 
PositionImpl.class);
+                int c = count.getAndIncrement();
+                if (c >= 3 && c < 5 || c >= 9 && c < 11) {
+                    cb.readEntryFailed(new 
ManagedLedgerException.TooManyRequestsException("Fake exception."),
+                            invocationOnMock.getArgument(2));
+                } else {
+                    
cb.readEntryComplete(EntryImpl.create(position.getLedgerId(), 
position.getEntryId(),
+                            messages.get(position).getBytes()),
+                            invocationOnMock.getArgument(2));
+                }
+                return null;
+            }
+        }).when(ledger).asyncReadEntry(any(), any(), any());
+
+        streamingEntryReader.asyncReadEntries(6,  100, null);
+        await().atMost(3000, TimeUnit.MILLISECONDS).until(() -> 
entryProcessed.get());
+        assertEquals(entries.size(), 6);
+        assertEquals(cursor.getReadPosition(), 
ledger.getNextValidPosition((PositionImpl) positions.peek()));
+        entryProcessed.set(false);
+        streamingEntryReader.asyncReadEntries(6, 100, null);
+        await().atMost(3000, TimeUnit.MILLISECONDS).until(() -> 
entryProcessed.get());
+        assertEquals(entries.size(), 12);
+        assertEquals(cursor.getReadPosition(), 
ledger.getNextValidPosition((PositionImpl) positions.peek()));
+        // Make sure message still returned in order
+        for (int i = 0; i < entries.size(); i++) {
+            assertEquals(String.format("message-%d", i), entries.get(i));
+        }
+    }
+
+    @Test
+    public void testWillCancelReadAfterExhaustingRetry() throws Exception {
+        Map<Position, String> messages = new HashMap<>();
+        AtomicInteger count = new AtomicInteger(0);
+        Stack<Position> positions = new Stack<>();
+        List<String> entries = new ArrayList<>();
+        for (int i = 0; i < 12; i++) {
+            String msg = String.format("message-%d", i);
+            messages.put(ledger.addEntry(msg.getBytes(Encoding)), msg);
+        }
+
+        StreamingEntryReader streamingEntryReader =
+                new StreamingEntryReader((ManagedCursorImpl) cursor, 
mockDispatcher, mockTopic);
+
+        doAnswer((InvocationOnMock invocationOnMock) -> {
+                    Entry entry = invocationOnMock.getArgument(0, Entry.class);
+                    positions.push(entry.getPosition());
+                    cursor.seek(ledger.getNextValidPosition((PositionImpl) 
entry.getPosition()));
+                    entries.add(new String(entry.getData()));
+                    return null;
+                }
+        ).when(mockDispatcher).readEntryComplete(any(Entry.class), 
any(PendingReadEntryRequest.class));
+
+        // Fail after first 3 read.
+        doAnswer(new Answer<Void>() {
+            @Override
+            public Void answer(InvocationOnMock invocationOnMock) throws 
Throwable {
+                AsyncCallbacks.ReadEntryCallback cb = 
invocationOnMock.getArgument(1, AsyncCallbacks.ReadEntryCallback.class);
+                PositionImpl position = invocationOnMock.getArgument(0, 
PositionImpl.class);
+                int c = count.getAndIncrement();
+                if (c >= 3) {
+                    cb.readEntryFailed(new 
ManagedLedgerException.TooManyRequestsException("Fake exception."),
+                            invocationOnMock.getArgument(2));
+                } else {
+                    
cb.readEntryComplete(EntryImpl.create(position.getLedgerId(), 
position.getEntryId(),
+                            messages.get(position).getBytes()),
+                            invocationOnMock.getArgument(2));
+                }
+                return null;
+            }
+        }).when(ledger).asyncReadEntry(any(), any(), any());
+
+        streamingEntryReader.asyncReadEntries(5,  100, null);
+        await().atMost(10, TimeUnit.SECONDS).until(() -> 
streamingEntryReader.getState() == StreamingEntryReader.State.Completed);
+        // Issued 5 read, should only have 3 entries as others were canceled 
after exhausting retries.
+        assertEquals(entries.size(), 3);
+        for (int i = 0; i < entries.size(); i++) {
+            assertEquals(String.format("message-%d", i), entries.get(i));
+        }
+        reset(ledger);
+        streamingEntryReader.asyncReadEntries(5,  100, null);
+        await().atMost(500, TimeUnit.MILLISECONDS).until(() -> 
streamingEntryReader.getState() == StreamingEntryReader.State.Completed);
+        assertEquals(entries.size(), 8);
+        for (int i = 0; i < entries.size(); i++) {
+            assertEquals(String.format("message-%d", i), entries.get(i));
+        }
+    }
+
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java
index 80bf968..62a6edd 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java
@@ -620,6 +620,7 @@ public class DispatcherBlockConsumerTest extends 
ProducerConsumerBase {
         int receivedMsgCount = 0;
         for (int i = 0; i < totalProducedMsgs; i++) {
             Message<?> msg = consumer.receive(500, TimeUnit.MILLISECONDS);
+            assertTrue(msg != null);
             if (!unackMessages.contains(i)) {
                 consumer.acknowledge(msg);
             }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java
index 7ed88f5..2b63411 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.client.api;
 import com.google.common.collect.Sets;
 
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.pulsar.broker.service.Dispatcher;
@@ -35,6 +36,8 @@ import org.slf4j.LoggerFactory;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
+import static org.awaitility.Awaitility.await;
+
 public class SubscriptionMessageDispatchThrottlingTest extends 
MessageDispatchThrottlingTest {
     private static final Logger log = 
LoggerFactory.getLogger(SubscriptionMessageDispatchThrottlingTest.class);
 
@@ -189,17 +192,13 @@ public class SubscriptionMessageDispatchThrottlingTest 
extends MessageDispatchTh
         Assert.assertTrue(isMessageRateUpdate);
         
Assert.assertEquals(admin.namespaces().getSubscriptionDispatchRate(namespace), 
dispatchRate);
 
-        long start = System.currentTimeMillis();
         // Asynchronously produce messages
         for (int i = 0; i < numProducedMessages; i++) {
             final String message = "my-message-" + i;
             producer.send(message.getBytes());
         }
-        latch.await();
+        await().atMost(2500, TimeUnit.MILLISECONDS).until(() -> 
latch.getCount() == 0);
         Assert.assertEquals(totalReceived.get(), numProducedMessages);
-        long end = System.currentTimeMillis();
-
-        Assert.assertTrue((end - start) >= 2000);
 
         consumer.close();
         producer.close();
diff --git a/site2/docs/reference-configuration.md 
b/site2/docs/reference-configuration.md
index 52f6d92..f31bd80 100644
--- a/site2/docs/reference-configuration.md
+++ b/site2/docs/reference-configuration.md
@@ -503,6 +503,7 @@ The value of 0 disables message-byte dispatch-throttling.|0|
 |dispatcherMinReadBatchSize|The minimum number of entries to read from 
BookKeeper. By default, it is 1 entry. When there is an error occurred on 
reading entries from bookkeeper, the broker will backoff the batch size to this 
minimum number.|1|
 |dispatcherMaxRoundRobinBatchSize|The maximum number of entries to dispatch 
for a shared subscription. By default, it is 20 entries.|20|
 | preciseDispatcherFlowControl | Precise dispathcer flow control according to 
history message number of each entry. | false |
+| streamingDispatch | Whether to use streaming read dispatcher. It can be 
useful when there's a huge backlog to drain and instead of read with micro 
batch we can streamline the read from bookkeeper to make the most of consumer 
capacity till we hit bookkeeper read limit or consumer process limit, then we 
can use consumer flow control to tune the speed. This feature is currently in 
preview and can be changed in subsequent release. | false |
 | maxConcurrentLookupRequest | Maximum number of concurrent lookup request 
that the broker allows to throttle heavy incoming lookup traffic. | 50000 |
 | maxConcurrentTopicLoadRequest | Maximum number of concurrent topic loading 
request that the broker allows to control the number of zk-operations. | 5000 |
 | maxConcurrentNonPersistentMessagePerConnection | Maximum number of 
concurrent non-persistent message that can be processed per connection. | 1000 |

Reply via email to