[
https://issues.apache.org/jira/browse/ROCKETMQ-67?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16011699#comment-16011699
]
ASF GitHub Bot commented on ROCKETMQ-67:
----------------------------------------
Github user dhchao11 commented on a diff in the pull request:
https://github.com/apache/incubator-rocketmq/pull/67#discussion_r116650808
--- Diff:
common/src/main/java/org/apache/rocketmq/common/consistenthash/ConsistentHashRouter.java
---
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.consistenthash;
+
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+/**
+ * To hash Node objects to a hash ring with a certain amount of virtual
node.
+ * Method routeNode will return a Node instance which the object key
should be allocated to according to consistent hash algorithm
+ *
+ * @param <T>
+ */
+public class ConsistentHashRouter<T extends Node> {
+ private final SortedMap<Long, VirtualNode<T>> ring = new TreeMap<>();
+ private final HashFunction hashFunction;
+
+ public ConsistentHashRouter(Collection<T> pNodes, int vNodeCount) {
+ this(pNodes,vNodeCount, new MD5Hash());
+ }
+
+ /**
+ *
+ * @param pNodes collections of physical nodes
+ * @param vNodeCount amounts of virtual nodes
+ * @param hashFunction hash Function to hash Node instances
+ */
+ public ConsistentHashRouter(Collection<T> pNodes, int vNodeCount,
HashFunction hashFunction) {
+ if (hashFunction == null) {
+ throw new NullPointerException("Hash Function is null");
+ }
+ this.hashFunction = hashFunction;
+ if (pNodes != null) {
+ for (T pNode : pNodes) {
+ addNode(pNode, vNodeCount);
+ }
+ }
+ }
+
+ /**
+ * add physic node to the hash ring with some virtual nodes
+ * @param pNode physical node needs added to hash ring
+ * @param vNodeCount the number of virtual node of the physical node.
Value should be greater than or equals to 0
+ */
+ public void addNode(T pNode, int vNodeCount) {
+ if (vNodeCount < 0) throw new IllegalArgumentException("illegal
virtual node counts :" + vNodeCount);
+ int existingReplicas = getExistingReplicas(pNode);
+ for (int i = 0; i < vNodeCount; i++) {
+ VirtualNode<T> vNode = new VirtualNode<>(pNode, i +
existingReplicas);
+ ring.put(hashFunction.hash(vNode.getKey()), vNode);
+ }
+ }
+
+ /**
+ * remove the physical node from the hash ring
+ * @param pNode
+ */
+ public void removeNode(T pNode) {
+ Iterator<Long> it = ring.keySet().iterator();
+ while (it.hasNext()) {
+ Long key = it.next();
+ VirtualNode<T> virtualNode = ring.get(key);
+ if (virtualNode.isVirtualNodeOf(pNode)) {
+ it.remove();
+ }
+ }
+ }
+
+ /**
+ * with a specified key, route the nearest Node instance in the
current hash ring
+ * @param objectKey the object key to find a nearest Node
+ * @return
+ */
+ public T routeNode(String objectKey) {
+ if (ring.isEmpty()) {
+ return null;
+ }
+ Long hashVal = hashFunction.hash(objectKey);
+ SortedMap<Long,VirtualNode<T>> tailMap = ring.tailMap(hashVal);
+ Long nodeHashVal = !tailMap.isEmpty() ? tailMap.firstKey() :
ring.firstKey();
+ return ring.get(nodeHashVal).getPhysicalNode();
+ }
+
+
+ public int getExistingReplicas(T pNode) {
+ int replicas = 0;
+ for (VirtualNode<T> vNode : ring.values()) {
+ if (vNode.isVirtualNodeOf(pNode)) {
+ replicas++;
+ }
+ }
+ return replicas;
+ }
+
+
+ //default hash function
+ private static class MD5Hash implements HashFunction {
+ MessageDigest instance;
+
+ public MD5Hash() {
+ try {
+ instance = MessageDigest.getInstance("MD5");
+ } catch (NoSuchAlgorithmException e) {
+ }
+ }
+
+ @Override
+ public long hash(String key) {
+ instance.reset();
+ instance.update(key.getBytes());
+ byte[] digest = instance.digest();
+
+ long h = 0;
+ for (int i = 0; i < 4; i++) {
+ h <<= 8;
+ h |= ((int) digest[i]) & 0xFF;
+ }
--- End diff --
digest is 128 bits, but the generated h only use 32 bits, other 96 bits are
ignored, will it be better use ^ method
> Consistent Hash allocate strategy support
> -----------------------------------------
>
> Key: ROCKETMQ-67
> URL: https://issues.apache.org/jira/browse/ROCKETMQ-67
> Project: Apache RocketMQ
> Issue Type: New Feature
> Components: rocketmq-client
> Reporter: Jaskey Lam
> Assignee: Jaskey Lam
> Fix For: 4.1.0-incubating
>
>
> For now, the average allocate strategy is very sensitive when clients
> register and unrigister.
> A Consistent Hash allocate strategy option is valueable for the developers
> who care more about latency stabilization and messages duplication.
> Intentions:
> The default AllocateMessageQueueStrategy is averaging strategy which allocate
> queue to consumer as evenly as possible. Whenever queues numbers or consumer
> numbers changed, say a new consumer starts or an old consumer shutdowns, a
> rehashing will be triggered then almost all consumer suffered from this that
> they will rebalance to drop old queues and get new queues.
> And that will cause
> message latency from producer to consumer increases at the moment when
> consumer/queue numbers change, even when they scale up.
> messages will be duplicated significantly since the offset may not be
> persisted to broker and that queue is assigned to another consumer to pull
> messages from.
> This is especially significant when they have tens of consumer instances and
> scale-up or deployment is often.
> Consistent Hash strategy to allocate queue is a good choice for these users.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)