This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new b1a29b520d3 [feature][broker] PIP 37: Support chunking with Shared
subscription (#16202)
b1a29b520d3 is described below
commit b1a29b520d34d60e60160e3a7b9b0e26926063ee
Author: Yunze Xu <[email protected]>
AuthorDate: Tue Aug 2 13:41:07 2022 +0800
[feature][broker] PIP 37: Support chunking with Shared subscription (#16202)
* [feature][broker] PIP 37: Support chunking with Shared subscription
### Motivation
https://github.com/apache/pulsar/wiki/PIP-37%3A-Large-message-size-handling-in-Pulsar#option-1-broker-caches-mapping-of-message-uuid-and-consumerid
The cons of option 1 described in the original proposal don't exist
for current code because broker keeps redelivered messages into
**sorted** map now.
### Modifications
First of all, to avoid too many code changes, an `EntryAndMetadata`
class is introduced to bind the `Entry` with the associated
`MessageMetadata` to avoid parsing the metadata repeatedly. It also
implements the `Entry` interface, so this PR changes some
`List<Entry>` parameters to `List<? extends Entry>` so that a
`List<EntryAndMetadata>` argument can be accepted.
Then, a `SharedConsumerAssignor` is introduced to assign a list of
entries to all shared consumers.
1. Use a default selector to select the next consumer, like
`PersistentDispatcherMultipleConsumers#getNextConsumer`,
2. Each time a consumer is chosen, assign the entries in range
[i, i+permits) to the consumer except entries that have uuid:
- If uuid is not cached, cache `uuid -> consumer` to indicate the
chunked message of this uuid must be dispatched to this consumer.
- Otherwise, assign this entry to the owner consumer of the uuid.
The `assign` method returns a map that maps `Consumer` to
`List<EntryAndMetadata>`. The following logic is similar to the
Key_Shared dispatcher.
Finally, cancel the limit in `ConsumerImpl`.
### Verifying this change
`SharedConsumerAssignorTest` is added to show how the assignor works
in detail.
`MessageChunkingSharedTest` is added to verify the Shared dispatcher
works on chunked messages, including:
- Single producer sends chunked messages with various chunk count to a
consumer has a limited permits.
- Single producer sends chunked messages to two consumers to verify
both they can receive chunked messages.
- Produce interleaved chunks via `PersistentTopic` directly to simulate
multiple producers, and verify the new consumer can receive all
unacknowledged messages received by the old consumer.
---
.../broker/service/AbstractBaseDispatcher.java | 11 +-
.../org/apache/pulsar/broker/service/Consumer.java | 49 +++-
.../pulsar/broker/service/EntryAndMetadata.java | 114 ++++++++++
.../pulsar/broker/service/PulsarCommandSender.java | 5 +-
.../broker/service/PulsarCommandSenderImpl.java | 5 +-
.../broker/service/SharedConsumerAssignor.java | 139 ++++++++++++
.../PersistentDispatcherMultipleConsumers.java | 124 ++++++++--
.../broker/service/SharedConsumerAssignorTest.java | 222 ++++++++++++++++++
.../client/impl/MessageChunkingSharedTest.java | 249 +++++++++++++++++++++
pulsar-client-cpp/lib/ConsumerImpl.cc | 4 +-
.../apache/pulsar/client/impl/ConsumerImpl.java | 2 +-
11 files changed, 890 insertions(+), 34 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
index de62df77e98..47728cb5b22 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
@@ -79,7 +79,7 @@ public abstract class AbstractBaseDispatcher extends
EntryFilterSupport implemen
* @param sendMessageInfo
* an object where the total size in messages and bytes will be
returned back to the caller
*/
- public int filterEntriesForConsumer(List<Entry> entries, EntryBatchSizes
batchSizes,
+ public int filterEntriesForConsumer(List<? extends Entry> entries,
EntryBatchSizes batchSizes,
SendMessageInfo sendMessageInfo, EntryBatchIndexesAcks indexesAcks,
ManagedCursor cursor, boolean isReplayRead, Consumer consumer) {
return filterEntriesForConsumer(Optional.empty(), 0, entries,
batchSizes, sendMessageInfo, indexesAcks, cursor,
@@ -96,7 +96,7 @@ public abstract class AbstractBaseDispatcher extends
EntryFilterSupport implemen
* EntryBatchIndexesAcks, ManagedCursor, boolean, Consumer)
*/
public int filterEntriesForConsumer(Optional<MessageMetadata[]>
optMetadataArray, int startOffset,
- List<Entry> entries, EntryBatchSizes batchSizes, SendMessageInfo
sendMessageInfo,
+ List<? extends Entry> entries, EntryBatchSizes batchSizes,
SendMessageInfo sendMessageInfo,
EntryBatchIndexesAcks indexesAcks, ManagedCursor cursor, boolean
isReplayRead, Consumer consumer) {
int totalMessages = 0;
long totalBytes = 0;
@@ -105,14 +105,17 @@ public abstract class AbstractBaseDispatcher extends
EntryFilterSupport implemen
List<Position> entriesToFiltered =
CollectionUtils.isNotEmpty(entryFilters) ? new ArrayList<>() : null;
List<PositionImpl> entriesToRedeliver =
CollectionUtils.isNotEmpty(entryFilters) ? new ArrayList<>() : null;
for (int i = 0, entriesSize = entries.size(); i < entriesSize; i++) {
- Entry entry = entries.get(i);
+ final Entry entry = entries.get(i);
if (entry == null) {
continue;
}
ByteBuf metadataAndPayload = entry.getDataBuffer();
final int metadataIndex = i + startOffset;
final MessageMetadata msgMetadata =
optMetadataArray.map(metadataArray -> metadataArray[metadataIndex])
- .orElseGet(() ->
Commands.peekMessageMetadata(metadataAndPayload, subscription.toString(), -1));
+ .orElseGet(() -> (entry instanceof EntryAndMetadata)
+ ? ((EntryAndMetadata) entry).getMetadata()
+ :
Commands.peekAndCopyMessageMetadata(metadataAndPayload,
subscription.toString(), -1)
+ );
EntryFilter.FilterResult filterResult = runFiltersForEntry(entry,
msgMetadata, consumer);
if (filterResult == EntryFilter.FilterResult.REJECT) {
entriesToFiltered.add(entry.getPosition());
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index 02304fe7ff3..37dfe087e7f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.broker.service;
import static com.google.common.base.Preconditions.checkArgument;
import static
org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.AtomicDouble;
@@ -203,6 +204,34 @@ public class Consumer {
.getPulsar().getConfiguration().isAcknowledgmentAtBatchIndexLevelEnabled();
}
+ @VisibleForTesting
+ Consumer(String consumerName, int availablePermits) {
+ this.subscription = null;
+ this.subType = null;
+ this.cnx = null;
+ this.appId = null;
+ this.topicName = null;
+ this.partitionIdx = 0;
+ this.consumerId = 0L;
+ this.priorityLevel = 0;
+ this.readCompacted = false;
+ this.consumerName = consumerName;
+ this.msgOut = null;
+ this.msgRedeliver = null;
+ this.msgOutCounter = null;
+ this.bytesOutCounter = null;
+ this.messageAckRate = null;
+ this.pendingAcks = null;
+ this.stats = null;
+ this.isDurable = false;
+ this.metadata = null;
+ this.keySharedMeta = null;
+ this.clientAddress = null;
+ this.startMessageId = null;
+ this.isAcknowledgmentAtBatchIndexLevelEnabled = false;
+ MESSAGE_PERMITS_UPDATER.set(this, availablePermits);
+ }
+
public SubType subType() {
return subType;
}
@@ -227,7 +256,7 @@ public class Consumer {
return readCompacted;
}
- public Future<Void> sendMessages(final List<Entry> entries,
EntryBatchSizes batchSizes,
+ public Future<Void> sendMessages(final List<? extends Entry> entries,
EntryBatchSizes batchSizes,
EntryBatchIndexesAcks batchIndexesAcks,
int totalMessages, long totalBytes, long
totalChunkedMessages,
RedeliveryTracker redeliveryTracker) {
@@ -241,7 +270,7 @@ public class Consumer {
*
* @return a SendMessageInfo object that contains the detail of what was
sent to consumer
*/
- public Future<Void> sendMessages(final List<Entry> entries,
EntryBatchSizes batchSizes,
+ public Future<Void> sendMessages(final List<? extends Entry> entries,
EntryBatchSizes batchSizes,
EntryBatchIndexesAcks batchIndexesAcks,
int totalMessages, long totalBytes, long
totalChunkedMessages,
RedeliveryTracker redeliveryTracker, long
epoch) {
@@ -820,8 +849,13 @@ public class Consumer {
@Override
public String toString() {
- return MoreObjects.toStringHelper(this).add("subscription",
subscription).add("consumerId", consumerId)
- .add("consumerName", consumerName).add("address",
this.cnx.clientAddress()).toString();
+ if (subscription != null && cnx != null) {
+ return MoreObjects.toStringHelper(this).add("subscription",
subscription).add("consumerId", consumerId)
+ .add("consumerName", consumerName).add("address",
this.cnx.clientAddress()).toString();
+ } else {
+ return MoreObjects.toStringHelper(this).add("consumerId",
consumerId)
+ .add("consumerName", consumerName).toString();
+ }
}
public CompletableFuture<Void> checkPermissionsAsync() {
@@ -1041,7 +1075,12 @@ public class Consumer {
}
private int getStickyKeyHash(Entry entry) {
- byte[] stickyKey = Commands.peekStickyKey(entry.getDataBuffer(),
topicName, subscription.getName());
+ final byte[] stickyKey;
+ if (entry instanceof EntryAndMetadata) {
+ stickyKey = ((EntryAndMetadata) entry).getStickyKey();
+ } else {
+ stickyKey = Commands.peekStickyKey(entry.getDataBuffer(),
topicName, subscription.getName());
+ }
return StickyKeyConsumerSelector.makeStickyKeyHash(stickyKey);
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/EntryAndMetadata.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/EntryAndMetadata.java
new file mode 100644
index 00000000000..0bcd215f8be
--- /dev/null
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/EntryAndMetadata.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import com.google.common.annotations.VisibleForTesting;
+import io.netty.buffer.ByteBuf;
+import java.nio.charset.StandardCharsets;
+import javax.annotation.Nullable;
+import lombok.Getter;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.pulsar.common.api.proto.MessageMetadata;
+import org.apache.pulsar.common.protocol.Commands;
+
+public class EntryAndMetadata implements Entry {
+
+ private final Entry entry;
+ @Getter
+ @Nullable
+ private final MessageMetadata metadata;
+
+ private EntryAndMetadata(final Entry entry, @Nullable final
MessageMetadata metadata) {
+ this.entry = entry;
+ this.metadata = metadata;
+ }
+
+ public static EntryAndMetadata create(final Entry entry, final
MessageMetadata metadata) {
+ return new EntryAndMetadata(entry, metadata);
+ }
+
+ @VisibleForTesting
+ static EntryAndMetadata create(final Entry entry) {
+ return create(entry,
Commands.peekAndCopyMessageMetadata(entry.getDataBuffer(), "", -1));
+ }
+
+ public byte[] getStickyKey() {
+ if (metadata != null) {
+ if (metadata.hasOrderingKey()) {
+ return metadata.getOrderingKey();
+ } else if (metadata.hasPartitionKey()) {
+ return
metadata.getPartitionKey().getBytes(StandardCharsets.UTF_8);
+ }
+ }
+ return "NONE_KEY".getBytes(StandardCharsets.UTF_8);
+ }
+
+ @Override
+ public String toString() {
+ String s = entry.getLedgerId() + ":" + entry.getEntryId();
+ if (metadata != null) {
+ s += ("@" + metadata.getProducerName() + "-" +
metadata.getSequenceId());
+ if (metadata.hasChunkId() && metadata.hasNumChunksFromMsg()) {
+ s += ("-" + metadata.getChunkId() + "-" +
metadata.getNumChunksFromMsg());
+ }
+ }
+ return s;
+ }
+
+ @Override
+ public byte[] getData() {
+ return entry.getData();
+ }
+
+ @Override
+ public byte[] getDataAndRelease() {
+ return entry.getDataAndRelease();
+ }
+
+ @Override
+ public int getLength() {
+ return entry.getLength();
+ }
+
+ @Override
+ public ByteBuf getDataBuffer() {
+ return entry.getDataBuffer();
+ }
+
+ @Override
+ public Position getPosition() {
+ return entry.getPosition();
+ }
+
+ @Override
+ public long getLedgerId() {
+ return entry.getLedgerId();
+ }
+
+ @Override
+ public long getEntryId() {
+ return entry.getEntryId();
+ }
+
+ @Override
+ public boolean release() {
+ return entry.release();
+ }
+}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSender.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSender.java
index 8934d2e6ce6..dc5b97d846f 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSender.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSender.java
@@ -78,8 +78,9 @@ public interface PulsarCommandSender {
void sendReachedEndOfTopic(long consumerId);
Future<Void> sendMessagesToConsumer(long consumerId, String topicName,
Subscription subscription,
- int partitionIdx, List<Entry> entries, EntryBatchSizes batchSizes,
EntryBatchIndexesAcks batchIndexesAcks,
- RedeliveryTracker redeliveryTracker, long epoch);
+ int partitionIdx, List<? extends
Entry> entries, EntryBatchSizes batchSizes,
+ EntryBatchIndexesAcks batchIndexesAcks,
+ RedeliveryTracker redeliveryTracker,
long epoch);
void sendTcClientConnectResponse(long requestId, ServerError error, String
message);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java
index 5b5af41d1ef..8eb00b6c295 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java
@@ -221,8 +221,9 @@ public class PulsarCommandSenderImpl implements
PulsarCommandSender {
@Override
public ChannelPromise sendMessagesToConsumer(long consumerId, String
topicName, Subscription subscription,
- int partitionIdx, List<Entry> entries, EntryBatchSizes batchSizes,
EntryBatchIndexesAcks batchIndexesAcks,
- RedeliveryTracker redeliveryTracker, long epoch) {
+ int partitionIdx, List<?
extends Entry> entries,
+ EntryBatchSizes batchSizes,
EntryBatchIndexesAcks batchIndexesAcks,
+ RedeliveryTracker
redeliveryTracker, long epoch) {
final ChannelHandlerContext ctx = cnx.ctx();
final ChannelPromise writePromise = ctx.newPromise();
ctx.channel().eventLoop().execute(() -> {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SharedConsumerAssignor.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SharedConsumerAssignor.java
new file mode 100644
index 00000000000..e3fb531877a
--- /dev/null
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SharedConsumerAssignor.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.ArrayList;
+import java.util.IdentityHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Supplier;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import org.apache.pulsar.common.api.proto.MessageMetadata;
+
+/**
+ * The assigner to assign entries to the proper {@link Consumer} in the shared
subscription.
+ */
+@RequiredArgsConstructor
+public class SharedConsumerAssignor {
+
+ // The cache to map uuid to a consumer because we need to guarantee all
chunks with the same uuid to be dispatched
+ // to a fixed consumer until the whole chunked message is dispatched.
+ @Getter
+ @VisibleForTesting
+ private final Map<String, Consumer> uuidToConsumer = new
ConcurrentHashMap<>();
+
+ // A temporary cache that is cleared each time `assign()` is called
+ private final Map<Consumer, Integer> consumerToPermits = new
IdentityHashMap<>();
+
+ // The selector for entries without uuid. The Consumer returned must have
at least 1 permit.
+ private final Supplier<Consumer> defaultSelector;
+
+ // Process the unassigned messages, e.g. adding them to the replay queue
+ private final java.util.function.Consumer<EntryAndMetadata>
unassignedMessageProcessor;
+
+ public Map<Consumer, List<EntryAndMetadata>> assign(final
List<EntryAndMetadata> entryAndMetadataList,
+ final int
numConsumers) {
+ assert numConsumers >= 0;
+ consumerToPermits.clear();
+ final Map<Consumer, List<EntryAndMetadata>> consumerToEntries = new
IdentityHashMap<>();
+
+ Consumer consumer = getConsumer(numConsumers);
+ if (consumer == null) {
+ entryAndMetadataList.forEach(EntryAndMetadata::release);
+ return consumerToEntries;
+ }
+ // The actual available permits might change, here we use the permits
at the moment to assign entries
+ int availablePermits = consumerToPermits.computeIfAbsent(consumer,
Consumer::getAvailablePermits);
+ int index = 0;
+ for (; index < entryAndMetadataList.size(); index++) {
+ final EntryAndMetadata entryAndMetadata =
entryAndMetadataList.get(index);
+ final MessageMetadata metadata = entryAndMetadata.getMetadata();
+
+ // Select another consumer to ensure `consumer != null` and
`availablePermits > 0`
+ if (availablePermits <= 0) {
+ consumerToPermits.put(consumer, availablePermits);
+ consumer = getConsumer(numConsumers);
+ if (consumer == null) {
+ break;
+ }
+ availablePermits = consumer.getAvailablePermits();
+ }
+
+ if (metadata == null || !metadata.hasUuid() ||
!metadata.hasChunkId() || !metadata.hasNumChunksFromMsg()) {
+ consumerToEntries.computeIfAbsent(consumer, __ -> new
ArrayList<>()).add(entryAndMetadata);
+ } else {
+ final Consumer consumerForUuid = getConsumerForUuid(metadata,
consumer, availablePermits);
+ if (consumerForUuid == null) {
+ unassignedMessageProcessor.accept(entryAndMetadata);
+ continue;
+ }
+ consumerToEntries.computeIfAbsent(consumerForUuid, __ -> new
ArrayList<>()).add(entryAndMetadata);
+ }
+ availablePermits--;
+ }
+
+ for (; index < entryAndMetadataList.size(); index++) {
+ unassignedMessageProcessor.accept(entryAndMetadataList.get(index));
+ }
+
+ return consumerToEntries;
+ }
+
+ private Consumer getConsumer(final int numConsumers) {
+ for (int i = 0; i < numConsumers; i++) {
+ final Consumer consumer = defaultSelector.get();
+ if (consumer == null) {
+ return null;
+ }
+ final int permits = consumerToPermits.computeIfAbsent(consumer,
Consumer::getAvailablePermits);
+ if (permits > 0) {
+ return consumer;
+ }
+ }
+ return null;
+ }
+
+ private Consumer getConsumerForUuid(final MessageMetadata metadata,
+ final Consumer defaultConsumer,
+ final int currentAvailablePermits) {
+ final String uuid = metadata.getUuid();
+ Consumer consumer = uuidToConsumer.get(uuid);
+ if (consumer == null) {
+ if (metadata.getChunkId() != 0) {
+ // Not the first chunk, skip it
+ return null;
+ }
+ consumer = defaultConsumer;
+ uuidToConsumer.put(uuid, consumer);
+ }
+ final int permits = consumerToPermits.computeIfAbsent(consumer,
Consumer::getAvailablePermits);
+ if (permits <= 0) {
+ return null;
+ }
+ if (metadata.getChunkId() == metadata.getNumChunksFromMsg() - 1) {
+ // The last chunk is received, we should remove the cache
+ uuidToConsumer.remove(uuid);
+ }
+ consumerToPermits.put(consumer, currentAvailablePermits - 1);
+ return consumer;
+ }
+}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index cf58bfd43ac..167bc188e99 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -22,10 +22,11 @@ import static
org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
import static
org.apache.pulsar.broker.service.persistent.PersistentTopic.MESSAGE_RATE_BACKOFF_MS;
import com.google.common.collect.Lists;
import com.google.common.collect.Range;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
-import java.util.Objects;
+import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
@@ -33,8 +34,8 @@ import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
-import java.util.stream.Stream;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
@@ -50,12 +51,14 @@ import
org.apache.pulsar.broker.service.BrokerServiceException;
import
org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.Dispatcher;
+import org.apache.pulsar.broker.service.EntryAndMetadata;
import org.apache.pulsar.broker.service.EntryBatchIndexesAcks;
import org.apache.pulsar.broker.service.EntryBatchSizes;
import org.apache.pulsar.broker.service.InMemoryRedeliveryTracker;
import org.apache.pulsar.broker.service.RedeliveryTracker;
import org.apache.pulsar.broker.service.RedeliveryTrackerDisabled;
import org.apache.pulsar.broker.service.SendMessageInfo;
+import org.apache.pulsar.broker.service.SharedConsumerAssignor;
import org.apache.pulsar.broker.service.StickyKeyConsumerSelector;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter.Type;
@@ -109,9 +112,9 @@ public class PersistentDispatcherMultipleConsumers extends
AbstractDispatcherMul
AtomicIntegerFieldUpdater.newUpdater(PersistentDispatcherMultipleConsumers.class,
"blockedDispatcherOnUnackedMsgs");
protected Optional<DispatchRateLimiter> dispatchRateLimiter =
Optional.empty();
-
private AtomicBoolean isRescheduleReadInProgress = new
AtomicBoolean(false);
private final ExecutorService dispatchMessagesThread;
+ private final SharedConsumerAssignor assignor;
protected enum ReadType {
Normal, Replay
@@ -136,6 +139,7 @@ public class PersistentDispatcherMultipleConsumers extends
AbstractDispatcherMul
: RedeliveryTrackerDisabled.REDELIVERY_TRACKER_DISABLED;
this.readBatchSize = serviceConfig.getDispatcherMaxReadBatchSize();
this.initializeDispatchRateLimiterIfNeeded();
+ this.assignor = new SharedConsumerAssignor(this::getNextConsumer,
this::addMessageToReplay);
}
@Override
@@ -575,12 +579,26 @@ public class PersistentDispatcherMultipleConsumers
extends AbstractDispatcherMul
if (entriesToDispatch == 0) {
return true;
}
- final MessageMetadata[] metadataArray = entries.stream()
- .map(entry ->
Commands.peekAndCopyMessageMetadata(entry.getDataBuffer(),
subscription.toString(), -1))
- .toArray(MessageMetadata[]::new);
- int remainingMessages =
Stream.of(metadataArray).filter(Objects::nonNull)
- .map(MessageMetadata::getNumMessagesInBatch)
- .reduce(0, Integer::sum);
+ final MessageMetadata[] metadataArray = new
MessageMetadata[entries.size()];
+ int remainingMessages = 0;
+ boolean hasChunk = false;
+ for (int i = 0; i < metadataArray.length; i++) {
+ final MessageMetadata metadata =
Commands.peekAndCopyMessageMetadata(
+ entries.get(i).getDataBuffer(), subscription.toString(),
-1);
+ if (metadata != null) {
+ remainingMessages += metadata.getNumMessagesInBatch();
+ if (!hasChunk && metadata.hasUuid()) {
+ hasChunk = true;
+ }
+ }
+ metadataArray[i] = metadata;
+ if (!hasChunk && metadataArray[i] != null &&
metadataArray[i].hasUuid()) {
+ hasChunk = true;
+ }
+ }
+ if (hasChunk) {
+ return sendChunkedMessagesToConsumers(readType, entries,
metadataArray);
+ }
int start = 0;
long totalMessagesSent = 0;
@@ -657,6 +675,23 @@ public class PersistentDispatcherMultipleConsumers extends
AbstractDispatcherMul
totalBytesSent += sendMessageInfo.getTotalBytes();
}
+ acquirePermitsForDeliveredMessages(totalEntries, totalMessagesSent,
totalBytesSent);
+
+ if (entriesToDispatch > 0) {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] No consumers found with available permits,
storing {} positions for later replay", name,
+ entries.size() - start);
+ }
+ entries.subList(start, entries.size()).forEach(entry -> {
+ long stickyKeyHash = getStickyKeyHash(entry);
+ addMessageToReplay(entry.getLedgerId(), entry.getEntryId(),
stickyKeyHash);
+ entry.release();
+ });
+ }
+ return true;
+ }
+
+ private void acquirePermitsForDeliveredMessages(long totalEntries, long
totalMessagesSent, long totalBytesSent) {
// acquire message-dispatch permits for already delivered messages
long permits = dispatchThrottlingOnBatchMessageEnabled ? totalEntries
: totalMessagesSent;
if (serviceConfig.isDispatchThrottlingOnNonBacklogConsumerEnabled() ||
!cursor.isActive()) {
@@ -671,19 +706,69 @@ public class PersistentDispatcherMultipleConsumers
extends AbstractDispatcherMul
dispatchRateLimiter.get().tryDispatchPermit(permits,
totalBytesSent);
}
}
+ }
+
+ private boolean sendChunkedMessagesToConsumers(ReadType readType,
+ List<Entry> entries,
+ MessageMetadata[]
metadataArray) {
+ final List<EntryAndMetadata> originalEntryAndMetadataList = new
ArrayList<>(metadataArray.length);
+ for (int i = 0; i < metadataArray.length; i++) {
+
originalEntryAndMetadataList.add(EntryAndMetadata.create(entries.get(i),
metadataArray[i]));
+ }
- if (entriesToDispatch > 0) {
+ final Map<Consumer, List<EntryAndMetadata>> assignResult =
+ assignor.assign(originalEntryAndMetadataList,
consumerList.size());
+ long totalMessagesSent = 0;
+ long totalBytesSent = 0;
+ long totalEntries = 0;
+ final AtomicInteger numConsumers = new
AtomicInteger(assignResult.size());
+ for (Map.Entry<Consumer, List<EntryAndMetadata>> current :
assignResult.entrySet()) {
+ final Consumer consumer = current.getKey();
+ final List<EntryAndMetadata> entryAndMetadataList =
current.getValue();
+ final int messagesForC = Math.min(consumer.getAvailablePermits(),
entryAndMetadataList.size());
if (log.isDebugEnabled()) {
- log.debug("[{}] No consumers found with available permits,
storing {} positions for later replay", name,
- entries.size() - start);
+ log.debug("[{}] select consumer {} with messages num {}, read
type is {}",
+ name, consumer.consumerName(), messagesForC, readType);
}
- entries.subList(start, entries.size()).forEach(entry -> {
- long stickyKeyHash = getStickyKeyHash(entry);
- addMessageToReplay(entry.getLedgerId(), entry.getEntryId(),
stickyKeyHash);
- entry.release();
+ if (messagesForC < entryAndMetadataList.size()) {
+ for (int i = messagesForC; i < entryAndMetadataList.size();
i++) {
+ final EntryAndMetadata entry = entryAndMetadataList.get(i);
+ addMessageToReplay(entry);
+ entryAndMetadataList.set(i, null);
+ }
+ }
+ if (messagesForC == 0) {
+ numConsumers.decrementAndGet();
+ continue;
+ }
+ if (readType == ReadType.Replay) {
+ entryAndMetadataList.stream().limit(messagesForC)
+ .forEach(e ->
redeliveryMessages.remove(e.getLedgerId(), e.getEntryId()));
+ }
+ final SendMessageInfo sendMessageInfo =
SendMessageInfo.getThreadLocal();
+ final EntryBatchSizes batchSizes =
EntryBatchSizes.get(messagesForC);
+ final EntryBatchIndexesAcks batchIndexesAcks =
EntryBatchIndexesAcks.get(messagesForC);
+
+ totalEntries += filterEntriesForConsumer(entryAndMetadataList,
batchSizes, sendMessageInfo,
+ batchIndexesAcks, cursor, readType == ReadType.Replay,
consumer);
+ consumer.sendMessages(entryAndMetadataList, batchSizes,
batchIndexesAcks,
+ sendMessageInfo.getTotalMessages(),
sendMessageInfo.getTotalBytes(),
+ sendMessageInfo.getTotalChunkedMessages(),
getRedeliveryTracker()
+ ).addListener(future -> {
+ if (future.isDone() && numConsumers.decrementAndGet() == 0) {
+ readMoreEntries();
+ }
});
+
+ TOTAL_AVAILABLE_PERMITS_UPDATER.getAndAdd(this,
+ -(sendMessageInfo.getTotalMessages() -
batchIndexesAcks.getTotalAckedIndexCount()));
+ totalMessagesSent += sendMessageInfo.getTotalMessages();
+ totalBytesSent += sendMessageInfo.getTotalBytes();
}
- return true;
+
+ acquirePermitsForDeliveredMessages(totalEntries, totalMessagesSent,
totalBytesSent);
+
+ return numConsumers.get() == 0; // trigger a new readMoreEntries() call
}
@Override
@@ -966,6 +1051,11 @@ public class PersistentDispatcherMultipleConsumers
extends AbstractDispatcherMul
}
}
+ private void addMessageToReplay(Entry entry) {
+ addMessageToReplay(entry.getLedgerId(), entry.getEntryId());
+ entry.release();
+ }
+
protected boolean addMessageToReplay(long ledgerId, long entryId, long
stickyKeyHash) {
if (checkIfMessageIsUnacked(ledgerId, entryId)) {
redeliveryMessages.add(ledgerId, entryId, stickyKeyHash);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SharedConsumerAssignorTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SharedConsumerAssignorTest.java
new file mode 100644
index 00000000000..7649f6eac2d
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SharedConsumerAssignorTest.java
@@ -0,0 +1,222 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertSame;
+import static org.testng.Assert.assertTrue;
+import com.google.common.collect.Sets;
+import io.netty.buffer.ByteBuf;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import lombok.RequiredArgsConstructor;
+import org.apache.bookkeeper.mledger.impl.EntryImpl;
+import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
+import org.apache.pulsar.common.api.proto.MessageMetadata;
+import org.apache.pulsar.common.protocol.Commands;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker")
+public class SharedConsumerAssignorTest {
+
+ private final ConsumerSelector roundRobinConsumerSelector = new
ConsumerSelector();
+ private final List<EntryAndMetadata> entryAndMetadataList = new
ArrayList<>();
+ private final List<EntryAndMetadata> replayQueue = new ArrayList<>();
+ private SharedConsumerAssignor assignor;
+
+ @BeforeMethod
+ public void prepareData() {
+ roundRobinConsumerSelector.clear();
+ entryAndMetadataList.clear();
+ replayQueue.clear();
+ assignor = new SharedConsumerAssignor(roundRobinConsumerSelector,
replayQueue::add);
+ final AtomicLong entryId = new AtomicLong(0L);
+ final MockProducer producerA = new MockProducer("A", entryId,
entryAndMetadataList);
+ final MockProducer producerB = new MockProducer("B", entryId,
entryAndMetadataList);
+ producerA.sendMessage();
+ producerA.sendChunk(0, 3);
+ producerA.sendChunk(1, 3);
+ producerB.sendMessage();
+ producerB.sendChunk(0, 2);
+ producerA.sendChunk(2, 3);
+ producerB.sendChunk(1, 2);
+ // Use the following data for all tests
+ // | entry id | uuid | chunk id | number of chunks |
+ // | -------- | ---- | -------- | ---------------- |
+ // | 0 | A-0 | | |
+ // | 1 | A-1 | 0 | 3 |
+ // | 2 | A-1 | 1 | 3 |
+ // | 3 | B-0 | | |
+ // | 4 | B-1 | 0 | 2 |
+ // | 5 | A-1 | 2 | 3 |
+ // | 6 | B-1 | 1 | 2 |
+ // P.S. In the table above, The uuid represents the
"<producer-name>-<sequence-id>" for non-chunks
+ assertEquals(toString(entryAndMetadataList), Arrays.asList(
+ "0:0@A-0", "0:1@A-1-0-3", "0:2@A-1-1-3", "0:3@B-0",
"0:4@B-1-0-2", "0:5@A-1-2-3", "0:6@B-1-1-2"));
+ }
+
+ @Test
+ public void testSingleConsumerMultiAssign() {
+ // Only first 5 entries can be received because the number of permits
is 5
+ final Consumer consumer = new Consumer("A", 5);
+ roundRobinConsumerSelector.addConsumers(consumer);
+
+ Map<Consumer, List<EntryAndMetadata>> result =
assignor.assign(entryAndMetadataList, 1);
+ assertEquals(result.getOrDefault(consumer, Collections.emptyList()),
entryAndMetadataList.subList(0, 5));
+ // Since two chunked messages (A-1 and B-1) are both not received,
these uuids have been cached
+ assertEquals(assignor.getUuidToConsumer().keySet(),
Sets.newHashSet("A-1", "B-1"));
+ assertEquals(toString(replayQueue), Arrays.asList("0:5@A-1-2-3",
"0:6@B-1-1-2"));
+
+ result = assignor.assign(entryAndMetadataList.subList(5, 6), 1);
+ assertEquals(result.getOrDefault(consumer, Collections.emptyList()),
entryAndMetadataList.subList(5, 6));
+ // A-1 is received so that uuid "A-1" has been removed from the cache
+ assertEquals(assignor.getUuidToConsumer().keySet(),
Sets.newHashSet("B-1"));
+
+ result = assignor.assign(entryAndMetadataList.subList(6, 7), 1);
+ assertEquals(result.getOrDefault(consumer, Collections.emptyList()),
entryAndMetadataList.subList(6, 7));
+ assertTrue(assignor.getUuidToConsumer().isEmpty());
+ }
+
+ @Test
+ public void testMultiConsumerWithSmallPermits() {
+ final Consumer consumerA = new Consumer("A", 3);
+ final Consumer consumerB = new Consumer("B", 4);
+ roundRobinConsumerSelector.addConsumers(consumerA, consumerB);
+
+ // The original order:
+ // A-0, A-1-0-3, A-1-1-3, B-0, B-1-0-2, A-1-2-3, B-1-1-2
+ // First consumerA received 3 entries and the available permits became
0. Then the consumer is switched to
+ // consumerB and then B-0 and B-1-0-2 were received by it.
+ // When consumerB tried to receive A-1-2-3, since the uuid "A-1" was
already assigned to consumerA, consumerB
+ // is not able to receive A-1-2-3.
+ // However, since consumerA has no more permits, A-1-2-3 cannot be
assigned to consumerA as well.
+ // Therefore, A-1-2-3 was skipped and added to the replay queue.
+ Map<Consumer, List<EntryAndMetadata>> result =
assignor.assign(entryAndMetadataList, 2);
+ assertEquals(toString(result.getOrDefault(consumerA,
Collections.emptyList())),
+ Arrays.asList("0:0@A-0", "0:1@A-1-0-3", "0:2@A-1-1-3"));
+ assertEquals(toString(result.getOrDefault(consumerB,
Collections.emptyList())),
+ Arrays.asList("0:3@B-0", "0:4@B-1-0-2", "0:6@B-1-1-2"));
+ assertEquals(toString(replayQueue),
Collections.singletonList("0:5@A-1-2-3"));
+ assertEquals(assignor.getUuidToConsumer().keySet(),
Sets.newHashSet("A-1"));
+
+ roundRobinConsumerSelector.clear();
+ roundRobinConsumerSelector.addConsumers(consumerB, consumerA);
+ assertSame(roundRobinConsumerSelector.peek(), consumerB);
+ final List<EntryAndMetadata> leftEntries = new
ArrayList<>(replayQueue);
+ replayQueue.clear();
+ result = assignor.assign(leftEntries, 2);
+ // Since uuid "A-1" is still cached, A-1-2-3 will be dispatched to
consumerA even if consumerB is the first
+ // consumer returned by roundRobinConsumerSelector.get()
+ assertEquals(toString(result.getOrDefault(consumerA,
Collections.emptyList())),
+ Collections.singletonList("0:5@A-1-2-3"));
+ assertNull(result.get(consumerB));
+ assertTrue(replayQueue.isEmpty());
+ assertTrue(assignor.getUuidToConsumer().isEmpty());
+ }
+
+ @RequiredArgsConstructor
+ static class ConsumerSelector implements Supplier<Consumer> {
+
+ private final List<Consumer> consumers = new ArrayList<>();
+ private int index = 0;
+
+ public void addConsumers(Consumer... consumers) {
+ this.consumers.addAll(Arrays.asList(consumers));
+ }
+
+ public void clear() {
+ consumers.clear();
+ index = 0;
+ }
+
+ @Override
+ public Consumer get() {
+ // a simple round-robin dispatcher
+ final Consumer consumer = peek();
+ if (consumer == null) {
+ return null;
+ }
+ index++;
+ return consumer;
+ }
+
+ public Consumer peek() {
+ if (consumers.isEmpty()) {
+ return null;
+ }
+ return consumers.get(index % consumers.size());
+ }
+ }
+
+ private static List<String> toString(final List<EntryAndMetadata>
entryAndMetadataList) {
+ return
entryAndMetadataList.stream().map(EntryAndMetadata::toString).collect(Collectors.toList());
+ }
+
+ @RequiredArgsConstructor
+ static class MockProducer {
+ final String name;
+ final AtomicLong entryId;
+ final List<EntryAndMetadata> entryAndMetadataList;
+ long sequenceId = 0L;
+
+ void sendMessage() {
+
entryAndMetadataList.add(createEntryAndMetadata(entryId.getAndIncrement(),
+ createMetadata(name, sequenceId++, null, null)));
+ }
+
+ void sendChunk(int chunkId, int numChunks) {
+
entryAndMetadataList.add(createEntryAndMetadata(entryId.getAndIncrement(),
+ createMetadata(name, sequenceId, chunkId, numChunks)));
+ if (chunkId == numChunks - 1) {
+ sequenceId++;
+ }
+ }
+ }
+
+ private static EntryAndMetadata createEntryAndMetadata(final long entryId,
+ final
MessageMetadata metadata) {
+ final ByteBuf payload = Commands.serializeMetadataAndPayload(
+ Commands.ChecksumType.Crc32c, metadata,
PulsarByteBufAllocator.DEFAULT.buffer());
+ return EntryAndMetadata.create(EntryImpl.create(0L, entryId, payload));
+ }
+
+ private static MessageMetadata createMetadata(final String producerName,
+ final long sequenceId,
+ final Integer chunkId,
+ final Integer numChunks) {
+ final MessageMetadata metadata = new MessageMetadata();
+ metadata.setProducerName(producerName);
+ metadata.setSequenceId(sequenceId);
+ metadata.setPublishTime(0L);
+ if (chunkId != null && numChunks != null) {
+ metadata.setUuid(producerName + "-" + sequenceId);
+ metadata.setChunkId(chunkId);
+ metadata.setNumChunksFromMsg(numChunks);
+ }
+ return metadata;
+ }
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingSharedTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingSharedTest.java
new file mode 100644
index 00000000000..e2aa1281341
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingSharedTest.java
@@ -0,0 +1,249 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+import io.netty.buffer.ByteBuf;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerBuilder;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.MessageListener;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
+import org.apache.pulsar.common.api.proto.MessageMetadata;
+import org.apache.pulsar.common.protocol.Commands;
+import org.testcontainers.shaded.org.awaitility.Awaitility;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+@Slf4j
+@Test(groups = "broker-impl")
+public class MessageChunkingSharedTest extends ProducerConsumerBase {
+
+ private static final int MAX_MESSAGE_SIZE = 100;
+
+ @BeforeClass
+ @Override
+ protected void setup() throws Exception {
+ super.internalSetup();
+ super.producerBaseSetup();
+ }
+
+ @AfterClass(alwaysRun = true)
+ @Override
+ protected void cleanup() throws Exception {
+ super.internalCleanup();
+ }
+
+ @Test
+ public void testSingleConsumer() throws Exception {
+ final String topic = "my-property/my-ns/test-single-consumer";
+ @Cleanup final Producer<String> producer = createProducer(topic);
+ @Cleanup final Consumer<String> consumer =
pulsarClient.newConsumer(Schema.STRING)
+ .topic(topic)
+ .subscriptionName("sub")
+ .subscriptionType(SubscriptionType.Shared)
+ .receiverQueueSize(5)
+ .subscribe();
+
+ final List<String> values = new ArrayList<>();
+ values.add(createChunkedMessage(1)); // non-chunk
+ values.add(createChunkedMessage(10)); // number of chunks > receiver
queue size
+ values.add(createChunkedMessage(4)); // number of chunks < receiver
queue size
+ for (String value : values) {
+ final MessageId messageId = producer.send(value);
+ log.info("Sent {} bytes to {}", value.length(), messageId);
+ }
+
+ final List<String> receivedValues = new ArrayList<>();
+ for (int i = 0; i < values.size(); i++) {
+ final Message<String> message = consumer.receive(3,
TimeUnit.SECONDS);
+ if (message == null) {
+ break;
+ }
+ receivedValues.add(message.getValue());
+ log.info("Received {} bytes from {}", message.getValue().length(),
message.getMessageId());
+ consumer.acknowledge(message);
+ }
+ assertEquals(receivedValues, values);
+ }
+
+ @Test
+ public void testMultiConsumers() throws Exception {
+ final String topic = "my-property/my-ns/test-multi-consumers";
+ @Cleanup final Producer<String> producer = createProducer(topic);
+ final ConsumerBuilder<String> consumerBuilder =
pulsarClient.newConsumer(Schema.STRING)
+ .topic(topic)
+ .subscriptionName("sub")
+ .subscriptionType(SubscriptionType.Shared)
+ .receiverQueueSize(5);
+
+ final List<String> receivedValues1 = Collections.synchronizedList(new
ArrayList<>());
+ @Cleanup final Consumer<String> consumer1 = consumerBuilder
+ .messageListener((MessageListener<String>) (consumer, msg) ->
receivedValues1.add(msg.getValue()))
+ .subscribe();
+ final List<String> receivedValues2 = Collections.synchronizedList(new
ArrayList<>());
+ @Cleanup final Consumer<String> consumer2 = consumerBuilder
+ .messageListener((MessageListener<String>) (consumer, msg) ->
receivedValues2.add(msg.getValue()))
+ .subscribe();
+
+ final Set<String> values = new HashSet<>();
+ for (int i = 0; i < 10; i++) {
+ values.add(createChunkedMessage(4));
+ }
+ for (String value : values) {
+ producer.send(value);
+ }
+
+ Awaitility.await().atMost(Duration.ofSeconds(3))
+ .until(() -> receivedValues1.size() + receivedValues2.size()
>= values.size());
+ assertEquals(receivedValues1.size() + receivedValues2.size(),
values.size());
+ assertFalse(receivedValues1.isEmpty());
+ assertFalse(receivedValues2.isEmpty());
+ for (String value : receivedValues1) {
+ assertTrue(values.contains(value));
+ }
+ for (String value : receivedValues2) {
+ assertTrue(values.contains(value));
+ }
+ }
+
+ @Test
+ public void testInterleavedChunks() throws Exception {
+ final String topic =
"persistent://my-property/my-ns/test-interleaved-chunks";
+ final ConsumerBuilder<byte[]> consumerBuilder =
pulsarClient.newConsumer()
+ .topic(topic)
+ .subscriptionName("sub")
+ .subscriptionType(SubscriptionType.Shared);
+ final List<String> receivedUuidList1 =
Collections.synchronizedList(new ArrayList<>());
+ final Consumer<byte[]> consumer1 =
consumerBuilder.messageListener((MessageListener<byte[]>)
+ (consumer, msg) ->
receivedUuidList1.add(msg.getProducerName() + "-" + msg.getSequenceId()))
+ .consumerName("consumer-1")
+ .subscribe();
+ final PersistentTopic persistentTopic = (PersistentTopic)
pulsar.getBrokerService()
+ .getTopicIfExists(topic).get().orElse(null);
+ assertNotNull(persistentTopic);
+ // send: A-0, A-1-0-3, A-1-1-3, B-0, B-1-0-2
+ sendNonChunk(persistentTopic, "A", 0);
+ sendChunk(persistentTopic, "A", 1, 0, 3);
+ sendChunk(persistentTopic, "A", 1, 1, 3);
+ sendNonChunk(persistentTopic, "B", 0);
+ sendChunk(persistentTopic, "B", 1, 0, 2);
+
+ Awaitility.await().atMost(Duration.ofSeconds(3)).until(() ->
receivedUuidList1.size() >= 2);
+ assertEquals(receivedUuidList1, Arrays.asList("A-0", "B-0"));
+
+ // complete all 2 chunks of B-1
+ sendChunk(persistentTopic, "B", 1, 1, 2);
+ Awaitility.await().atMost(Duration.ofSeconds(3)).until(() ->
receivedUuidList1.size() >= 3);
+ assertEquals(receivedUuidList1, Arrays.asList("A-0", "B-0", "B-1"));
+
+ // complete all 3 chunks of A-1
+ sendChunk(persistentTopic, "A", 1, 2, 3);
+ Awaitility.await().atMost(Duration.ofSeconds(3)).until(() ->
receivedUuidList1.size() >= 3);
+ assertEquals(receivedUuidList1, Arrays.asList("A-0", "B-0", "B-1",
"A-1"));
+
+ final List<String> receivedUuidList2 =
Collections.synchronizedList(new ArrayList<>());
+ @Cleanup final Consumer<byte[]> consumer2 =
consumerBuilder.messageListener((MessageListener<byte[]>)
+ (consumer, msg) ->
receivedUuidList2.add(msg.getProducerName() + "-" + msg.getSequenceId()))
+ .consumerName("consumer-2")
+ .subscribe();
+ consumer1.close();
+
+ // Since messages were never acknowledged, all messages will be
redelivered to consumer-2
+ Awaitility.await().atMost(Duration.ofSeconds(3)).until(() ->
receivedUuidList2.size() >= 4);
+ assertEquals(receivedUuidList1, Arrays.asList("A-0", "B-0", "B-1",
"A-1"));
+ }
+
+ private Producer<String> createProducer(String topic) throws
PulsarClientException {
+ return pulsarClient.newProducer(Schema.STRING)
+ .topic(topic)
+ .enableChunking(true)
+ .enableBatching(false)
+ .chunkMaxMessageSize(MAX_MESSAGE_SIZE)
+ .create();
+ }
+
+ private static String createChunkedMessage(int numChunks) {
+ assert numChunks >= 1;
+ final byte[] payload = new byte[(numChunks - 1) * MAX_MESSAGE_SIZE +
MAX_MESSAGE_SIZE / 10];
+ final Random random = new Random();
+ for (int i = 0; i < payload.length; i++) {
+ payload[i] = (byte) ('a' + random.nextInt(26));
+ }
+ return Schema.STRING.decode(payload);
+ }
+
+ private static void sendNonChunk(final PersistentTopic persistentTopic,
+ final String producerName,
+ final long sequenceId) {
+ sendChunk(persistentTopic, producerName, sequenceId, null, null);
+ }
+
+ private static void sendChunk(final PersistentTopic persistentTopic,
+ final String producerName,
+ final long sequenceId,
+ final Integer chunkId,
+ final Integer numChunks) {
+ final MessageMetadata metadata = new MessageMetadata();
+ metadata.setProducerName(producerName);
+ metadata.setSequenceId(sequenceId);
+ metadata.setPublishTime(System.currentTimeMillis());
+ if (chunkId != null && numChunks != null) {
+ metadata.setUuid(producerName + "-" + sequenceId);
+ metadata.setChunkId(chunkId);
+ metadata.setNumChunksFromMsg(numChunks);
+ metadata.setTotalChunkMsgSize(numChunks);
+ }
+ final ByteBuf buf =
Commands.serializeMetadataAndPayload(Commands.ChecksumType.Crc32c, metadata,
+ PulsarByteBufAllocator.DEFAULT.buffer(1));
+ persistentTopic.publishMessage(buf, (e, ledgerId, entryId) -> {
+ String name = producerName + "-" + sequenceId;
+ if (chunkId != null) {
+ name += "-" + chunkId + "-" + numChunks;
+ }
+ if (e == null) {
+ log.info("Sent {} to ({}, {})", name, ledgerId, entryId);
+ } else {
+ log.error("Failed to send {}: {}", name, e.getMessage());
+ }
+ });
+ }
+}
diff --git a/pulsar-client-cpp/lib/ConsumerImpl.cc
b/pulsar-client-cpp/lib/ConsumerImpl.cc
index 63d2afc29bc..a9ed815ac2a 100644
--- a/pulsar-client-cpp/lib/ConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/ConsumerImpl.cc
@@ -406,9 +406,7 @@ void ConsumerImpl::messageReceived(const
ClientConnectionPtr& cnx, const proto::
metadata.encryption_keys_size() <= 0 ||
config_.getCryptoKeyReader().get() ||
config_.getCryptoFailureAction() ==
ConsumerCryptoFailureAction::CONSUME;
- const bool isChunkedMessage = metadata.num_chunks_from_msg() > 1 &&
- config_.getConsumerType() !=
ConsumerType::ConsumerShared &&
- config_.getConsumerType() !=
ConsumerType::ConsumerKeyShared;
+ const bool isChunkedMessage = metadata.num_chunks_from_msg() > 1;
if (isMessageDecryptable && !isChunkedMessage) {
if (!uncompressMessageIfNeeded(cnx, msg.message_id(), metadata,
payload, true)) {
// Message was discarded on decompression error
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 4d858ae6263..5bc998c4526 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -1297,7 +1297,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
final int numMessages = msgMetadata.getNumMessagesInBatch();
final int numChunks = msgMetadata.hasNumChunksFromMsg() ?
msgMetadata.getNumChunksFromMsg() : 0;
- final boolean isChunkedMessage = numChunks > 1 &&
conf.getSubscriptionType() != SubscriptionType.Shared;
+ final boolean isChunkedMessage = numChunks > 1;
MessageIdImpl msgId = new MessageIdImpl(messageId.getLedgerId(),
messageId.getEntryId(), getPartitionIndex());
if (acknowledgmentsGroupingTracker.isDuplicate(msgId)) {