This is an automated email from the ASF dual-hosted git repository.

mmarshall pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 09e646f72eb [cleanup] Remove Streaming Dispatcher Code (#20279)
09e646f72eb is described below

commit 09e646f72eb640987ca9162988b6b146c485cd31
Author: Michael Marshall <[email protected]>
AuthorDate: Wed May 17 16:57:16 2023 -0500

    [cleanup] Remove Streaming Dispatcher Code (#20279)
    
    Discussed on mailing list here: 
https://lists.apache.org/thread/ky2bkzlz93njx3ntnvkpd0l77qzzgcmv
    
    Fixes: https://github.com/apache/pulsar/issues/19088
    Fixes: https://github.com/apache/pulsar/issues/16450
    Fixes: https://github.com/apache/pulsar/issues/15422
    Fixes: https://github.com/apache/pulsar/issues/11428
    
    ### Motivation
    
    I am putting this PR together to move the mailing list conversation along.
    
    Copied from @eolivelli on the mailing list thread:
    
    > There are many flaky tests about that feature and I believe that it
    has never been used in Production by anyone, because it happened a few
    times that we did some changes in the regular Dispatcher and
    introduced bugs on the StreamingDispacther (usually manifested as
    flaky tests)
    
    ### Modifications
    
    * Remove all `StreamingDispatcher` code, tests, and configuration.
    
    ### Verifying this change
    
    It should be sufficient to see the tests pass.
    
    ### Does this pull request potentially affect one of the following parts:
    
    This affects configuration.
    
    ### Documentation
    
    - [x] `doc-not-needed`
    
    There are no references to "streaming dispatcher" or "streamingdispatcher" 
in our github `pulsar-site` repo, which indicates to me that no docs need to be 
updated.
    
    ### Matching PR in forked repository
    
    PR in forked repository: https://github.com/michaeljmarshall/pulsar/pull/46
---
 .../apache/pulsar/broker/ServiceConfiguration.java |   7 -
 ...istentStreamingDispatcherMultipleConsumers.java | 240 -----------
 ...entStreamingDispatcherSingleActiveConsumer.java | 233 -----------
 .../service/persistent/PersistentSubscription.java |  17 +-
 .../streamingdispatch/PendingReadEntryRequest.java |  76 ----
 .../streamingdispatch/StreamingDispatcher.java     |  53 ---
 .../streamingdispatch/StreamingEntryReader.java    | 342 ----------------
 .../service/streamingdispatch/package-info.java    |  19 -
 ...herFailoverConsumerStreamingDispatcherTest.java |  37 --
 ...rsistentFailoverStreamingDispatcherE2ETest.java |  38 --
 ...istentStreamingDispatcherBlockConsumerTest.java |  38 --
 ...eDispatchStreamingDispatcherThrottlingTest.java |  39 --
 .../PersistentTopicStreamingDispatcherE2ETest.java |  42 --
 .../PersistentTopicStreamingDispatcherTest.java    |  41 --
 ...roducerConsumerTestStreamingDispatcherTest.java |  36 --
 .../StreamingEntryReaderTests.java                 | 441 ---------------------
 .../client/api/MessageDispatchThrottlingTest.java  |   1 -
 17 files changed, 3 insertions(+), 1697 deletions(-)

diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 8668c17c3d8..9966912bc8e 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -1174,13 +1174,6 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
     )
     private boolean allowOverrideEntryFilters = false;
 
-    @FieldContext(
-        category = CATEGORY_SERVER,
-        doc = "Whether to use streaming read dispatcher. Currently is in 
preview and can be changed "
-                + "in subsequent release."
-    )
-    private boolean streamingDispatch = false;
-
     @FieldContext(
         dynamic = true,
         category = CATEGORY_SERVER,
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherMultipleConsumers.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherMultipleConsumers.java
deleted file mode 100644
index 44e8a423344..00000000000
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherMultipleConsumers.java
+++ /dev/null
@@ -1,240 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.broker.service.persistent;
-
-import com.google.common.collect.Lists;
-import java.util.Set;
-import java.util.concurrent.Executor;
-import java.util.concurrent.TimeUnit;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.bookkeeper.mledger.Entry;
-import org.apache.bookkeeper.mledger.ManagedCursor;
-import org.apache.bookkeeper.mledger.Position;
-import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
-import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
-import org.apache.bookkeeper.mledger.impl.PositionImpl;
-import org.apache.commons.lang3.tuple.Pair;
-import org.apache.pulsar.broker.service.Subscription;
-import 
org.apache.pulsar.broker.service.streamingdispatch.PendingReadEntryRequest;
-import org.apache.pulsar.broker.service.streamingdispatch.StreamingDispatcher;
-import org.apache.pulsar.broker.service.streamingdispatch.StreamingEntryReader;
-
-/**
- * A {@link PersistentDispatcherMultipleConsumers} implemented {@link 
StreamingDispatcher}.
- * It'll use {@link StreamingEntryReader} to read new entries instead read as 
micro batch from managed ledger.
- */
-@Slf4j
-public class PersistentStreamingDispatcherMultipleConsumers extends 
PersistentDispatcherMultipleConsumers
-    implements StreamingDispatcher {
-
-    private int sendingTaskCounter = 0;
-    private final StreamingEntryReader streamingEntryReader = new 
StreamingEntryReader((ManagedCursorImpl) cursor,
-            this, topic);
-    private final Executor topicExecutor;
-
-    public PersistentStreamingDispatcherMultipleConsumers(PersistentTopic 
topic, ManagedCursor cursor,
-                                                          Subscription 
subscription) {
-        super(topic, cursor, subscription);
-        this.topicExecutor = 
topic.getBrokerService().getTopicOrderedExecutor().chooseThread(topic.getName());
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public synchronized void readEntryComplete(Entry entry, 
PendingReadEntryRequest ctx) {
-
-        ReadType readType = (ReadType) ctx.ctx;
-        if (ctx.isLast()) {
-            readFailureBackoff.reduceToHalf();
-            if (readType == ReadType.Normal) {
-                havePendingRead = false;
-            } else {
-                havePendingReplayRead = false;
-            }
-        }
-
-        if (readBatchSize < serviceConfig.getDispatcherMaxReadBatchSize()) {
-            int newReadBatchSize = Math.min(readBatchSize * 2, 
serviceConfig.getDispatcherMaxReadBatchSize());
-            if (log.isDebugEnabled()) {
-                log.debug("[{}] Increasing read batch size from {} to {}", 
name, readBatchSize, newReadBatchSize);
-            }
-            readBatchSize = newReadBatchSize;
-        }
-
-        if (shouldRewindBeforeReadingOrReplaying && readType == 
ReadType.Normal) {
-            // All consumers got disconnected before the completion of the 
read operation
-            entry.release();
-            cursor.rewind();
-            shouldRewindBeforeReadingOrReplaying = false;
-            readMoreEntries();
-            return;
-        }
-
-        if (log.isDebugEnabled()) {
-            log.debug("[{}] Distributing a messages to {} consumers", name, 
consumerList.size());
-        }
-
-        cursor.seek(((ManagedLedgerImpl) cursor.getManagedLedger())
-                .getNextValidPosition((PositionImpl) entry.getPosition()));
-
-        long size = entry.getLength();
-        updatePendingBytesToDispatch(size);
-        // dispatch messages to a separate thread, but still in order for this 
subscription
-        // sendMessagesToConsumers is responsible for running broker-side 
filters
-        // that may be quite expensive
-        if (serviceConfig.isDispatcherDispatchMessagesInSubscriptionThread()) {
-            // setting sendInProgress here, because sendMessagesToConsumers 
will be executed
-            // in a separate thread, and we want to prevent more reads
-            acquireSendInProgress();
-            dispatchMessagesThread.execute(() -> {
-                if (sendMessagesToConsumers(readType, 
Lists.newArrayList(entry), false)) {
-                    readMoreEntries();
-                } else {
-                    updatePendingBytesToDispatch(-size);
-                }
-            });
-        } else {
-            if (sendMessagesToConsumers(readType, Lists.newArrayList(entry), 
true)) {
-                readMoreEntriesAsync();
-            } else {
-                updatePendingBytesToDispatch(-size);
-            }
-        }
-        ctx.recycle();
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public void canReadMoreEntries(boolean withBackoff) {
-        havePendingRead = false;
-        topic.getBrokerService().executor().schedule(() -> {
-            topicExecutor.execute(() -> {
-                synchronized 
(PersistentStreamingDispatcherMultipleConsumers.this) {
-                    if (!havePendingRead) {
-                        log.info("[{}] Scheduling read operation", name);
-                        readMoreEntries();
-                    } else {
-                        log.info("[{}] Skipping read since we have 
pendingRead", name);
-                    }
-                }
-            });
-        }, withBackoff
-                ? readFailureBackoff.next() : 0, TimeUnit.MILLISECONDS);
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public void notifyConsumersEndOfTopic() {
-        if (cursor.getNumberOfEntriesInBacklog(false) == 0) {
-            // Topic has been terminated and there are no more entries to read
-            // Notify the consumer only if all the messages were already 
acknowledged
-            checkAndApplyReachedEndOfTopicOrTopicMigration(consumerList);
-        }
-    }
-
-    @Override
-    protected void cancelPendingRead() {
-        if (havePendingRead && streamingEntryReader.cancelReadRequests()) {
-            havePendingRead = false;
-        }
-    }
-
-    @Override
-    protected synchronized void acquireSendInProgress() {
-        sendingTaskCounter++;
-    }
-
-    @Override
-    protected synchronized void releaseSendInProgress() {
-        sendingTaskCounter--;
-    }
-
-    @Override
-    protected synchronized boolean isSendInProgress() {
-        return sendingTaskCounter > 0;
-    }
-
-    @Override
-    public synchronized void readMoreEntries() {
-        if (isSendInProgress()) {
-            // we cannot read more entries while sending the previous batch
-            // otherwise we could re-read the same entries and send duplicates
-            return;
-        }
-        // totalAvailablePermits may be updated by other threads
-        int currentTotalAvailablePermits = totalAvailablePermits;
-        if (currentTotalAvailablePermits > 0 && 
isAtleastOneConsumerAvailable()) {
-            Pair<Integer, Long> calculateResult = 
calculateToRead(currentTotalAvailablePermits);
-            int messagesToRead = calculateResult.getLeft();
-            long bytesToRead = calculateResult.getRight();
-            if (-1 == messagesToRead || bytesToRead == -1) {
-                // Skip read as topic/dispatcher has exceed the dispatch rate 
or previous pending read hasn't complete.
-                return;
-            }
-
-            Set<PositionImpl> messagesToReplayNow = 
getMessagesToReplayNow(messagesToRead);
-
-            if (!messagesToReplayNow.isEmpty()) {
-                if (log.isDebugEnabled()) {
-                    log.debug("[{}] Schedule replay of {} messages for {} 
consumers", name, messagesToReplayNow.size(),
-                            consumerList.size());
-                }
-
-                havePendingReplayRead = true;
-                Set<? extends Position> deletedMessages = 
topic.isDelayedDeliveryEnabled()
-                        ? asyncReplayEntriesInOrder(messagesToReplayNow) : 
asyncReplayEntries(messagesToReplayNow);
-                // clear already acked positions from replay bucket
-
-                deletedMessages.forEach(position -> 
redeliveryMessages.remove(((PositionImpl) position).getLedgerId(),
-                        ((PositionImpl) position).getEntryId()));
-                // if all the entries are acked-entries and cleared up from 
redeliveryMessages, try to read
-                // next entries as readCompletedEntries-callback was never 
called
-                if ((messagesToReplayNow.size() - deletedMessages.size()) == 
0) {
-                    havePendingReplayRead = false;
-                    // We should not call readMoreEntries() recursively in the 
same thread
-                    // as there is a risk of StackOverflowError
-                    
topic.getBrokerService().executor().execute(this::readMoreEntries);
-                }
-            } else if (BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER.get(this) == 
TRUE) {
-                log.debug("[{}] Dispatcher read is blocked due to 
unackMessages {} reached to max {}", name,
-                        totalUnackedMessages, 
topic.getMaxUnackedMessagesOnSubscription());
-            } else if (!havePendingRead) {
-                if (log.isDebugEnabled()) {
-                    log.debug("[{}] Schedule read of {} messages for {} 
consumers", name, messagesToRead,
-                            consumerList.size());
-                }
-                havePendingRead = true;
-                streamingEntryReader.asyncReadEntries(messagesToRead, 
bytesToRead,
-                        ReadType.Normal);
-            } else {
-                log.debug("[{}] Cannot schedule next read until previous one 
is done", name);
-            }
-        } else {
-            if (log.isDebugEnabled()) {
-                log.debug("[{}] Consumer buffer is full, pause reading", name);
-            }
-        }
-    }
-
-}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherSingleActiveConsumer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherSingleActiveConsumer.java
deleted file mode 100644
index efe9de778a3..00000000000
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherSingleActiveConsumer.java
+++ /dev/null
@@ -1,233 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.broker.service.persistent;
-
-import static 
org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH;
-import com.google.common.collect.Lists;
-import java.util.concurrent.Executor;
-import java.util.concurrent.TimeUnit;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.bookkeeper.mledger.Entry;
-import org.apache.bookkeeper.mledger.ManagedCursor;
-import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
-import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
-import org.apache.bookkeeper.mledger.impl.PositionImpl;
-import org.apache.commons.lang3.tuple.Pair;
-import org.apache.pulsar.broker.service.Consumer;
-import org.apache.pulsar.broker.service.EntryBatchIndexesAcks;
-import org.apache.pulsar.broker.service.EntryBatchSizes;
-import org.apache.pulsar.broker.service.SendMessageInfo;
-import org.apache.pulsar.broker.service.Subscription;
-import 
org.apache.pulsar.broker.service.streamingdispatch.PendingReadEntryRequest;
-import org.apache.pulsar.broker.service.streamingdispatch.StreamingDispatcher;
-import org.apache.pulsar.broker.service.streamingdispatch.StreamingEntryReader;
-import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
-
-/**
- * A {@link PersistentDispatcherSingleActiveConsumer} implemented {@link 
StreamingDispatcher}.
- * It'll use {@link StreamingEntryReader} to read new entries instead read as 
micro batch from managed ledger.
- */
-@Slf4j
-public class PersistentStreamingDispatcherSingleActiveConsumer extends 
PersistentDispatcherSingleActiveConsumer
-        implements StreamingDispatcher {
-
-    private final StreamingEntryReader streamingEntryReader = new 
StreamingEntryReader((ManagedCursorImpl) cursor,
-            this, topic);
-
-    private final Executor dispatcherExecutor;
-
-    public PersistentStreamingDispatcherSingleActiveConsumer(ManagedCursor 
cursor, SubType subscriptionType,
-                                                             int 
partitionIndex, PersistentTopic topic,
-                                                             Subscription 
subscription) {
-        super(cursor, subscriptionType, partitionIndex, topic, subscription);
-        this.dispatcherExecutor = 
topic.getBrokerService().getTopicOrderedExecutor().chooseThread(name);
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public void canReadMoreEntries(boolean withBackoff) {
-        havePendingRead = false;
-        topic.getBrokerService().executor().schedule(() -> {
-             topicExecutor.execute(() -> {
-                synchronized 
(PersistentStreamingDispatcherSingleActiveConsumer.this) {
-                    Consumer currentConsumer = 
ACTIVE_CONSUMER_UPDATER.get(this);
-                    if (currentConsumer != null && !havePendingRead) {
-                        if (log.isDebugEnabled()) {
-                            log.debug("[{}-{}] Scheduling read ", name, 
currentConsumer);
-                        }
-                        readMoreEntries(currentConsumer);
-                    } else {
-                        log.info("[{}-{}] Skipping read as we still 
havePendingRead {}", name,
-                                currentConsumer, havePendingRead);
-                    }
-                }
-            });
-        }, withBackoff
-                ? readFailureBackoff.next() : 0, TimeUnit.MILLISECONDS);
-    }
-
-    @Override
-    protected void cancelPendingRead() {
-        if (havePendingRead && streamingEntryReader.cancelReadRequests()) {
-            havePendingRead = false;
-        }
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public synchronized void notifyConsumersEndOfTopic() {
-        if (cursor.getNumberOfEntriesInBacklog(false) == 0) {
-            // Topic has been terminated and there are no more entries to read
-            // Notify the consumer only if all the messages were already 
acknowledged
-            checkAndApplyReachedEndOfTopicOrTopicMigration(consumers);
-        }
-    }
-
-    @Override
-    public String getName() {
-        return name;
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public void readEntryComplete(Entry entry, PendingReadEntryRequest ctx) {
-        dispatcherExecutor.execute(() -> internalReadEntryComplete(entry, 
ctx));
-    }
-
-    public synchronized void internalReadEntryComplete(Entry entry, 
PendingReadEntryRequest ctx) {
-        if (ctx.isLast()) {
-            readFailureBackoff.reduceToHalf();
-            havePendingRead = false;
-        }
-
-        isFirstRead = false;
-
-        if (readBatchSize < serviceConfig.getDispatcherMaxReadBatchSize()) {
-            int newReadBatchSize = Math.min(readBatchSize * 2, 
serviceConfig.getDispatcherMaxReadBatchSize());
-            if (log.isDebugEnabled()) {
-                log.debug("[{}-{}] Increasing read batch size from {} to {}", 
name,
-                        ((Consumer) ctx.ctx).consumerName(), readBatchSize, 
newReadBatchSize);
-            }
-            readBatchSize = newReadBatchSize;
-        }
-
-        Consumer currentConsumer = ACTIVE_CONSUMER_UPDATER.get(this);
-
-        if (isKeyHashRangeFiltered) {
-            byte[] key = peekStickyKey(entry.getDataBuffer());
-            Consumer consumer = stickyKeyConsumerSelector.select(key);
-            // Skip the entry if it's not for current active consumer.
-            if (consumer == null || currentConsumer != consumer) {
-                entry.release();
-                return;
-            }
-        }
-        Consumer consumer = (Consumer) ctx.ctx;
-        ctx.recycle();
-        if (currentConsumer == null || consumer != currentConsumer) {
-            // Active consumer has changed since the read request has been 
issued. We need to rewind the cursor and
-            // re-issue the read request for the new consumer
-            if (log.isDebugEnabled()) {
-                log.debug("[{}] Rewind because no available consumer found to 
dispatch message to.", name);
-            }
-
-            entry.release();
-            streamingEntryReader.cancelReadRequests();
-            havePendingRead = false;
-            if (currentConsumer != null) {
-                notifyActiveConsumerChanged(currentConsumer);
-                readMoreEntries(currentConsumer);
-            }
-        } else {
-            EntryBatchSizes batchSizes = EntryBatchSizes.get(1);
-            SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal();
-            EntryBatchIndexesAcks batchIndexesAcks = 
EntryBatchIndexesAcks.get(1);
-            filterEntriesForConsumer(Lists.newArrayList(entry), batchSizes, 
sendMessageInfo, batchIndexesAcks,
-                    cursor, false, consumer);
-            // Update cursor's read position.
-            cursor.seek(((ManagedLedgerImpl) cursor.getManagedLedger())
-                    .getNextValidPosition((PositionImpl) entry.getPosition()));
-            dispatchEntriesToConsumer(currentConsumer, 
Lists.newArrayList(entry), batchSizes,
-                    batchIndexesAcks, sendMessageInfo, DEFAULT_CONSUMER_EPOCH);
-        }
-    }
-
-    @Override
-    protected void readMoreEntries(Consumer consumer) {
-        // consumer can be null when all consumers are disconnected from 
broker.
-        // so skip reading more entries if currently there is no active 
consumer.
-        if (null == consumer) {
-            if (log.isDebugEnabled()) {
-                log.debug("[{}] Skipping read for the topic, Due to the 
current consumer is null", topic.getName());
-            }
-            return;
-        }
-        if (havePendingRead) {
-            if (log.isDebugEnabled()) {
-                log.debug("[{}] Skipping read for the topic, Due to we have 
pending read.", topic.getName());
-            }
-            return;
-        }
-
-        if (consumer.getAvailablePermits() > 0) {
-            synchronized (this) {
-                if (havePendingRead) {
-                    if (log.isDebugEnabled()) {
-                        log.debug("[{}] Skipping read for the topic, Due to we 
have pending read.", topic.getName());
-                    }
-                    return;
-                }
-
-                Pair<Integer, Long> calculateResult = 
calculateToRead(consumer);
-                int messagesToRead = calculateResult.getLeft();
-                long bytesToRead = calculateResult.getRight();
-
-
-                if (-1 == messagesToRead || bytesToRead == -1) {
-                    // Skip read as topic/dispatcher has exceed the dispatch 
rate.
-                    return;
-                }
-
-                // Schedule read
-                if (log.isDebugEnabled()) {
-                    log.debug("[{}-{}] Schedule read of {} messages", name, 
consumer, messagesToRead);
-                }
-                havePendingRead = true;
-
-                if (consumer.readCompacted()) {
-                    topic.getCompactedTopic().asyncReadEntriesOrWait(cursor, 
messagesToRead, isFirstRead,
-                            this, consumer);
-                } else {
-                    streamingEntryReader.asyncReadEntries(messagesToRead, 
bytesToRead, consumer);
-                }
-            }
-        } else {
-            if (log.isDebugEnabled()) {
-                log.debug("[{}-{}] Consumer buffer is full, pause reading", 
name, consumer);
-            }
-        }
-    }
-
-}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index d283cac77c7..09dabcd4bfc 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -223,26 +223,18 @@ public class PersistentSubscription extends 
AbstractSubscription implements Subs
 
                 if (dispatcher == null || !dispatcher.isConsumerConnected()) {
                     Dispatcher previousDispatcher = null;
-                    boolean useStreamingDispatcher = 
topic.getBrokerService().getPulsar()
-                            .getConfiguration().isStreamingDispatch();
                     switch (consumer.subType()) {
                         case Exclusive:
                             if (dispatcher == null || dispatcher.getType() != 
SubType.Exclusive) {
                                 previousDispatcher = dispatcher;
-                                dispatcher = useStreamingDispatcher
-                                        ? new 
PersistentStreamingDispatcherSingleActiveConsumer(
-                                        cursor, SubType.Exclusive, 0, topic, 
this)
-                                        : new 
PersistentDispatcherSingleActiveConsumer(
+                                dispatcher = new 
PersistentDispatcherSingleActiveConsumer(
                                         cursor, SubType.Exclusive, 0, topic, 
this);
                             }
                             break;
                         case Shared:
                             if (dispatcher == null || dispatcher.getType() != 
SubType.Shared) {
                                 previousDispatcher = dispatcher;
-                                dispatcher = useStreamingDispatcher
-                                        ? new 
PersistentStreamingDispatcherMultipleConsumers(
-                                        topic, cursor, this)
-                                        : new 
PersistentDispatcherMultipleConsumers(topic, cursor, this);
+                                dispatcher = new 
PersistentDispatcherMultipleConsumers(topic, cursor, this);
                             }
                             break;
                         case Failover:
@@ -256,10 +248,7 @@ public class PersistentSubscription extends 
AbstractSubscription implements Subs
 
                             if (dispatcher == null || dispatcher.getType() != 
SubType.Failover) {
                                 previousDispatcher = dispatcher;
-                                dispatcher = useStreamingDispatcher
-                                        ? new 
PersistentStreamingDispatcherSingleActiveConsumer(
-                                        cursor, SubType.Failover, 
partitionIndex, topic, this) :
-                                        new 
PersistentDispatcherSingleActiveConsumer(cursor, SubType.Failover,
+                                dispatcher = new 
PersistentDispatcherSingleActiveConsumer(cursor, SubType.Failover,
                                                 partitionIndex, topic, this);
                             }
                             break;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/streamingdispatch/PendingReadEntryRequest.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/streamingdispatch/PendingReadEntryRequest.java
deleted file mode 100644
index e98e5de07c1..00000000000
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/streamingdispatch/PendingReadEntryRequest.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.broker.service.streamingdispatch;
-
-import io.netty.util.Recycler;
-import lombok.Data;
-import org.apache.bookkeeper.mledger.Entry;
-import org.apache.bookkeeper.mledger.ManagedLedger;
-import org.apache.bookkeeper.mledger.impl.PositionImpl;
-
-/**
- * Representing a pending read request to read an entry from {@link 
ManagedLedger} carrying necessary context.
- */
-@Data
-public class PendingReadEntryRequest {
-
-    private static final Recycler<PendingReadEntryRequest> RECYCLER = new 
Recycler<PendingReadEntryRequest>() {
-        protected PendingReadEntryRequest 
newObject(Recycler.Handle<PendingReadEntryRequest> handle) {
-            return new PendingReadEntryRequest(handle);
-        }
-    };
-
-    public static PendingReadEntryRequest create(Object ctx, PositionImpl 
position) {
-        PendingReadEntryRequest pendingReadEntryRequest = RECYCLER.get();
-        pendingReadEntryRequest.ctx = ctx;
-        pendingReadEntryRequest.position = position;
-        pendingReadEntryRequest.retry = 0;
-        pendingReadEntryRequest.isLast = false;
-        return pendingReadEntryRequest;
-    }
-
-    public void recycle() {
-        entry = null;
-        ctx = null;
-        position = null;
-        retry = -1;
-        recyclerHandle.recycle(this);
-    }
-
-    public boolean isLastRequest() {
-        return isLast;
-    }
-
-    private final Recycler.Handle<PendingReadEntryRequest> recyclerHandle;
-
-    // Entry read from ledger
-    public Entry entry;
-
-    // Passed in context that'll be pass to callback
-    public Object ctx;
-
-    // Position of entry to be read
-    public PositionImpl position;
-
-    // Number of time request has been retried.
-    int retry;
-
-    // If request is the last one of a set of requests.
-    boolean isLast;
-}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/streamingdispatch/StreamingDispatcher.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/streamingdispatch/StreamingDispatcher.java
deleted file mode 100644
index 4e9e8befe0f..00000000000
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/streamingdispatch/StreamingDispatcher.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.broker.service.streamingdispatch;
-
-import org.apache.bookkeeper.mledger.Entry;
-import org.apache.bookkeeper.mledger.ManagedLedger;
-import org.apache.pulsar.broker.service.Dispatcher;
-import org.apache.pulsar.common.classification.InterfaceStability;
-
-/**
- * A {@link Dispatcher} that'll use {@link StreamingEntryReader} to read 
entries from {@link ManagedLedger}.
- */
[email protected]
-public interface StreamingDispatcher extends Dispatcher {
-
-    /**
-     * Notify dispatcher issued read entry request has complete.
-     * @param entry Entry read.
-     * @param ctx   Context passed in when issuing read entries request.
-     */
-    void readEntryComplete(Entry entry, PendingReadEntryRequest ctx);
-
-    /**
-     * Notify dispatcher can issue next read request.
-     */
-    void canReadMoreEntries(boolean withBackoff);
-
-    /**
-     * Notify dispatcher to inform consumers reached end of topic.
-     */
-    void notifyConsumersEndOfTopic();
-
-    /**
-     * @return Name of the dispatcher.
-     */
-    String getName();
-}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/streamingdispatch/StreamingEntryReader.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/streamingdispatch/StreamingEntryReader.java
deleted file mode 100644
index 6ffc5ba0f62..00000000000
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/streamingdispatch/StreamingEntryReader.java
+++ /dev/null
@@ -1,342 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.broker.service.streamingdispatch;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Queue;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.Executor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.bookkeeper.mledger.AsyncCallbacks;
-import org.apache.bookkeeper.mledger.Entry;
-import org.apache.bookkeeper.mledger.ManagedLedgerException;
-import org.apache.bookkeeper.mledger.Position;
-import org.apache.bookkeeper.mledger.WaitingEntryCallBack;
-import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
-import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
-import org.apache.bookkeeper.mledger.impl.PositionImpl;
-import org.apache.pulsar.broker.service.persistent.PersistentTopic;
-import 
org.apache.pulsar.broker.transaction.exception.buffer.TransactionBufferException;
-import org.apache.pulsar.client.impl.Backoff;
-
-/**
- * Entry reader that fulfill read request by streamline the read instead of 
reading with micro batch.
- */
-@Slf4j
-public class StreamingEntryReader implements AsyncCallbacks.ReadEntryCallback, 
WaitingEntryCallBack {
-
-    private final int maxRetry = 3;
-
-    // Queue for read request issued yet waiting for complete from managed 
ledger.
-    private ConcurrentLinkedQueue<PendingReadEntryRequest> issuedReads = new 
ConcurrentLinkedQueue<>();
-
-    // Queue for read request that's wait for new entries from managed ledger.
-    private ConcurrentLinkedQueue<PendingReadEntryRequest> pendingReads = new 
ConcurrentLinkedQueue<>();
-
-    private final ManagedCursorImpl cursor;
-
-    private final StreamingDispatcher dispatcher;
-
-    private final PersistentTopic topic;
-
-    private final Executor topicExecutor;
-
-    private final Executor dispatcherExecutor;
-
-    private AtomicInteger currentReadSizeByte = new AtomicInteger(0);
-
-    private volatile State state;
-
-    private static final AtomicReferenceFieldUpdater<StreamingEntryReader, 
State> STATE_UPDATER =
-            AtomicReferenceFieldUpdater.newUpdater(StreamingEntryReader.class, 
State.class, "state");
-
-    private volatile long maxReadSizeByte;
-
-    private final Backoff readFailureBackoff = new Backoff(10, 
TimeUnit.MILLISECONDS,
-            1, TimeUnit.SECONDS, 0, TimeUnit.MILLISECONDS);
-
-    public StreamingEntryReader(ManagedCursorImpl cursor, StreamingDispatcher 
dispatcher, PersistentTopic topic) {
-        this.cursor = cursor;
-        this.dispatcher = dispatcher;
-        this.topic = topic;
-        this.topicExecutor = 
topic.getBrokerService().getTopicOrderedExecutor().chooseThread(topic.getName());
-        this.dispatcherExecutor = 
topic.getBrokerService().getTopicOrderedExecutor().chooseThread(dispatcher.getName());
-    }
-
-    /**
-     * Read entries in streaming way, that said instead of reading with micro 
batch and send entries to consumer after
-     * all entries in the batch are read from ledger, this method will fire 
numEntriesToRead requests to managedLedger
-     * and send entry to consumer whenever it is read and all entries before 
it have been sent to consumer.
-     * @param numEntriesToRead number of entry to read from ledger.
-     * @param maxReadSizeByte maximum byte will be read from ledger.
-     * @param ctx Context send along with read request.
-     */
-    public synchronized void asyncReadEntries(int numEntriesToRead, long 
maxReadSizeByte, Object ctx) {
-        if (STATE_UPDATER.compareAndSet(this, State.Canceling, 
State.Canceled)) {
-            internalCancelReadRequests();
-        }
-
-        if (!issuedReads.isEmpty() || !pendingReads.isEmpty()) {
-            if (log.isDebugEnabled()) {
-                log.debug("[{}] There's pending streaming read not completed 
yet. Not scheduling next read request.",
-                        cursor.getName());
-            }
-            return;
-        }
-
-        PositionImpl nextReadPosition = (PositionImpl) 
cursor.getReadPosition();
-        ManagedLedgerImpl managedLedger = (ManagedLedgerImpl) 
cursor.getManagedLedger();
-        // Edge case, when a old ledger is full and new ledger is not yet 
opened, position can point to next
-        // position of the last confirmed position, but it'll be an invalid 
position. So try to update the position.
-        if (!managedLedger.isValidPosition(nextReadPosition)) {
-            nextReadPosition = 
managedLedger.getNextValidPosition(nextReadPosition);
-        }
-        boolean hasEntriesToRead = 
managedLedger.hasMoreEntries(nextReadPosition);
-        currentReadSizeByte.set(0);
-        STATE_UPDATER.set(this, State.Issued);
-        this.maxReadSizeByte = maxReadSizeByte;
-        for (int c = 0; c < numEntriesToRead; c++) {
-            PendingReadEntryRequest pendingReadEntryRequest = 
PendingReadEntryRequest.create(ctx, nextReadPosition);
-            // Make sure once we start putting request into pending requests 
queue, we won't put any following request
-            // to issued requests queue in order to guarantee the order.
-            if (hasEntriesToRead && 
managedLedger.hasMoreEntries(nextReadPosition)) {
-                issuedReads.offer(pendingReadEntryRequest);
-            } else {
-                pendingReads.offer(pendingReadEntryRequest);
-            }
-            nextReadPosition = 
managedLedger.getNextValidPosition(nextReadPosition);
-        }
-
-        // Issue requests.
-        for (PendingReadEntryRequest request : issuedReads) {
-            managedLedger.asyncReadEntry(request.position, this, request);
-        }
-
-        if (!pendingReads.isEmpty()) {
-            if (log.isDebugEnabled()) {
-                log.debug("[{}} Streaming entry reader has {} pending read 
requests waiting on new entry."
-                        , cursor.getName(), pendingReads.size());
-            }
-            // If new entries are available after we put request into pending 
queue, fire read.
-            // Else register callback with managed ledger to get notify when 
new entries are available.
-            if (managedLedger.hasMoreEntries(pendingReads.peek().position)) {
-                entriesAvailable();
-            } else if (managedLedger.isTerminated()) {
-                dispatcher.notifyConsumersEndOfTopic();
-                cleanQueue(pendingReads);
-                if (issuedReads.size() == 0) {
-                    dispatcher.canReadMoreEntries(true);
-                }
-            } else {
-                managedLedger.addWaitingEntryCallBack(this);
-            }
-        }
-    }
-
-    @Override
-    public void readEntryComplete(Entry entry, Object ctx) {
-        // Don't block caller thread, complete read entry with dispatcher 
dedicated thread.
-        dispatcherExecutor.execute(() -> internalReadEntryComplete(entry, 
ctx));
-    }
-
-    private void internalReadEntryComplete(Entry entry, Object ctx) {
-        PendingReadEntryRequest pendingReadEntryRequest = 
(PendingReadEntryRequest) ctx;
-        pendingReadEntryRequest.entry = entry;
-        readFailureBackoff.reduceToHalf();
-        Entry readEntry;
-        // If we have entry to send to dispatcher.
-        if (!issuedReads.isEmpty() && issuedReads.peek() == 
pendingReadEntryRequest) {
-            while (!issuedReads.isEmpty() && issuedReads.peek().entry != null) 
{
-                PendingReadEntryRequest firstPendingReadEntryRequest = 
issuedReads.poll();
-                readEntry = firstPendingReadEntryRequest.entry;
-                currentReadSizeByte.addAndGet(readEntry.getLength());
-                //Cancel remaining requests and reset cursor if 
maxReadSizeByte exceeded.
-                if (currentReadSizeByte.get() > maxReadSizeByte) {
-                    cancelReadRequests(readEntry.getPosition());
-                    dispatcher.canReadMoreEntries(false);
-                    STATE_UPDATER.set(this, State.Completed);
-                    return;
-                } else {
-                    // All request has been completed, mark returned entry as 
last.
-                    if (issuedReads.isEmpty() && pendingReads.isEmpty()) {
-                        firstPendingReadEntryRequest.isLast = true;
-                        STATE_UPDATER.set(this, State.Completed);
-                    }
-                    dispatcher.readEntryComplete(readEntry, 
firstPendingReadEntryRequest);
-                }
-            }
-        } else if (!issuedReads.isEmpty() && issuedReads.peek().retry > 
maxRetry) {
-            cancelReadRequests(issuedReads.peek().position);
-            dispatcher.canReadMoreEntries(true);
-            STATE_UPDATER.set(this, State.Completed);
-        }
-    }
-
-    @Override
-    public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
-        // Don't block caller thread, complete read entry fail with dispatcher 
dedicated thread.
-        dispatcherExecutor.execute(() -> internalReadEntryFailed(exception, 
ctx));
-    }
-
-    private void internalReadEntryFailed(ManagedLedgerException exception, 
Object ctx) {
-        PendingReadEntryRequest pendingReadEntryRequest = 
(PendingReadEntryRequest) ctx;
-        PositionImpl readPosition = pendingReadEntryRequest.position;
-        pendingReadEntryRequest.retry++;
-        long waitTimeMillis = readFailureBackoff.next();
-        if (exception.getCause() instanceof 
TransactionBufferException.TransactionNotSealedException
-                || exception.getCause() instanceof 
ManagedLedgerException.OffloadReadHandleClosedException) {
-            waitTimeMillis = 1;
-            if (log.isDebugEnabled()) {
-                log.debug("[{}] Error reading transaction entries : {}, - 
Retrying to read in {} seconds",
-                        cursor.getName(), exception.getMessage(), 
waitTimeMillis / 1000.0);
-            }
-        } else if (!(exception instanceof 
ManagedLedgerException.TooManyRequestsException)) {
-            log.error("[{} Error reading entries at {} : {} - Retrying to read 
in {} seconds", cursor.getName(),
-                    readPosition, exception.getMessage(), waitTimeMillis / 
1000.0);
-        } else {
-            if (log.isDebugEnabled()) {
-                log.debug("[{}] Got throttled by bookies while reading at {} : 
{} - Retrying to read in {} seconds",
-                        cursor.getName(), readPosition, 
exception.getMessage(), waitTimeMillis / 1000.0);
-            }
-        }
-        if (!issuedReads.isEmpty()) {
-            if (issuedReads.peek().retry > maxRetry) {
-                cancelReadRequests(issuedReads.peek().position);
-                dispatcher.canReadMoreEntries(true);
-                STATE_UPDATER.set(this, State.Completed);
-                return;
-            }
-            if (pendingReadEntryRequest.retry <= maxRetry) {
-                retryReadRequest(pendingReadEntryRequest, waitTimeMillis);
-            }
-        }
-    }
-
-    // Cancel all issued and pending request and update cursor's read position.
-    private void cancelReadRequests(Position position) {
-        if (!issuedReads.isEmpty()) {
-            cleanQueue(issuedReads);
-            cursor.seek(position);
-        }
-
-        if (!pendingReads.isEmpty()) {
-            cleanQueue(pendingReads);
-        }
-    }
-
-    private void internalCancelReadRequests() {
-        Position readPosition = !issuedReads.isEmpty() ? 
issuedReads.peek().position : pendingReads.peek().position;
-        cancelReadRequests(readPosition);
-    }
-
-    public boolean cancelReadRequests() {
-        if (STATE_UPDATER.compareAndSet(this, State.Issued, State.Canceling)) {
-            // Don't block caller thread, complete cancel read with dispatcher 
dedicated thread.
-             topicExecutor.execute(() -> {
-                synchronized (StreamingEntryReader.this) {
-                    if (STATE_UPDATER.compareAndSet(this, State.Canceling, 
State.Canceled)) {
-                        internalCancelReadRequests();
-                    }
-                }
-            });
-            return true;
-        }
-        return false;
-    }
-
-    private void cleanQueue(Queue<PendingReadEntryRequest> queue) {
-        while (!queue.isEmpty()) {
-            PendingReadEntryRequest pendingReadEntryRequest = queue.poll();
-            if (pendingReadEntryRequest.entry != null) {
-                pendingReadEntryRequest.entry.release();
-                pendingReadEntryRequest.recycle();
-            }
-        }
-    }
-
-    private void retryReadRequest(PendingReadEntryRequest 
pendingReadEntryRequest, long delay) {
-        topic.getBrokerService().executor().schedule(() -> {
-            // Jump again into dispatcher dedicated thread
-            dispatcherExecutor.execute(() -> {
-                ManagedLedgerImpl managedLedger = (ManagedLedgerImpl) 
cursor.getManagedLedger();
-                managedLedger.asyncReadEntry(pendingReadEntryRequest.position, 
this, pendingReadEntryRequest);
-            });
-        }, delay, TimeUnit.MILLISECONDS);
-    }
-
-    @Override
-    public void entriesAvailable() {
-        dispatcherExecutor.execute(this::internalEntriesAvailable);
-    }
-
-    private synchronized void internalEntriesAvailable() {
-        if (log.isDebugEnabled()) {
-            log.debug("[{}} Streaming entry reader get notification of newly 
added entries from managed ledger,"
-                    + " trying to issued pending read requests.", 
cursor.getName());
-        }
-        ManagedLedgerImpl managedLedger = (ManagedLedgerImpl) 
cursor.getManagedLedger();
-        List<PendingReadEntryRequest> newlyIssuedRequests = new ArrayList<>();
-        if (!pendingReads.isEmpty()) {
-            // Edge case, when a old ledger is full and new ledger is not yet 
opened, position can point to next
-            // position of the last confirmed position, but it'll be an 
invalid position. So try to update the position.
-            if (!managedLedger.isValidPosition(pendingReads.peek().position)) {
-                pendingReads.peek().position = 
managedLedger.getNextValidPosition(pendingReads.peek().position);
-            }
-            while (!pendingReads.isEmpty() && 
managedLedger.hasMoreEntries(pendingReads.peek().position)) {
-                PendingReadEntryRequest next = pendingReads.poll();
-                issuedReads.offer(next);
-                newlyIssuedRequests.add(next);
-                // Need to update the position because when the 
PendingReadEntryRequest is created, the position could
-                // be all set to managed ledger's last confirmed position.
-                if (!pendingReads.isEmpty()) {
-                    pendingReads.peek().position = 
managedLedger.getNextValidPosition(next.position);
-                }
-            }
-
-            for (PendingReadEntryRequest request : newlyIssuedRequests) {
-                managedLedger.asyncReadEntry(request.position, this, request);
-            }
-
-            if (!pendingReads.isEmpty()) {
-                if (log.isDebugEnabled()) {
-                    log.debug("[{}} Streaming entry reader has {} pending read 
requests waiting on new entry."
-                            , cursor.getName(), pendingReads.size());
-                }
-                if 
(managedLedger.hasMoreEntries(pendingReads.peek().position)) {
-                    entriesAvailable();
-                } else {
-                    managedLedger.addWaitingEntryCallBack(this);
-                }
-            }
-        }
-    }
-
-    protected State getState() {
-        return STATE_UPDATER.get(this);
-    }
-
-    enum State {
-        Issued, Canceling, Canceled, Completed;
-    }
-
-}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/streamingdispatch/package-info.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/streamingdispatch/package-info.java
deleted file mode 100644
index 4d576d9f437..00000000000
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/streamingdispatch/package-info.java
+++ /dev/null
@@ -1,19 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.broker.service.streamingdispatch;
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherFailoverConsumerStreamingDispatcherTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherFailoverConsumerStreamingDispatcherTest.java
deleted file mode 100644
index f86fe0701dc..00000000000
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherFailoverConsumerStreamingDispatcherTest.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.broker.service.persistent;
-
-import 
org.apache.pulsar.broker.service.PersistentDispatcherFailoverConsumerTest;
-import org.apache.pulsar.broker.service.streamingdispatch.StreamingDispatcher;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.Test;
-
-/**
- * PersistentDispatcherFailoverConsumerTest with {@link StreamingDispatcher}
- */
-@Test(groups = "quarantine")
-public class PersistentDispatcherFailoverConsumerStreamingDispatcherTest 
extends PersistentDispatcherFailoverConsumerTest {
-
-    @BeforeMethod(alwaysRun = true)
-    public void setup() throws Exception {
-        super.setup();
-        pulsarTestContext.getConfig().setStreamingDispatch(true);
-    }
-}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentFailoverStreamingDispatcherE2ETest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentFailoverStreamingDispatcherE2ETest.java
deleted file mode 100644
index 92352cde47f..00000000000
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentFailoverStreamingDispatcherE2ETest.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.broker.service.persistent;
-
-import org.apache.pulsar.broker.service.PersistentFailoverE2ETest;
-import org.apache.pulsar.broker.service.streamingdispatch.StreamingDispatcher;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
-
-/**
- * PersistentFailoverE2ETest with {@link StreamingDispatcher}
- */
-@Test(groups = "broker")
-public class PersistentFailoverStreamingDispatcherE2ETest extends 
PersistentFailoverE2ETest {
-
-    @BeforeClass
-    @Override
-    protected void setup() throws Exception {
-        conf.setStreamingDispatch(true);
-        super.setup();
-    }
-}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherBlockConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherBlockConsumerTest.java
deleted file mode 100644
index ef515cd85bf..00000000000
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherBlockConsumerTest.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.broker.service.persistent;
-
-import org.apache.pulsar.broker.service.streamingdispatch.StreamingDispatcher;
-import org.apache.pulsar.client.api.DispatcherBlockConsumerTest;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.Test;
-
-/**
- * DispatcherBlockConsumerTest with {@link StreamingDispatcher}
- */
-@Test(groups = "flaky")
-public class PersistentStreamingDispatcherBlockConsumerTest extends 
DispatcherBlockConsumerTest {
-
-    @BeforeMethod
-    @Override
-    protected void setup() throws Exception {
-        super.setup();
-        conf.setStreamingDispatch(true);
-    }
-}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionMessageDispatchStreamingDispatcherThrottlingTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionMessageDispatchStreamingDispatcherThrottlingTest.java
deleted file mode 100644
index fc62d84b586..00000000000
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionMessageDispatchStreamingDispatcherThrottlingTest.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.broker.service.persistent;
-
-import org.apache.pulsar.broker.service.streamingdispatch.StreamingDispatcher;
-import org.apache.pulsar.client.api.SubscriptionMessageDispatchThrottlingTest;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
-
-/**
- * SubscriptionMessageDispatchThrottlingTest with {@link StreamingDispatcher}
- */
-@Test(groups = "flaky")
-public class 
PersistentSubscriptionMessageDispatchStreamingDispatcherThrottlingTest
-    extends SubscriptionMessageDispatchThrottlingTest {
-
-    @BeforeClass
-    @Override
-    protected void setup() throws Exception {
-        super.setup();
-        conf.setStreamingDispatch(true);
-    }
-}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicStreamingDispatcherE2ETest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicStreamingDispatcherE2ETest.java
deleted file mode 100644
index b4aa4793ba6..00000000000
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicStreamingDispatcherE2ETest.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.broker.service.persistent;
-
-import org.apache.pulsar.broker.service.PersistentTopicE2ETest;
-import org.apache.pulsar.broker.service.streamingdispatch.StreamingDispatcher;
-import org.testng.annotations.Test;
-
-/**
- * PersistentTopicE2ETest with {@link StreamingDispatcher}
- */
-@Test(groups = "flaky")
-public class PersistentTopicStreamingDispatcherE2ETest extends 
PersistentTopicE2ETest {
-
-    @Override
-    protected void doInitConf() throws Exception {
-        super.doInitConf();
-        conf.setStreamingDispatch(true);
-    }
-
-    @Override
-    @Test
-    public void testMessageRedelivery() throws Exception {
-        super.testMessageRedelivery();
-    }
-}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicStreamingDispatcherTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicStreamingDispatcherTest.java
deleted file mode 100644
index 440cbbe290c..00000000000
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicStreamingDispatcherTest.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.broker.service.persistent;
-
-import org.apache.pulsar.broker.ServiceConfiguration;
-import org.apache.pulsar.broker.service.PersistentTopicTest;
-import org.apache.pulsar.broker.service.streamingdispatch.StreamingDispatcher;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.Test;
-
-/**
- * PersistentTopicTest with {@link StreamingDispatcher}
- */
-@Test(groups = "broker")
-public class PersistentTopicStreamingDispatcherTest extends 
PersistentTopicTest {
-
-    @BeforeMethod(alwaysRun = true)
-    public void setup() throws Exception {
-        super.setup();
-        ServiceConfiguration config = pulsarTestContext.getConfig();
-        config.setTopicLevelPoliciesEnabled(false);
-        config.setSystemTopicEnabled(false);
-        config.setStreamingDispatch(true);
-    }
-}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/SimpleProducerConsumerTestStreamingDispatcherTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/SimpleProducerConsumerTestStreamingDispatcherTest.java
deleted file mode 100644
index 706571eeb71..00000000000
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/SimpleProducerConsumerTestStreamingDispatcherTest.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.broker.service.persistent;
-
-import org.apache.pulsar.broker.service.streamingdispatch.StreamingDispatcher;
-import org.apache.pulsar.client.api.SimpleProducerConsumerTest;
-import org.testng.annotations.Test;
-
-/**
- * SimpleProducerConsumerTest with {@link StreamingDispatcher}
- */
-@Test(groups = "flaky")
-public class SimpleProducerConsumerTestStreamingDispatcherTest extends 
SimpleProducerConsumerTest {
-
-    @Override
-    protected void doInitConf() throws Exception {
-        super.doInitConf();
-        conf.setStreamingDispatch(true);
-    }
-}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/streamingdispatch/StreamingEntryReaderTests.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/streamingdispatch/StreamingEntryReaderTests.java
deleted file mode 100644
index 5eb29b8ef28..00000000000
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/streamingdispatch/StreamingEntryReaderTests.java
+++ /dev/null
@@ -1,441 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.broker.service.streamingdispatch;
-
-import static org.awaitility.Awaitility.await;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyBoolean;
-import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.reset;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.when;
-import static org.testng.Assert.assertEquals;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.nio.NioEventLoopGroup;
-import java.nio.charset.Charset;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Stack;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.bookkeeper.common.util.OrderedExecutor;
-import org.apache.bookkeeper.common.util.OrderedScheduler;
-import org.apache.bookkeeper.mledger.AsyncCallbacks;
-import org.apache.bookkeeper.mledger.Entry;
-import org.apache.bookkeeper.mledger.ManagedCursor;
-import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
-import org.apache.bookkeeper.mledger.ManagedLedgerException;
-import org.apache.bookkeeper.mledger.Position;
-import org.apache.bookkeeper.mledger.impl.EntryImpl;
-import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
-import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
-import org.apache.bookkeeper.mledger.impl.PositionImpl;
-import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
-import org.apache.pulsar.broker.service.BrokerService;
-import org.apache.pulsar.broker.service.persistent.PersistentTopic;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-import org.testng.annotations.Test;
-
-/**
- * Tests for {@link StreamingEntryReader}
- */
-@Test(groups = "flaky")
-public class StreamingEntryReaderTests extends MockedBookKeeperTestCase {
-
-    private static final Charset Encoding = StandardCharsets.UTF_8;
-    private PersistentTopic mockTopic;
-    private StreamingDispatcher mockDispatcher;
-    private BrokerService mockBrokerService;
-    private EventLoopGroup eventLoopGroup;
-    private OrderedExecutor orderedExecutor;
-    private ManagedLedgerConfig config;
-    private ManagedLedgerImpl ledger;
-    private ManagedCursor cursor;
-
-    @Override
-    protected void setUpTestCase() throws Exception {
-        eventLoopGroup = new NioEventLoopGroup(1);
-        orderedExecutor = OrderedScheduler.newSchedulerBuilder()
-                .numThreads(1)
-                .name("StreamingEntryReaderTests").build();
-        mockTopic = mock(PersistentTopic.class);
-        mockBrokerService = mock(BrokerService.class);
-        mockDispatcher = mock(StreamingDispatcher.class);
-        config = new ManagedLedgerConfig().setMaxEntriesPerLedger(10);
-        ledger = spy((ManagedLedgerImpl) factory.open("my_test_ledger", 
config));
-        cursor = ledger.openCursor("test");
-        when(mockTopic.getBrokerService()).thenReturn(mockBrokerService);
-        when(mockBrokerService.executor()).thenReturn(eventLoopGroup);
-        
when(mockBrokerService.getTopicOrderedExecutor()).thenReturn(orderedExecutor);
-        doAnswer(new Answer<Void>() {
-            @Override
-            public Void answer(InvocationOnMock invocationOnMock) {
-                return null;
-            }
-        }).when(mockDispatcher).notifyConsumersEndOfTopic();
-    }
-
-    @Override
-    protected void cleanUpTestCase() {
-        if (eventLoopGroup != null) {
-            eventLoopGroup.shutdownNow();
-            eventLoopGroup = null;
-        }
-        if (orderedExecutor != null) {
-            orderedExecutor.shutdownNow();
-            orderedExecutor = null;
-        }
-    }
-
-    @Test
-    public void testCanReadEntryFromMLedgerHappyPath() throws Exception {
-        AtomicInteger entryCount = new AtomicInteger(0);
-        Stack<Position> positions = new Stack<>();
-
-        for (int i = 0; i < 150; i++) {
-            ledger.addEntry(String.format("message-%d", i).getBytes(Encoding));
-        }
-
-        StreamingEntryReader streamingEntryReader =new 
StreamingEntryReader((ManagedCursorImpl) cursor,
-                mockDispatcher, mockTopic);
-
-        doAnswer((InvocationOnMock invocationOnMock) -> {
-                Entry entry = invocationOnMock.getArgument(0, Entry.class);
-                positions.push(entry.getPosition());
-                assertEquals(new String(entry.getData()), 
String.format("message-%d", entryCount.getAndIncrement()));
-                cursor.seek(ledger.getNextValidPosition((PositionImpl) 
entry.getPosition()));
-                return null;
-            }
-        ).when(mockDispatcher).readEntryComplete(any(Entry.class), 
any(PendingReadEntryRequest.class));
-
-        streamingEntryReader.asyncReadEntries(50, 700, null);
-        await().until(() -> entryCount.get() == 50);
-        // Check cursor's read position has been properly updated
-        assertEquals(cursor.getReadPosition(), 
ledger.getNextValidPosition((PositionImpl) positions.peek()));
-        streamingEntryReader.asyncReadEntries(50, 700, null);
-        await().until(() -> entryCount.get() == 100);
-        assertEquals(cursor.getReadPosition(), 
ledger.getNextValidPosition((PositionImpl) positions.peek()));
-        streamingEntryReader.asyncReadEntries(50, 700, null);
-        await().until(() -> entryCount.get() == 150);
-        assertEquals(cursor.getReadPosition(), 
ledger.getNextValidPosition((PositionImpl) positions.peek()));
-    }
-
-    @Test
-    public void testCanReadEntryFromMLedgerSizeExceededLimit() throws 
Exception {
-        AtomicBoolean readComplete = new AtomicBoolean(false);
-        Stack<Position> positions = new Stack<>();
-        List<String> entries = new ArrayList<>();
-        int size = "mmmmmmmmmmessage-0".getBytes().length;
-        for (int i = 0; i < 15; i++) {
-            ledger.addEntry(String.format("mmmmmmmmmmessage-%d", 
i).getBytes(Encoding));
-        }
-
-        StreamingEntryReader streamingEntryReader =
-                new StreamingEntryReader((ManagedCursorImpl) cursor, 
mockDispatcher, mockTopic);
-
-        doAnswer((InvocationOnMock invocationOnMock) -> {
-                Entry entry = invocationOnMock.getArgument(0, Entry.class);
-                positions.push(entry.getPosition());
-                entries.add(new String(entry.getData()));
-                cursor.seek(ledger.getNextValidPosition((PositionImpl) 
entry.getPosition()));
-                return null;
-            }
-        ).when(mockDispatcher).readEntryComplete(any(Entry.class), 
any(PendingReadEntryRequest.class));
-
-        doAnswer((InvocationOnMock invocationOnMock) -> {
-                readComplete.set(true);
-                return null;
-            }
-        ).when(mockDispatcher).canReadMoreEntries(anyBoolean());
-
-        PositionImpl position = 
ledger.getPositionAfterN(ledger.getFirstPosition(), 3, 
ManagedLedgerImpl.PositionBound.startExcluded);
-        // Make reading from mledger return out of order.
-        doAnswer(new Answer<Void>() {
-            @Override
-            public Void answer(InvocationOnMock invocationOnMock) throws 
Throwable {
-                AsyncCallbacks.ReadEntryCallback cb = 
invocationOnMock.getArgument(1, AsyncCallbacks.ReadEntryCallback.class);
-                executor.schedule(() -> {
-                    
cb.readEntryComplete(EntryImpl.create(position.getLedgerId(), 
position.getEntryId(), "mmmmmmmmmmessage-2".getBytes()),
-                            invocationOnMock.getArgument(2));
-                }, 200, TimeUnit.MILLISECONDS);
-                return null;
-            }
-        }).when(ledger).asyncReadEntry(eq(position), any(), any());
-
-        // Only 2 entries should be read with this request.
-        streamingEntryReader.asyncReadEntries(6, size * 2 + 1, null);
-        await().until(() -> readComplete.get());
-        assertEquals(entries.size(), 2);
-        // Assert cursor's read position has been properly updated to the 
third entry, since we should only read
-        // 2 retries with previous request
-        assertEquals(cursor.getReadPosition(), 
ledger.getNextValidPosition((PositionImpl) positions.peek()));
-        reset(ledger);
-        readComplete.set(false);
-        streamingEntryReader.asyncReadEntries(6, size * 2 + 1, null);
-        await().until(() -> readComplete.get());
-        readComplete.set(false);
-        streamingEntryReader.asyncReadEntries(6, size * 2 + 1, null);
-        await().until(() -> readComplete.get());
-        readComplete.set(false);
-        streamingEntryReader.asyncReadEntries(6, size * 2 + 1, null);
-        await().until(() -> readComplete.get());
-        assertEquals(cursor.getReadPosition(), 
ledger.getNextValidPosition((PositionImpl) positions.peek()));
-        assertEquals(entries.size(), 8);
-        for (int i = 0; i < entries.size(); i++) {
-            assertEquals(String.format("mmmmmmmmmmessage-%d", i), 
entries.get(i));
-        }
-    }
-
-    @Test
-    public void testCanReadEntryFromMLedgerWaitingForNewEntry() throws 
Exception {
-        AtomicInteger entryCount = new AtomicInteger(0);
-        AtomicBoolean entryProcessed = new AtomicBoolean(false);
-        Stack<Position> positions = new Stack<>();
-        List<String> entries = new ArrayList<>();
-        for (int i = 0; i < 7; i++) {
-            ledger.addEntry(String.format("message-%d", i).getBytes(Encoding));
-        }
-
-        StreamingEntryReader streamingEntryReader =
-                new StreamingEntryReader((ManagedCursorImpl) cursor, 
mockDispatcher, mockTopic);
-
-        doAnswer((InvocationOnMock invocationOnMock) -> {
-                Entry entry = invocationOnMock.getArgument(0, Entry.class);
-                positions.push(entry.getPosition());
-                entries.add(new String(entry.getData()));
-                entryCount.getAndIncrement();
-                cursor.seek(ledger.getNextValidPosition((PositionImpl) 
entry.getPosition()));
-                entryProcessed.set(true);
-                return null;
-            }
-        ).when(mockDispatcher).readEntryComplete(any(Entry.class), 
any(PendingReadEntryRequest.class));
-
-        streamingEntryReader.asyncReadEntries(5,  100, null);
-        await().until(() -> entryCount.get() == 5);
-        assertEquals(cursor.getReadPosition(), 
ledger.getNextValidPosition((PositionImpl) positions.peek()));
-        streamingEntryReader.asyncReadEntries(5, 100, null);
-        // We only write 7 entries initially so only 7 entries can be read.
-        await().until(() -> entryCount.get() == 7);
-        // Add new entry and await for it to be send to reader.
-        entryProcessed.set(false);
-        ledger.addEntry("message-7".getBytes(Encoding));
-        await().until(() -> entryProcessed.get());
-        assertEquals(entries.size(), 8);
-        entryProcessed.set(false);
-        ledger.addEntry("message-8".getBytes(Encoding));
-        await().until(() -> entryProcessed.get());
-        assertEquals(entries.size(), 9);
-        entryProcessed.set(false);
-        ledger.addEntry("message-9".getBytes(Encoding));
-        await().until(() -> entryProcessed.get());
-        assertEquals(entries.size(), 10);
-        assertEquals(cursor.getReadPosition(), 
ledger.getNextValidPosition((PositionImpl) positions.peek()));
-        for (int i = 0; i < entries.size(); i++) {
-            assertEquals(String.format("message-%d", i), entries.get(i));
-        }
-    }
-
-    @Test
-    public void testCanCancelReadEntryRequestAndResumeReading() throws 
Exception {
-        Map<Position, String> messages = new HashMap<>();
-        AtomicInteger count = new AtomicInteger(0);
-        Stack<Position> positions = new Stack<>();
-        List<String> entries = new ArrayList<>();
-
-        for (int i = 0; i < 20; i++) {
-            String msg = String.format("message-%d", i);
-            messages.put(ledger.addEntry(msg.getBytes(Encoding)), msg);
-        }
-
-        StreamingEntryReader streamingEntryReader =
-                new StreamingEntryReader((ManagedCursorImpl) cursor, 
mockDispatcher, mockTopic);
-
-        doAnswer((InvocationOnMock invocationOnMock) -> {
-                Entry entry = invocationOnMock.getArgument(0, Entry.class);
-                positions.push(entry.getPosition());
-                entries.add(new String(entry.getData()));
-                cursor.seek(ledger.getNextValidPosition((PositionImpl) 
entry.getPosition()));
-                return null;
-            }
-        ).when(mockDispatcher).readEntryComplete(any(Entry.class), 
any(PendingReadEntryRequest.class));
-
-        // Only return 5 entries
-        doAnswer(new Answer<Void>() {
-            @Override
-            public Void answer(InvocationOnMock invocationOnMock) {
-                AsyncCallbacks.ReadEntryCallback cb = 
invocationOnMock.getArgument(1, AsyncCallbacks.ReadEntryCallback.class);
-                PositionImpl position = invocationOnMock.getArgument(0, 
PositionImpl.class);
-                int c = count.getAndIncrement();
-                if (c < 5) {
-                    
cb.readEntryComplete(EntryImpl.create(position.getLedgerId(), 
position.getEntryId(),
-                            messages.get(position).getBytes()),
-                            invocationOnMock.getArgument(2));
-                }
-                return null;
-            }
-        }).when(ledger).asyncReadEntry(any(), any(), any());
-
-        streamingEntryReader.asyncReadEntries(20,  200, null);
-        streamingEntryReader.cancelReadRequests();
-        await().until(() -> streamingEntryReader.getState() == 
StreamingEntryReader.State.Canceled);
-        // Only have 5 entry as we make ledger only return 5 entries and 
cancel the request.
-        assertEquals(entries.size(), 5);
-        assertEquals(cursor.getReadPosition(), 
ledger.getNextValidPosition((PositionImpl) positions.peek()));
-        // Clear mock and try to read remaining entries
-        reset(ledger);
-        streamingEntryReader.asyncReadEntries(15,  200, null);
-        streamingEntryReader.cancelReadRequests();
-        await().until(() -> streamingEntryReader.getState() == 
StreamingEntryReader.State.Completed);
-        // Only have 5 entry as we make ledger only return 5 entries and 
cancel the request.
-        assertEquals(entries.size(), 20);
-        assertEquals(cursor.getReadPosition(), 
ledger.getNextValidPosition((PositionImpl) positions.peek()));
-        // Make sure message still returned in order
-        for (int i = 0; i < entries.size(); i++) {
-            assertEquals(String.format("message-%d", i), entries.get(i));
-        }
-    }
-
-    @Test
-    public void testCanHandleExceptionAndRetry() throws Exception {
-        Map<Position, String> messages = new HashMap<>();
-        AtomicBoolean entryProcessed = new AtomicBoolean(false);
-        AtomicInteger count = new AtomicInteger(0);
-        Stack<Position> positions = new Stack<>();
-        List<String> entries = new ArrayList<>();
-        for (int i = 0; i < 12; i++) {
-            String msg = String.format("message-%d", i);
-            messages.put(ledger.addEntry(msg.getBytes(Encoding)), msg);
-        }
-
-        StreamingEntryReader streamingEntryReader =
-                new StreamingEntryReader((ManagedCursorImpl) cursor, 
mockDispatcher, mockTopic);
-
-        doAnswer((InvocationOnMock invocationOnMock) -> {
-                Entry entry = invocationOnMock.getArgument(0, Entry.class);
-                positions.push(entry.getPosition());
-                entries.add(new String(entry.getData()));
-                cursor.seek(ledger.getNextValidPosition((PositionImpl) 
entry.getPosition()));
-
-                if (entries.size() == 6 || entries.size() == 12) {
-                    entryProcessed.set(true);
-                }
-                return null;
-            }
-        ).when(mockDispatcher).readEntryComplete(any(Entry.class), 
any(PendingReadEntryRequest.class));
-
-        // Make reading from mledger throw exception randomly.
-        doAnswer(new Answer<Void>() {
-            @Override
-            public Void answer(InvocationOnMock invocationOnMock) throws 
Throwable {
-                AsyncCallbacks.ReadEntryCallback cb = 
invocationOnMock.getArgument(1, AsyncCallbacks.ReadEntryCallback.class);
-                PositionImpl position = invocationOnMock.getArgument(0, 
PositionImpl.class);
-                int c = count.getAndIncrement();
-                if (c >= 3 && c < 5 || c >= 9 && c < 11) {
-                    cb.readEntryFailed(new 
ManagedLedgerException.TooManyRequestsException("Fake exception."),
-                            invocationOnMock.getArgument(2));
-                } else {
-                    
cb.readEntryComplete(EntryImpl.create(position.getLedgerId(), 
position.getEntryId(),
-                            messages.get(position).getBytes()),
-                            invocationOnMock.getArgument(2));
-                }
-                return null;
-            }
-        }).when(ledger).asyncReadEntry(any(), any(), any());
-
-        streamingEntryReader.asyncReadEntries(6,  100, null);
-        await().until(() -> entryProcessed.get());
-        assertEquals(entries.size(), 6);
-        assertEquals(cursor.getReadPosition(), 
ledger.getNextValidPosition((PositionImpl) positions.peek()));
-        entryProcessed.set(false);
-        streamingEntryReader.asyncReadEntries(6, 100, null);
-        await().until(() -> entryProcessed.get());
-        assertEquals(entries.size(), 12);
-        assertEquals(cursor.getReadPosition(), 
ledger.getNextValidPosition((PositionImpl) positions.peek()));
-        // Make sure message still returned in order
-        for (int i = 0; i < entries.size(); i++) {
-            assertEquals(String.format("message-%d", i), entries.get(i));
-        }
-    }
-
-    @Test
-    public void testWillCancelReadAfterExhaustingRetry() throws Exception {
-        Map<Position, String> messages = new HashMap<>();
-        AtomicInteger count = new AtomicInteger(0);
-        Stack<Position> positions = new Stack<>();
-        List<String> entries = new ArrayList<>();
-        for (int i = 0; i < 12; i++) {
-            String msg = String.format("message-%d", i);
-            messages.put(ledger.addEntry(msg.getBytes(Encoding)), msg);
-        }
-
-        StreamingEntryReader streamingEntryReader =
-                new StreamingEntryReader((ManagedCursorImpl) cursor, 
mockDispatcher, mockTopic);
-
-        doAnswer((InvocationOnMock invocationOnMock) -> {
-                    Entry entry = invocationOnMock.getArgument(0, Entry.class);
-                    positions.push(entry.getPosition());
-                    cursor.seek(ledger.getNextValidPosition((PositionImpl) 
entry.getPosition()));
-                    entries.add(new String(entry.getData()));
-                    return null;
-                }
-        ).when(mockDispatcher).readEntryComplete(any(Entry.class), 
any(PendingReadEntryRequest.class));
-
-        // Fail after first 3 read.
-        doAnswer(new Answer<Void>() {
-            @Override
-            public Void answer(InvocationOnMock invocationOnMock) throws 
Throwable {
-                AsyncCallbacks.ReadEntryCallback cb = 
invocationOnMock.getArgument(1, AsyncCallbacks.ReadEntryCallback.class);
-                PositionImpl position = invocationOnMock.getArgument(0, 
PositionImpl.class);
-                int c = count.getAndIncrement();
-                if (c >= 3) {
-                    cb.readEntryFailed(new 
ManagedLedgerException.TooManyRequestsException("Fake exception."),
-                            invocationOnMock.getArgument(2));
-                } else {
-                    
cb.readEntryComplete(EntryImpl.create(position.getLedgerId(), 
position.getEntryId(),
-                            messages.get(position).getBytes()),
-                            invocationOnMock.getArgument(2));
-                }
-                return null;
-            }
-        }).when(ledger).asyncReadEntry(any(), any(), any());
-
-        streamingEntryReader.asyncReadEntries(5,  100, null);
-        await().until(() -> streamingEntryReader.getState() == 
StreamingEntryReader.State.Completed);
-        // Issued 5 read, should only have 3 entries as others were canceled 
after exhausting retries.
-        assertEquals(entries.size(), 3);
-        for (int i = 0; i < entries.size(); i++) {
-            assertEquals(String.format("message-%d", i), entries.get(i));
-        }
-        reset(ledger);
-        streamingEntryReader.asyncReadEntries(5,  100, null);
-        await().until(() -> streamingEntryReader.getState() == 
StreamingEntryReader.State.Completed);
-        assertEquals(entries.size(), 8);
-        for (int i = 0; i < entries.size(); i++) {
-            assertEquals(String.format("message-%d", i), entries.get(i));
-        }
-    }
-
-}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java
index ed85ffd600e..4f4affc39d3 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java
@@ -1228,7 +1228,6 @@ public class MessageDispatchThrottlingTest extends 
ProducerConsumerBase {
         conf.setManagedLedgerMinimumBacklogCursorsForCaching(2);
         conf.setManagedLedgerMinimumBacklogEntriesForCaching(10);
         conf.setManagedLedgerCacheEvictionTimeThresholdMillis(60 * 1000);
-        conf.setStreamingDispatch(false);
         restartBroker();
         final long totalMessages = 200;
         final int receiverSize = 10;


Reply via email to