This is an automated email from the ASF dual-hosted git repository.

zhaijia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 2373ca3  PIP-34 Key_Shared subscription core implementation. (#4079)
2373ca3 is described below

commit 2373ca36fa0d84103c0feed357da1fbb52119d05
Author: lipenghui <[email protected]>
AuthorDate: Mon Apr 22 15:57:19 2019 +0800

    PIP-34 Key_Shared subscription core implementation. (#4079)
    
    ## Motivation
    This is a core implementation for PIP-34 and there is a task tracker 
ISSUE-4077 for this PIP
    
    ## Modifications
    Add a new subscription type named Key_Shared
    Add PersistentStickyKeyDispatcherMultipleConsumers to handle the message 
dispatch
    Add a simple hash range based consumer selector
    Verifying this change
    Add new unit tests to verifying the hash range selector and Key_Shared mode 
message consume.
    
    
    * PIP-34 Key_Shared subscription core implementation.
    * PIP-34 Add more unit test.
    1.test redelivery with Key_Shared subscription
    2.test none key dispatch with Key_Shared subscription
    3.test ordering key dispatch with Key_Shared subscription
    * PIP-34 Fix alignment issue of Pulsar.proto
    * PIP-34 Fix TODO: format
    * PIP-34 Fix hash and ordering key issues
    * PIP-34 documentation for Key_Shared subscription
    * PIP-34 Fix cpp test issue.
    * PIP-34 Fix cpp format issue.
---
 .../broker/service/BrokerServiceException.java     |   9 +
 .../org/apache/pulsar/broker/service/Consumer.java |   2 +-
 .../HashRangeStickyKeyConsumerSelector.java        | 149 ++++++++++
 .../broker/service/StickyKeyConsumerSelector.java  |  27 +-
 .../PersistentDispatcherMultipleConsumers.java     |  32 +-
 ...istentStickyKeyDispatcherMultipleConsumers.java | 159 ++++++++++
 .../service/persistent/PersistentSubscription.java |   5 +
 .../HashRangeStickyKeyConsumerSelectorTest.java    | 141 +++++++++
 .../client/api/KeySharedSubscriptionTest.java      | 330 +++++++++++++++++++++
 .../java/org/apache/pulsar/client/api/Message.java |  15 +
 .../apache/pulsar/client/api/SubscriptionType.java |  11 +-
 .../pulsar/client/api/TypedMessageBuilder.java     |   9 +
 pulsar-client-cpp/include/pulsar/Result.h          |   4 +-
 pulsar-client-cpp/lib/ClientConnection.cc          |   3 +
 pulsar-client-cpp/lib/Result.cc                    |   3 +
 .../apache/pulsar/client/impl/ConsumerBase.java    |   3 +
 .../java/org/apache/pulsar/client/impl/Hash.java   |   3 +-
 .../apache/pulsar/client/impl/JavaStringHash.java  |   8 +
 .../org/apache/pulsar/client/impl/MessageImpl.java |  12 +
 .../apache/pulsar/client/impl/Murmur3_32Hash.java  |  67 +----
 .../pulsar/client/impl/TopicMessageImpl.java       |  10 +
 .../client/impl/TypedMessageBuilderImpl.java       |   7 +
 .../apache/pulsar/common/api/proto/PulsarApi.java  | 126 ++++++++
 .../java/org/apache/pulsar/common/util}/Hash.java  |   9 +-
 .../apache/pulsar/common/util}/Murmur3_32Hash.java |   9 +-
 pulsar-common/src/main/proto/PulsarApi.proto       |  13 +-
 .../assets/pulsar-key-shared-subscriptions.png     | Bin 0 -> 124310 bytes
 site2/docs/assets/pulsar-subscription-modes.png    | Bin 66178 -> 220423 bytes
 site2/docs/concepts-messaging.md                   |  23 +-
 29 files changed, 1085 insertions(+), 104 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java
index d037edf..c3f6909 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.broker.service;
 
 import org.apache.pulsar.broker.service.schema.IncompatibleSchemaException;
 import org.apache.pulsar.common.api.proto.PulsarApi;
