This is an automated email from the ASF dual-hosted git repository.

albumenj pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo.git


The following commit(s) were added to refs/heads/3.0 by this push:
     new 89b74fe7f1 Revert #8948 (#10233)
89b74fe7f1 is described below

commit 89b74fe7f15117aa1301914c47ae0c87b1ca8d77
Author: Albumen Kevin <[email protected]>
AuthorDate: Thu Jun 30 20:49:56 2022 +0800

    Revert #8948 (#10233)
---
 .../loadbalance/ConsistentHashLoadBalance.java     | 95 ++--------------------
 .../loadbalance/ConsistentHashLoadBalanceTest.java | 31 ++++---
 2 files changed, 29 insertions(+), 97 deletions(-)

diff --git 
a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/loadbalance/ConsistentHashLoadBalance.java
 
b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/loadbalance/ConsistentHashLoadBalance.java
index db2e1accef..15c7066e06 100644
--- 
a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/loadbalance/ConsistentHashLoadBalance.java
+++ 
b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/loadbalance/ConsistentHashLoadBalance.java
@@ -27,7 +27,6 @@ import java.util.Map;
 import java.util.TreeMap;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicLong;
 
 import static 
org.apache.dubbo.common.constants.CommonConstants.COMMA_SPLIT_PATTERN;
 
