This is an automated email from the ASF dual-hosted git repository.
albumenj pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.0 by this push:
new 89b74fe7f1 Revert #8948 (#10233)
89b74fe7f1 is described below
commit 89b74fe7f15117aa1301914c47ae0c87b1ca8d77
Author: Albumen Kevin <[email protected]>
AuthorDate: Thu Jun 30 20:49:56 2022 +0800
Revert #8948 (#10233)
---
.../loadbalance/ConsistentHashLoadBalance.java | 95 ++--------------------
.../loadbalance/ConsistentHashLoadBalanceTest.java | 31 ++++---
2 files changed, 29 insertions(+), 97 deletions(-)
diff --git
a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/loadbalance/ConsistentHashLoadBalance.java
b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/loadbalance/ConsistentHashLoadBalance.java
index db2e1accef..15c7066e06 100644
---
a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/loadbalance/ConsistentHashLoadBalance.java
+++
b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/loadbalance/ConsistentHashLoadBalance.java
@@ -27,7 +27,6 @@ import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicLong;
import static
org.apache.dubbo.common.constants.CommonConstants.COMMA_SPLIT_PATTERN;
@@ -36,22 +35,26 @@ import static
org.apache.dubbo.common.constants.CommonConstants.COMMA_SPLIT_PATT
*/
public class ConsistentHashLoadBalance extends AbstractLoadBalance {
public static final String NAME = "consistenthash";
+
/**
* Hash nodes name
*/
public static final String HASH_NODES = "hash.nodes";
+
/**
* Hash arguments name
*/
public static final String HASH_ARGUMENTS = "hash.arguments";
+
private final ConcurrentMap<String, ConsistentHashSelector<?>> selectors =
new ConcurrentHashMap<String, ConsistentHashSelector<?>>();
+
@SuppressWarnings("unchecked")
@Override
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url,
Invocation invocation) {
String methodName = RpcUtils.getMethodName(invocation);
String key = invokers.get(0).getUrl().getServiceKey() + "." +
methodName;
// using the hashcode of list to compute the hash only pay attention
to the elements in the list
- int invokersHashCode = getCorrespondingHashCode(invokers);
+ int invokersHashCode = invokers.hashCode();
ConsistentHashSelector<T> selector = (ConsistentHashSelector<T>)
selectors.get(key);
if (selector == null || selector.identityHashCode != invokersHashCode)
{
selectors.put(key, new ConsistentHashSelector<T>(invokers,
methodName, invokersHashCode));
@@ -60,47 +63,16 @@ public class ConsistentHashLoadBalance extends
AbstractLoadBalance {
return selector.select(invocation);
}
- /**
- * get hash code of invokers
- * Make this method to public in order to use this method in test case
- * @param invokers
- * @return
- */
- public <T> int getCorrespondingHashCode(List<Invoker<T>> invokers){
- return invokers.hashCode();
- }
-
private static final class ConsistentHashSelector<T> {
private final TreeMap<Long, Invoker<T>> virtualInvokers;
+
private final int replicaNumber;
+
private final int identityHashCode;
private final int[] argumentIndex;
- /**
- * key: server(invoker) address
- * value: count of requests accept by certain server
- */
- private Map<String, AtomicLong> serverRequestCountMap = new
ConcurrentHashMap<>();
-
- /**
- * count of total requests accept by all servers
- */
- private AtomicLong totalRequestCount;
-
- /**
- * count of current servers(invokers)
- */
- private int serverCount;
-
- /**
- * the ratio which allow count of requests accept by each server
- * overrate average (totalRequestCount/serverCount).
- * 1.5 is recommended, in the future we can make this param
configurable
- */
- private static final double OVERLOAD_RATIO_THREAD = 1.5F;
-
ConsistentHashSelector(List<Invoker<T>> invokers, String methodName,
int identityHashCode) {
this.virtualInvokers = new TreeMap<Long, Invoker<T>>();
this.identityHashCode = identityHashCode;
@@ -121,10 +93,6 @@ public class ConsistentHashLoadBalance extends
AbstractLoadBalance {
}
}
}
-
- totalRequestCount = new AtomicLong(0);
- serverCount = invokers.size();
- serverRequestCountMap.clear();
}
public Invoker<T> select(Invocation invocation) {
@@ -132,6 +100,7 @@ public class ConsistentHashLoadBalance extends
AbstractLoadBalance {
byte[] digest = Bytes.getMD5(key);
return selectForKey(hash(digest, 0));
}
+
private String toKey(Object[] args) {
StringBuilder buf = new StringBuilder();
for (int i : argumentIndex) {
@@ -141,61 +110,15 @@ public class ConsistentHashLoadBalance extends
AbstractLoadBalance {
}
return buf.toString();
}
+
private Invoker<T> selectForKey(long hash) {
Map.Entry<Long, Invoker<T>> entry =
virtualInvokers.ceilingEntry(hash);
if (entry == null) {
entry = virtualInvokers.firstEntry();
}
-
- String serverAddress = entry.getValue().getUrl().getAddress();
-
- /**
- * The following part of codes aims to select suitable invoker.
- * This part is not complete thread safety.
- * However, in the scene of consumer-side load balance,
- * thread race for this part of codes
- * (execution time cost for this part of codes without any IO or
- * network operation is very low) will rarely occur. And even in
- * extreme case, a few requests are assigned to an invoker which
- * is above OVERLOAD_RATIO_THREAD will not make a significant
impact
- * on the effect of this new algorithm.
- * And make this part of codes synchronized will reduce efficiency
of
- * every request. In my opinion, this is not worth. So it is not a
- * problem for this part is not complete thread safety.
- */
- double overloadThread = ((double) totalRequestCount.get() /
(double) serverCount) * OVERLOAD_RATIO_THREAD;
- /**
- * Find a valid server node:
- * 1. Not have accept request yet
- * or
- * 2. Not have overloaded (request count already accept < thread
(average request count * overloadRatioAllowed ))
- */
- while (serverRequestCountMap.containsKey(serverAddress)
- && serverRequestCountMap.get(serverAddress).get() >=
overloadThread) {
- /**
- * If server node is not valid, get next node
- */
- entry = getNextInvokerNode(virtualInvokers, entry);
- serverAddress = entry.getValue().getUrl().getAddress();
- }
- if (!serverRequestCountMap.containsKey(serverAddress)) {
- serverRequestCountMap.put(serverAddress, new AtomicLong(1));
- } else {
- serverRequestCountMap.get(serverAddress).incrementAndGet();
- }
- totalRequestCount.incrementAndGet();
-
return entry.getValue();
}
- private Map.Entry<Long, Invoker<T>> getNextInvokerNode(TreeMap<Long,
Invoker<T>> virtualInvokers, Map.Entry<Long, Invoker<T>> entry){
- Map.Entry<Long, Invoker<T>> nextEntry =
virtualInvokers.higherEntry(entry.getKey());
- if(nextEntry == null){
- return virtualInvokers.firstEntry();
- }
- return nextEntry;
- }
-
private long hash(byte[] digest, int number) {
return (((long) (digest[3 + number * 4] & 0xFF) << 24)
| ((long) (digest[2 + number * 4] & 0xFF) << 16)
diff --git
a/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/loadbalance/ConsistentHashLoadBalanceTest.java
b/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/loadbalance/ConsistentHashLoadBalanceTest.java
index 606437bbd9..d9b78afd69 100644
---
a/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/loadbalance/ConsistentHashLoadBalanceTest.java
+++
b/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/loadbalance/ConsistentHashLoadBalanceTest.java
@@ -18,12 +18,14 @@ package org.apache.dubbo.rpc.cluster.loadbalance;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.rpc.Invoker;
+import org.apache.dubbo.rpc.cluster.LoadBalance;
import org.apache.dubbo.rpc.cluster.RouterChain;
import org.apache.dubbo.rpc.cluster.router.state.BitList;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
@@ -34,32 +36,39 @@ public class ConsistentHashLoadBalanceTest extends
LoadBalanceBaseTest {
@Test
public void testConsistentHashLoadBalance() {
int runs = 10000;
+ long unHitedInvokerCount = 0;
+ Map<Invoker, Long> hitedInvokers = new HashMap<>();
Map<Invoker, AtomicLong> counter = getInvokeCounter(runs,
ConsistentHashLoadBalance.NAME);
- double overloadRatioAllowed = 1.5F;
- int serverCount = counter.size();
- double overloadThread = ((double) runs *
overloadRatioAllowed)/((double) serverCount);
- for (Invoker invoker : counter.keySet()) {
- Long count = counter.get(invoker).get();
- Assertions.assertTrue(count < (overloadThread + 1L),
- "count of request accept by each invoker will not be higher
than (overloadRatioAllowed * average + 1)");
+ for (Invoker minvoker : counter.keySet()) {
+ Long count = counter.get(minvoker).get();
+
+ if (count == 0) {
+ unHitedInvokerCount++;
+ } else {
+ hitedInvokers.put(minvoker, count);
+ }
}
+ Assertions.assertEquals(counter.size() - 1,
+ unHitedInvokerCount, "the number of unHitedInvoker should be
counter.size() - 1");
+ Assertions.assertEquals(1, hitedInvokers.size(), "the number of
hitedInvoker should be 1");
+ Assertions.assertEquals(runs,
+ hitedInvokers.values().iterator().next().intValue(), "the number
of hited count should be the number of runs");
}
// https://github.com/apache/dubbo/issues/5429
@Test
void testNormalWhenRouterEnabled() {
- ConsistentHashLoadBalance lb = (ConsistentHashLoadBalance)
getLoadBalance(ConsistentHashLoadBalance.NAME);
+ LoadBalance lb = getLoadBalance(ConsistentHashLoadBalance.NAME);
URL url = invokers.get(0).getUrl();
RouterChain<LoadBalanceBaseTest> routerChain =
RouterChain.buildChain(LoadBalanceBaseTest.class, url);
Invoker<LoadBalanceBaseTest> result = lb.select(invokers, url,
invocation);
- int originalHashCode = lb.getCorrespondingHashCode(invokers);
for (int i = 0; i < 100; i++) {
routerChain.setInvokers(new BitList<>(invokers));
List<Invoker<LoadBalanceBaseTest>> routeInvokers =
routerChain.route(url, new BitList<>(invokers), invocation);
-
- Assertions.assertEquals(originalHashCode,
lb.getCorrespondingHashCode(routeInvokers));
+ Invoker<LoadBalanceBaseTest> finalInvoker =
lb.select(routeInvokers, url, invocation);
+ Assertions.assertEquals(result, finalInvoker);
}
}
}