+import org.apache.pulsar.common.api.proto.PulsarApi.ServerError;
 
 /**
  * Base type of exception thrown by Pulsar Broker Service
@@ -146,6 +147,12 @@ public class BrokerServiceException extends Exception {
         }
     }
 
+    public static class ConsumerAssignException extends BrokerServiceException 
{
+        public ConsumerAssignException(String msg) {
+            super(msg);
+        }
+    }
+
     public static PulsarApi.ServerError getClientErrorCode(Throwable t) {
         if (t instanceof ServerMetadataException) {
             return PulsarApi.ServerError.MetadataError;
@@ -166,6 +173,8 @@ public class BrokerServiceException extends Exception {
             return PulsarApi.ServerError.ServiceNotReady;
         } else if (t instanceof IncompatibleSchemaException) {
             return PulsarApi.ServerError.IncompatibleSchema;
+        } else if (t instanceof ConsumerAssignException) {
+            return ServerError.ConsumerAssignError;
         } else {
             return PulsarApi.ServerError.UnknownError;
         }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index ec0ec66..8934ff4 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -145,7 +145,7 @@ public class Consumer {
         stats.setClientVersion(cnx.getClientVersion());
         stats.metadata = this.metadata;
 
-        if (subType == SubType.Shared) {
+        if (subType == SubType.Shared || subType == SubType.Key_Shared) {
             this.pendingAcks = new ConcurrentLongLongPairHashMap(256, 1);
         } else {
             // We don't need to keep track of pending acks if the subscription 
is not shared
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeStickyKeyConsumerSelector.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeStickyKeyConsumerSelector.java
new file mode 100644
index 0000000..c0dfa79
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeStickyKeyConsumerSelector.java
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import 
org.apache.pulsar.broker.service.BrokerServiceException.ConsumerAssignException;
+import org.apache.pulsar.common.util.Murmur3_32Hash;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentSkipListMap;
+
+/**
+ * This is a consumer selector based fixed hash range.
+ *
+ * 1.Each consumer serves a fixed range of hash value
+ * 2.The whole range of hash value could be covered by all the consumers.
+ * 3.Once a consumer is removed, the left consumers could still serve the 
whole range.
+ *
+ * Initializing with a fixed hash range, by default 2 << 5.
+ * First consumer added, hash range looks like:
+ *
+ * 0 -> 65536(consumer-1)
+ *
+ * Second consumer added, will find a biggest range to split:
+ *
+ * 0 -> 32768(consumer-2) -> 65536(consumer-1)
+ *
+ * While a consumer removed, The range for this consumer will be taken over
+ * by other consumer, consumer-2 removed:
+ *
+ * 0 -> 65536(consumer-1)
+ *
+ * In this approach use skip list map to maintain the hash range and consumers.
+ *
+ * Select consumer will return the ceiling key of message key hashcode % range 
size.
+ *
+ */
+public class HashRangeStickyKeyConsumerSelector implements 
StickyKeyConsumerSelector {
+
+    public static final int DEFAULT_RANGE_SIZE =  2 << 15;
+
+    private final int rangeSize;
+
+    private final ConcurrentSkipListMap<Integer, Consumer> rangeMap;
+    private final Map<Consumer, Integer> consumerRange;
+
+    public HashRangeStickyKeyConsumerSelector() {
+        this(DEFAULT_RANGE_SIZE);
+    }
+
+    public HashRangeStickyKeyConsumerSelector(int rangeSize) {
+        if (rangeSize < 2) {
+            throw new IllegalArgumentException("range size must greater than 
2");
+        }
+        if (!is2Power(rangeSize)) {
+            throw new IllegalArgumentException("range size must be nth power 
with 2");
+        }
+        this.rangeMap = new ConcurrentSkipListMap<>();
+        this.consumerRange = new HashMap<>();
+        this.rangeSize = rangeSize;
+    }
+
+    @Override
+    public synchronized void addConsumer(Consumer consumer) throws 
ConsumerAssignException {
+        if (rangeMap.size() == 0) {
+            rangeMap.put(rangeSize, consumer);
+            consumerRange.put(consumer, rangeSize);
+        } else {
+            splitRange(findBiggestRange(), consumer);
+        }
+    }
+
+    @Override
+    public synchronized void removeConsumer(Consumer consumer) {
+        Integer removeRange = consumerRange.get(consumer);
+        if (removeRange != null) {
+            if (removeRange == rangeSize && rangeMap.size() > 1) {
+                Consumer lowerConsumer = 
rangeMap.lowerEntry(removeRange).getValue();
+                rangeMap.put(removeRange, lowerConsumer);
+                consumerRange.put(lowerConsumer, removeRange);
+            } else {
+                rangeMap.remove(removeRange);
+                consumerRange.remove(consumer);
+            }
+        }
+    }
+
+    @Override
+    public Consumer select(byte[] stickyKey) {
+        if (rangeMap.size() > 0) {
+            int slot = Murmur3_32Hash.getInstance().makeHash(stickyKey) % 
rangeSize;
+            return rangeMap.ceilingEntry(slot).getValue();
+        } else {
+            return null;
+        }
+    }
+
+    private int findBiggestRange() {
+        int slots = 0;
+        int busiestRange = rangeSize;
+        for (Entry<Integer, Consumer> entry : rangeMap.entrySet()) {
+            Integer lowerKey = rangeMap.lowerKey(entry.getKey());
+            if (lowerKey == null) {
+                lowerKey = 0;
+            }
+            if (entry.getKey() - lowerKey > slots) {
+                slots = entry.getKey() - lowerKey;
+                busiestRange = entry.getKey();
+            }
+        }
+        return busiestRange;
+    }
+
+    private void splitRange(int range, Consumer targetConsumer) throws 
ConsumerAssignException {
+        Integer lowerKey = rangeMap.lowerKey(range);
+        if (lowerKey == null) {
+            lowerKey = 0;
+        }
+        if (range - lowerKey <= 1) {
+            throw new ConsumerAssignException("No more range can assigned to 
new consumer, assigned consumers "
+                    + rangeMap.size());
+        }
+        int splitRange = range - ((range - lowerKey) >> 1);
+        rangeMap.put(splitRange, targetConsumer);
+        consumerRange.put(targetConsumer, splitRange);
+    }
+
+    private boolean is2Power(int num) {
+        if(num < 2) return false;
+        return (num & num - 1) == 0;
+    }
+}
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/Hash.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/StickyKeyConsumerSelector.java
similarity index 58%
copy from pulsar-client/src/main/java/org/apache/pulsar/client/impl/Hash.java
copy to 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/StickyKeyConsumerSelector.java
index 31c771a..ce8ef86 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/Hash.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/StickyKeyConsumerSelector.java
@@ -16,13 +16,30 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.client.impl;
+package org.apache.pulsar.broker.service;
+
+import 
org.apache.pulsar.broker.service.BrokerServiceException.ConsumerAssignException;
+
+public interface StickyKeyConsumerSelector {
+
+    /**
+     * Add a new consumer
+     * @param consumer new consumer
+     */
+    void addConsumer(Consumer consumer) throws ConsumerAssignException;
 
-public interface Hash {
     /**
-     * Generate the hash of a given String
+     * Remove the consumer
+     * @param consumer consumer to be removed
+     */
+    void removeConsumer(Consumer consumer);
+
+    /**
+     * Select a consumer by sticky key
      *
-     * @return The hash of {@param s}, which is non-negative integer.
+     * @param stickyKey sticky key
+     * @return consumer
      */
-    int makeHash(String s);
+    Consumer select(byte[] stickyKey);
+
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 066e2b6..b9351c2 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -65,8 +65,8 @@ import com.google.common.collect.Lists;
  */
 public class PersistentDispatcherMultipleConsumers  extends 
AbstractDispatcherMultipleConsumers implements Dispatcher, ReadEntriesCallback {
 
-    private final PersistentTopic topic;
-    private final ManagedCursor cursor;
+    protected final PersistentTopic topic;
+    protected final ManagedCursor cursor;
 
     private CompletableFuture<Void> closeFuture = null;
     LongPairSet messagesToReplay = new ConcurrentSortedLongPairSet(128, 2);
@@ -75,9 +75,9 @@ public class PersistentDispatcherMultipleConsumers  extends 
AbstractDispatcherMu
     private boolean havePendingRead = false;
     private boolean havePendingReplayRead = false;
     private boolean shouldRewindBeforeReadingOrReplaying = false;
-    private final String name;
+    protected final String name;
 
-    private int totalAvailablePermits = 0;
+    protected int totalAvailablePermits = 0;
     private int readBatchSize;
     private final Backoff readFailureBackoff = new Backoff(15, 
TimeUnit.SECONDS, 1, TimeUnit.MINUTES, 0, TimeUnit.MILLISECONDS);
     private static final 
AtomicIntegerFieldUpdater<PersistentDispatcherMultipleConsumers> 
TOTAL_UNACKED_MESSAGES_UPDATER =
@@ -87,8 +87,8 @@ public class PersistentDispatcherMultipleConsumers  extends 
AbstractDispatcherMu
     private volatile int blockedDispatcherOnUnackedMsgs = FALSE;
     private static final 
AtomicIntegerFieldUpdater<PersistentDispatcherMultipleConsumers> 
BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER =
             
AtomicIntegerFieldUpdater.newUpdater(PersistentDispatcherMultipleConsumers.class,
 "blockedDispatcherOnUnackedMsgs");
-    private final ServiceConfiguration serviceConfig;
-    private Optional<DispatchRateLimiter> dispatchRateLimiter = 
Optional.empty();
+    protected final ServiceConfiguration serviceConfig;
+    protected Optional<DispatchRateLimiter> dispatchRateLimiter = 
Optional.empty();
 
     enum ReadType {
         Normal, Replay
@@ -374,9 +374,6 @@ public class PersistentDispatcherMultipleConsumers  extends 
AbstractDispatcherMu
     @Override
     public synchronized void readEntriesComplete(List<Entry> entries, Object 
ctx) {
         ReadType readType = (ReadType) ctx;
-        int start = 0;
-        int entriesToDispatch = entries.size();
-
         if (readType == ReadType.Normal) {
             havePendingRead = false;
         } else {
@@ -407,8 +404,17 @@ public class PersistentDispatcherMultipleConsumers  
extends AbstractDispatcherMu
             log.debug("[{}] Distributing {} messages to {} consumers", name, 
entries.size(), consumerList.size());
         }
 
+        sendMessagesToConsumers(readType, entries);
+
+        readMoreEntries();
+    }
+
+    protected void sendMessagesToConsumers(ReadType readType, List<Entry> 
entries) {
+        int start = 0;
+        int entriesToDispatch = entries.size();
         long totalMessagesSent = 0;
         long totalBytesSent = 0;
+
         while (entriesToDispatch > 0 && totalAvailablePermits > 0 && 
isAtleastOneConsumerAvailable()) {
             Consumer c = getNextConsumer();
             if (c == null) {
@@ -421,8 +427,8 @@ public class PersistentDispatcherMultipleConsumers  extends 
AbstractDispatcherMu
 
             // round-robin dispatch batch size for this consumer
             int messagesForC = Math.min(
-                Math.min(entriesToDispatch, c.getAvailablePermits()),
-                serviceConfig.getDispatcherMaxRoundRobinBatchSize());
+                    Math.min(entriesToDispatch, c.getAvailablePermits()),
+                    serviceConfig.getDispatcherMaxRoundRobinBatchSize());
 
             if (messagesForC > 0) {
 
@@ -465,8 +471,6 @@ public class PersistentDispatcherMultipleConsumers  extends 
AbstractDispatcherMu
                 entry.release();
             });
         }
-
-        readMoreEntries();
     }
 
     @Override
@@ -531,7 +535,7 @@ public class PersistentDispatcherMultipleConsumers  extends 
AbstractDispatcherMu
      *
      * @return
      */
-    private boolean isAtleastOneConsumerAvailable() {
+    protected boolean isAtleastOneConsumerAvailable() {
         if (consumerList.isEmpty() || IS_CLOSED_UPDATER.get(this) == TRUE) {
             // abort read if no consumers are connected or if disconnect is 
initiated
             return false;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
new file mode 100644
index 0000000..eaa3d1d
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.persistent;
+
+import io.netty.buffer.ByteBuf;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedCursor;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.broker.service.BrokerServiceException;
+import org.apache.pulsar.broker.service.Consumer;
+import org.apache.pulsar.broker.service.Consumer.SendMessageInfo;
+import org.apache.pulsar.broker.service.HashRangeStickyKeyConsumerSelector;
+import org.apache.pulsar.broker.service.StickyKeyConsumerSelector;
+import org.apache.pulsar.common.api.Commands;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
+import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class PersistentStickyKeyDispatcherMultipleConsumers extends 
PersistentDispatcherMultipleConsumers {
+
+    public static final String NONE_KEY = "NONE_KEY";
+
+    private final StickyKeyConsumerSelector selector;
+
+    PersistentStickyKeyDispatcherMultipleConsumers(PersistentTopic topic, 
ManagedCursor cursor) {
+        super(topic, cursor);
+        //TODO: Consumer selector Pluggable
+        selector = new HashRangeStickyKeyConsumerSelector();
+    }
+
+    @Override
+    public synchronized void addConsumer(Consumer consumer) throws 
BrokerServiceException {
+        super.addConsumer(consumer);
+        selector.addConsumer(consumer);
+    }
+
+    @Override
+    public synchronized void removeConsumer(Consumer consumer) throws 
BrokerServiceException {
+        super.removeConsumer(consumer);
+        selector.removeConsumer(consumer);
+    }
+
+    @Override
+    protected void sendMessagesToConsumers(ReadType readType, List<Entry> 
entries) {
+        long totalMessagesSent = 0;
+        long totalBytesSent = 0;
+        if (entries.size() > 0) {
+            final Map<byte[], List<Entry>> groupedEntries = entries
+                    .stream()
+                    .collect(Collectors.groupingBy(entry -> 
peekStickyKey(entry.getDataBuffer()), Collectors.toList()));
+            final Iterator<Map.Entry<byte[], List<Entry>>> iterator = 
groupedEntries.entrySet().iterator();
+            while (iterator.hasNext() && totalAvailablePermits > 0 && 
isAtleastOneConsumerAvailable()) {
+                final Map.Entry<byte[], List<Entry>> entriesWithSameKey = 
iterator.next();
+                //TODO: None key policy
+                final Consumer consumer = 
selector.select(entriesWithSameKey.getKey());
+
+                if (consumer == null) {
+                    // Do nothing, cursor will be rewind at reconnection
+                    log.info("[{}] rewind because no available consumer found 
for key {} from total {}", name,
+                            entriesWithSameKey.getKey(), consumerList.size());
+                    entriesWithSameKey.getValue().forEach(Entry::release);
+                    cursor.rewind();
+                    return;
+                }
+
+                int messagesForC = 
Math.min(entriesWithSameKey.getValue().size(), consumer.getAvailablePermits());
+                if (messagesForC > 0) {
+
+                    // remove positions first from replay list first : 
sendMessages recycles entries
+                    List<Entry> subList = new 
ArrayList<>(entriesWithSameKey.getValue().subList(0, messagesForC));
+                    if (readType == ReadType.Replay) {
+                        subList.forEach(entry -> 
messagesToReplay.remove(entry.getLedgerId(), entry.getEntryId()));
+                    }
+                    final SendMessageInfo sentMsgInfo = 
consumer.sendMessages(subList);
+                    entriesWithSameKey.getValue().removeAll(subList);
+                    final long msgSent = sentMsgInfo.getTotalSentMessages();
+                    totalAvailablePermits -= msgSent;
+                    totalMessagesSent += sentMsgInfo.getTotalSentMessages();
+                    totalBytesSent += sentMsgInfo.getTotalSentMessageBytes();
+
+                    if (entriesWithSameKey.getValue().size() == 0) {
+                        iterator.remove();
+                    }
+                }
+            }
+
+            // acquire message-dispatch permits for already delivered messages
+            if 
(serviceConfig.isDispatchThrottlingOnNonBacklogConsumerEnabled() || 
!cursor.isActive()) {
+                if (topic.getDispatchRateLimiter().isPresent()) {
+                    
topic.getDispatchRateLimiter().get().tryDispatchPermit(totalMessagesSent, 
totalBytesSent);
+                }
+
+                if (dispatchRateLimiter.isPresent()) {
+                    
dispatchRateLimiter.get().tryDispatchPermit(totalMessagesSent, totalBytesSent);
+                }
+            }
+
+            if (groupedEntries.size() > 0) {
+                int laterReplay = 0;
+                for (List<Entry> entryList : groupedEntries.values()) {
+                    laterReplay += entryList.size();
+                    entryList.forEach(entry -> {
+                        messagesToReplay.add(entry.getLedgerId(), 
entry.getEntryId());
+                        entry.release();
+                    });
+                }
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}] No consumers found with available permits, 
storing {} positions for later replay", name,
+                            laterReplay);
+                }
+
+            }
+        }
+    }
+
+    @Override
+    public SubType getType() {
+        return SubType.Key_Shared;
+    }
+
+    private byte[] peekStickyKey(ByteBuf metadataAndPayload) {
+        metadataAndPayload.markReaderIndex();
+        MessageMetadata metadata = 
Commands.parseMessageMetadata(metadataAndPayload);
+        metadataAndPayload.resetReaderIndex();
+        String key = metadata.getPartitionKey();
+        metadata.recycle();
+        if (StringUtils.isNotBlank(key) || metadata.hasOrderingKey()) {
+            return metadata.hasOrderingKey() ? 
metadata.getOrderingKey().toByteArray() : key.getBytes();
+        } else {
+            return NONE_KEY.getBytes();
+        }
+    }
+
+    private static final Logger log = 
LoggerFactory.getLogger(PersistentStickyKeyDispatcherMultipleConsumers.class);
+
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index 669c6d4..ce9d4aa 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -127,6 +127,11 @@ public class PersistentSubscription implements 
Subscription {
                             topic);
                 }
                 break;
+            case Key_Shared:
+                if (dispatcher == null || dispatcher.getType() != 
SubType.Key_Shared) {
+                    dispatcher = new 
PersistentStickyKeyDispatcherMultipleConsumers(topic, cursor);
+                }
+                break;
             default:
                 throw new ServerMetadataException("Unsupported subscription 
type");
             }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/HashRangeStickyKeyConsumerSelectorTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/HashRangeStickyKeyConsumerSelectorTest.java
new file mode 100644
index 0000000..ff67c20
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/HashRangeStickyKeyConsumerSelectorTest.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import static 
org.apache.pulsar.broker.service.HashRangeStickyKeyConsumerSelector.DEFAULT_RANGE_SIZE;
+import static org.mockito.Mockito.mock;
+
+import 
org.apache.pulsar.broker.service.BrokerServiceException.ConsumerAssignException;
+import org.apache.pulsar.common.util.Murmur3_32Hash;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import java.util.UUID;
+
+public class HashRangeStickyKeyConsumerSelectorTest {
+
+    @Test
+    public void testConsumerSelect() throws ConsumerAssignException {
+
+        StickyKeyConsumerSelector selector = new 
HashRangeStickyKeyConsumerSelector();
+        String key1 = "anyKey";
+        Assert.assertNull(selector.select(key1.getBytes()));
+
+        Consumer consumer1 = mock(Consumer.class);
+        selector.addConsumer(consumer1);
+        int consumer1Slot = DEFAULT_RANGE_SIZE;
+        Assert.assertEquals(selector.select(key1.getBytes()), consumer1);
+
+        Consumer consumer2 = mock(Consumer.class);
+        selector.addConsumer(consumer2);
+        int consumer2Slot = consumer1Slot >> 1;
+
+        for (int i = 0; i < 100; i++) {
+            String key = UUID.randomUUID().toString();
+            int slot = Murmur3_32Hash.getInstance().makeHash(key.getBytes()) % 
DEFAULT_RANGE_SIZE;
+            if (slot < consumer2Slot) {
+                Assert.assertEquals(selector.select(key.getBytes()), 
consumer2);
+            } else {
+                Assert.assertEquals(selector.select(key.getBytes()), 
consumer1);
+            }
+        }
+
+        Consumer consumer3 = mock(Consumer.class);
+        selector.addConsumer(consumer3);
+        int consumer3Slot = consumer2Slot >> 1;
+
+        for (int i = 0; i < 100; i++) {
+            String key = UUID.randomUUID().toString();
+            int slot = Murmur3_32Hash.getInstance().makeHash(key.getBytes()) % 
DEFAULT_RANGE_SIZE;
+            if (slot < consumer3Slot) {
+                Assert.assertEquals(selector.select(key.getBytes()), 
consumer3);
+            } else if (slot < consumer2Slot) {
+                Assert.assertEquals(selector.select(key.getBytes()), 
consumer2);
+            } else {
+                Assert.assertEquals(selector.select(key.getBytes()), 
consumer1);
+            }
+        }
+
+        Consumer consumer4 = mock(Consumer.class);
+        selector.addConsumer(consumer4);
+        int consumer4Slot = consumer1Slot - ((consumer1Slot - consumer2Slot) 
>> 1);
+
+        for (int i = 0; i < 100; i++) {
+            String key = UUID.randomUUID().toString();
+            int slot = Murmur3_32Hash.getInstance().makeHash(key.getBytes()) % 
DEFAULT_RANGE_SIZE;
+            if (slot < consumer3Slot) {
+                Assert.assertEquals(selector.select(key.getBytes()), 
consumer3);
+            } else if (slot < consumer2Slot) {
+                Assert.assertEquals(selector.select(key.getBytes()), 
consumer2);
+            } else if (slot < consumer4Slot) {
+                Assert.assertEquals(selector.select(key.getBytes()), 
consumer4);
+            } else {
+                Assert.assertEquals(selector.select(key.getBytes()), 
consumer1);
+            }
+        }
+
+        selector.removeConsumer(consumer1);
+        for (int i = 0; i < 100; i++) {
+            String key = UUID.randomUUID().toString();
+            int slot = Murmur3_32Hash.getInstance().makeHash(key.getBytes()) % 
DEFAULT_RANGE_SIZE;
+            if (slot < consumer3Slot) {
+                Assert.assertEquals(selector.select(key.getBytes()), 
consumer3);
+            } else if (slot < consumer2Slot) {
+                Assert.assertEquals(selector.select(key.getBytes()), 
consumer2);
+            } else {
+                Assert.assertEquals(selector.select(key.getBytes()), 
consumer4);
+            }
+        }
+
+        selector.removeConsumer(consumer2);
+        for (int i = 0; i < 100; i++) {
+            String key = UUID.randomUUID().toString();
+            int slot = Murmur3_32Hash.getInstance().makeHash(key.getBytes()) % 
DEFAULT_RANGE_SIZE;
+            if (slot < consumer3Slot) {
+                Assert.assertEquals(selector.select(key.getBytes()), 
consumer3);
+            } else {
+                Assert.assertEquals(selector.select(key.getBytes()), 
consumer4);
+            }
+        }
+
+        selector.removeConsumer(consumer3);
+        for (int i = 0; i < 100; i++) {
+            String key = UUID.randomUUID().toString();
+            Assert.assertEquals(selector.select(key.getBytes()), consumer4);
+        }
+    }
+
+    @Test(expectedExceptions = ConsumerAssignException.class)
+    public void testSplitExceed() throws ConsumerAssignException {
+        StickyKeyConsumerSelector selector = new 
HashRangeStickyKeyConsumerSelector(16);
+        for (int i = 0; i <= 16; i++) {
+            selector.addConsumer(mock(Consumer.class));
+        }
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class)
+    public void testRangeSizeLessThan2() {
+        new HashRangeStickyKeyConsumerSelector(1);
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class)
+    public void testRangeSizePower2() {
+        new HashRangeStickyKeyConsumerSelector(6);
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
new file mode 100644
index 0000000..e5f1edc
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
@@ -0,0 +1,330 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import org.apache.pulsar.broker.service.HashRangeStickyKeyConsumerSelector;
+import 
org.apache.pulsar.broker.service.persistent.PersistentStickyKeyDispatcherMultipleConsumers;
+import org.apache.pulsar.common.util.Murmur3_32Hash;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+public class KeySharedSubscriptionTest extends ProducerConsumerBase {
+
+    private static final Logger log = 
LoggerFactory.getLogger(KeySharedSubscriptionTest.class);
+
+
+    @BeforeMethod
+    @Override
+    protected void setup() throws Exception {
+        super.internalSetup();
+        super.producerBaseSetup();
+    }
+
+    @AfterMethod
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @Test
+    public void testSendAndReceiveWithHashRangeStickyKeyConsumerSelector() 
throws PulsarClientException {
+
+        String topic = "persistent://public/default/key_shared";
+
+        Consumer<byte[]> consumer1 = pulsarClient.newConsumer()
+                .topic(topic)
+                .subscriptionName("key_shared")
+                .subscriptionType(SubscriptionType.Key_Shared)
+                .ackTimeout(10, TimeUnit.SECONDS)
+                .subscribe();
+
+        Consumer<byte[]> consumer2 = pulsarClient.newConsumer()
+                .topic(topic)
+                .subscriptionName("key_shared")
+                .subscriptionType(SubscriptionType.Key_Shared)
+                .ackTimeout(10, TimeUnit.SECONDS)
+                .subscribe();
+
+        Consumer<byte[]> consumer3 = pulsarClient.newConsumer()
+                .topic(topic)
+                .subscriptionName("key_shared")
+                .subscriptionType(SubscriptionType.Key_Shared)
+                .ackTimeout(10, TimeUnit.SECONDS)
+                .subscribe();
+
+        int consumer1Slot = 
HashRangeStickyKeyConsumerSelector.DEFAULT_RANGE_SIZE;
+        int consumer2Slot = consumer1Slot >> 1;
+        int consumer3Slot = consumer2Slot >> 1;
+
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topic)
+                .enableBatching(false)
+                .create();
+
+        int consumer1ExpectMessages = 0;
+        int consumer2ExpectMessages = 0;
+        int consumer3ExpectMessages = 0;
+
+        for (int i = 0; i < 100; i++) {
+            String key = UUID.randomUUID().toString();
+            int slot = Murmur3_32Hash.getInstance().makeHash(key.getBytes())
+                % HashRangeStickyKeyConsumerSelector.DEFAULT_RANGE_SIZE;
+            if (slot < consumer3Slot) {
+                consumer3ExpectMessages++;
+            } else if (slot < consumer2Slot) {
+                consumer2ExpectMessages++;
+            } else {
+                consumer1ExpectMessages++;
+            }
+            producer.newMessage()
+                    .key(key)
+                    .value(key.getBytes())
+                    .send();
+        }
+
+        int consumer1Received = 0;
+        for (int i = 0; i < consumer1ExpectMessages; i++) {
+            consumer1.receive();
+            consumer1Received++;
+        }
+
+        int consumer2Received = 0;
+        for (int i = 0; i < consumer2ExpectMessages; i++) {
+            consumer2.receive();
+            consumer2Received++;
+        }
+
+        int consumer3Received = 0;
+        for (int i = 0; i < consumer3ExpectMessages; i++) {
+            consumer3.receive();
+            consumer3Received++;
+        }
+        Assert.assertEquals(consumer1ExpectMessages, consumer1Received);
+        Assert.assertEquals(consumer2ExpectMessages, consumer2Received);
+        Assert.assertEquals(consumer3ExpectMessages, consumer3Received);
+
+        // messages not acked, test redelivery
+
+        for (int i = 0; i < consumer1ExpectMessages; i++) {
+            Message message = consumer1.receive();
+            consumer1.acknowledge(message);
+            consumer1Received++;
+        }
+
+        for (int i = 0; i < consumer2ExpectMessages; i++) {
+            Message message = consumer2.receive();
+            consumer2.acknowledge(message);
+            consumer2Received++;
+        }
+
+        for (int i = 0; i < consumer3ExpectMessages; i++) {
+            Message message = consumer3.receive();
+            consumer3.acknowledge(message);
+            consumer3Received++;
+        }
+
+        Assert.assertEquals(consumer1ExpectMessages * 2, consumer1Received);
+        Assert.assertEquals(consumer2ExpectMessages * 2, consumer2Received);
+        Assert.assertEquals(consumer3ExpectMessages * 2, consumer3Received);
+    }
+
+    @Test
+    public void 
testNonKeySendAndReceiveWithHashRangeStickyKeyConsumerSelector() throws 
PulsarClientException {
+
+        String topic = "persistent://public/default/key_shared_none_key";
+
+        Consumer<byte[]> consumer1 = pulsarClient.newConsumer()
+                .topic(topic)
+                .subscriptionName("key_shared")
+                .subscriptionType(SubscriptionType.Key_Shared)
+                .subscribe();
+
+        Consumer<byte[]> consumer2 = pulsarClient.newConsumer()
+                .topic(topic)
+                .subscriptionName("key_shared")
+                .subscriptionType(SubscriptionType.Key_Shared)
+                .subscribe();
+
+        Consumer<byte[]> consumer3 = pulsarClient.newConsumer()
+                .topic(topic)
+                .subscriptionName("key_shared")
+                .subscriptionType(SubscriptionType.Key_Shared)
+                .subscribe();
+
+        int consumer1Slot = 
HashRangeStickyKeyConsumerSelector.DEFAULT_RANGE_SIZE;
+        int consumer2Slot = consumer1Slot >> 1;
+        int consumer3Slot = consumer2Slot >> 1;
+
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topic)
+                .enableBatching(false)
+                .create();
+
+        for (int i = 0; i < 100; i++) {
+            producer.newMessage()
+                    .value(("Message - " + i).getBytes())
+                    .send();
+        }
+
+        int expectMessages = 100;
+        int receiveMessages = 0;
+        int slot = 
Murmur3_32Hash.getInstance().makeHash(PersistentStickyKeyDispatcherMultipleConsumers.NONE_KEY.getBytes())
+            % HashRangeStickyKeyConsumerSelector.DEFAULT_RANGE_SIZE;
+        if (slot < consumer3Slot) {
+            for (int i = 0; i < expectMessages; i++) {
+                Message message = consumer3.receive();
+                consumer3.acknowledge(message);
+                receiveMessages++;
+            }
+        } else if (slot < consumer2Slot) {
+            for (int i = 0; i < expectMessages; i++) {
+                Message message = consumer2.receive();
+                consumer2.acknowledge(message);
+                receiveMessages++;
+            }
+        } else {
+            for (int i = 0; i < expectMessages; i++) {
+                Message message = consumer1.receive();
+                consumer1.acknowledge(message);
+                receiveMessages++;
+            }
+        }
+        Assert.assertEquals(expectMessages, receiveMessages);
+
+        Producer<byte[]> batchingProducer = pulsarClient.newProducer()
+                .topic(topic)
+                .enableBatching(true)
+                .create();
+
+        for (int i = 0; i < 100; i++) {
+            batchingProducer.newMessage()
+                    .value(("Message - " + i).getBytes())
+                    .send();
+        }
+
+        if (slot < consumer3Slot) {
+            for (int i = 0; i < expectMessages; i++) {
+                Message message = consumer3.receive();
+                consumer3.acknowledge(message);
+                receiveMessages++;
+            }
+        } else if (slot < consumer2Slot) {
+            for (int i = 0; i < expectMessages; i++) {
+                Message message = consumer2.receive();
+                consumer2.acknowledge(message);
+                receiveMessages++;
+            }
+        } else {
+            for (int i = 0; i < expectMessages; i++) {
+                Message message = consumer1.receive();
+                consumer1.acknowledge(message);
+                receiveMessages++;
+            }
+        }
+
+        Assert.assertEquals(expectMessages * 2, receiveMessages);
+
+    }
+
+    @Test
+    public void testOrderingKeyWithHashRangeStickyKeyConsumerSelector() throws 
PulsarClientException {
+        String topic = "persistent://public/default/key_shared_ordering_key";
+
+        Consumer<byte[]> consumer1 = pulsarClient.newConsumer()
+                .topic(topic)
+                .subscriptionName("key_shared")
+                .subscriptionType(SubscriptionType.Key_Shared)
+                .ackTimeout(10, TimeUnit.SECONDS)
+                .subscribe();
+
+        Consumer<byte[]> consumer2 = pulsarClient.newConsumer()
+                .topic(topic)
+                .subscriptionName("key_shared")
+                .subscriptionType(SubscriptionType.Key_Shared)
+                .ackTimeout(10, TimeUnit.SECONDS)
+                .subscribe();
+
+        Consumer<byte[]> consumer3 = pulsarClient.newConsumer()
+                .topic(topic)
+                .subscriptionName("key_shared")
+                .subscriptionType(SubscriptionType.Key_Shared)
+                .ackTimeout(10, TimeUnit.SECONDS)
+                .subscribe();
+
+        int consumer1Slot = 
HashRangeStickyKeyConsumerSelector.DEFAULT_RANGE_SIZE;
+        int consumer2Slot = consumer1Slot >> 1;
+        int consumer3Slot = consumer2Slot >> 1;
+
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topic)
+                .enableBatching(false)
+                .create();
+
+        int consumer1ExpectMessages = 0;
+        int consumer2ExpectMessages = 0;
+        int consumer3ExpectMessages = 0;
+
+        for (int i = 0; i < 100; i++) {
+            String key = UUID.randomUUID().toString();
+            String orderingKey = UUID.randomUUID().toString();
+            int slot = 
Murmur3_32Hash.getInstance().makeHash(orderingKey.getBytes())
+                % HashRangeStickyKeyConsumerSelector.DEFAULT_RANGE_SIZE;
+            if (slot < consumer3Slot) {
+                consumer3ExpectMessages++;
+            } else if (slot < consumer2Slot) {
+                consumer2ExpectMessages++;
+            } else {
+                consumer1ExpectMessages++;
+            }
+            producer.newMessage()
+                    .key(key)
+                    .orderingKey(orderingKey.getBytes())
+                    .value(key.getBytes())
+                    .send();
+        }
+
+        int consumer1Received = 0;
+        for (int i = 0; i < consumer1ExpectMessages; i++) {
+            consumer1.receive();
+            consumer1Received++;
+        }
+
+        int consumer2Received = 0;
+        for (int i = 0; i < consumer2ExpectMessages; i++) {
+            consumer2.receive();
+            consumer2Received++;
+        }
+
+        int consumer3Received = 0;
+        for (int i = 0; i < consumer3ExpectMessages; i++) {
+            consumer3.receive();
+            consumer3Received++;
+        }
+        Assert.assertEquals(consumer1ExpectMessages, consumer1Received);
+        Assert.assertEquals(consumer2ExpectMessages, consumer2Received);
+        Assert.assertEquals(consumer3ExpectMessages, consumer3Received);
+    }
+}
diff --git 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Message.java 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Message.java
index e672ab0..e192916 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Message.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Message.java
@@ -155,6 +155,21 @@ public interface Message<T> {
     byte[] getKeyBytes();
 
     /**
+     * Check whether the message has a ordering key
+     *
+     * @return true if the ordering key was set while creating the message
+     *         false if the ordering key was not set while creating the message
+     */
+    boolean hasOrderingKey();
+
+    /**
+     * Get the ordering key of the message
+     *
+     * @return the ordering key of the message
+     */
+    byte[] getOrderingKey();
+
+    /**
      * Get the topic the message was published to
      *
      * @return the topic the message was published to
diff --git 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/SubscriptionType.java
 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/SubscriptionType.java
index 9d38664..4098b69 100644
--- 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/SubscriptionType.java
+++ 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/SubscriptionType.java
@@ -47,5 +47,14 @@ public enum SubscriptionType {
      * be split across the available consumers. On each partition, at most one 
consumer will be active at a given point
      * in time.
      */
-    Failover
+    Failover,
+
+    /**
+     * Multiple consumer will be able to use the same subscription and all 
messages with the same key
+     * will be dispatched to only one consumer
+     *
+     * Use ordering_key to overwrite the message key for message ordering.
+     *
+     */
+    Key_Shared
 }
\ No newline at end of file
diff --git 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TypedMessageBuilder.java
 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TypedMessageBuilder.java
index a1e2f2d..824a967 100644
--- 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TypedMessageBuilder.java
+++ 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TypedMessageBuilder.java
@@ -104,6 +104,15 @@ public interface TypedMessageBuilder<T> extends 
Serializable {
     TypedMessageBuilder<T> keyBytes(byte[] key);
 
     /**
+     * Sets the ordering key of the message for message dispatch in {@link 
SubscriptionType#Key_Shared} mode.
+     * Partition key Will be used if ordering key not specified
+     *
+     * @param orderingKey the ordering key for the message
+     * @return the message builder instance
+     */
+    TypedMessageBuilder<T> orderingKey(byte[] orderingKey);
+
+    /**
      * Set a domain object on the message
      *
      * @param value
diff --git a/pulsar-client-cpp/include/pulsar/Result.h 
b/pulsar-client-cpp/include/pulsar/Result.h
index a4bc3b3..fa24d5c 100644
--- a/pulsar-client-cpp/include/pulsar/Result.h
+++ b/pulsar-client-cpp/include/pulsar/Result.h
@@ -76,7 +76,9 @@ enum Result
     ResultTopicTerminated,          /// Topic was already terminated
     ResultCryptoError,              /// Error when crypto operation fails
 
-    ResultIncompatibleSchema,  /// Specified schema is incompatible with the 
topic's schema
+    ResultIncompatibleSchema,   /// Specified schema is incompatible with the 
topic's schema
+    ResultConsumerAssignError,  /// Error when a new consumer connected but 
can't assign messages to this
+                                /// consumer
 };
 
 // Return string representation of result code
diff --git a/pulsar-client-cpp/lib/ClientConnection.cc 
b/pulsar-client-cpp/lib/ClientConnection.cc
index e16aa5c..086bfa8 100644
--- a/pulsar-client-cpp/lib/ClientConnection.cc
+++ b/pulsar-client-cpp/lib/ClientConnection.cc
@@ -108,6 +108,9 @@ static Result getResult(ServerError serverError) {
 
         case IncompatibleSchema:
             return ResultIncompatibleSchema;
+
+        case ConsumerAssignError:
+            return ResultConsumerAssignError;
     }
     // NOTE : Do not add default case in the switch above. In future if we get 
new cases for
     // ServerError and miss them in the switch above we would like to get 
notified. Adding
diff --git a/pulsar-client-cpp/lib/Result.cc b/pulsar-client-cpp/lib/Result.cc
index ff66bf6..536232a 100644
--- a/pulsar-client-cpp/lib/Result.cc
+++ b/pulsar-client-cpp/lib/Result.cc
@@ -131,6 +131,9 @@ const char* pulsar::strResult(Result result) {
 
         case ResultIncompatibleSchema:
             return "IncompatibleSchema";
+
+        case ResultConsumerAssignError:
+            return "ResultConsumerAssignError";
     };
     // NOTE : Do not add default case in the switch above. In future if we get 
new cases for
     // ServerError and miss them in the switch above we would like to get 
notified. Adding
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index 11d7543..f5ab15a 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -316,6 +316,9 @@ public abstract class ConsumerBase<T> extends HandlerState 
implements Consumer<T
 
         case Failover:
             return SubType.Failover;
+
+        case Key_Shared:
+            return SubType.Key_Shared;
         }
 
         // Should not happen since we cover all cases above
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/Hash.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/Hash.java
index 31c771a..1b1e4a7 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/Hash.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/Hash.java
@@ -18,7 +18,8 @@
  */
 package org.apache.pulsar.client.impl;
 
-public interface Hash {
+public interface Hash extends org.apache.pulsar.common.util.Hash {
+
     /**
      * Generate the hash of a given String
      *
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/JavaStringHash.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/JavaStringHash.java
index 2f6fc28..e005bae 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/JavaStringHash.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/JavaStringHash.java
@@ -18,6 +18,8 @@
  */
 package org.apache.pulsar.client.impl;
 
+import static java.nio.charset.StandardCharsets.UTF_8;
+
 public class JavaStringHash implements Hash {
     private static final JavaStringHash instance = new JavaStringHash();
 
@@ -31,4 +33,10 @@ public class JavaStringHash implements Hash {
     public int makeHash(String s) {
         return s.hashCode() & Integer.MAX_VALUE;
     }
+
+    @Override
+    public int makeHash(byte[] b) {
+        return makeHash(new String(b, UTF_8));
+    }
+
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
index 30e1474..6a38abb 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
@@ -335,6 +335,18 @@ public class MessageImpl<T> implements Message<T> {
         }
     }
 
+    @Override
+    public boolean hasOrderingKey() {
+        checkNotNull(msgMetadataBuilder);
+        return msgMetadataBuilder.hasOrderingKey();
+    }
+
+    @Override
+    public byte[] getOrderingKey() {
+        checkNotNull(msgMetadataBuilder);
+        return msgMetadataBuilder.getOrderingKey().toByteArray();
+    }
+
     public ClientCnx getCnx() {
         return cnx;
     }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/Murmur3_32Hash.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/Murmur3_32Hash.java
index 80552da..05952fb 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/Murmur3_32Hash.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/Murmur3_32Hash.java
@@ -23,80 +23,23 @@
  */
 package org.apache.pulsar.client.impl;
 
-import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
 import java.nio.charset.StandardCharsets;
 
-import com.google.common.primitives.UnsignedBytes;
-
 public class Murmur3_32Hash implements Hash {
     private static final Murmur3_32Hash instance = new Murmur3_32Hash();
 
-    private static final int CHUNK_SIZE = 4;
-    private static final int C1 = 0xcc9e2d51;
-    private static final int C2 = 0x1b873593;
-    private final int seed;
-
-    private Murmur3_32Hash() {
-        seed = 0;
-    }
-
     public static Hash getInstance() {
         return instance;
     }
 
     @Override
     public int makeHash(String s) {
-        return makeHash(s.getBytes(StandardCharsets.UTF_8)) & 
Integer.MAX_VALUE;
-    }
-
-    private int makeHash(byte[] bytes) {
-        int len = bytes.length;
-        int reminder = len % CHUNK_SIZE;
-        int h1 = seed;
-
-        ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
-        byteBuffer.order(ByteOrder.LITTLE_ENDIAN);
-
-        while (byteBuffer.remaining() >= CHUNK_SIZE) {
-            int k1 = byteBuffer.getInt();
-
-            k1 = mixK1(k1);
-            h1 = mixH1(h1, k1);
-        }
-
-        int k1 = 0;
-        for (int i = 0; i < reminder; i++) {
-            k1 ^= UnsignedBytes.toInt(byteBuffer.get()) << (i * 8);
-        }
-
-        h1 ^= mixK1(k1);
-        h1 ^= len;
-        h1 = fmix(h1);
-
-        return h1;
+        return org.apache.pulsar.common.util.Murmur3_32Hash.getInstance()
+            .makeHash(s.getBytes(StandardCharsets.UTF_8)) & Integer.MAX_VALUE;
     }
 
-    private int fmix(int h) {
-        h ^= h >>> 16;
-        h *= 0x85ebca6b;
-        h ^= h >>> 13;
-        h *= 0xc2b2ae35;
-        h ^= h >>> 16;
-
-        return h;
-    }
-
-    private int mixK1(int k1) {
-        k1 *= C1;
-        k1 = Integer.rotateLeft(k1, 15);
-        k1 *= C2;
-        return k1;
-    }
-
-    private int mixH1(int h1, int k1) {
-        h1 ^= k1;
-        h1 = Integer.rotateLeft(h1, 13);
-        return h1 * 5 + 0xe6546b64;
+    @Override
+    public int makeHash(byte[] b) {
+        return 
org.apache.pulsar.common.util.Murmur3_32Hash.getInstance().makeHash(b) & 
Integer.MAX_VALUE;
     }
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java
index fff104d..723875c 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java
@@ -129,6 +129,16 @@ public class TopicMessageImpl<T> implements Message<T> {
     }
 
     @Override
+    public boolean hasOrderingKey() {
+        return msg.hasOrderingKey();
+    }
+
+    @Override
+    public byte[] getOrderingKey() {
+        return msg.getOrderingKey();
+    }
+
+    @Override
     public T getValue() {
         return msg.getValue();
     }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java
index fb18c0e..9ac887e 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java
@@ -36,6 +36,7 @@ import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.TypedMessageBuilder;
 import org.apache.pulsar.common.api.proto.PulsarApi.KeyValue;
 import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
+import org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString;
 
 public class TypedMessageBuilderImpl<T> implements TypedMessageBuilder<T> {
     private static final ByteBuffer EMPTY_CONTENT = ByteBuffer.allocate(0);
@@ -76,6 +77,12 @@ public class TypedMessageBuilderImpl<T> implements 
TypedMessageBuilder<T> {
     }
 
     @Override
+    public TypedMessageBuilder<T> orderingKey(byte[] orderingKey) {
+        msgMetadataBuilder.setOrderingKey(ByteString.copyFrom(orderingKey));
+        return this;
+    }
+
+    @Override
     public TypedMessageBuilder<T> value(T value) {
         checkArgument(value != null, "Need Non-Null content value");
         this.content = ByteBuffer.wrap(schema.encode(value));
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
index b2419f1..053ad19 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
@@ -76,6 +76,7 @@ public final class PulsarApi {
     ProducerBusy(16, 16),
     InvalidTopicName(17, 17),
     IncompatibleSchema(18, 18),
+    ConsumerAssignError(19, 19),
     ;
     
     public static final int UnknownError_VALUE = 0;
@@ -97,6 +98,7 @@ public final class PulsarApi {
     public static final int ProducerBusy_VALUE = 16;
     public static final int InvalidTopicName_VALUE = 17;
     public static final int IncompatibleSchema_VALUE = 18;
+    public static final int ConsumerAssignError_VALUE = 19;
     
     
     public final int getNumber() { return value; }
@@ -122,6 +124,7 @@ public final class PulsarApi {
         case 16: return ProducerBusy;
         case 17: return InvalidTopicName;
         case 18: return IncompatibleSchema;
+        case 19: return ConsumerAssignError;
         default: return null;
       }
     }
@@ -3085,6 +3088,10 @@ public final class PulsarApi {
     // optional bool partition_key_b64_encoded = 17 [default = false];
     boolean hasPartitionKeyB64Encoded();
     boolean getPartitionKeyB64Encoded();
+    
+    // optional bytes ordering_key = 18;
+    boolean hasOrderingKey();
+    org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString 
getOrderingKey();
   }
   public static final class MessageMetadata extends
       org.apache.pulsar.shaded.com.google.protobuf.v241.GeneratedMessageLite
@@ -3395,6 +3402,16 @@ public final class PulsarApi {
       return partitionKeyB64Encoded_;
     }
     
+    // optional bytes ordering_key = 18;
+    public static final int ORDERING_KEY_FIELD_NUMBER = 18;
+    private org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString 
orderingKey_;
+    public boolean hasOrderingKey() {
+      return ((bitField0_ & 0x00002000) == 0x00002000);
+    }
+    public org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString 
getOrderingKey() {
+      return orderingKey_;
+    }
+    
     private void initFields() {
       producerName_ = "";
       sequenceId_ = 0L;
@@ -3412,6 +3429,7 @@ public final class PulsarApi {
       encryptionParam_ = 
org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString.EMPTY;
       schemaVersion_ = 
org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString.EMPTY;
       partitionKeyB64Encoded_ = false;
+      orderingKey_ = 
org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString.EMPTY;
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -3502,6 +3520,9 @@ public final class PulsarApi {
       if (((bitField0_ & 0x00001000) == 0x00001000)) {
         output.writeBool(17, partitionKeyB64Encoded_);
       }
+      if (((bitField0_ & 0x00002000) == 0x00002000)) {
+        output.writeBytes(18, orderingKey_);
+      }
     }
     
     private int memoizedSerializedSize = -1;
@@ -3579,6 +3600,10 @@ public final class PulsarApi {
         size += 
org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
           .computeBoolSize(17, partitionKeyB64Encoded_);
       }
+      if (((bitField0_ & 0x00002000) == 0x00002000)) {
+        size += 
org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
+          .computeBytesSize(18, orderingKey_);
+      }
       memoizedSerializedSize = size;
       return size;
     }
@@ -3724,6 +3749,8 @@ public final class PulsarApi {
         bitField0_ = (bitField0_ & ~0x00004000);
         partitionKeyB64Encoded_ = false;
         bitField0_ = (bitField0_ & ~0x00008000);
+        orderingKey_ = 
org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString.EMPTY;
+        bitField0_ = (bitField0_ & ~0x00010000);
         return this;
       }
       
@@ -3825,6 +3852,10 @@ public final class PulsarApi {
           to_bitField0_ |= 0x00001000;
         }
         result.partitionKeyB64Encoded_ = partitionKeyB64Encoded_;
+        if (((from_bitField0_ & 0x00010000) == 0x00010000)) {
+          to_bitField0_ |= 0x00002000;
+        }
+        result.orderingKey_ = orderingKey_;
         result.bitField0_ = to_bitField0_;
         return result;
       }
@@ -3900,6 +3931,9 @@ public final class PulsarApi {
         if (other.hasPartitionKeyB64Encoded()) {
           setPartitionKeyB64Encoded(other.getPartitionKeyB64Encoded());
         }
+        if (other.hasOrderingKey()) {
+          setOrderingKey(other.getOrderingKey());
+        }
         return this;
       }
       
@@ -4039,6 +4073,11 @@ public final class PulsarApi {
               partitionKeyB64Encoded_ = input.readBool();
               break;
             }
+            case 146: {
+              bitField0_ |= 0x00010000;
+              orderingKey_ = input.readBytes();
+              break;
+            }
           }
         }
       }
@@ -4621,6 +4660,30 @@ public final class PulsarApi {
         return this;
       }
       
+      // optional bytes ordering_key = 18;
+      private org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString 
orderingKey_ = 
org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString.EMPTY;
+      public boolean hasOrderingKey() {
+        return ((bitField0_ & 0x00010000) == 0x00010000);
+      }
+      public org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString 
getOrderingKey() {
+        return orderingKey_;
+      }
+      public Builder 
setOrderingKey(org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString 
value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  bitField0_ |= 0x00010000;
+        orderingKey_ = value;
+        
+        return this;
+      }
+      public Builder clearOrderingKey() {
+        bitField0_ = (bitField0_ & ~0x00010000);
+        orderingKey_ = getDefaultInstance().getOrderingKey();
+        
+        return this;
+      }
+      
       // @@protoc_insertion_point(builder_scope:pulsar.proto.MessageMetadata)
     }
     
@@ -4660,6 +4723,10 @@ public final class PulsarApi {
     // optional bool partition_key_b64_encoded = 6 [default = false];
     boolean hasPartitionKeyB64Encoded();
     boolean getPartitionKeyB64Encoded();
+    
+    // optional bytes ordering_key = 7;
+    boolean hasOrderingKey();
+    org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString 
getOrderingKey();
   }
   public static final class SingleMessageMetadata extends
       org.apache.pulsar.shaded.com.google.protobuf.v241.GeneratedMessageLite
@@ -4789,6 +4856,16 @@ public final class PulsarApi {
       return partitionKeyB64Encoded_;
     }
     
+    // optional bytes ordering_key = 7;
+    public static final int ORDERING_KEY_FIELD_NUMBER = 7;
+    private org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString 
orderingKey_;
+    public boolean hasOrderingKey() {
+      return ((bitField0_ & 0x00000020) == 0x00000020);
+    }
+    public org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString 
getOrderingKey() {
+      return orderingKey_;
+    }
+    
     private void initFields() {
       properties_ = java.util.Collections.emptyList();
       partitionKey_ = "";
@@ -4796,6 +4873,7 @@ public final class PulsarApi {
       compactedOut_ = false;
       eventTime_ = 0L;
       partitionKeyB64Encoded_ = false;
+      orderingKey_ = 
org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString.EMPTY;
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -4842,6 +4920,9 @@ public final class PulsarApi {
       if (((bitField0_ & 0x00000010) == 0x00000010)) {
         output.writeBool(6, partitionKeyB64Encoded_);
       }
+      if (((bitField0_ & 0x00000020) == 0x00000020)) {
+        output.writeBytes(7, orderingKey_);
+      }
     }
     
     private int memoizedSerializedSize = -1;
@@ -4874,6 +4955,10 @@ public final class PulsarApi {
         size += 
org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
           .computeBoolSize(6, partitionKeyB64Encoded_);
       }
+      if (((bitField0_ & 0x00000020) == 0x00000020)) {
+        size += 
org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
+          .computeBytesSize(7, orderingKey_);
+      }
       memoizedSerializedSize = size;
       return size;
     }
@@ -4999,6 +5084,8 @@ public final class PulsarApi {
         bitField0_ = (bitField0_ & ~0x00000010);
         partitionKeyB64Encoded_ = false;
         bitField0_ = (bitField0_ & ~0x00000020);
+        orderingKey_ = 
org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString.EMPTY;
+        bitField0_ = (bitField0_ & ~0x00000040);
         return this;
       }
       
@@ -5057,6 +5144,10 @@ public final class PulsarApi {
           to_bitField0_ |= 0x00000010;
         }
         result.partitionKeyB64Encoded_ = partitionKeyB64Encoded_;
+        if (((from_bitField0_ & 0x00000040) == 0x00000040)) {
+          to_bitField0_ |= 0x00000020;
+        }
+        result.orderingKey_ = orderingKey_;
         result.bitField0_ = to_bitField0_;
         return result;
       }
@@ -5088,6 +5179,9 @@ public final class PulsarApi {
         if (other.hasPartitionKeyB64Encoded()) {
           setPartitionKeyB64Encoded(other.getPartitionKeyB64Encoded());
         }
+        if (other.hasOrderingKey()) {
+          setOrderingKey(other.getOrderingKey());
+        }
         return this;
       }
       
@@ -5158,6 +5252,11 @@ public final class PulsarApi {
               partitionKeyB64Encoded_ = input.readBool();
               break;
             }
+            case 58: {
+              bitField0_ |= 0x00000040;
+              orderingKey_ = input.readBytes();
+              break;
+            }
           }
         }
       }
@@ -5373,6 +5472,30 @@ public final class PulsarApi {
         return this;
       }
       
+      // optional bytes ordering_key = 7;
+      private org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString 
orderingKey_ = 
org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString.EMPTY;
+      public boolean hasOrderingKey() {
+        return ((bitField0_ & 0x00000040) == 0x00000040);
+      }
+      public org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString 
getOrderingKey() {
+        return orderingKey_;
+      }
+      public Builder 
setOrderingKey(org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString 
value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  bitField0_ |= 0x00000040;
+        orderingKey_ = value;
+        
+        return this;
+      }
+      public Builder clearOrderingKey() {
+        bitField0_ = (bitField0_ & ~0x00000040);
+        orderingKey_ = getDefaultInstance().getOrderingKey();
+        
+        return this;
+      }
+      
       // 
@@protoc_insertion_point(builder_scope:pulsar.proto.SingleMessageMetadata)
     }
     
@@ -8331,11 +8454,13 @@ public final class PulsarApi {
       Exclusive(0, 0),
       Shared(1, 1),
       Failover(2, 2),
+      Key_Shared(3, 3),
       ;
       
       public static final int Exclusive_VALUE = 0;
       public static final int Shared_VALUE = 1;
       public static final int Failover_VALUE = 2;
+      public static final int Key_Shared_VALUE = 3;
       
       
       public final int getNumber() { return value; }
@@ -8345,6 +8470,7 @@ public final class PulsarApi {
           case 0: return Exclusive;
           case 1: return Shared;
           case 2: return Failover;
+          case 3: return Key_Shared;
           default: return null;
         }
       }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/Hash.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/Hash.java
similarity index 82%
copy from pulsar-client/src/main/java/org/apache/pulsar/client/impl/Hash.java
copy to pulsar-common/src/main/java/org/apache/pulsar/common/util/Hash.java
index 31c771a..ac7d0b0 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/Hash.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/Hash.java
@@ -16,13 +16,14 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.client.impl;
+package org.apache.pulsar.common.util;
 
 public interface Hash {
+
     /**
-     * Generate the hash of a given String
+     * Generate the hash of a given byte array
      *
-     * @return The hash of {@param s}, which is non-negative integer.
+     * @return The hash of {@param b}, which is non-negative integer.
      */
-    int makeHash(String s);
+    int makeHash(byte[] b);
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/Murmur3_32Hash.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/Murmur3_32Hash.java
similarity index 91%
copy from 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/Murmur3_32Hash.java
copy to 
pulsar-common/src/main/java/org/apache/pulsar/common/util/Murmur3_32Hash.java
index 80552da..dd0c5ab 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/Murmur3_32Hash.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/Murmur3_32Hash.java
@@ -21,11 +21,10 @@
  * public domain. This source code, implemented by Licht Takeuchi, is based on
  * the orignal MurmurHash3 source code.
  */
-package org.apache.pulsar.client.impl;
+package org.apache.pulsar.common.util;
 
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
-import java.nio.charset.StandardCharsets;
 
 import com.google.common.primitives.UnsignedBytes;
 
@@ -46,11 +45,11 @@ public class Murmur3_32Hash implements Hash {
     }
 
     @Override
-    public int makeHash(String s) {
-        return makeHash(s.getBytes(StandardCharsets.UTF_8)) & 
Integer.MAX_VALUE;
+    public int makeHash(byte[] b) {
+        return makeHash0(b) & Integer.MAX_VALUE;
     }
 
-    private int makeHash(byte[] bytes) {
+    private int makeHash0(byte[] bytes) {
         int len = bytes.length;
         int reminder = len % CHUNK_SIZE;
         int h1 = seed;
diff --git a/pulsar-common/src/main/proto/PulsarApi.proto 
b/pulsar-common/src/main/proto/PulsarApi.proto
index c3e9fa2..d73e601 100644
--- a/pulsar-common/src/main/proto/PulsarApi.proto
+++ b/pulsar-common/src/main/proto/PulsarApi.proto
@@ -110,8 +110,9 @@ message MessageMetadata {
        // Additional parameters required by encryption
        optional bytes encryption_param = 15;
        optional bytes schema_version = 16;
-
-        optional bool partition_key_b64_encoded = 17 [ default = false ];
+       optional bool partition_key_b64_encoded = 17 [ default = false ];
+       // Specific a key to overwrite the message key which used for ordering 
dispatch in Key_Shared mode.
+       optional bytes ordering_key = 18;
 }
 
 
@@ -124,8 +125,9 @@ message SingleMessageMetadata {
        // the timestamp that this event occurs. it is typically set by 
applications.
        // if this field is omitted, `publish_time` can be used for the purpose 
of `event_time`.
        optional uint64 event_time = 5 [default = 0];
-
-        optional bool partition_key_b64_encoded = 6 [ default = false ];
+       optional bool partition_key_b64_encoded = 6 [ default = false ];
+    // Specific a key to overwrite the message key which used for ordering 
dispatch in Key_Shared mode.
+       optional bytes ordering_key = 7;
 }
 
 enum ServerError {
@@ -152,6 +154,7 @@ enum ServerError {
     InvalidTopicName = 17; // The topic name is not valid
 
     IncompatibleSchema = 18; // Specified schema was incompatible with topic 
schema
+       ConsumerAssignError = 19; // Dispatcher assign consumer error
 }
 
 enum AuthMethod {
@@ -180,6 +183,7 @@ enum ProtocolVersion {
                         // Added CommandGetTopicsOfNamespace
        v13 = 13; // Schema-registry : added avro schema format for json
        v14 = 14; // Add CommandAuthChallenge and CommandAuthResponse for 
mutual auth
+                 // Added Key_Shared subscription
 }
 
 message CommandConnect {
@@ -234,6 +238,7 @@ message CommandSubscribe {
                Exclusive = 0;
                Shared    = 1;
                Failover  = 2;
+               Key_Shared = 3;
        }
        required string topic        = 1;
        required string subscription = 2;
diff --git a/site2/docs/assets/pulsar-key-shared-subscriptions.png 
b/site2/docs/assets/pulsar-key-shared-subscriptions.png
new file mode 100644
index 0000000..db02e0f
Binary files /dev/null and 
b/site2/docs/assets/pulsar-key-shared-subscriptions.png differ
diff --git a/site2/docs/assets/pulsar-subscription-modes.png 
b/site2/docs/assets/pulsar-subscription-modes.png
index e8e618b..1412fd7 100644
Binary files a/site2/docs/assets/pulsar-subscription-modes.png and 
b/site2/docs/assets/pulsar-subscription-modes.png differ
diff --git a/site2/docs/concepts-messaging.md b/site2/docs/concepts-messaging.md
index f5ae6cf..9714596 100644
--- a/site2/docs/concepts-messaging.md
+++ b/site2/docs/concepts-messaging.md
@@ -116,6 +116,16 @@ In the diagram above, only **Consumer-A** is allowed to 
consume messages.
 
 ![Exclusive subscriptions](assets/pulsar-exclusive-subscriptions.png)
 
+### Failover
+
+In *failover* mode, multiple consumers can attach to the same subscription. 
The consumers will be lexically sorted by the consumer's name and the first 
consumer will initially be the only one receiving messages. This consumer is 
called the *master consumer*.
+
+When the master consumer disconnects, all (non-acked and subsequent) messages 
will be delivered to the next consumer in line.
+
+In the diagram above, Consumer-C-1 is the master consumer while Consumer-C-2 
would be the next in line to receive messages if Consumer-C-1 disconnected.
+
+![Failover subscriptions](assets/pulsar-failover-subscriptions.png)
+
 ### Shared
 
 In *shared* or *round robin* mode, multiple consumers can attach to the same 
subscription. Messages are delivered in a round robin distribution across 
consumers, and any given message is delivered to only one consumer. When a 
consumer disconnects, all the messages that were sent to it and not 
acknowledged will be rescheduled for sending to the remaining consumers.
@@ -129,15 +139,16 @@ In the diagram above, **Consumer-B-1** and 
**Consumer-B-2** are able to subscrib
 
 ![Shared subscriptions](assets/pulsar-shared-subscriptions.png)
 
-### Failover
-
-In *failover* mode, multiple consumers can attach to the same subscription. 
The consumers will be lexically sorted by the consumer's name and the first 
consumer will initially be the only one receiving messages. This consumer is 
called the *master consumer*.
+### Key_shared
 
-When the master consumer disconnects, all (non-acked and subsequent) messages 
will be delivered to the next consumer in line.
+In *Key_Shared* mode, multiple consumers can attach to the same subscription. 
Messages are delivered in a distribution across consumers and message with same 
key or same ordering key are delivered to only one consumer. No matter how many 
times the message is re-delivered, it is delivered to the same consumer. When a 
consumer connected or disconnected will cause served consumer change for some 
key of message.
 
-In the diagram above, Consumer-C-1 is the master consumer while Consumer-C-2 
would be the next in line to receive messages if Consumer-C-1 disconnected.
+> #### Limitations of Key_Shared mode
+> There are two important things to be aware of when using Key_Shared mode:
+> * You need to specify a key or orderingKey for messages
+> * You cannot use cumulative acknowledgment with Key_Shared mode.
 
-![Failover subscriptions](assets/pulsar-failover-subscriptions.png)
+![Key_Shared subscriptions](assets/pulsar-key-shared-subscriptions.png)
 
 ## Multi-topic subscriptions
 

Reply via email to