This is an automated email from the ASF dual-hosted git repository.
mmarshall pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 09e646f72eb [cleanup] Remove Streaming Dispatcher Code (#20279)
09e646f72eb is described below
commit 09e646f72eb640987ca9162988b6b146c485cd31
Author: Michael Marshall <[email protected]>
AuthorDate: Wed May 17 16:57:16 2023 -0500
[cleanup] Remove Streaming Dispatcher Code (#20279)
Discussed on mailing list here:
https://lists.apache.org/thread/ky2bkzlz93njx3ntnvkpd0l77qzzgcmv
Fixes: https://github.com/apache/pulsar/issues/19088
Fixes: https://github.com/apache/pulsar/issues/16450
Fixes: https://github.com/apache/pulsar/issues/15422
Fixes: https://github.com/apache/pulsar/issues/11428
### Motivation
I am putting this PR together to move the mailing list conversation along.
Copied from @eolivelli on the mailing list thread:
> There are many flaky tests about that feature and I believe that it
has never been used in Production by anyone, because it happened a few
times that we did some changes in the regular Dispatcher and
introduced bugs on the StreamingDispacther (usually manifested as
flaky tests)
### Modifications
* Remove all `StreamingDispatcher` code, tests, and configuration.
### Verifying this change
It should be sufficient to see the tests pass.
### Does this pull request potentially affect one of the following parts:
This affects configuration.
### Documentation
- [x] `doc-not-needed`
There are no references to "streaming dispatcher" or "streamingdispatcher"
in our github `pulsar-site` repo, which indicates to me that no docs need to be
updated.
### Matching PR in forked repository
PR in forked repository: https://github.com/michaeljmarshall/pulsar/pull/46
---
.../apache/pulsar/broker/ServiceConfiguration.java | 7 -
...istentStreamingDispatcherMultipleConsumers.java | 240 -----------
...entStreamingDispatcherSingleActiveConsumer.java | 233 -----------
.../service/persistent/PersistentSubscription.java | 17 +-
.../streamingdispatch/PendingReadEntryRequest.java | 76 ----
.../streamingdispatch/StreamingDispatcher.java | 53 ---
.../streamingdispatch/StreamingEntryReader.java | 342 ----------------
.../service/streamingdispatch/package-info.java | 19 -
...herFailoverConsumerStreamingDispatcherTest.java | 37 --
...rsistentFailoverStreamingDispatcherE2ETest.java | 38 --
...istentStreamingDispatcherBlockConsumerTest.java | 38 --
...eDispatchStreamingDispatcherThrottlingTest.java | 39 --
.../PersistentTopicStreamingDispatcherE2ETest.java | 42 --
.../PersistentTopicStreamingDispatcherTest.java | 41 --
...roducerConsumerTestStreamingDispatcherTest.java | 36 --
.../StreamingEntryReaderTests.java | 441 ---------------------
.../client/api/MessageDispatchThrottlingTest.java | 1 -
17 files changed, 3 insertions(+), 1697 deletions(-)
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 8668c17c3d8..9966912bc8e 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -1174,13 +1174,6 @@ public class ServiceConfiguration implements
PulsarConfiguration {
)
private boolean allowOverrideEntryFilters = false;
- @FieldContext(
- category = CATEGORY_SERVER,
- doc = "Whether to use streaming read dispatcher. Currently is in
preview and can be changed "
- + "in subsequent release."
- )
- private boolean streamingDispatch = false;
-
@FieldContext(
dynamic = true,
category = CATEGORY_SERVER,
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherMultipleConsumers.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherMultipleConsumers.java
deleted file mode 100644
index 44e8a423344..00000000000
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherMultipleConsumers.java
+++ /dev/null
@@ -1,240 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.broker.service.persistent;
-
-import com.google.common.collect.Lists;
-import java.util.Set;
-import java.util.concurrent.Executor;
-import java.util.concurrent.TimeUnit;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.bookkeeper.mledger.Entry;
-import org.apache.bookkeeper.mledger.ManagedCursor;
-import org.apache.bookkeeper.mledger.Position;
-import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
-import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
-import org.apache.bookkeeper.mledger.impl.PositionImpl;
-import org.apache.commons.lang3.tuple.Pair;
-import org.apache.pulsar.broker.service.Subscription;
-import
org.apache.pulsar.broker.service.streamingdispatch.PendingReadEntryRequest;
-import org.apache.pulsar.broker.service.streamingdispatch.StreamingDispatcher;
-import org.apache.pulsar.broker.service.streamingdispatch.StreamingEntryReader;
-
-/**
- * A {@link PersistentDispatcherMultipleConsumers} implemented {@link
StreamingDispatcher}.
- * It'll use {@link StreamingEntryReader} to read new entries instead read as
micro batch from managed ledger.
- */
-@Slf4j
-public class PersistentStreamingDispatcherMultipleConsumers extends
PersistentDispatcherMultipleConsumers
- implements StreamingDispatcher {
-
- private int sendingTaskCounter = 0;
- private final StreamingEntryReader streamingEntryReader = new
StreamingEntryReader((ManagedCursorImpl) cursor,
- this, topic);
- private final Executor topicExecutor;
-
- public PersistentStreamingDispatcherMultipleConsumers(PersistentTopic
topic, ManagedCursor cursor,
- Subscription
subscription) {
- super(topic, cursor, subscription);
- this.topicExecutor =
topic.getBrokerService().getTopicOrderedExecutor().chooseThread(topic.getName());
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public synchronized void readEntryComplete(Entry entry,
PendingReadEntryRequest ctx) {
-
- ReadType readType = (ReadType) ctx.ctx;
- if (ctx.isLast()) {
- readFailureBackoff.reduceToHalf();
- if (readType == ReadType.Normal) {
- havePendingRead = false;
- } else {
- havePendingReplayRead = false;
- }
- }
-
- if (readBatchSize < serviceConfig.getDispatcherMaxReadBatchSize()) {
- int newReadBatchSize = Math.min(readBatchSize * 2,
serviceConfig.getDispatcherMaxReadBatchSize());
- if (log.isDebugEnabled()) {
- log.debug("[{}] Increasing read batch size from {} to {}",
name, readBatchSize, newReadBatchSize);
- }
- readBatchSize = newReadBatchSize;
- }
-
- if (shouldRewindBeforeReadingOrReplaying && readType ==
ReadType.Normal) {
- // All consumers got disconnected before the completion of the
read operation
- entry.release();
- cursor.rewind();
- shouldRewindBeforeReadingOrReplaying = false;
- readMoreEntries();
- return;
- }
-
- if (log.isDebugEnabled()) {
- log.debug("[{}] Distributing a messages to {} consumers", name,
consumerList.size());
- }
-
- cursor.seek(((ManagedLedgerImpl) cursor.getManagedLedger())
- .getNextValidPosition((PositionImpl) entry.getPosition()));
-
- long size = entry.getLength();
- updatePendingBytesToDispatch(size);
- // dispatch messages to a separate thread, but still in order for this
subscription
- // sendMessagesToConsumers is responsible for running broker-side
filters
- // that may be quite expensive
- if (serviceConfig.isDispatcherDispatchMessagesInSubscriptionThread()) {
- // setting sendInProgress here, because sendMessagesToConsumers
will be executed
- // in a separate thread, and we want to prevent more reads
- acquireSendInProgress();
- dispatchMessagesThread.execute(() -> {
- if (sendMessagesToConsumers(readType,
Lists.newArrayList(entry), false)) {
- readMoreEntries();
- } else {
- updatePendingBytesToDispatch(-size);
- }
- });
- } else {
- if (sendMessagesToConsumers(readType, Lists.newArrayList(entry),
true)) {
- readMoreEntriesAsync();
- } else {
- updatePendingBytesToDispatch(-size);
- }
- }
- ctx.recycle();
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public void canReadMoreEntries(boolean withBackoff) {
- havePendingRead = false;
- topic.getBrokerService().executor().schedule(() -> {
- topicExecutor.execute(() -> {
- synchronized
(PersistentStreamingDispatcherMultipleConsumers.this) {
- if (!havePendingRead) {
- log.info("[{}] Scheduling read operation", name);
- readMoreEntries();
- } else {
- log.info("[{}] Skipping read since we have
pendingRead", name);
- }
- }
- });
- }, withBackoff
- ? readFailureBackoff.next() : 0, TimeUnit.MILLISECONDS);
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public void notifyConsumersEndOfTopic() {
- if (cursor.getNumberOfEntriesInBacklog(false) == 0) {
- // Topic has been terminated and there are no more entries to read
- // Notify the consumer only if all the messages were already
acknowledged
- checkAndApplyReachedEndOfTopicOrTopicMigration(consumerList);
- }
- }
-
- @Override
- protected void cancelPendingRead() {
- if (havePendingRead && streamingEntryReader.cancelReadRequests()) {
- havePendingRead = false;
- }
- }
-
- @Override
- protected synchronized void acquireSendInProgress() {
- sendingTaskCounter++;
- }
-
- @Override
- protected synchronized void releaseSendInProgress() {
- sendingTaskCounter--;
- }
-
- @Override
- protected synchronized boolean isSendInProgress() {
- return sendingTaskCounter > 0;
- }
-
- @Override
- public synchronized void readMoreEntries() {
- if (isSendInProgress()) {
- // we cannot read more entries while sending the previous batch
- // otherwise we could re-read the same entries and send duplicates
- return;
- }
- // totalAvailablePermits may be updated by other threads
- int currentTotalAvailablePermits = totalAvailablePermits;
- if (currentTotalAvailablePermits > 0 &&
isAtleastOneConsumerAvailable()) {
- Pair<Integer, Long> calculateResult =
calculateToRead(currentTotalAvailablePermits);
- int messagesToRead = calculateResult.getLeft();
- long bytesToRead = calculateResult.getRight();
- if (-1 == messagesToRead || bytesToRead == -1) {
- // Skip read as topic/dispatcher has exceed the dispatch rate
or previous pending read hasn't complete.
- return;
- }
-
- Set<PositionImpl> messagesToReplayNow =
getMessagesToReplayNow(messagesToRead);
-
- if (!messagesToReplayNow.isEmpty()) {
- if (log.isDebugEnabled()) {
- log.debug("[{}] Schedule replay of {} messages for {}
consumers", name, messagesToReplayNow.size(),
- consumerList.size());
- }
-
- havePendingReplayRead = true;
- Set<? extends Position> deletedMessages =
topic.isDelayedDeliveryEnabled()
- ? asyncReplayEntriesInOrder(messagesToReplayNow) :
asyncReplayEntries(messagesToReplayNow);
- // clear already acked positions from replay bucket
-
- deletedMessages.forEach(position ->
redeliveryMessages.remove(((PositionImpl) position).getLedgerId(),
- ((PositionImpl) position).getEntryId()));
- // if all the entries are acked-entries and cleared up from
redeliveryMessages, try to read
- // next entries as readCompletedEntries-callback was never
called
- if ((messagesToReplayNow.size() - deletedMessages.size()) ==
0) {
- havePendingReplayRead = false;
- // We should not call readMoreEntries() recursively in the
same thread
- // as there is a risk of StackOverflowError
-
topic.getBrokerService().executor().execute(this::readMoreEntries);
- }
- } else if (BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER.get(this) ==
TRUE) {
- log.debug("[{}] Dispatcher read is blocked due to
unackMessages {} reached to max {}", name,
- totalUnackedMessages,
topic.getMaxUnackedMessagesOnSubscription());
- } else if (!havePendingRead) {
- if (log.isDebugEnabled()) {
- log.debug("[{}] Schedule read of {} messages for {}
consumers", name, messagesToRead,
- consumerList.size());
- }
- havePendingRead = true;
- streamingEntryReader.asyncReadEntries(messagesToRead,
bytesToRead,
- ReadType.Normal);
- } else {
- log.debug("[{}] Cannot schedule next read until previous one
is done", name);
- }
- } else {
- if (log.isDebugEnabled()) {
- log.debug("[{}] Consumer buffer is full, pause reading", name);
- }
- }
- }
-
-}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherSingleActiveConsumer.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherSingleActiveConsumer.java
deleted file mode 100644
index efe9de778a3..00000000000
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherSingleActiveConsumer.java
+++ /dev/null
@@ -1,233 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.broker.service.persistent;
-
-import static
org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH;
-import com.google.common.collect.Lists;
-import java.util.concurrent.Executor;
-import java.util.concurrent.TimeUnit;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.bookkeeper.mledger.Entry;
-import org.apache.bookkeeper.mledger.ManagedCursor;
-import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
-import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
-import org.apache.bookkeeper.mledger.impl.PositionImpl;
-import org.apache.commons.lang3.tuple.Pair;
-import org.apache.pulsar.broker.service.Consumer;
-import org.apache.pulsar.broker.service.EntryBatchIndexesAcks;
-import org.apache.pulsar.broker.service.EntryBatchSizes;
-import org.apache.pulsar.broker.service.SendMessageInfo;
-import org.apache.pulsar.broker.service.Subscription;
-import
org.apache.pulsar.broker.service.streamingdispatch.PendingReadEntryRequest;
-import org.apache.pulsar.broker.service.streamingdispatch.StreamingDispatcher;
-import org.apache.pulsar.broker.service.streamingdispatch.StreamingEntryReader;
-import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
-
-/**
- * A {@link PersistentDispatcherSingleActiveConsumer} implemented {@link
StreamingDispatcher}.
- * It'll use {@link StreamingEntryReader} to read new entries instead read as
micro batch from managed ledger.
- */
-@Slf4j
-public class PersistentStreamingDispatcherSingleActiveConsumer extends
PersistentDispatcherSingleActiveConsumer
- implements StreamingDispatcher {
-
- private final StreamingEntryReader streamingEntryReader = new
StreamingEntryReader((ManagedCursorImpl) cursor,
- this, topic);
-
- private final Executor dispatcherExecutor;
-
- public PersistentStreamingDispatcherSingleActiveConsumer(ManagedCursor
cursor, SubType subscriptionType,
- int
partitionIndex, PersistentTopic topic,
- Subscription
subscription) {
- super(cursor, subscriptionType, partitionIndex, topic, subscription);
- this.dispatcherExecutor =
topic.getBrokerService().getTopicOrderedExecutor().chooseThread(name);
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public void canReadMoreEntries(boolean withBackoff) {
- havePendingRead = false;
- topic.getBrokerService().executor().schedule(() -> {
- topicExecutor.execute(() -> {
- synchronized
(PersistentStreamingDispatcherSingleActiveConsumer.this) {
- Consumer currentConsumer =
ACTIVE_CONSUMER_UPDATER.get(this);
- if (currentConsumer != null && !havePendingRead) {
- if (log.isDebugEnabled()) {
- log.debug("[{}-{}] Scheduling read ", name,
currentConsumer);
- }
- readMoreEntries(currentConsumer);
- } else {
- log.info("[{}-{}] Skipping read as we still
havePendingRead {}", name,
- currentConsumer, havePendingRead);
- }
- }
- });
- }, withBackoff
- ? readFailureBackoff.next() : 0, TimeUnit.MILLISECONDS);
- }
-
- @Override
- protected void cancelPendingRead() {
- if (havePendingRead && streamingEntryReader.cancelReadRequests()) {
- havePendingRead = false;
- }
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public synchronized void notifyConsumersEndOfTopic() {
- if (cursor.getNumberOfEntriesInBacklog(false) == 0) {
- // Topic has been terminated and there are no more entries to read
- // Notify the consumer only if all the messages were already
acknowledged
- checkAndApplyReachedEndOfTopicOrTopicMigration(consumers);
- }
- }
-
- @Override
- public String getName() {
- return name;
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public void readEntryComplete(Entry entry, PendingReadEntryRequest ctx) {
- dispatcherExecutor.execute(() -> internalReadEntryComplete(entry,
ctx));
- }
-
- public synchronized void internalReadEntryComplete(Entry entry,
PendingReadEntryRequest ctx) {
- if (ctx.isLast()) {
- readFailureBackoff.reduceToHalf();
- havePendingRead = false;
- }
-
- isFirstRead = false;
-
- if (readBatchSize < serviceConfig.getDispatcherMaxReadBatchSize()) {
- int newReadBatchSize = Math.min(readBatchSize * 2,
serviceConfig.getDispatcherMaxReadBatchSize());
- if (log.isDebugEnabled()) {
- log.debug("[{}-{}] Increasing read batch size from {} to {}",
name,
- ((Consumer) ctx.ctx).consumerName(), readBatchSize,
newReadBatchSize);
- }
- readBatchSize = newReadBatchSize;
- }
-
- Consumer currentConsumer = ACTIVE_CONSUMER_UPDATER.get(this);
-
- if (isKeyHashRangeFiltered) {
- byte[] key = peekStickyKey(entry.getDataBuffer());
- Consumer consumer = stickyKeyConsumerSelector.select(key);
- // Skip the entry if it's not for current active consumer.
- if (consumer == null || currentConsumer != consumer) {
- entry.release();
- return;
- }
- }
- Consumer consumer = (Consumer) ctx.ctx;
- ctx.recycle();
- if (currentConsumer == null || consumer != currentConsumer) {
- // Active consumer has changed since the read request has been
issued. We need to rewind the cursor and
- // re-issue the read request for the new consumer
- if (log.isDebugEnabled()) {
- log.debug("[{}] Rewind because no available consumer found to
dispatch message to.", name);
- }
-
- entry.release();
- streamingEntryReader.cancelReadRequests();
- havePendingRead = false;
- if (currentConsumer != null) {
- notifyActiveConsumerChanged(currentConsumer);
- readMoreEntries(currentConsumer);
- }
- } else {
- EntryBatchSizes batchSizes = EntryBatchSizes.get(1);
- SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal();
- EntryBatchIndexesAcks batchIndexesAcks =
EntryBatchIndexesAcks.get(1);
- filterEntriesForConsumer(Lists.newArrayList(entry), batchSizes,
sendMessageInfo, batchIndexesAcks,
- cursor, false, consumer);
- // Update cursor's read position.
- cursor.seek(((ManagedLedgerImpl) cursor.getManagedLedger())
- .getNextValidPosition((PositionImpl) entry.getPosition()));
- dispatchEntriesToConsumer(currentConsumer,
Lists.newArrayList(entry), batchSizes,
- batchIndexesAcks, sendMessageInfo, DEFAULT_CONSUMER_EPOCH);
- }
- }
-
- @Override
- protected void readMoreEntries(Consumer consumer) {
- // consumer can be null when all consumers are disconnected from
broker.
- // so skip reading more entries if currently there is no active
consumer.
- if (null == consumer) {
- if (log.isDebugEnabled()) {
- log.debug("[{}] Skipping read for the topic, Due to the
current consumer is null", topic.getName());
- }
- return;
- }
- if (havePendingRead) {
- if (log.isDebugEnabled()) {
- log.debug("[{}] Skipping read for the topic, Due to we have
pending read.", topic.getName());
- }
- return;
- }
-
- if (consumer.getAvailablePermits() > 0) {
- synchronized (this) {
- if (havePendingRead) {
- if (log.isDebugEnabled()) {
- log.debug("[{}] Skipping read for the topic, Due to we
have pending read.", topic.getName());
- }
- return;
- }
-
- Pair<Integer, Long> calculateResult =
calculateToRead(consumer);
- int messagesToRead = calculateResult.getLeft();
- long bytesToRead = calculateResult.getRight();
-
-
- if (-1 == messagesToRead || bytesToRead == -1) {
- // Skip read as topic/dispatcher has exceed the dispatch
rate.
- return;
- }
-
- // Schedule read
- if (log.isDebugEnabled()) {
- log.debug("[{}-{}] Schedule read of {} messages", name,
consumer, messagesToRead);
- }
- havePendingRead = true;
-
- if (consumer.readCompacted()) {
- topic.getCompactedTopic().asyncReadEntriesOrWait(cursor,
messagesToRead, isFirstRead,
- this, consumer);
- } else {
- streamingEntryReader.asyncReadEntries(messagesToRead,
bytesToRead, consumer);
- }
- }
- } else {
- if (log.isDebugEnabled()) {
- log.debug("[{}-{}] Consumer buffer is full, pause reading",
name, consumer);
- }
- }
- }
-
-}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index d283cac77c7..09dabcd4bfc 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -223,26 +223,18 @@ public class PersistentSubscription extends
AbstractSubscription implements Subs
if (dispatcher == null || !dispatcher.isConsumerConnected()) {
Dispatcher previousDispatcher = null;
- boolean useStreamingDispatcher =
topic.getBrokerService().getPulsar()
- .getConfiguration().isStreamingDispatch();
switch (consumer.subType()) {
case Exclusive:
if (dispatcher == null || dispatcher.getType() !=
SubType.Exclusive) {
previousDispatcher = dispatcher;
- dispatcher = useStreamingDispatcher
- ? new
PersistentStreamingDispatcherSingleActiveConsumer(
- cursor, SubType.Exclusive, 0, topic,
this)
- : new
PersistentDispatcherSingleActiveConsumer(
+ dispatcher = new
PersistentDispatcherSingleActiveConsumer(
cursor, SubType.Exclusive, 0, topic,
this);
}
break;
case Shared:
if (dispatcher == null || dispatcher.getType() !=
SubType.Shared) {
previousDispatcher = dispatcher;
- dispatcher = useStreamingDispatcher
- ? new
PersistentStreamingDispatcherMultipleConsumers(
- topic, cursor, this)
- : new
PersistentDispatcherMultipleConsumers(topic, cursor, this);
+ dispatcher = new
PersistentDispatcherMultipleConsumers(topic, cursor, this);
}
break;
case Failover:
@@ -256,10 +248,7 @@ public class PersistentSubscription extends
AbstractSubscription implements Subs
if (dispatcher == null || dispatcher.getType() !=
SubType.Failover) {
previousDispatcher = dispatcher;
- dispatcher = useStreamingDispatcher
- ? new
PersistentStreamingDispatcherSingleActiveConsumer(
- cursor, SubType.Failover,
partitionIndex, topic, this) :
- new
PersistentDispatcherSingleActiveConsumer(cursor, SubType.Failover,
+ dispatcher = new
PersistentDispatcherSingleActiveConsumer(cursor, SubType.Failover,
partitionIndex, topic, this);
}
break;
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/streamingdispatch/PendingReadEntryRequest.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/streamingdispatch/PendingReadEntryRequest.java
deleted file mode 100644
index e98e5de07c1..00000000000
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/streamingdispatch/PendingReadEntryRequest.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.broker.service.streamingdispatch;
-
-import io.netty.util.Recycler;
-import lombok.Data;
-import org.apache.bookkeeper.mledger.Entry;
-import org.apache.bookkeeper.mledger.ManagedLedger;
-import org.apache.bookkeeper.mledger.impl.PositionImpl;
-
-/**
- * Representing a pending read request to read an entry from {@link
ManagedLedger} carrying necessary context.
- */
-@Data
-public class PendingReadEntryRequest {
-
- private static final Recycler<PendingReadEntryRequest> RECYCLER = new
Recycler<PendingReadEntryRequest>() {
- protected PendingReadEntryRequest
newObject(Recycler.Handle<PendingReadEntryRequest> handle) {
- return new PendingReadEntryRequest(handle);
- }
- };
-
- public static PendingReadEntryRequest create(Object ctx, PositionImpl
position) {
- PendingReadEntryRequest pendingReadEntryRequest = RECYCLER.get();
- pendingReadEntryRequest.ctx = ctx;
- pendingReadEntryRequest.position = position;
- pendingReadEntryRequest.retry = 0;
- pendingReadEntryRequest.isLast = false;
- return pendingReadEntryRequest;
- }
-
- public void recycle() {
- entry = null;
- ctx = null;
- position = null;
- retry = -1;
- recyclerHandle.recycle(this);
- }
-
- public boolean isLastRequest() {
- return isLast;
- }
-
- private final Recycler.Handle<PendingReadEntryRequest> recyclerHandle;
-
- // Entry read from ledger
- public Entry entry;
-
- // Passed in context that'll be pass to callback
- public Object ctx;
-
- // Position of entry to be read
- public PositionImpl position;
-
- // Number of time request has been retried.
- int retry;
-
- // If request is the last one of a set of requests.
- boolean isLast;
-}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/streamingdispatch/StreamingDispatcher.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/streamingdispatch/StreamingDispatcher.java
deleted file mode 100644
index 4e9e8befe0f..00000000000
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/streamingdispatch/StreamingDispatcher.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.broker.service.streamingdispatch;
-
-import org.apache.bookkeeper.mledger.Entry;
-import org.apache.bookkeeper.mledger.ManagedLedger;
-import org.apache.pulsar.broker.service.Dispatcher;
-import org.apache.pulsar.common.classification.InterfaceStability;
-
-/**
- * A {@link Dispatcher} that'll use {@link StreamingEntryReader} to read
entries from {@link ManagedLedger}.
- */
[email protected]
-public interface StreamingDispatcher extends Dispatcher {
-
- /**
- * Notify dispatcher issued read entry request has complete.
- * @param entry Entry read.
- * @param ctx Context passed in when issuing read entries request.
- */
- void readEntryComplete(Entry entry, PendingReadEntryRequest ctx);
-
- /**
- * Notify dispatcher can issue next read request.
- */
- void canReadMoreEntries(boolean withBackoff);
-
- /**
- * Notify dispatcher to inform consumers reached end of topic.
- */
- void notifyConsumersEndOfTopic();
-
- /**
- * @return Name of the dispatcher.
- */
- String getName();
-}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/streamingdispatch/StreamingEntryReader.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/streamingdispatch/StreamingEntryReader.java
deleted file mode 100644
index 6ffc5ba0f62..00000000000
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/streamingdispatch/StreamingEntryReader.java
+++ /dev/null
@@ -1,342 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.broker.service.streamingdispatch;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Queue;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.Executor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.bookkeeper.mledger.AsyncCallbacks;
-import org.apache.bookkeeper.mledger.Entry;
-import org.apache.bookkeeper.mledger.ManagedLedgerException;
-import org.apache.bookkeeper.mledger.Position;
-import org.apache.bookkeeper.mledger.WaitingEntryCallBack;
-import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
-import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
-import org.apache.bookkeeper.mledger.impl.PositionImpl;
-import org.apache.pulsar.broker.service.persistent.PersistentTopic;
-import
org.apache.pulsar.broker.transaction.exception.buffer.TransactionBufferException;
-import org.apache.pulsar.client.impl.Backoff;
-
-/**
- * Entry reader that fulfill read request by streamline the read instead of
reading with micro batch.
- */
-@Slf4j
-public class StreamingEntryReader implements AsyncCallbacks.ReadEntryCallback,
WaitingEntryCallBack {
-
- private final int maxRetry = 3;
-
- // Queue for read request issued yet waiting for complete from managed
ledger.
- private ConcurrentLinkedQueue<PendingReadEntryRequest> issuedReads = new
ConcurrentLinkedQueue<>();
-
- // Queue for read request that's wait for new entries from managed ledger.
- private ConcurrentLinkedQueue<PendingReadEntryRequest> pendingReads = new
ConcurrentLinkedQueue<>();
-
- private final ManagedCursorImpl cursor;
-
- private final StreamingDispatcher dispatcher;
-
- private final PersistentTopic topic;
-
- private final Executor topicExecutor;
-
- private final Executor dispatcherExecutor;
-
- private AtomicInteger currentReadSizeByte = new AtomicInteger(0);
-
- private volatile State state;
-
- private static final AtomicReferenceFieldUpdater<StreamingEntryReader,
State> STATE_UPDATER =
- AtomicReferenceFieldUpdater.newUpdater(StreamingEntryReader.class,
State.class, "state");
-
- private volatile long maxReadSizeByte;
-
- private final Backoff readFailureBackoff = new Backoff(10,
TimeUnit.MILLISECONDS,
- 1, TimeUnit.SECONDS, 0, TimeUnit.MILLISECONDS);
-
- public StreamingEntryReader(ManagedCursorImpl cursor, StreamingDispatcher
dispatcher, PersistentTopic topic) {
- this.cursor = cursor;
- this.dispatcher = dispatcher;
- this.topic = topic;
- this.topicExecutor =
topic.getBrokerService().getTopicOrderedExecutor().chooseThread(topic.getName());
- this.dispatcherExecutor =
topic.getBrokerService().getTopicOrderedExecutor().chooseThread(dispatcher.getName());
- }
-
- /**
- * Read entries in streaming way, that said instead of reading with micro
batch and send entries to consumer after
- * all entries in the batch are read from ledger, this method will fire
numEntriesToRead requests to managedLedger
- * and send entry to consumer whenever it is read and all entries before
it have been sent to consumer.
- * @param numEntriesToRead number of entry to read from ledger.
- * @param maxReadSizeByte maximum byte will be read from ledger.
- * @param ctx Context send along with read request.
- */
- public synchronized void asyncReadEntries(int numEntriesToRead, long
maxReadSizeByte, Object ctx) {
- if (STATE_UPDATER.compareAndSet(this, State.Canceling,
State.Canceled)) {
- internalCancelReadRequests();
- }
-
- if (!issuedReads.isEmpty() || !pendingReads.isEmpty()) {
- if (log.isDebugEnabled()) {
- log.debug("[{}] There's pending streaming read not completed
yet. Not scheduling next read request.",
- cursor.getName());
- }
- return;
- }
-
- PositionImpl nextReadPosition = (PositionImpl)
cursor.getReadPosition();
- ManagedLedgerImpl managedLedger = (ManagedLedgerImpl)
cursor.getManagedLedger();
- // Edge case, when a old ledger is full and new ledger is not yet
opened, position can point to next
- // position of the last confirmed position, but it'll be an invalid
position. So try to update the position.
- if (!managedLedger.isValidPosition(nextReadPosition)) {
- nextReadPosition =
managedLedger.getNextValidPosition(nextReadPosition);
- }
- boolean hasEntriesToRead =
managedLedger.hasMoreEntries(nextReadPosition);
- currentReadSizeByte.set(0);
- STATE_UPDATER.set(this, State.Issued);
- this.maxReadSizeByte = maxReadSizeByte;
- for (int c = 0; c < numEntriesToRead; c++) {
- PendingReadEntryRequest pendingReadEntryRequest =
PendingReadEntryRequest.create(ctx, nextReadPosition);
- // Make sure once we start putting request into pending requests
queue, we won't put any following request
- // to issued requests queue in order to guarantee the order.
- if (hasEntriesToRead &&
managedLedger.hasMoreEntries(nextReadPosition)) {
- issuedReads.offer(pendingReadEntryRequest);
- } else {
- pendingReads.offer(pendingReadEntryRequest);
- }
- nextReadPosition =
managedLedger.getNextValidPosition(nextReadPosition);
- }
-
- // Issue requests.
- for (PendingReadEntryRequest request : issuedReads) {
- managedLedger.asyncReadEntry(request.position, this, request);
- }
-
- if (!pendingReads.isEmpty()) {
- if (log.isDebugEnabled()) {
- log.debug("[{}} Streaming entry reader has {} pending read
requests waiting on new entry."
- , cursor.getName(), pendingReads.size());
- }
- // If new entries are available after we put request into pending
queue, fire read.
- // Else register callback with managed ledger to get notify when
new entries are available.
- if (managedLedger.hasMoreEntries(pendingReads.peek().position)) {
- entriesAvailable();
- } else if (managedLedger.isTerminated()) {
- dispatcher.notifyConsumersEndOfTopic();
- cleanQueue(pendingReads);
- if (issuedReads.size() == 0) {
- dispatcher.canReadMoreEntries(true);
- }
- } else {
- managedLedger.addWaitingEntryCallBack(this);
- }
- }
- }
-
- @Override
- public void readEntryComplete(Entry entry, Object ctx) {
- // Don't block caller thread, complete read entry with dispatcher
dedicated thread.
- dispatcherExecutor.execute(() -> internalReadEntryComplete(entry,
ctx));
- }
-
- private void internalReadEntryComplete(Entry entry, Object ctx) {
- PendingReadEntryRequest pendingReadEntryRequest =
(PendingReadEntryRequest) ctx;
- pendingReadEntryRequest.entry = entry;
- readFailureBackoff.reduceToHalf();
- Entry readEntry;
- // If we have entry to send to dispatcher.
- if (!issuedReads.isEmpty() && issuedReads.peek() ==
pendingReadEntryRequest) {
- while (!issuedReads.isEmpty() && issuedReads.peek().entry != null)
{
- PendingReadEntryRequest firstPendingReadEntryRequest =
issuedReads.poll();
- readEntry = firstPendingReadEntryRequest.entry;
- currentReadSizeByte.addAndGet(readEntry.getLength());
- //Cancel remaining requests and reset cursor if
maxReadSizeByte exceeded.
- if (currentReadSizeByte.get() > maxReadSizeByte) {
- cancelReadRequests(readEntry.getPosition());
- dispatcher.canReadMoreEntries(false);
- STATE_UPDATER.set(this, State.Completed);
- return;
- } else {
- // All request has been completed, mark returned entry as
last.
- if (issuedReads.isEmpty() && pendingReads.isEmpty()) {
- firstPendingReadEntryRequest.isLast = true;
- STATE_UPDATER.set(this, State.Completed);
- }
- dispatcher.readEntryComplete(readEntry,
firstPendingReadEntryRequest);
- }
- }
- } else if (!issuedReads.isEmpty() && issuedReads.peek().retry >
maxRetry) {
- cancelReadRequests(issuedReads.peek().position);
- dispatcher.canReadMoreEntries(true);
- STATE_UPDATER.set(this, State.Completed);
- }
- }
-
- @Override
- public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
- // Don't block caller thread, complete read entry fail with dispatcher
dedicated thread.
- dispatcherExecutor.execute(() -> internalReadEntryFailed(exception,
ctx));
- }
-
- private void internalReadEntryFailed(ManagedLedgerException exception,
Object ctx) {
- PendingReadEntryRequest pendingReadEntryRequest =
(PendingReadEntryRequest) ctx;
- PositionImpl readPosition = pendingReadEntryRequest.position;
- pendingReadEntryRequest.retry++;
- long waitTimeMillis = readFailureBackoff.next();
- if (exception.getCause() instanceof
TransactionBufferException.TransactionNotSealedException
- || exception.getCause() instanceof
ManagedLedgerException.OffloadReadHandleClosedException) {
- waitTimeMillis = 1;
- if (log.isDebugEnabled()) {
- log.debug("[{}] Error reading transaction entries : {}, -
Retrying to read in {} seconds",
- cursor.getName(), exception.getMessage(),
waitTimeMillis / 1000.0);
- }
- } else if (!(exception instanceof
ManagedLedgerException.TooManyRequestsException)) {
- log.error("[{} Error reading entries at {} : {} - Retrying to read
in {} seconds", cursor.getName(),
- readPosition, exception.getMessage(), waitTimeMillis /
1000.0);
- } else {
- if (log.isDebugEnabled()) {
- log.debug("[{}] Got throttled by bookies while reading at {} :
{} - Retrying to read in {} seconds",
- cursor.getName(), readPosition,
exception.getMessage(), waitTimeMillis / 1000.0);
- }
- }
- if (!issuedReads.isEmpty()) {
- if (issuedReads.peek().retry > maxRetry) {
- cancelReadRequests(issuedReads.peek().position);
- dispatcher.canReadMoreEntries(true);
- STATE_UPDATER.set(this, State.Completed);
- return;
- }
- if (pendingReadEntryRequest.retry <= maxRetry) {
- retryReadRequest(pendingReadEntryRequest, waitTimeMillis);
- }
- }
- }
-
- // Cancel all issued and pending request and update cursor's read position.
- private void cancelReadRequests(Position position) {
- if (!issuedReads.isEmpty()) {
- cleanQueue(issuedReads);
- cursor.seek(position);
- }
-
- if (!pendingReads.isEmpty()) {
- cleanQueue(pendingReads);
- }
- }
-
- private void internalCancelReadRequests() {
- Position readPosition = !issuedReads.isEmpty() ?
issuedReads.peek().position : pendingReads.peek().position;
- cancelReadRequests(readPosition);
- }
-
- public boolean cancelReadRequests() {
- if (STATE_UPDATER.compareAndSet(this, State.Issued, State.Canceling)) {
- // Don't block caller thread, complete cancel read with dispatcher
dedicated thread.
- topicExecutor.execute(() -> {
- synchronized (StreamingEntryReader.this) {
- if (STATE_UPDATER.compareAndSet(this, State.Canceling,
State.Canceled)) {
- internalCancelReadRequests();
- }
- }
- });
- return true;
- }
- return false;
- }
-
- private void cleanQueue(Queue<PendingReadEntryRequest> queue) {
- while (!queue.isEmpty()) {
- PendingReadEntryRequest pendingReadEntryRequest = queue.poll();
- if (pendingReadEntryRequest.entry != null) {
- pendingReadEntryRequest.entry.release();
- pendingReadEntryRequest.recycle();
- }
- }
- }
-
- private void retryReadRequest(PendingReadEntryRequest
pendingReadEntryRequest, long delay) {
- topic.getBrokerService().executor().schedule(() -> {
- // Jump again into dispatcher dedicated thread
- dispatcherExecutor.execute(() -> {
- ManagedLedgerImpl managedLedger = (ManagedLedgerImpl)
cursor.getManagedLedger();
- managedLedger.asyncReadEntry(pendingReadEntryRequest.position,
this, pendingReadEntryRequest);
- });
- }, delay, TimeUnit.MILLISECONDS);
- }
-
- @Override
- public void entriesAvailable() {
- dispatcherExecutor.execute(this::internalEntriesAvailable);
- }
-
- private synchronized void internalEntriesAvailable() {
- if (log.isDebugEnabled()) {
- log.debug("[{}} Streaming entry reader get notification of newly
added entries from managed ledger,"
- + " trying to issued pending read requests.",
cursor.getName());
- }
- ManagedLedgerImpl managedLedger = (ManagedLedgerImpl)
cursor.getManagedLedger();
- List<PendingReadEntryRequest> newlyIssuedRequests = new ArrayList<>();
- if (!pendingReads.isEmpty()) {
- // Edge case, when a old ledger is full and new ledger is not yet
opened, position can point to next
- // position of the last confirmed position, but it'll be an
invalid position. So try to update the position.
- if (!managedLedger.isValidPosition(pendingReads.peek().position)) {
- pendingReads.peek().position =
managedLedger.getNextValidPosition(pendingReads.peek().position);
- }
- while (!pendingReads.isEmpty() &&
managedLedger.hasMoreEntries(pendingReads.peek().position)) {
- PendingReadEntryRequest next = pendingReads.poll();
- issuedReads.offer(next);
- newlyIssuedRequests.add(next);
- // Need to update the position because when the
PendingReadEntryRequest is created, the position could
- // be all set to managed ledger's last confirmed position.
- if (!pendingReads.isEmpty()) {
- pendingReads.peek().position =
managedLedger.getNextValidPosition(next.position);
- }
- }
-
- for (PendingReadEntryRequest request : newlyIssuedRequests) {
- managedLedger.asyncReadEntry(request.position, this, request);
- }
-
- if (!pendingReads.isEmpty()) {
- if (log.isDebugEnabled()) {
- log.debug("[{}} Streaming entry reader has {} pending read
requests waiting on new entry."
- , cursor.getName(), pendingReads.size());
- }
- if
(managedLedger.hasMoreEntries(pendingReads.peek().position)) {
- entriesAvailable();
- } else {
- managedLedger.addWaitingEntryCallBack(this);
- }
- }
- }
- }
-
- protected State getState() {
- return STATE_UPDATER.get(this);
- }
-
- enum State {
- Issued, Canceling, Canceled, Completed;
- }
-
-}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/streamingdispatch/package-info.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/streamingdispatch/package-info.java
deleted file mode 100644
index 4d576d9f437..00000000000
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/streamingdispatch/package-info.java
+++ /dev/null
@@ -1,19 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.broker.service.streamingdispatch;
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherFailoverConsumerStreamingDispatcherTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherFailoverConsumerStreamingDispatcherTest.java
deleted file mode 100644
index f86fe0701dc..00000000000
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherFailoverConsumerStreamingDispatcherTest.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.broker.service.persistent;
-
-import
org.apache.pulsar.broker.service.PersistentDispatcherFailoverConsumerTest;
-import org.apache.pulsar.broker.service.streamingdispatch.StreamingDispatcher;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.Test;
-
-/**
- * PersistentDispatcherFailoverConsumerTest with {@link StreamingDispatcher}
- */
-@Test(groups = "quarantine")
-public class PersistentDispatcherFailoverConsumerStreamingDispatcherTest
extends PersistentDispatcherFailoverConsumerTest {
-
- @BeforeMethod(alwaysRun = true)
- public void setup() throws Exception {
- super.setup();
- pulsarTestContext.getConfig().setStreamingDispatch(true);
- }
-}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentFailoverStreamingDispatcherE2ETest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentFailoverStreamingDispatcherE2ETest.java
deleted file mode 100644
index 92352cde47f..00000000000
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentFailoverStreamingDispatcherE2ETest.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.broker.service.persistent;
-
-import org.apache.pulsar.broker.service.PersistentFailoverE2ETest;
-import org.apache.pulsar.broker.service.streamingdispatch.StreamingDispatcher;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
-
-/**
- * PersistentFailoverE2ETest with {@link StreamingDispatcher}
- */
-@Test(groups = "broker")
-public class PersistentFailoverStreamingDispatcherE2ETest extends
PersistentFailoverE2ETest {
-
- @BeforeClass
- @Override
- protected void setup() throws Exception {
- conf.setStreamingDispatch(true);
- super.setup();
- }
-}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherBlockConsumerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherBlockConsumerTest.java
deleted file mode 100644
index ef515cd85bf..00000000000
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherBlockConsumerTest.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.broker.service.persistent;
-
-import org.apache.pulsar.broker.service.streamingdispatch.StreamingDispatcher;
-import org.apache.pulsar.client.api.DispatcherBlockConsumerTest;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.Test;
-
-/**
- * DispatcherBlockConsumerTest with {@link StreamingDispatcher}
- */
-@Test(groups = "flaky")
-public class PersistentStreamingDispatcherBlockConsumerTest extends
DispatcherBlockConsumerTest {
-
- @BeforeMethod
- @Override
- protected void setup() throws Exception {
- super.setup();
- conf.setStreamingDispatch(true);
- }
-}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionMessageDispatchStreamingDispatcherThrottlingTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionMessageDispatchStreamingDispatcherThrottlingTest.java
deleted file mode 100644
index fc62d84b586..00000000000
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionMessageDispatchStreamingDispatcherThrottlingTest.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.broker.service.persistent;
-
-import org.apache.pulsar.broker.service.streamingdispatch.StreamingDispatcher;
-import org.apache.pulsar.client.api.SubscriptionMessageDispatchThrottlingTest;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
-
-/**
- * SubscriptionMessageDispatchThrottlingTest with {@link StreamingDispatcher}
- */
-@Test(groups = "flaky")
-public class
PersistentSubscriptionMessageDispatchStreamingDispatcherThrottlingTest
- extends SubscriptionMessageDispatchThrottlingTest {
-
- @BeforeClass
- @Override
- protected void setup() throws Exception {
- super.setup();
- conf.setStreamingDispatch(true);
- }
-}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicStreamingDispatcherE2ETest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicStreamingDispatcherE2ETest.java
deleted file mode 100644
index b4aa4793ba6..00000000000
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicStreamingDispatcherE2ETest.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.broker.service.persistent;
-
-import org.apache.pulsar.broker.service.PersistentTopicE2ETest;
-import org.apache.pulsar.broker.service.streamingdispatch.StreamingDispatcher;
-import org.testng.annotations.Test;
-
-/**
- * PersistentTopicE2ETest with {@link StreamingDispatcher}
- */
-@Test(groups = "flaky")
-public class PersistentTopicStreamingDispatcherE2ETest extends
PersistentTopicE2ETest {
-
- @Override
- protected void doInitConf() throws Exception {
- super.doInitConf();
- conf.setStreamingDispatch(true);
- }
-
- @Override
- @Test
- public void testMessageRedelivery() throws Exception {
- super.testMessageRedelivery();
- }
-}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicStreamingDispatcherTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicStreamingDispatcherTest.java
deleted file mode 100644
index 440cbbe290c..00000000000
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicStreamingDispatcherTest.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.broker.service.persistent;
-
-import org.apache.pulsar.broker.ServiceConfiguration;
-import org.apache.pulsar.broker.service.PersistentTopicTest;
-import org.apache.pulsar.broker.service.streamingdispatch.StreamingDispatcher;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.Test;
-
-/**
- * PersistentTopicTest with {@link StreamingDispatcher}
- */
-@Test(groups = "broker")
-public class PersistentTopicStreamingDispatcherTest extends
PersistentTopicTest {
-
- @BeforeMethod(alwaysRun = true)
- public void setup() throws Exception {
- super.setup();
- ServiceConfiguration config = pulsarTestContext.getConfig();
- config.setTopicLevelPoliciesEnabled(false);
- config.setSystemTopicEnabled(false);
- config.setStreamingDispatch(true);
- }
-}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/SimpleProducerConsumerTestStreamingDispatcherTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/SimpleProducerConsumerTestStreamingDispatcherTest.java
deleted file mode 100644
index 706571eeb71..00000000000
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/SimpleProducerConsumerTestStreamingDispatcherTest.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.broker.service.persistent;
-
-import org.apache.pulsar.broker.service.streamingdispatch.StreamingDispatcher;
-import org.apache.pulsar.client.api.SimpleProducerConsumerTest;
-import org.testng.annotations.Test;
-
-/**
- * SimpleProducerConsumerTest with {@link StreamingDispatcher}
- */
-@Test(groups = "flaky")
-public class SimpleProducerConsumerTestStreamingDispatcherTest extends
SimpleProducerConsumerTest {
-
- @Override
- protected void doInitConf() throws Exception {
- super.doInitConf();
- conf.setStreamingDispatch(true);
- }
-}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/streamingdispatch/StreamingEntryReaderTests.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/streamingdispatch/StreamingEntryReaderTests.java
deleted file mode 100644
index 5eb29b8ef28..00000000000
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/streamingdispatch/StreamingEntryReaderTests.java
+++ /dev/null
@@ -1,441 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.broker.service.streamingdispatch;
-
-import static org.awaitility.Awaitility.await;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyBoolean;
-import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.reset;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.when;
-import static org.testng.Assert.assertEquals;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.nio.NioEventLoopGroup;
-import java.nio.charset.Charset;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Stack;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.bookkeeper.common.util.OrderedExecutor;
-import org.apache.bookkeeper.common.util.OrderedScheduler;
-import org.apache.bookkeeper.mledger.AsyncCallbacks;
-import org.apache.bookkeeper.mledger.Entry;
-import org.apache.bookkeeper.mledger.ManagedCursor;
-import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
-import org.apache.bookkeeper.mledger.ManagedLedgerException;
-import org.apache.bookkeeper.mledger.Position;
-import org.apache.bookkeeper.mledger.impl.EntryImpl;
-import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
-import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
-import org.apache.bookkeeper.mledger.impl.PositionImpl;
-import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
-import org.apache.pulsar.broker.service.BrokerService;
-import org.apache.pulsar.broker.service.persistent.PersistentTopic;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-import org.testng.annotations.Test;
-
-/**
- * Tests for {@link StreamingEntryReader}
- */
-@Test(groups = "flaky")
-public class StreamingEntryReaderTests extends MockedBookKeeperTestCase {
-
- private static final Charset Encoding = StandardCharsets.UTF_8;
- private PersistentTopic mockTopic;
- private StreamingDispatcher mockDispatcher;
- private BrokerService mockBrokerService;
- private EventLoopGroup eventLoopGroup;
- private OrderedExecutor orderedExecutor;
- private ManagedLedgerConfig config;
- private ManagedLedgerImpl ledger;
- private ManagedCursor cursor;
-
- @Override
- protected void setUpTestCase() throws Exception {
- eventLoopGroup = new NioEventLoopGroup(1);
- orderedExecutor = OrderedScheduler.newSchedulerBuilder()
- .numThreads(1)
- .name("StreamingEntryReaderTests").build();
- mockTopic = mock(PersistentTopic.class);
- mockBrokerService = mock(BrokerService.class);
- mockDispatcher = mock(StreamingDispatcher.class);
- config = new ManagedLedgerConfig().setMaxEntriesPerLedger(10);
- ledger = spy((ManagedLedgerImpl) factory.open("my_test_ledger",
config));
- cursor = ledger.openCursor("test");
- when(mockTopic.getBrokerService()).thenReturn(mockBrokerService);
- when(mockBrokerService.executor()).thenReturn(eventLoopGroup);
-
when(mockBrokerService.getTopicOrderedExecutor()).thenReturn(orderedExecutor);
- doAnswer(new Answer<Void>() {
- @Override
- public Void answer(InvocationOnMock invocationOnMock) {
- return null;
- }
- }).when(mockDispatcher).notifyConsumersEndOfTopic();
- }
-
- @Override
- protected void cleanUpTestCase() {
- if (eventLoopGroup != null) {
- eventLoopGroup.shutdownNow();
- eventLoopGroup = null;
- }
- if (orderedExecutor != null) {
- orderedExecutor.shutdownNow();
- orderedExecutor = null;
- }
- }
-
- @Test
- public void testCanReadEntryFromMLedgerHappyPath() throws Exception {
- AtomicInteger entryCount = new AtomicInteger(0);
- Stack<Position> positions = new Stack<>();
-
- for (int i = 0; i < 150; i++) {
- ledger.addEntry(String.format("message-%d", i).getBytes(Encoding));
- }
-
- StreamingEntryReader streamingEntryReader =new
StreamingEntryReader((ManagedCursorImpl) cursor,
- mockDispatcher, mockTopic);
-
- doAnswer((InvocationOnMock invocationOnMock) -> {
- Entry entry = invocationOnMock.getArgument(0, Entry.class);
- positions.push(entry.getPosition());
- assertEquals(new String(entry.getData()),
String.format("message-%d", entryCount.getAndIncrement()));
- cursor.seek(ledger.getNextValidPosition((PositionImpl)
entry.getPosition()));
- return null;
- }
- ).when(mockDispatcher).readEntryComplete(any(Entry.class),
any(PendingReadEntryRequest.class));
-
- streamingEntryReader.asyncReadEntries(50, 700, null);
- await().until(() -> entryCount.get() == 50);
- // Check cursor's read position has been properly updated
- assertEquals(cursor.getReadPosition(),
ledger.getNextValidPosition((PositionImpl) positions.peek()));
- streamingEntryReader.asyncReadEntries(50, 700, null);
- await().until(() -> entryCount.get() == 100);
- assertEquals(cursor.getReadPosition(),
ledger.getNextValidPosition((PositionImpl) positions.peek()));
- streamingEntryReader.asyncReadEntries(50, 700, null);
- await().until(() -> entryCount.get() == 150);
- assertEquals(cursor.getReadPosition(),
ledger.getNextValidPosition((PositionImpl) positions.peek()));
- }
-
- @Test
- public void testCanReadEntryFromMLedgerSizeExceededLimit() throws
Exception {
- AtomicBoolean readComplete = new AtomicBoolean(false);
- Stack<Position> positions = new Stack<>();
- List<String> entries = new ArrayList<>();
- int size = "mmmmmmmmmmessage-0".getBytes().length;
- for (int i = 0; i < 15; i++) {
- ledger.addEntry(String.format("mmmmmmmmmmessage-%d",
i).getBytes(Encoding));
- }
-
- StreamingEntryReader streamingEntryReader =
- new StreamingEntryReader((ManagedCursorImpl) cursor,
mockDispatcher, mockTopic);
-
- doAnswer((InvocationOnMock invocationOnMock) -> {
- Entry entry = invocationOnMock.getArgument(0, Entry.class);
- positions.push(entry.getPosition());
- entries.add(new String(entry.getData()));
- cursor.seek(ledger.getNextValidPosition((PositionImpl)
entry.getPosition()));
- return null;
- }
- ).when(mockDispatcher).readEntryComplete(any(Entry.class),
any(PendingReadEntryRequest.class));
-
- doAnswer((InvocationOnMock invocationOnMock) -> {
- readComplete.set(true);
- return null;
- }
- ).when(mockDispatcher).canReadMoreEntries(anyBoolean());
-
- PositionImpl position =
ledger.getPositionAfterN(ledger.getFirstPosition(), 3,
ManagedLedgerImpl.PositionBound.startExcluded);
- // Make reading from mledger return out of order.
- doAnswer(new Answer<Void>() {
- @Override
- public Void answer(InvocationOnMock invocationOnMock) throws
Throwable {
- AsyncCallbacks.ReadEntryCallback cb =
invocationOnMock.getArgument(1, AsyncCallbacks.ReadEntryCallback.class);
- executor.schedule(() -> {
-
cb.readEntryComplete(EntryImpl.create(position.getLedgerId(),
position.getEntryId(), "mmmmmmmmmmessage-2".getBytes()),
- invocationOnMock.getArgument(2));
- }, 200, TimeUnit.MILLISECONDS);
- return null;
- }
- }).when(ledger).asyncReadEntry(eq(position), any(), any());
-
- // Only 2 entries should be read with this request.
- streamingEntryReader.asyncReadEntries(6, size * 2 + 1, null);
- await().until(() -> readComplete.get());
- assertEquals(entries.size(), 2);
- // Assert cursor's read position has been properly updated to the
third entry, since we should only read
- // 2 retries with previous request
- assertEquals(cursor.getReadPosition(),
ledger.getNextValidPosition((PositionImpl) positions.peek()));
- reset(ledger);
- readComplete.set(false);
- streamingEntryReader.asyncReadEntries(6, size * 2 + 1, null);
- await().until(() -> readComplete.get());
- readComplete.set(false);
- streamingEntryReader.asyncReadEntries(6, size * 2 + 1, null);
- await().until(() -> readComplete.get());
- readComplete.set(false);
- streamingEntryReader.asyncReadEntries(6, size * 2 + 1, null);
- await().until(() -> readComplete.get());
- assertEquals(cursor.getReadPosition(),
ledger.getNextValidPosition((PositionImpl) positions.peek()));
- assertEquals(entries.size(), 8);
- for (int i = 0; i < entries.size(); i++) {
- assertEquals(String.format("mmmmmmmmmmessage-%d", i),
entries.get(i));
- }
- }
-
- @Test
- public void testCanReadEntryFromMLedgerWaitingForNewEntry() throws
Exception {
- AtomicInteger entryCount = new AtomicInteger(0);
- AtomicBoolean entryProcessed = new AtomicBoolean(false);
- Stack<Position> positions = new Stack<>();
- List<String> entries = new ArrayList<>();
- for (int i = 0; i < 7; i++) {
- ledger.addEntry(String.format("message-%d", i).getBytes(Encoding));
- }
-
- StreamingEntryReader streamingEntryReader =
- new StreamingEntryReader((ManagedCursorImpl) cursor,
mockDispatcher, mockTopic);
-
- doAnswer((InvocationOnMock invocationOnMock) -> {
- Entry entry = invocationOnMock.getArgument(0, Entry.class);
- positions.push(entry.getPosition());
- entries.add(new String(entry.getData()));
- entryCount.getAndIncrement();
- cursor.seek(ledger.getNextValidPosition((PositionImpl)
entry.getPosition()));
- entryProcessed.set(true);
- return null;
- }
- ).when(mockDispatcher).readEntryComplete(any(Entry.class),
any(PendingReadEntryRequest.class));
-
- streamingEntryReader.asyncReadEntries(5, 100, null);
- await().until(() -> entryCount.get() == 5);
- assertEquals(cursor.getReadPosition(),
ledger.getNextValidPosition((PositionImpl) positions.peek()));
- streamingEntryReader.asyncReadEntries(5, 100, null);
- // We only write 7 entries initially so only 7 entries can be read.
- await().until(() -> entryCount.get() == 7);
- // Add new entry and await for it to be send to reader.
- entryProcessed.set(false);
- ledger.addEntry("message-7".getBytes(Encoding));
- await().until(() -> entryProcessed.get());
- assertEquals(entries.size(), 8);
- entryProcessed.set(false);
- ledger.addEntry("message-8".getBytes(Encoding));
- await().until(() -> entryProcessed.get());
- assertEquals(entries.size(), 9);
- entryProcessed.set(false);
- ledger.addEntry("message-9".getBytes(Encoding));
- await().until(() -> entryProcessed.get());
- assertEquals(entries.size(), 10);
- assertEquals(cursor.getReadPosition(),
ledger.getNextValidPosition((PositionImpl) positions.peek()));
- for (int i = 0; i < entries.size(); i++) {
- assertEquals(String.format("message-%d", i), entries.get(i));
- }
- }
-
- @Test
- public void testCanCancelReadEntryRequestAndResumeReading() throws
Exception {
- Map<Position, String> messages = new HashMap<>();
- AtomicInteger count = new AtomicInteger(0);
- Stack<Position> positions = new Stack<>();
- List<String> entries = new ArrayList<>();
-
- for (int i = 0; i < 20; i++) {
- String msg = String.format("message-%d", i);
- messages.put(ledger.addEntry(msg.getBytes(Encoding)), msg);
- }
-
- StreamingEntryReader streamingEntryReader =
- new StreamingEntryReader((ManagedCursorImpl) cursor,
mockDispatcher, mockTopic);
-
- doAnswer((InvocationOnMock invocationOnMock) -> {
- Entry entry = invocationOnMock.getArgument(0, Entry.class);
- positions.push(entry.getPosition());
- entries.add(new String(entry.getData()));
- cursor.seek(ledger.getNextValidPosition((PositionImpl)
entry.getPosition()));
- return null;
- }
- ).when(mockDispatcher).readEntryComplete(any(Entry.class),
any(PendingReadEntryRequest.class));
-
- // Only return 5 entries
- doAnswer(new Answer<Void>() {
- @Override
- public Void answer(InvocationOnMock invocationOnMock) {
- AsyncCallbacks.ReadEntryCallback cb =
invocationOnMock.getArgument(1, AsyncCallbacks.ReadEntryCallback.class);
- PositionImpl position = invocationOnMock.getArgument(0,
PositionImpl.class);
- int c = count.getAndIncrement();
- if (c < 5) {
-
cb.readEntryComplete(EntryImpl.create(position.getLedgerId(),
position.getEntryId(),
- messages.get(position).getBytes()),
- invocationOnMock.getArgument(2));
- }
- return null;
- }
- }).when(ledger).asyncReadEntry(any(), any(), any());
-
- streamingEntryReader.asyncReadEntries(20, 200, null);
- streamingEntryReader.cancelReadRequests();
- await().until(() -> streamingEntryReader.getState() ==
StreamingEntryReader.State.Canceled);
- // Only have 5 entry as we make ledger only return 5 entries and
cancel the request.
- assertEquals(entries.size(), 5);
- assertEquals(cursor.getReadPosition(),
ledger.getNextValidPosition((PositionImpl) positions.peek()));
- // Clear mock and try to read remaining entries
- reset(ledger);
- streamingEntryReader.asyncReadEntries(15, 200, null);
- streamingEntryReader.cancelReadRequests();
- await().until(() -> streamingEntryReader.getState() ==
StreamingEntryReader.State.Completed);
- // Only have 5 entry as we make ledger only return 5 entries and
cancel the request.
- assertEquals(entries.size(), 20);
- assertEquals(cursor.getReadPosition(),
ledger.getNextValidPosition((PositionImpl) positions.peek()));
- // Make sure message still returned in order
- for (int i = 0; i < entries.size(); i++) {
- assertEquals(String.format("message-%d", i), entries.get(i));
- }
- }
-
- @Test
- public void testCanHandleExceptionAndRetry() throws Exception {
- Map<Position, String> messages = new HashMap<>();
- AtomicBoolean entryProcessed = new AtomicBoolean(false);
- AtomicInteger count = new AtomicInteger(0);
- Stack<Position> positions = new Stack<>();
- List<String> entries = new ArrayList<>();
- for (int i = 0; i < 12; i++) {
- String msg = String.format("message-%d", i);
- messages.put(ledger.addEntry(msg.getBytes(Encoding)), msg);
- }
-
- StreamingEntryReader streamingEntryReader =
- new StreamingEntryReader((ManagedCursorImpl) cursor,
mockDispatcher, mockTopic);
-
- doAnswer((InvocationOnMock invocationOnMock) -> {
- Entry entry = invocationOnMock.getArgument(0, Entry.class);
- positions.push(entry.getPosition());
- entries.add(new String(entry.getData()));
- cursor.seek(ledger.getNextValidPosition((PositionImpl)
entry.getPosition()));
-
- if (entries.size() == 6 || entries.size() == 12) {
- entryProcessed.set(true);
- }
- return null;
- }
- ).when(mockDispatcher).readEntryComplete(any(Entry.class),
any(PendingReadEntryRequest.class));
-
- // Make reading from mledger throw exception randomly.
- doAnswer(new Answer<Void>() {
- @Override
- public Void answer(InvocationOnMock invocationOnMock) throws
Throwable {
- AsyncCallbacks.ReadEntryCallback cb =
invocationOnMock.getArgument(1, AsyncCallbacks.ReadEntryCallback.class);
- PositionImpl position = invocationOnMock.getArgument(0,
PositionImpl.class);
- int c = count.getAndIncrement();
- if (c >= 3 && c < 5 || c >= 9 && c < 11) {
- cb.readEntryFailed(new
ManagedLedgerException.TooManyRequestsException("Fake exception."),
- invocationOnMock.getArgument(2));
- } else {
-
cb.readEntryComplete(EntryImpl.create(position.getLedgerId(),
position.getEntryId(),
- messages.get(position).getBytes()),
- invocationOnMock.getArgument(2));
- }
- return null;
- }
- }).when(ledger).asyncReadEntry(any(), any(), any());
-
- streamingEntryReader.asyncReadEntries(6, 100, null);
- await().until(() -> entryProcessed.get());
- assertEquals(entries.size(), 6);
- assertEquals(cursor.getReadPosition(),
ledger.getNextValidPosition((PositionImpl) positions.peek()));
- entryProcessed.set(false);
- streamingEntryReader.asyncReadEntries(6, 100, null);
- await().until(() -> entryProcessed.get());
- assertEquals(entries.size(), 12);
- assertEquals(cursor.getReadPosition(),
ledger.getNextValidPosition((PositionImpl) positions.peek()));
- // Make sure message still returned in order
- for (int i = 0; i < entries.size(); i++) {
- assertEquals(String.format("message-%d", i), entries.get(i));
- }
- }
-
- @Test
- public void testWillCancelReadAfterExhaustingRetry() throws Exception {
- Map<Position, String> messages = new HashMap<>();
- AtomicInteger count = new AtomicInteger(0);
- Stack<Position> positions = new Stack<>();
- List<String> entries = new ArrayList<>();
- for (int i = 0; i < 12; i++) {
- String msg = String.format("message-%d", i);
- messages.put(ledger.addEntry(msg.getBytes(Encoding)), msg);
- }
-
- StreamingEntryReader streamingEntryReader =
- new StreamingEntryReader((ManagedCursorImpl) cursor,
mockDispatcher, mockTopic);
-
- doAnswer((InvocationOnMock invocationOnMock) -> {
- Entry entry = invocationOnMock.getArgument(0, Entry.class);
- positions.push(entry.getPosition());
- cursor.seek(ledger.getNextValidPosition((PositionImpl)
entry.getPosition()));
- entries.add(new String(entry.getData()));
- return null;
- }
- ).when(mockDispatcher).readEntryComplete(any(Entry.class),
any(PendingReadEntryRequest.class));
-
- // Fail after first 3 read.
- doAnswer(new Answer<Void>() {
- @Override
- public Void answer(InvocationOnMock invocationOnMock) throws
Throwable {
- AsyncCallbacks.ReadEntryCallback cb =
invocationOnMock.getArgument(1, AsyncCallbacks.ReadEntryCallback.class);
- PositionImpl position = invocationOnMock.getArgument(0,
PositionImpl.class);
- int c = count.getAndIncrement();
- if (c >= 3) {
- cb.readEntryFailed(new
ManagedLedgerException.TooManyRequestsException("Fake exception."),
- invocationOnMock.getArgument(2));
- } else {
-
cb.readEntryComplete(EntryImpl.create(position.getLedgerId(),
position.getEntryId(),
- messages.get(position).getBytes()),
- invocationOnMock.getArgument(2));
- }
- return null;
- }
- }).when(ledger).asyncReadEntry(any(), any(), any());
-
- streamingEntryReader.asyncReadEntries(5, 100, null);
- await().until(() -> streamingEntryReader.getState() ==
StreamingEntryReader.State.Completed);
- // Issued 5 read, should only have 3 entries as others were canceled
after exhausting retries.
- assertEquals(entries.size(), 3);
- for (int i = 0; i < entries.size(); i++) {
- assertEquals(String.format("message-%d", i), entries.get(i));
- }
- reset(ledger);
- streamingEntryReader.asyncReadEntries(5, 100, null);
- await().until(() -> streamingEntryReader.getState() ==
StreamingEntryReader.State.Completed);
- assertEquals(entries.size(), 8);
- for (int i = 0; i < entries.size(); i++) {
- assertEquals(String.format("message-%d", i), entries.get(i));
- }
- }
-
-}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java
index ed85ffd600e..4f4affc39d3 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java
@@ -1228,7 +1228,6 @@ public class MessageDispatchThrottlingTest extends
ProducerConsumerBase {
conf.setManagedLedgerMinimumBacklogCursorsForCaching(2);
conf.setManagedLedgerMinimumBacklogEntriesForCaching(10);
conf.setManagedLedgerCacheEvictionTimeThresholdMillis(60 * 1000);
- conf.setStreamingDispatch(false);
restartBroker();
final long totalMessages = 200;
final int receiverSize = 10;