This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new b1a29b520d3 [feature][broker] PIP 37: Support chunking with Shared 
subscription (#16202)
b1a29b520d3 is described below

commit b1a29b520d34d60e60160e3a7b9b0e26926063ee
Author: Yunze Xu <[email protected]>
AuthorDate: Tue Aug 2 13:41:07 2022 +0800

    [feature][broker] PIP 37: Support chunking with Shared subscription (#16202)
    
    * [feature][broker] PIP 37: Support chunking with Shared subscription
    
    ### Motivation
    
    
https://github.com/apache/pulsar/wiki/PIP-37%3A-Large-message-size-handling-in-Pulsar#option-1-broker-caches-mapping-of-message-uuid-and-consumerid
    
    The cons of option 1 described in the original proposal don't exist
    for current code because broker keeps redelivered messages into
    **sorted** map now.
    
    ### Modifications
    
    First of all, to avoid too many code changes, an `EntryAndMetadata`
    class is introduced to bind the `Entry` with the associated
    `MessageMetadata` to avoid parsing the metadata repeatedly. It also
    implements the `Entry` interface, so this PR changes some
    `List<Entry>` parameters to `List<? extends Entry>` so that a
    `List<EntryAndMetadata>` argument can be accepted.
    
    Then, a `SharedConsumerAssignor` is introduced to assign a list of
    entries to all shared consumers.
    1. Use a default selector to select the next consumer, like
       `PersistentDispatcherMultipleConsumers#getNextConsumer`,
    2. Each time a consumer is chosen, assign the entries in range
       [i, i+permits) to the consumer except entries that have uuid:
      - If uuid is not cached, cache `uuid -> consumer` to indicate the
        chunked message of this uuid must be dispatched to this consumer.
      - Otherwise, assign this entry to the owner consumer of the uuid.
    
    The `assign` method returns a map that maps `Consumer` to
    `List<EntryAndMetadata>`. The following logic is similar to the
    Key_Shared dispatcher.
    
    Finally, cancel the limit in `ConsumerImpl`.
    
    ### Verifying this change
    
    `SharedConsumerAssignorTest` is added to show how the assignor works
    in detail.
    
    `MessageChunkingSharedTest` is added to verify the Shared dispatcher
    works on chunked messages, including:
    - Single producer sends chunked messages with various chunk count to a
      consumer has a limited permits.
    - Single producer sends chunked messages to two consumers to verify
      both they can receive chunked messages.
    - Produce interleaved chunks via `PersistentTopic` directly to simulate
      multiple producers, and verify the new consumer can receive all
      unacknowledged messages received by the old consumer.
---
 .../broker/service/AbstractBaseDispatcher.java     |  11 +-
 .../org/apache/pulsar/broker/service/Consumer.java |  49 +++-
 .../pulsar/broker/service/EntryAndMetadata.java    | 114 ++++++++++
 .../pulsar/broker/service/PulsarCommandSender.java |   5 +-
 .../broker/service/PulsarCommandSenderImpl.java    |   5 +-
 .../broker/service/SharedConsumerAssignor.java     | 139 ++++++++++++
 .../PersistentDispatcherMultipleConsumers.java     | 124 ++++++++--
 .../broker/service/SharedConsumerAssignorTest.java | 222 ++++++++++++++++++
 .../client/impl/MessageChunkingSharedTest.java     | 249 +++++++++++++++++++++
 pulsar-client-cpp/lib/ConsumerImpl.cc              |   4 +-
 .../apache/pulsar/client/impl/ConsumerImpl.java    |   2 +-
 11 files changed, 890 insertions(+), 34 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
index de62df77e98..47728cb5b22 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
@@ -79,7 +79,7 @@ public abstract class AbstractBaseDispatcher extends 
EntryFilterSupport implemen
      * @param sendMessageInfo
      *            an object where the total size in messages and bytes will be 
returned back to the caller
      */
-    public int filterEntriesForConsumer(List<Entry> entries, EntryBatchSizes 
batchSizes,
+    public int filterEntriesForConsumer(List<? extends Entry> entries, 
EntryBatchSizes batchSizes,
             SendMessageInfo sendMessageInfo, EntryBatchIndexesAcks indexesAcks,
             ManagedCursor cursor, boolean isReplayRead, Consumer consumer) {
         return filterEntriesForConsumer(Optional.empty(), 0, entries, 
batchSizes, sendMessageInfo, indexesAcks, cursor,
@@ -96,7 +96,7 @@ public abstract class AbstractBaseDispatcher extends 
EntryFilterSupport implemen
      *   EntryBatchIndexesAcks, ManagedCursor, boolean, Consumer)
      */
     public int filterEntriesForConsumer(Optional<MessageMetadata[]> 
optMetadataArray, int startOffset,
-             List<Entry> entries, EntryBatchSizes batchSizes, SendMessageInfo 
sendMessageInfo,
+             List<? extends Entry> entries, EntryBatchSizes batchSizes, 
SendMessageInfo sendMessageInfo,
              EntryBatchIndexesAcks indexesAcks, ManagedCursor cursor, boolean 
isReplayRead, Consumer consumer) {
         int totalMessages = 0;
         long totalBytes = 0;
@@ -105,14 +105,17 @@ public abstract class AbstractBaseDispatcher extends 
EntryFilterSupport implemen
         List<Position> entriesToFiltered = 
CollectionUtils.isNotEmpty(entryFilters) ? new ArrayList<>() : null;
         List<PositionImpl> entriesToRedeliver = 
CollectionUtils.isNotEmpty(entryFilters) ? new ArrayList<>() : null;
         for (int i = 0, entriesSize = entries.size(); i < entriesSize; i++) {
-            Entry entry = entries.get(i);
+            final Entry entry = entries.get(i);
             if (entry == null) {
                 continue;
             }
             ByteBuf metadataAndPayload = entry.getDataBuffer();
             final int metadataIndex = i + startOffset;
             final MessageMetadata msgMetadata = 
optMetadataArray.map(metadataArray -> metadataArray[metadataIndex])
-                    .orElseGet(() -> 
Commands.peekMessageMetadata(metadataAndPayload, subscription.toString(), -1));
+                    .orElseGet(() -> (entry instanceof EntryAndMetadata)
+                            ? ((EntryAndMetadata) entry).getMetadata()
+                            : 
Commands.peekAndCopyMessageMetadata(metadataAndPayload, 
subscription.toString(), -1)
+                    );
             EntryFilter.FilterResult filterResult = runFiltersForEntry(entry, 
msgMetadata, consumer);
             if (filterResult == EntryFilter.FilterResult.REJECT) {
                 entriesToFiltered.add(entry.getPosition());
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index 02304fe7ff3..37dfe087e7f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.broker.service;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static 
org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.MoreObjects;
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.AtomicDouble;
@@ -203,6 +204,34 @@ public class Consumer {
                 
.getPulsar().getConfiguration().isAcknowledgmentAtBatchIndexLevelEnabled();
     }
 
+    @VisibleForTesting
+    Consumer(String consumerName, int availablePermits) {
+        this.subscription = null;
+        this.subType = null;
+        this.cnx = null;
+        this.appId = null;
+        this.topicName = null;
+        this.partitionIdx = 0;
+        this.consumerId = 0L;
+        this.priorityLevel = 0;
+        this.readCompacted = false;
+        this.consumerName = consumerName;
+        this.msgOut = null;
+        this.msgRedeliver = null;
+        this.msgOutCounter = null;
+        this.bytesOutCounter = null;
+        this.messageAckRate = null;
+        this.pendingAcks = null;
+        this.stats = null;
+        this.isDurable = false;
+        this.metadata = null;
+        this.keySharedMeta = null;
+        this.clientAddress = null;
+        this.startMessageId = null;
+        this.isAcknowledgmentAtBatchIndexLevelEnabled = false;
+        MESSAGE_PERMITS_UPDATER.set(this, availablePermits);
+    }
+
     public SubType subType() {
         return subType;
     }
@@ -227,7 +256,7 @@ public class Consumer {
         return readCompacted;
     }
 
-    public Future<Void> sendMessages(final List<Entry> entries, 
EntryBatchSizes batchSizes,
+    public Future<Void> sendMessages(final List<? extends Entry> entries, 
EntryBatchSizes batchSizes,
                                      EntryBatchIndexesAcks batchIndexesAcks,
                                      int totalMessages, long totalBytes, long 
totalChunkedMessages,
                                      RedeliveryTracker redeliveryTracker) {
@@ -241,7 +270,7 @@ public class Consumer {
      *
      * @return a SendMessageInfo object that contains the detail of what was 
sent to consumer
      */
-    public Future<Void> sendMessages(final List<Entry> entries, 
EntryBatchSizes batchSizes,
+    public Future<Void> sendMessages(final List<? extends Entry> entries, 
EntryBatchSizes batchSizes,
                                      EntryBatchIndexesAcks batchIndexesAcks,
                                      int totalMessages, long totalBytes, long 
totalChunkedMessages,
                                      RedeliveryTracker redeliveryTracker, long 
epoch) {
@@ -820,8 +849,13 @@ public class Consumer {
 
     @Override
     public String toString() {
-        return MoreObjects.toStringHelper(this).add("subscription", 
subscription).add("consumerId", consumerId)
-                .add("consumerName", consumerName).add("address", 
this.cnx.clientAddress()).toString();
+        if (subscription != null && cnx != null) {
+            return MoreObjects.toStringHelper(this).add("subscription", 
subscription).add("consumerId", consumerId)
+                    .add("consumerName", consumerName).add("address", 
this.cnx.clientAddress()).toString();
+        } else {
+            return MoreObjects.toStringHelper(this).add("consumerId", 
consumerId)
+                    .add("consumerName", consumerName).toString();
+        }
     }
 
     public CompletableFuture<Void> checkPermissionsAsync() {
@@ -1041,7 +1075,12 @@ public class Consumer {
     }
 
     private int getStickyKeyHash(Entry entry) {
-        byte[] stickyKey = Commands.peekStickyKey(entry.getDataBuffer(), 
topicName, subscription.getName());
+        final byte[] stickyKey;
+        if (entry instanceof EntryAndMetadata) {
+            stickyKey = ((EntryAndMetadata) entry).getStickyKey();
+        } else {
+            stickyKey = Commands.peekStickyKey(entry.getDataBuffer(), 
topicName, subscription.getName());
+        }
         return StickyKeyConsumerSelector.makeStickyKeyHash(stickyKey);
     }
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/EntryAndMetadata.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/EntryAndMetadata.java
new file mode 100644
index 00000000000..0bcd215f8be
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/EntryAndMetadata.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import com.google.common.annotations.VisibleForTesting;
+import io.netty.buffer.ByteBuf;
+import java.nio.charset.StandardCharsets;
+import javax.annotation.Nullable;
+import lombok.Getter;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.pulsar.common.api.proto.MessageMetadata;
+import org.apache.pulsar.common.protocol.Commands;
+
+public class EntryAndMetadata implements Entry {
+
+    private final Entry entry;
+    @Getter
+    @Nullable
+    private final MessageMetadata metadata;
+
+    private EntryAndMetadata(final Entry entry, @Nullable final 
MessageMetadata metadata) {
+        this.entry = entry;
+        this.metadata = metadata;
+    }
+
+    public static EntryAndMetadata create(final Entry entry, final 
MessageMetadata metadata) {
+        return new EntryAndMetadata(entry, metadata);
+    }
+
+    @VisibleForTesting
+    static EntryAndMetadata create(final Entry entry) {
+        return create(entry, 
Commands.peekAndCopyMessageMetadata(entry.getDataBuffer(), "", -1));
+    }
+
+    public byte[] getStickyKey() {
+        if (metadata != null) {
+            if (metadata.hasOrderingKey()) {
+                return metadata.getOrderingKey();
+            } else if (metadata.hasPartitionKey()) {
+                return 
metadata.getPartitionKey().getBytes(StandardCharsets.UTF_8);
+            }
+        }
+        return "NONE_KEY".getBytes(StandardCharsets.UTF_8);
+    }
+
+    @Override
+    public String toString() {
+        String s = entry.getLedgerId() + ":" + entry.getEntryId();
+        if (metadata != null) {
+            s += ("@" + metadata.getProducerName() + "-" + 
metadata.getSequenceId());
+            if (metadata.hasChunkId() && metadata.hasNumChunksFromMsg()) {
+                s += ("-" + metadata.getChunkId() + "-" + 
metadata.getNumChunksFromMsg());
+            }
+        }
+        return s;
+    }
+
+    @Override
+    public byte[] getData() {
+        return entry.getData();
+    }
+
+    @Override
+    public byte[] getDataAndRelease() {
+        return entry.getDataAndRelease();
+    }
+
+    @Override
+    public int getLength() {
+        return entry.getLength();
+    }
+
+    @Override
+    public ByteBuf getDataBuffer() {
+        return entry.getDataBuffer();
+    }
+
+    @Override
+    public Position getPosition() {
+        return entry.getPosition();
+    }
+
+    @Override
+    public long getLedgerId() {
+        return entry.getLedgerId();
+    }
+
+    @Override
+    public long getEntryId() {
+        return entry.getEntryId();
+    }
+
+    @Override
+    public boolean release() {
+        return entry.release();
+    }
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSender.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSender.java
index 8934d2e6ce6..dc5b97d846f 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSender.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSender.java
@@ -78,8 +78,9 @@ public interface PulsarCommandSender {
     void sendReachedEndOfTopic(long consumerId);
 
     Future<Void> sendMessagesToConsumer(long consumerId, String topicName, 
Subscription subscription,
-            int partitionIdx, List<Entry> entries, EntryBatchSizes batchSizes, 
EntryBatchIndexesAcks batchIndexesAcks,
-            RedeliveryTracker redeliveryTracker, long epoch);
+                                        int partitionIdx, List<? extends 
Entry> entries, EntryBatchSizes batchSizes,
+                                        EntryBatchIndexesAcks batchIndexesAcks,
+                                        RedeliveryTracker redeliveryTracker, 
long epoch);
 
     void sendTcClientConnectResponse(long requestId, ServerError error, String 
message);
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java
index 5b5af41d1ef..8eb00b6c295 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java
@@ -221,8 +221,9 @@ public class PulsarCommandSenderImpl implements 
PulsarCommandSender {
 
     @Override
     public ChannelPromise sendMessagesToConsumer(long consumerId, String 
topicName, Subscription subscription,
-            int partitionIdx, List<Entry> entries, EntryBatchSizes batchSizes, 
EntryBatchIndexesAcks batchIndexesAcks,
-            RedeliveryTracker redeliveryTracker, long epoch) {
+                                                 int partitionIdx, List<? 
extends Entry> entries,
+                                                 EntryBatchSizes batchSizes, 
EntryBatchIndexesAcks batchIndexesAcks,
+                                                 RedeliveryTracker 
redeliveryTracker, long epoch) {
         final ChannelHandlerContext ctx = cnx.ctx();
         final ChannelPromise writePromise = ctx.newPromise();
         ctx.channel().eventLoop().execute(() -> {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SharedConsumerAssignor.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SharedConsumerAssignor.java
new file mode 100644
index 00000000000..e3fb531877a
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SharedConsumerAssignor.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.ArrayList;
+import java.util.IdentityHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Supplier;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import org.apache.pulsar.common.api.proto.MessageMetadata;
+
+/**
+ * The assigner to assign entries to the proper {@link Consumer} in the shared 
subscription.
+ */
+@RequiredArgsConstructor
+public class SharedConsumerAssignor {
+
+    // The cache to map uuid to a consumer because we need to guarantee all 
chunks with the same uuid to be dispatched
+    // to a fixed consumer until the whole chunked message is dispatched.
+    @Getter
+    @VisibleForTesting
+    private final Map<String, Consumer> uuidToConsumer = new 
ConcurrentHashMap<>();
+
+    // A temporary cache that is cleared each time `assign()` is called
+    private final Map<Consumer, Integer> consumerToPermits = new 
IdentityHashMap<>();
+
+    // The selector for entries without uuid. The Consumer returned must have 
at least 1 permit.
+    private final Supplier<Consumer> defaultSelector;
+
+    // Process the unassigned messages, e.g. adding them to the replay queue
+    private final java.util.function.Consumer<EntryAndMetadata> 
unassignedMessageProcessor;
+
+    public Map<Consumer, List<EntryAndMetadata>> assign(final 
List<EntryAndMetadata> entryAndMetadataList,
+                                                        final int 
numConsumers) {
+        assert numConsumers >= 0;
+        consumerToPermits.clear();
+        final Map<Consumer, List<EntryAndMetadata>> consumerToEntries = new 
IdentityHashMap<>();
+
+        Consumer consumer = getConsumer(numConsumers);
+        if (consumer == null) {
+            entryAndMetadataList.forEach(EntryAndMetadata::release);
+            return consumerToEntries;
+        }
+        // The actual available permits might change, here we use the permits 
at the moment to assign entries
+        int availablePermits = consumerToPermits.computeIfAbsent(consumer, 
Consumer::getAvailablePermits);
+        int index = 0;
+        for (; index < entryAndMetadataList.size(); index++) {
+            final EntryAndMetadata entryAndMetadata = 
entryAndMetadataList.get(index);
+            final MessageMetadata metadata = entryAndMetadata.getMetadata();
+
+            // Select another consumer to ensure `consumer != null` and 
`availablePermits > 0`
+            if (availablePermits <= 0) {
+                consumerToPermits.put(consumer, availablePermits);
+                consumer = getConsumer(numConsumers);
+                if (consumer == null) {
+                    break;
+                }
+                availablePermits = consumer.getAvailablePermits();
+            }
+
+            if (metadata == null || !metadata.hasUuid() || 
!metadata.hasChunkId() || !metadata.hasNumChunksFromMsg()) {
+                consumerToEntries.computeIfAbsent(consumer, __ -> new 
ArrayList<>()).add(entryAndMetadata);
+            } else {
+                final Consumer consumerForUuid = getConsumerForUuid(metadata, 
consumer, availablePermits);
+                if (consumerForUuid == null) {
+                    unassignedMessageProcessor.accept(entryAndMetadata);
+                    continue;
+                }
+                consumerToEntries.computeIfAbsent(consumerForUuid, __ -> new 
ArrayList<>()).add(entryAndMetadata);
+            }
+            availablePermits--;
+        }
+
+        for (; index < entryAndMetadataList.size(); index++) {
+            unassignedMessageProcessor.accept(entryAndMetadataList.get(index));
+        }
+
+        return consumerToEntries;
+    }
+
+    private Consumer getConsumer(final int numConsumers) {
+        for (int i = 0; i < numConsumers; i++) {
+            final Consumer consumer = defaultSelector.get();
+            if (consumer == null) {
+                return null;
+            }
+            final int permits = consumerToPermits.computeIfAbsent(consumer, 
Consumer::getAvailablePermits);
+            if (permits > 0) {
+                return consumer;
+            }
+        }
+        return null;
+    }
+
+    private Consumer getConsumerForUuid(final MessageMetadata metadata,
+                                        final Consumer defaultConsumer,
+                                        final int currentAvailablePermits) {
+        final String uuid = metadata.getUuid();
+        Consumer consumer = uuidToConsumer.get(uuid);
+        if (consumer == null) {
+            if (metadata.getChunkId() != 0) {
+                // Not the first chunk, skip it
+                return null;
+            }
+            consumer = defaultConsumer;
+            uuidToConsumer.put(uuid, consumer);
+        }
+        final int permits = consumerToPermits.computeIfAbsent(consumer, 
Consumer::getAvailablePermits);
+        if (permits <= 0) {
+            return null;
+        }
+        if (metadata.getChunkId() == metadata.getNumChunksFromMsg() - 1) {
+            // The last chunk is received, we should remove the cache
+            uuidToConsumer.remove(uuid);
+        }
+        consumerToPermits.put(consumer, currentAvailablePermits - 1);
+        return consumer;
+    }
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index cf58bfd43ac..167bc188e99 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -22,10 +22,11 @@ import static 
org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
 import static 
org.apache.pulsar.broker.service.persistent.PersistentTopic.MESSAGE_RATE_BACKOFF_MS;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Range;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
-import java.util.Objects;
+import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
@@ -33,8 +34,8 @@ import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
-import java.util.stream.Stream;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.ManagedCursor;
@@ -50,12 +51,14 @@ import 
org.apache.pulsar.broker.service.BrokerServiceException;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException;
 import org.apache.pulsar.broker.service.Consumer;
 import org.apache.pulsar.broker.service.Dispatcher;
+import org.apache.pulsar.broker.service.EntryAndMetadata;
 import org.apache.pulsar.broker.service.EntryBatchIndexesAcks;
 import org.apache.pulsar.broker.service.EntryBatchSizes;
 import org.apache.pulsar.broker.service.InMemoryRedeliveryTracker;
 import org.apache.pulsar.broker.service.RedeliveryTracker;
 import org.apache.pulsar.broker.service.RedeliveryTrackerDisabled;
 import org.apache.pulsar.broker.service.SendMessageInfo;
+import org.apache.pulsar.broker.service.SharedConsumerAssignor;
 import org.apache.pulsar.broker.service.StickyKeyConsumerSelector;
 import org.apache.pulsar.broker.service.Subscription;
 import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter.Type;
@@ -109,9 +112,9 @@ public class PersistentDispatcherMultipleConsumers extends 
AbstractDispatcherMul
             
AtomicIntegerFieldUpdater.newUpdater(PersistentDispatcherMultipleConsumers.class,
                     "blockedDispatcherOnUnackedMsgs");
     protected Optional<DispatchRateLimiter> dispatchRateLimiter = 
Optional.empty();
-
     private AtomicBoolean isRescheduleReadInProgress = new 
AtomicBoolean(false);
     private final ExecutorService dispatchMessagesThread;
+    private final SharedConsumerAssignor assignor;
 
     protected enum ReadType {
         Normal, Replay
@@ -136,6 +139,7 @@ public class PersistentDispatcherMultipleConsumers extends 
AbstractDispatcherMul
                 : RedeliveryTrackerDisabled.REDELIVERY_TRACKER_DISABLED;
         this.readBatchSize = serviceConfig.getDispatcherMaxReadBatchSize();
         this.initializeDispatchRateLimiterIfNeeded();
+        this.assignor = new SharedConsumerAssignor(this::getNextConsumer, 
this::addMessageToReplay);
     }
 
     @Override
@@ -575,12 +579,26 @@ public class PersistentDispatcherMultipleConsumers 
extends AbstractDispatcherMul
         if (entriesToDispatch == 0) {
             return true;
         }
-        final MessageMetadata[] metadataArray = entries.stream()
-                .map(entry -> 
Commands.peekAndCopyMessageMetadata(entry.getDataBuffer(), 
subscription.toString(), -1))
-                .toArray(MessageMetadata[]::new);
-        int remainingMessages = 
Stream.of(metadataArray).filter(Objects::nonNull)
-                .map(MessageMetadata::getNumMessagesInBatch)
-                .reduce(0, Integer::sum);
+        final MessageMetadata[] metadataArray = new 
MessageMetadata[entries.size()];
+        int remainingMessages = 0;
+        boolean hasChunk = false;
+        for (int i = 0; i < metadataArray.length; i++) {
+            final MessageMetadata metadata = 
Commands.peekAndCopyMessageMetadata(
+                    entries.get(i).getDataBuffer(), subscription.toString(), 
-1);
+            if (metadata != null) {
+                remainingMessages += metadata.getNumMessagesInBatch();
+                if (!hasChunk && metadata.hasUuid()) {
+                    hasChunk = true;
+                }
+            }
+            metadataArray[i] = metadata;
+            if (!hasChunk && metadataArray[i] != null && 
metadataArray[i].hasUuid()) {
+                hasChunk = true;
+            }
+        }
+        if (hasChunk) {
+            return sendChunkedMessagesToConsumers(readType, entries, 
metadataArray);
+        }
 
         int start = 0;
         long totalMessagesSent = 0;
@@ -657,6 +675,23 @@ public class PersistentDispatcherMultipleConsumers extends 
AbstractDispatcherMul
             totalBytesSent += sendMessageInfo.getTotalBytes();
         }
 
+        acquirePermitsForDeliveredMessages(totalEntries, totalMessagesSent, 
totalBytesSent);
+
+        if (entriesToDispatch > 0) {
+            if (log.isDebugEnabled()) {
+                log.debug("[{}] No consumers found with available permits, 
storing {} positions for later replay", name,
+                        entries.size() - start);
+            }
+            entries.subList(start, entries.size()).forEach(entry -> {
+                long stickyKeyHash = getStickyKeyHash(entry);
+                addMessageToReplay(entry.getLedgerId(), entry.getEntryId(), 
stickyKeyHash);
+                entry.release();
+            });
+        }
+        return true;
+    }
+
+    private void acquirePermitsForDeliveredMessages(long totalEntries, long 
totalMessagesSent, long totalBytesSent) {
         // acquire message-dispatch permits for already delivered messages
         long permits = dispatchThrottlingOnBatchMessageEnabled ? totalEntries 
: totalMessagesSent;
         if (serviceConfig.isDispatchThrottlingOnNonBacklogConsumerEnabled() || 
!cursor.isActive()) {
@@ -671,19 +706,69 @@ public class PersistentDispatcherMultipleConsumers 
extends AbstractDispatcherMul
                 dispatchRateLimiter.get().tryDispatchPermit(permits, 
totalBytesSent);
             }
         }
+    }
+
+    private boolean sendChunkedMessagesToConsumers(ReadType readType,
+                                                   List<Entry> entries,
+                                                   MessageMetadata[] 
metadataArray) {
+        final List<EntryAndMetadata> originalEntryAndMetadataList = new 
ArrayList<>(metadataArray.length);
+        for (int i = 0; i < metadataArray.length; i++) {
+            
originalEntryAndMetadataList.add(EntryAndMetadata.create(entries.get(i), 
metadataArray[i]));
+        }
 
-        if (entriesToDispatch > 0) {
+        final Map<Consumer, List<EntryAndMetadata>> assignResult =
+                assignor.assign(originalEntryAndMetadataList, 
consumerList.size());
+        long totalMessagesSent = 0;
+        long totalBytesSent = 0;
+        long totalEntries = 0;
+        final AtomicInteger numConsumers = new 
AtomicInteger(assignResult.size());
+        for (Map.Entry<Consumer, List<EntryAndMetadata>> current : 
assignResult.entrySet()) {
+            final Consumer consumer = current.getKey();
+            final List<EntryAndMetadata> entryAndMetadataList = 
current.getValue();
+            final int messagesForC = Math.min(consumer.getAvailablePermits(), 
entryAndMetadataList.size());
             if (log.isDebugEnabled()) {
-                log.debug("[{}] No consumers found with available permits, 
storing {} positions for later replay", name,
-                        entries.size() - start);
+                log.debug("[{}] select consumer {} with messages num {}, read 
type is {}",
+                        name, consumer.consumerName(), messagesForC, readType);
             }
-            entries.subList(start, entries.size()).forEach(entry -> {
-                long stickyKeyHash = getStickyKeyHash(entry);
-                addMessageToReplay(entry.getLedgerId(), entry.getEntryId(), 
stickyKeyHash);
-                entry.release();
+            if (messagesForC < entryAndMetadataList.size()) {
+                for (int i = messagesForC; i < entryAndMetadataList.size(); 
i++) {
+                    final EntryAndMetadata entry = entryAndMetadataList.get(i);
+                    addMessageToReplay(entry);
+                    entryAndMetadataList.set(i, null);
+                }
+            }
+            if (messagesForC == 0) {
+                numConsumers.decrementAndGet();
+                continue;
+            }
+            if (readType == ReadType.Replay) {
+                entryAndMetadataList.stream().limit(messagesForC)
+                        .forEach(e -> 
redeliveryMessages.remove(e.getLedgerId(), e.getEntryId()));
+            }
+            final SendMessageInfo sendMessageInfo = 
SendMessageInfo.getThreadLocal();
+            final EntryBatchSizes batchSizes = 
EntryBatchSizes.get(messagesForC);
+            final EntryBatchIndexesAcks batchIndexesAcks = 
EntryBatchIndexesAcks.get(messagesForC);
+
+            totalEntries += filterEntriesForConsumer(entryAndMetadataList, 
batchSizes, sendMessageInfo,
+                    batchIndexesAcks, cursor, readType == ReadType.Replay, 
consumer);
+            consumer.sendMessages(entryAndMetadataList, batchSizes, 
batchIndexesAcks,
+                    sendMessageInfo.getTotalMessages(), 
sendMessageInfo.getTotalBytes(),
+                    sendMessageInfo.getTotalChunkedMessages(), 
getRedeliveryTracker()
+            ).addListener(future -> {
+                if (future.isDone() && numConsumers.decrementAndGet() == 0) {
+                    readMoreEntries();
+                }
             });
+
+            TOTAL_AVAILABLE_PERMITS_UPDATER.getAndAdd(this,
+                    -(sendMessageInfo.getTotalMessages() - 
batchIndexesAcks.getTotalAckedIndexCount()));
+            totalMessagesSent += sendMessageInfo.getTotalMessages();
+            totalBytesSent += sendMessageInfo.getTotalBytes();
         }
-        return true;
+
+        acquirePermitsForDeliveredMessages(totalEntries, totalMessagesSent, 
totalBytesSent);
+
+        return numConsumers.get() == 0; // trigger a new readMoreEntries() call
     }
 
     @Override
@@ -966,6 +1051,11 @@ public class PersistentDispatcherMultipleConsumers 
extends AbstractDispatcherMul
         }
     }
 
+    private void addMessageToReplay(Entry entry) {
+        addMessageToReplay(entry.getLedgerId(), entry.getEntryId());
+        entry.release();
+    }
+
     protected boolean addMessageToReplay(long ledgerId, long entryId, long 
stickyKeyHash) {
         if (checkIfMessageIsUnacked(ledgerId, entryId)) {
             redeliveryMessages.add(ledgerId, entryId, stickyKeyHash);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SharedConsumerAssignorTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SharedConsumerAssignorTest.java
new file mode 100644
index 00000000000..7649f6eac2d
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SharedConsumerAssignorTest.java
@@ -0,0 +1,222 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertSame;
+import static org.testng.Assert.assertTrue;
+import com.google.common.collect.Sets;
+import io.netty.buffer.ByteBuf;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import lombok.RequiredArgsConstructor;
+import org.apache.bookkeeper.mledger.impl.EntryImpl;
+import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
+import org.apache.pulsar.common.api.proto.MessageMetadata;
+import org.apache.pulsar.common.protocol.Commands;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker")
+public class SharedConsumerAssignorTest {
+
+    private final ConsumerSelector roundRobinConsumerSelector = new 
ConsumerSelector();
+    private final List<EntryAndMetadata> entryAndMetadataList = new 
ArrayList<>();
+    private final List<EntryAndMetadata> replayQueue = new ArrayList<>();
+    private SharedConsumerAssignor assignor;
+
+    @BeforeMethod
+    public void prepareData() {
+        roundRobinConsumerSelector.clear();
+        entryAndMetadataList.clear();
+        replayQueue.clear();
+        assignor = new SharedConsumerAssignor(roundRobinConsumerSelector, 
replayQueue::add);
+        final AtomicLong entryId = new AtomicLong(0L);
+        final MockProducer producerA = new MockProducer("A", entryId, 
entryAndMetadataList);
+        final MockProducer producerB = new MockProducer("B", entryId, 
entryAndMetadataList);
+        producerA.sendMessage();
+        producerA.sendChunk(0, 3);
+        producerA.sendChunk(1, 3);
+        producerB.sendMessage();
+        producerB.sendChunk(0, 2);
+        producerA.sendChunk(2, 3);
+        producerB.sendChunk(1, 2);
+        // Use the following data for all tests
+        // | entry id | uuid | chunk id | number of chunks |
+        // | -------- | ---- | -------- | ---------------- |
+        // | 0        | A-0  |          |                  |
+        // | 1        | A-1  | 0        | 3                |
+        // | 2        | A-1  | 1        | 3                |
+        // | 3        | B-0  |          |                  |
+        // | 4        | B-1  | 0        | 2                |
+        // | 5        | A-1  | 2        | 3                |
+        // | 6        | B-1  | 1        | 2               |
+        // P.S. In the table above, The uuid represents the 
"<producer-name>-<sequence-id>" for non-chunks
+        assertEquals(toString(entryAndMetadataList), Arrays.asList(
+                "0:0@A-0", "0:1@A-1-0-3", "0:2@A-1-1-3", "0:3@B-0", 
"0:4@B-1-0-2", "0:5@A-1-2-3", "0:6@B-1-1-2"));
+    }
+
+    @Test
+    public void testSingleConsumerMultiAssign() {
+        // Only first 5 entries can be received because the number of permits 
is 5
+        final Consumer consumer = new Consumer("A", 5);
+        roundRobinConsumerSelector.addConsumers(consumer);
+
+        Map<Consumer, List<EntryAndMetadata>> result = 
assignor.assign(entryAndMetadataList, 1);
+        assertEquals(result.getOrDefault(consumer, Collections.emptyList()), 
entryAndMetadataList.subList(0, 5));
+        // Since two chunked messages (A-1 and B-1) are both not received, 
these uuids have been cached
+        assertEquals(assignor.getUuidToConsumer().keySet(), 
Sets.newHashSet("A-1", "B-1"));
+        assertEquals(toString(replayQueue), Arrays.asList("0:5@A-1-2-3", 
"0:6@B-1-1-2"));
+
+        result = assignor.assign(entryAndMetadataList.subList(5, 6), 1);
+        assertEquals(result.getOrDefault(consumer, Collections.emptyList()), 
entryAndMetadataList.subList(5, 6));
+        // A-1 is received so that uuid "A-1" has been removed from the cache
+        assertEquals(assignor.getUuidToConsumer().keySet(), 
Sets.newHashSet("B-1"));
+
+        result = assignor.assign(entryAndMetadataList.subList(6, 7), 1);
+        assertEquals(result.getOrDefault(consumer, Collections.emptyList()), 
entryAndMetadataList.subList(6, 7));
+        assertTrue(assignor.getUuidToConsumer().isEmpty());
+    }
+
+    @Test
+    public void testMultiConsumerWithSmallPermits() {
+        final Consumer consumerA = new Consumer("A", 3);
+        final Consumer consumerB = new Consumer("B", 4);
+        roundRobinConsumerSelector.addConsumers(consumerA, consumerB);
+
+        // The original order:
+        //   A-0, A-1-0-3, A-1-1-3, B-0, B-1-0-2, A-1-2-3, B-1-1-2
+        // First consumerA received 3 entries and the available permits became 
0. Then the consumer is switched to
+        // consumerB and then B-0 and B-1-0-2 were received by it.
+        // When consumerB tried to receive A-1-2-3, since the uuid "A-1" was 
already assigned to consumerA, consumerB
+        // is not able to receive A-1-2-3.
+        // However, since consumerA has no more permits, A-1-2-3 cannot be 
assigned to consumerA as well.
+        // Therefore, A-1-2-3 was skipped and added to the replay queue.
+        Map<Consumer, List<EntryAndMetadata>> result = 
assignor.assign(entryAndMetadataList, 2);
+        assertEquals(toString(result.getOrDefault(consumerA, 
Collections.emptyList())),
+                Arrays.asList("0:0@A-0", "0:1@A-1-0-3", "0:2@A-1-1-3"));
+        assertEquals(toString(result.getOrDefault(consumerB, 
Collections.emptyList())),
+                Arrays.asList("0:3@B-0", "0:4@B-1-0-2", "0:6@B-1-1-2"));
+        assertEquals(toString(replayQueue), 
Collections.singletonList("0:5@A-1-2-3"));
+        assertEquals(assignor.getUuidToConsumer().keySet(), 
Sets.newHashSet("A-1"));
+
+        roundRobinConsumerSelector.clear();
+        roundRobinConsumerSelector.addConsumers(consumerB, consumerA);
+        assertSame(roundRobinConsumerSelector.peek(), consumerB);
+        final List<EntryAndMetadata> leftEntries = new 
ArrayList<>(replayQueue);
+        replayQueue.clear();
+        result = assignor.assign(leftEntries, 2);
+        // Since uuid "A-1" is still cached, A-1-2-3 will be dispatched to 
consumerA even if consumerB is the first
+        // consumer returned by roundRobinConsumerSelector.get()
+        assertEquals(toString(result.getOrDefault(consumerA, 
Collections.emptyList())),
+                Collections.singletonList("0:5@A-1-2-3"));
+        assertNull(result.get(consumerB));
+        assertTrue(replayQueue.isEmpty());
+        assertTrue(assignor.getUuidToConsumer().isEmpty());
+    }
+
+    @RequiredArgsConstructor
+    static class ConsumerSelector implements Supplier<Consumer> {
+
+        private final List<Consumer> consumers = new ArrayList<>();
+        private int index = 0;
+
+        public void addConsumers(Consumer... consumers) {
+            this.consumers.addAll(Arrays.asList(consumers));
+        }
+
+        public void clear() {
+            consumers.clear();
+            index = 0;
+        }
+
+        @Override
+        public Consumer get() {
+            // a simple round-robin dispatcher
+            final Consumer consumer = peek();
+            if (consumer == null) {
+                return null;
+            }
+            index++;
+            return consumer;
+        }
+
+        public Consumer peek() {
+            if (consumers.isEmpty()) {
+                return null;
+            }
+            return consumers.get(index % consumers.size());
+        }
+    }
+
+    private static List<String> toString(final List<EntryAndMetadata> 
entryAndMetadataList) {
+        return 
entryAndMetadataList.stream().map(EntryAndMetadata::toString).collect(Collectors.toList());
+    }
+
+    @RequiredArgsConstructor
+    static class MockProducer {
+        final String name;
+        final AtomicLong entryId;
+        final List<EntryAndMetadata> entryAndMetadataList;
+        long sequenceId = 0L;
+
+        void sendMessage() {
+            
entryAndMetadataList.add(createEntryAndMetadata(entryId.getAndIncrement(),
+                    createMetadata(name, sequenceId++, null, null)));
+        }
+
+        void sendChunk(int chunkId, int numChunks) {
+            
entryAndMetadataList.add(createEntryAndMetadata(entryId.getAndIncrement(),
+                    createMetadata(name, sequenceId, chunkId, numChunks)));
+            if (chunkId == numChunks - 1) {
+                sequenceId++;
+            }
+        }
+    }
+
+    private static EntryAndMetadata createEntryAndMetadata(final long entryId,
+                                                           final 
MessageMetadata metadata) {
+        final ByteBuf payload = Commands.serializeMetadataAndPayload(
+                Commands.ChecksumType.Crc32c, metadata, 
PulsarByteBufAllocator.DEFAULT.buffer());
+        return EntryAndMetadata.create(EntryImpl.create(0L, entryId, payload));
+    }
+
+    private static MessageMetadata createMetadata(final String producerName,
+                                                  final long sequenceId,
+                                                  final Integer chunkId,
+                                                  final Integer numChunks) {
+        final MessageMetadata metadata = new MessageMetadata();
+        metadata.setProducerName(producerName);
+        metadata.setSequenceId(sequenceId);
+        metadata.setPublishTime(0L);
+        if (chunkId != null && numChunks != null) {
+            metadata.setUuid(producerName + "-" + sequenceId);
+            metadata.setChunkId(chunkId);
+            metadata.setNumChunksFromMsg(numChunks);
+        }
+        return metadata;
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingSharedTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingSharedTest.java
new file mode 100644
index 00000000000..e2aa1281341
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingSharedTest.java
@@ -0,0 +1,249 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+import io.netty.buffer.ByteBuf;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerBuilder;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.MessageListener;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
+import org.apache.pulsar.common.api.proto.MessageMetadata;
+import org.apache.pulsar.common.protocol.Commands;
+import org.testcontainers.shaded.org.awaitility.Awaitility;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+@Slf4j
+@Test(groups = "broker-impl")
+public class MessageChunkingSharedTest extends ProducerConsumerBase {
+
+    private static final int MAX_MESSAGE_SIZE = 100;
+
+    @BeforeClass
+    @Override
+    protected void setup() throws Exception {
+        super.internalSetup();
+        super.producerBaseSetup();
+    }
+
+    @AfterClass(alwaysRun = true)
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @Test
+    public void testSingleConsumer() throws Exception {
+        final String topic = "my-property/my-ns/test-single-consumer";
+        @Cleanup final Producer<String> producer = createProducer(topic);
+        @Cleanup final Consumer<String> consumer = 
pulsarClient.newConsumer(Schema.STRING)
+                .topic(topic)
+                .subscriptionName("sub")
+                .subscriptionType(SubscriptionType.Shared)
+                .receiverQueueSize(5)
+                .subscribe();
+
+        final List<String> values = new ArrayList<>();
+        values.add(createChunkedMessage(1)); // non-chunk
+        values.add(createChunkedMessage(10)); // number of chunks > receiver 
queue size
+        values.add(createChunkedMessage(4)); // number of chunks < receiver 
queue size
+        for (String value : values) {
+            final MessageId messageId = producer.send(value);
+            log.info("Sent {} bytes to {}", value.length(), messageId);
+        }
+
+        final List<String> receivedValues = new ArrayList<>();
+        for (int i = 0; i < values.size(); i++) {
+            final Message<String> message = consumer.receive(3, 
TimeUnit.SECONDS);
+            if (message == null) {
+                break;
+            }
+            receivedValues.add(message.getValue());
+            log.info("Received {} bytes from {}", message.getValue().length(), 
message.getMessageId());
+            consumer.acknowledge(message);
+        }
+        assertEquals(receivedValues, values);
+    }
+
+    @Test
+    public void testMultiConsumers() throws Exception {
+        final String topic = "my-property/my-ns/test-multi-consumers";
+        @Cleanup final Producer<String> producer = createProducer(topic);
+        final ConsumerBuilder<String> consumerBuilder = 
pulsarClient.newConsumer(Schema.STRING)
+                .topic(topic)
+                .subscriptionName("sub")
+                .subscriptionType(SubscriptionType.Shared)
+                .receiverQueueSize(5);
+
+        final List<String> receivedValues1 = Collections.synchronizedList(new 
ArrayList<>());
+        @Cleanup final Consumer<String> consumer1 = consumerBuilder
+                .messageListener((MessageListener<String>) (consumer, msg) -> 
receivedValues1.add(msg.getValue()))
+                .subscribe();
+        final List<String> receivedValues2 = Collections.synchronizedList(new 
ArrayList<>());
+        @Cleanup final Consumer<String> consumer2 = consumerBuilder
+                .messageListener((MessageListener<String>) (consumer, msg) -> 
receivedValues2.add(msg.getValue()))
+                .subscribe();
+
+        final Set<String> values = new HashSet<>();
+        for (int i = 0; i < 10; i++) {
+            values.add(createChunkedMessage(4));
+        }
+        for (String value : values) {
+            producer.send(value);
+        }
+
+        Awaitility.await().atMost(Duration.ofSeconds(3))
+                .until(() -> receivedValues1.size() + receivedValues2.size() 
>= values.size());
+        assertEquals(receivedValues1.size() + receivedValues2.size(), 
values.size());
+        assertFalse(receivedValues1.isEmpty());
+        assertFalse(receivedValues2.isEmpty());
+        for (String value : receivedValues1) {
+            assertTrue(values.contains(value));
+        }
+        for (String value : receivedValues2) {
+            assertTrue(values.contains(value));
+        }
+    }
+
+    @Test
+    public void testInterleavedChunks() throws Exception {
+        final String topic = 
"persistent://my-property/my-ns/test-interleaved-chunks";
+        final ConsumerBuilder<byte[]> consumerBuilder = 
pulsarClient.newConsumer()
+                .topic(topic)
+                .subscriptionName("sub")
+                .subscriptionType(SubscriptionType.Shared);
+        final List<String> receivedUuidList1 = 
Collections.synchronizedList(new ArrayList<>());
+        final Consumer<byte[]> consumer1 = 
consumerBuilder.messageListener((MessageListener<byte[]>)
+                        (consumer, msg) -> 
receivedUuidList1.add(msg.getProducerName() + "-" + msg.getSequenceId()))
+                .consumerName("consumer-1")
+                .subscribe();
+        final PersistentTopic persistentTopic = (PersistentTopic) 
pulsar.getBrokerService()
+                .getTopicIfExists(topic).get().orElse(null);
+        assertNotNull(persistentTopic);
+        // send: A-0, A-1-0-3, A-1-1-3, B-0, B-1-0-2
+        sendNonChunk(persistentTopic, "A", 0);
+        sendChunk(persistentTopic, "A", 1, 0, 3);
+        sendChunk(persistentTopic, "A", 1, 1, 3);
+        sendNonChunk(persistentTopic, "B", 0);
+        sendChunk(persistentTopic, "B", 1, 0, 2);
+
+        Awaitility.await().atMost(Duration.ofSeconds(3)).until(() -> 
receivedUuidList1.size() >= 2);
+        assertEquals(receivedUuidList1, Arrays.asList("A-0", "B-0"));
+
+        // complete all 2 chunks of B-1
+        sendChunk(persistentTopic, "B", 1, 1, 2);
+        Awaitility.await().atMost(Duration.ofSeconds(3)).until(() -> 
receivedUuidList1.size() >= 3);
+        assertEquals(receivedUuidList1, Arrays.asList("A-0", "B-0", "B-1"));
+
+        // complete all 3 chunks of A-1
+        sendChunk(persistentTopic, "A", 1, 2, 3);
+        Awaitility.await().atMost(Duration.ofSeconds(3)).until(() -> 
receivedUuidList1.size() >= 3);
+        assertEquals(receivedUuidList1, Arrays.asList("A-0", "B-0", "B-1", 
"A-1"));
+
+        final List<String> receivedUuidList2 = 
Collections.synchronizedList(new ArrayList<>());
+        @Cleanup final Consumer<byte[]> consumer2 = 
consumerBuilder.messageListener((MessageListener<byte[]>)
+                        (consumer, msg) -> 
receivedUuidList2.add(msg.getProducerName() + "-" + msg.getSequenceId()))
+                .consumerName("consumer-2")
+                .subscribe();
+        consumer1.close();
+
+        // Since messages were never acknowledged, all messages will be 
redelivered to consumer-2
+        Awaitility.await().atMost(Duration.ofSeconds(3)).until(() -> 
receivedUuidList2.size() >= 4);
+        assertEquals(receivedUuidList1, Arrays.asList("A-0", "B-0", "B-1", 
"A-1"));
+    }
+
+    private Producer<String> createProducer(String topic) throws 
PulsarClientException {
+        return pulsarClient.newProducer(Schema.STRING)
+                .topic(topic)
+                .enableChunking(true)
+                .enableBatching(false)
+                .chunkMaxMessageSize(MAX_MESSAGE_SIZE)
+                .create();
+    }
+
+    private static String createChunkedMessage(int numChunks) {
+        assert numChunks >= 1;
+        final byte[] payload = new byte[(numChunks - 1) * MAX_MESSAGE_SIZE + 
MAX_MESSAGE_SIZE / 10];
+        final Random random = new Random();
+        for (int i = 0; i < payload.length; i++) {
+            payload[i] = (byte) ('a' + random.nextInt(26));
+        }
+        return Schema.STRING.decode(payload);
+    }
+
+    private static void sendNonChunk(final PersistentTopic persistentTopic,
+                                     final String producerName,
+                                     final long sequenceId) {
+        sendChunk(persistentTopic, producerName, sequenceId, null, null);
+    }
+
+    private static void sendChunk(final PersistentTopic persistentTopic,
+                                  final String producerName,
+                                  final long sequenceId,
+                                  final Integer chunkId,
+                                  final Integer numChunks) {
+        final MessageMetadata metadata = new MessageMetadata();
+        metadata.setProducerName(producerName);
+        metadata.setSequenceId(sequenceId);
+        metadata.setPublishTime(System.currentTimeMillis());
+        if (chunkId != null && numChunks != null) {
+            metadata.setUuid(producerName + "-" + sequenceId);
+            metadata.setChunkId(chunkId);
+            metadata.setNumChunksFromMsg(numChunks);
+            metadata.setTotalChunkMsgSize(numChunks);
+        }
+        final ByteBuf buf = 
Commands.serializeMetadataAndPayload(Commands.ChecksumType.Crc32c, metadata,
+                PulsarByteBufAllocator.DEFAULT.buffer(1));
+        persistentTopic.publishMessage(buf, (e, ledgerId, entryId) -> {
+            String name = producerName + "-" + sequenceId;
+            if (chunkId != null) {
+                name += "-" + chunkId + "-" + numChunks;
+            }
+            if (e == null) {
+                log.info("Sent {} to ({}, {})", name, ledgerId, entryId);
+            } else {
+                log.error("Failed to send {}: {}", name, e.getMessage());
+            }
+        });
+    }
+}
diff --git a/pulsar-client-cpp/lib/ConsumerImpl.cc 
b/pulsar-client-cpp/lib/ConsumerImpl.cc
index 63d2afc29bc..a9ed815ac2a 100644
--- a/pulsar-client-cpp/lib/ConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/ConsumerImpl.cc
@@ -406,9 +406,7 @@ void ConsumerImpl::messageReceived(const 
ClientConnectionPtr& cnx, const proto::
         metadata.encryption_keys_size() <= 0 || 
config_.getCryptoKeyReader().get() ||
         config_.getCryptoFailureAction() == 
ConsumerCryptoFailureAction::CONSUME;
 
-    const bool isChunkedMessage = metadata.num_chunks_from_msg() > 1 &&
-                                  config_.getConsumerType() != 
ConsumerType::ConsumerShared &&
-                                  config_.getConsumerType() != 
ConsumerType::ConsumerKeyShared;
+    const bool isChunkedMessage = metadata.num_chunks_from_msg() > 1;
     if (isMessageDecryptable && !isChunkedMessage) {
         if (!uncompressMessageIfNeeded(cnx, msg.message_id(), metadata, 
payload, true)) {
             // Message was discarded on decompression error
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 4d858ae6263..5bc998c4526 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -1297,7 +1297,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
 
         final int numMessages = msgMetadata.getNumMessagesInBatch();
         final int numChunks = msgMetadata.hasNumChunksFromMsg() ? 
msgMetadata.getNumChunksFromMsg() : 0;
-        final boolean isChunkedMessage = numChunks > 1 && 
conf.getSubscriptionType() != SubscriptionType.Shared;
+        final boolean isChunkedMessage = numChunks > 1;
 
         MessageIdImpl msgId = new MessageIdImpl(messageId.getLedgerId(), 
messageId.getEntryId(), getPartitionIndex());
         if (acknowledgmentsGroupingTracker.isDuplicate(msgId)) {

Reply via email to