@@ -36,22 +35,26 @@ import static 
org.apache.dubbo.common.constants.CommonConstants.COMMA_SPLIT_PATT
  */
 public class ConsistentHashLoadBalance extends AbstractLoadBalance {
     public static final String NAME = "consistenthash";
+
     /**
      * Hash nodes name
      */
     public static final String HASH_NODES = "hash.nodes";
+
     /**
      * Hash arguments name
      */
     public static final String HASH_ARGUMENTS = "hash.arguments";
+
     private final ConcurrentMap<String, ConsistentHashSelector<?>> selectors = 
new ConcurrentHashMap<String, ConsistentHashSelector<?>>();
+
     @SuppressWarnings("unchecked")
     @Override
     protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, 
Invocation invocation) {
         String methodName = RpcUtils.getMethodName(invocation);
         String key = invokers.get(0).getUrl().getServiceKey() + "." + 
methodName;
         // using the hashcode of list to compute the hash only pay attention 
to the elements in the list
-        int invokersHashCode = getCorrespondingHashCode(invokers);
+        int invokersHashCode = invokers.hashCode();
         ConsistentHashSelector<T> selector = (ConsistentHashSelector<T>) 
selectors.get(key);
         if (selector == null || selector.identityHashCode != invokersHashCode) 
{
             selectors.put(key, new ConsistentHashSelector<T>(invokers, 
methodName, invokersHashCode));
@@ -60,47 +63,16 @@ public class ConsistentHashLoadBalance extends 
AbstractLoadBalance {
         return selector.select(invocation);
     }
 
-    /**
-     * get hash code of invokers
-     * Make this method to public in order to use this method in test case
-     * @param invokers
-     * @return
-     */
-    public <T> int getCorrespondingHashCode(List<Invoker<T>> invokers){
-        return invokers.hashCode();
-    }
-
     private static final class ConsistentHashSelector<T> {
 
         private final TreeMap<Long, Invoker<T>> virtualInvokers;
+
         private final int replicaNumber;
+
         private final int identityHashCode;
 
         private final int[] argumentIndex;
 
-        /**
-         * key: server(invoker) address
-         * value: count of requests accept by certain server
-         */
-        private Map<String, AtomicLong> serverRequestCountMap = new 
ConcurrentHashMap<>();
-
-        /**
-         * count of total requests accept by all servers
-         */
-        private AtomicLong totalRequestCount;
-
-        /**
-         * count of current servers(invokers)
-         */
-        private int serverCount;
-
-        /**
-         * the ratio which allow count of requests accept by each server
-         * overrate average (totalRequestCount/serverCount).
-         * 1.5 is recommended, in the future we can make this param 
configurable
-         */
-        private static final double OVERLOAD_RATIO_THREAD = 1.5F;
-
         ConsistentHashSelector(List<Invoker<T>> invokers, String methodName, 
int identityHashCode) {
             this.virtualInvokers = new TreeMap<Long, Invoker<T>>();
             this.identityHashCode = identityHashCode;
@@ -121,10 +93,6 @@ public class ConsistentHashLoadBalance extends 
AbstractLoadBalance {
                     }
                 }
             }
-
-            totalRequestCount = new AtomicLong(0);
-            serverCount = invokers.size();
-            serverRequestCountMap.clear();
         }
 
         public Invoker<T> select(Invocation invocation) {
@@ -132,6 +100,7 @@ public class ConsistentHashLoadBalance extends 
AbstractLoadBalance {
             byte[] digest = Bytes.getMD5(key);
             return selectForKey(hash(digest, 0));
         }
+
         private String toKey(Object[] args) {
             StringBuilder buf = new StringBuilder();
             for (int i : argumentIndex) {
@@ -141,61 +110,15 @@ public class ConsistentHashLoadBalance extends 
AbstractLoadBalance {
             }
             return buf.toString();
         }
+
         private Invoker<T> selectForKey(long hash) {
             Map.Entry<Long, Invoker<T>> entry = 
virtualInvokers.ceilingEntry(hash);
             if (entry == null) {
                 entry = virtualInvokers.firstEntry();
             }
-
-            String serverAddress = entry.getValue().getUrl().getAddress();
-
-            /**
-             * The following part of codes aims to select suitable invoker.
-             * This part is not complete thread safety.
-             * However, in the scene of consumer-side load balance,
-             * thread race for this part of codes
-             * (execution time cost for this part of codes without any IO or
-             * network operation is very low) will rarely occur. And even in
-             * extreme case, a few requests are assigned to an invoker which
-             * is above OVERLOAD_RATIO_THREAD will not make a significant 
impact
-             * on the effect of this new algorithm.
-             * And make this part of codes synchronized will reduce efficiency 
of
-             * every request. In my opinion, this is not worth. So it is not a
-             * problem for this part is not complete thread safety.
-             */
-            double overloadThread = ((double) totalRequestCount.get() / 
(double) serverCount) * OVERLOAD_RATIO_THREAD;
-            /**
-             * Find a valid server node:
-             * 1. Not have accept request yet
-             * or
-             * 2. Not have overloaded (request count already accept < thread 
(average request count * overloadRatioAllowed ))
-             */
-            while (serverRequestCountMap.containsKey(serverAddress)
-                && serverRequestCountMap.get(serverAddress).get() >= 
overloadThread) {
-                /**
-                 * If server node is not valid, get next node
-                 */
-                entry = getNextInvokerNode(virtualInvokers, entry);
-                serverAddress = entry.getValue().getUrl().getAddress();
-            }
-            if (!serverRequestCountMap.containsKey(serverAddress)) {
-                serverRequestCountMap.put(serverAddress, new AtomicLong(1));
-            } else {
-                serverRequestCountMap.get(serverAddress).incrementAndGet();
-            }
-            totalRequestCount.incrementAndGet();
-
             return entry.getValue();
         }
 
-        private Map.Entry<Long, Invoker<T>> getNextInvokerNode(TreeMap<Long, 
Invoker<T>> virtualInvokers, Map.Entry<Long, Invoker<T>> entry){
-            Map.Entry<Long, Invoker<T>> nextEntry = 
virtualInvokers.higherEntry(entry.getKey());
-            if(nextEntry == null){
-                return virtualInvokers.firstEntry();
-            }
-            return nextEntry;
-        }
-
         private long hash(byte[] digest, int number) {
             return (((long) (digest[3 + number * 4] & 0xFF) << 24)
                     | ((long) (digest[2 + number * 4] & 0xFF) << 16)
diff --git 
a/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/loadbalance/ConsistentHashLoadBalanceTest.java
 
b/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/loadbalance/ConsistentHashLoadBalanceTest.java
index 606437bbd9..d9b78afd69 100644
--- 
a/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/loadbalance/ConsistentHashLoadBalanceTest.java
+++ 
b/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/loadbalance/ConsistentHashLoadBalanceTest.java
@@ -18,12 +18,14 @@ package org.apache.dubbo.rpc.cluster.loadbalance;
 
 import org.apache.dubbo.common.URL;
 import org.apache.dubbo.rpc.Invoker;
+import org.apache.dubbo.rpc.cluster.LoadBalance;
 import org.apache.dubbo.rpc.cluster.RouterChain;
 import org.apache.dubbo.rpc.cluster.router.state.BitList;
 
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicLong;
@@ -34,32 +36,39 @@ public class ConsistentHashLoadBalanceTest extends 
LoadBalanceBaseTest {
     @Test
     public void testConsistentHashLoadBalance() {
         int runs = 10000;
+        long unHitedInvokerCount = 0;
+        Map<Invoker, Long> hitedInvokers = new HashMap<>();
         Map<Invoker, AtomicLong> counter = getInvokeCounter(runs, 
ConsistentHashLoadBalance.NAME);
-        double overloadRatioAllowed = 1.5F;
-        int serverCount = counter.size();
-        double overloadThread = ((double) runs * 
overloadRatioAllowed)/((double) serverCount);
-        for (Invoker invoker : counter.keySet()) {
-            Long count = counter.get(invoker).get();
-            Assertions.assertTrue(count < (overloadThread + 1L),
-                "count of request accept by each invoker will not be higher 
than (overloadRatioAllowed * average + 1)");
+        for (Invoker minvoker : counter.keySet()) {
+            Long count = counter.get(minvoker).get();
+
+            if (count == 0) {
+                unHitedInvokerCount++;
+            } else {
+                hitedInvokers.put(minvoker, count);
+            }
         }
 
+        Assertions.assertEquals(counter.size() - 1,
+            unHitedInvokerCount, "the number of unHitedInvoker should be 
counter.size() - 1");
+        Assertions.assertEquals(1, hitedInvokers.size(), "the number of 
hitedInvoker should be 1");
+        Assertions.assertEquals(runs,
+            hitedInvokers.values().iterator().next().intValue(), "the number 
of hited count should be the number of runs");
     }
 
     // https://github.com/apache/dubbo/issues/5429
     @Test
     void testNormalWhenRouterEnabled() {
-        ConsistentHashLoadBalance lb = (ConsistentHashLoadBalance) 
getLoadBalance(ConsistentHashLoadBalance.NAME);
+        LoadBalance lb = getLoadBalance(ConsistentHashLoadBalance.NAME);
         URL url = invokers.get(0).getUrl();
         RouterChain<LoadBalanceBaseTest> routerChain = 
RouterChain.buildChain(LoadBalanceBaseTest.class, url);
         Invoker<LoadBalanceBaseTest> result = lb.select(invokers, url, 
invocation);
-        int originalHashCode = lb.getCorrespondingHashCode(invokers);
 
         for (int i = 0; i < 100; i++) {
             routerChain.setInvokers(new BitList<>(invokers));
             List<Invoker<LoadBalanceBaseTest>> routeInvokers = 
routerChain.route(url, new BitList<>(invokers), invocation);
-
-            Assertions.assertEquals(originalHashCode, 
lb.getCorrespondingHashCode(routeInvokers));
+            Invoker<LoadBalanceBaseTest> finalInvoker = 
lb.select(routeInvokers, url, invocation);
+            Assertions.assertEquals(result, finalInvoker);
         }
     }
 }

Reply via email to