This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-1.2-lts
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 800a4263e56fa966339e5dae274c247c565ad854
Author: caiconghui <[email protected]>
AuthorDate: Fri Jan 20 16:24:49 2023 +0800

    [enhancement](query) Make query scan nodes more evenly distributed (#16037)
    
    Add replicaNumPerHost into consideration while schedule scan node to host 
to make final query scan nodes more evenly distributed in cluster
---
 .../main/java/org/apache/doris/qe/Coordinator.java | 130 ++++++++++++---------
 .../java/org/apache/doris/qe/SimpleScheduler.java  |   6 +-
 .../java/org/apache/doris/qe/CoordinatorTest.java  | 118 ++++++++++++-------
 3 files changed, 155 insertions(+), 99 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 40b085505d..a646cb06bd 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -1332,8 +1332,7 @@ public class Coordinator {
             if (params.instanceExecParams.isEmpty()) {
                 Reference<Long> backendIdRef = new Reference<Long>();
                 TNetworkAddress execHostport;
-                if (ConnectContext.get() != null
-                        && !ConnectContext.get().isResourceTagsSet()
+                if (ConnectContext.get() != null && 
ConnectContext.get().isResourceTagsSet()
                         && !addressToBackendID.isEmpty()) {
                     // In this case, we only use the BE where the replica 
selected by the tag is located to
                     // execute this query. Otherwise, except for the scan 
node, the rest of the execution nodes
@@ -1419,7 +1418,7 @@ public class Coordinator {
         return Pair.of(fatherPlan, newPlan);
     }
 
-    private <K, V> V findOrInsert(HashMap<K, V> m, final K key, final V 
defaultVal) {
+    private <K, V> V findOrInsert(Map<K, V> m, final K key, final V 
defaultVal) {
         V value = m.get(key);
         if (value == null) {
             m.put(key, defaultVal);
@@ -1511,10 +1510,34 @@ public class Coordinator {
         }
     }
 
+    private Map<TNetworkAddress, Long> getReplicaNumPerHost() {
+        Map<TNetworkAddress, Long> replicaNumPerHost = Maps.newHashMap();
+        for (ScanNode scanNode : scanNodes) {
+            List<TScanRangeLocations> locationsList = 
scanNode.getScanRangeLocations(0);
+            if (locationsList == null) {
+                // only analysis olap scan node
+                continue;
+            }
+            for (TScanRangeLocations locations : locationsList) {
+                for (TScanRangeLocation location : locations.locations) {
+                    if (replicaNumPerHost.containsKey(location.server)) {
+                        replicaNumPerHost.put(location.server, 
replicaNumPerHost.get(location.server) + 1L);
+                    } else {
+                        replicaNumPerHost.put(location.server, 1L);
+                    }
+                }
+
+            }
+        }
+        return replicaNumPerHost;
+    }
+
     // Populates scan_range_assignment_.
     // <fragment, <server, nodeId>>
     private void computeScanRangeAssignment() throws Exception {
-        HashMap<TNetworkAddress, Long> assignedBytesPerHost = 
Maps.newHashMap();
+        Map<TNetworkAddress, Long> assignedBytesPerHost = Maps.newHashMap();
+        Map<TNetworkAddress, Long> replicaNumPerHost = getReplicaNumPerHost();
+        Collections.shuffle(scanNodes);
         // set scan ranges/locations for scan nodes
         for (ScanNode scanNode : scanNodes) {
             // the parameters of getScanRangeLocations may ignore, It doesn't 
take effect
@@ -1523,7 +1546,7 @@ public class Coordinator {
                 // only analysis olap scan node
                 continue;
             }
-
+            Collections.shuffle(locations);
             Set<Integer> scanNodeIds = 
fragmentIdToScanNodeIds.computeIfAbsent(scanNode.getFragmentId(),
                     k -> Sets.newHashSet());
             scanNodeIds.add(scanNode.getId().asInt());
@@ -1538,35 +1561,35 @@ public class Coordinator {
             // A fragment may contain both colocate join and bucket shuffle 
join
             // on need both compute scanRange to init basic data for query 
coordinator
             if (fragmentContainsColocateJoin) {
-                computeScanRangeAssignmentByColocate((OlapScanNode) scanNode);
+                computeScanRangeAssignmentByColocate((OlapScanNode) scanNode, 
assignedBytesPerHost, replicaNumPerHost);
             }
             if (fragmentContainsBucketShuffleJoin) {
                 
bucketShuffleJoinController.computeScanRangeAssignmentByBucket((OlapScanNode) 
scanNode,
-                        idToBackend, addressToBackendID);
+                        idToBackend, addressToBackendID, replicaNumPerHost);
             }
             if (!(fragmentContainsColocateJoin || 
fragmentContainsBucketShuffleJoin)) {
-                computeScanRangeAssignmentByScheduler(scanNode, locations, 
assignment, assignedBytesPerHost);
+                computeScanRangeAssignmentByScheduler(scanNode, locations, 
assignment, assignedBytesPerHost,
+                        replicaNumPerHost);
             }
         }
     }
 
     // To ensure the same bucketSeq tablet to the same execHostPort
     private void computeScanRangeAssignmentByColocate(
-            final OlapScanNode scanNode) throws Exception {
+            final OlapScanNode scanNode, Map<TNetworkAddress, Long> 
assignedBytesPerHost,
+            Map<TNetworkAddress, Long> replicaNumPerHost) throws Exception {
         if 
(!fragmentIdToSeqToAddressMap.containsKey(scanNode.getFragmentId())) {
             fragmentIdToSeqToAddressMap.put(scanNode.getFragmentId(), new 
HashMap<>());
             fragmentIdTobucketSeqToScanRangeMap.put(scanNode.getFragmentId(), 
new BucketSeqToScanRange());
         }
         Map<Integer, TNetworkAddress> bucketSeqToAddress = 
fragmentIdToSeqToAddressMap.get(scanNode.getFragmentId());
         BucketSeqToScanRange bucketSeqToScanRange = 
fragmentIdTobucketSeqToScanRangeMap.get(scanNode.getFragmentId());
-
-        HashMap<TNetworkAddress, Long> assignedBytesPerHost = 
Maps.newHashMap();
         for (Integer bucketSeq : scanNode.bucketSeq2locations.keySet()) {
             //fill scanRangeParamsList
             List<TScanRangeLocations> locations = 
scanNode.bucketSeq2locations.get(bucketSeq);
             if (!bucketSeqToAddress.containsKey(bucketSeq)) {
                 getExecHostPortForFragmentIDAndBucketSeq(locations.get(0),
-                        scanNode.getFragmentId(), bucketSeq, 
assignedBytesPerHost);
+                        scanNode.getFragmentId(), bucketSeq, 
assignedBytesPerHost, replicaNumPerHost);
             }
 
             for (TScanRangeLocations location : locations) {
@@ -1586,10 +1609,11 @@ public class Coordinator {
 
     //ensure bucket sequence distribued to every host evenly
     private void getExecHostPortForFragmentIDAndBucketSeq(TScanRangeLocations 
seqLocation,
-            PlanFragmentId fragmentId, Integer bucketSeq, 
HashMap<TNetworkAddress, Long> assignedBytesPerHost)
+            PlanFragmentId fragmentId, Integer bucketSeq, Map<TNetworkAddress, 
Long> assignedBytesPerHost,
+            Map<TNetworkAddress, Long> replicaNumPerHost)
             throws Exception {
         Reference<Long> backendIdRef = new Reference<Long>();
-        selectBackendsByRoundRobin(seqLocation, assignedBytesPerHost, 
backendIdRef);
+        selectBackendsByRoundRobin(seqLocation, assignedBytesPerHost, 
replicaNumPerHost, backendIdRef);
         Backend backend = this.idToBackend.get(backendIdRef.getRef());
         TNetworkAddress execHostPort = new TNetworkAddress(backend.getHost(), 
backend.getBePort());
         this.addressToBackendID.put(execHostPort, backendIdRef.getRef());
@@ -1597,10 +1621,12 @@ public class Coordinator {
     }
 
     public TScanRangeLocation selectBackendsByRoundRobin(TScanRangeLocations 
seqLocation,
-                                                         
HashMap<TNetworkAddress, Long> assignedBytesPerHost,
+                                                         Map<TNetworkAddress, 
Long> assignedBytesPerHost,
+                                                         Map<TNetworkAddress, 
Long> replicaNumPerHost,
                                                          Reference<Long> 
backendIdRef) throws UserException {
         if (!Config.enable_local_replica_selection) {
-            return selectBackendsByRoundRobin(seqLocation.getLocations(), 
assignedBytesPerHost, backendIdRef);
+            return selectBackendsByRoundRobin(seqLocation.getLocations(), 
assignedBytesPerHost, replicaNumPerHost,
+                    backendIdRef);
         }
 
         List<TScanRangeLocation> localLocations = new ArrayList<>();
@@ -1615,35 +1641,38 @@ public class Coordinator {
         }
 
         try {
-            return selectBackendsByRoundRobin(localLocations, 
assignedBytesPerHost, backendIdRef);
+            return selectBackendsByRoundRobin(localLocations, 
assignedBytesPerHost, replicaNumPerHost, backendIdRef);
         } catch (UserException ue) {
             if (!Config.enable_local_replica_selection_fallback) {
                 throw ue;
             }
-            return selectBackendsByRoundRobin(nonlocalLocations, 
assignedBytesPerHost, backendIdRef);
+            return selectBackendsByRoundRobin(nonlocalLocations, 
assignedBytesPerHost, replicaNumPerHost, backendIdRef);
         }
     }
 
     public TScanRangeLocation 
selectBackendsByRoundRobin(List<TScanRangeLocation> locations,
-            HashMap<TNetworkAddress, Long> assignedBytesPerHost, 
Reference<Long> backendIdRef) throws UserException {
+            Map<TNetworkAddress, Long> assignedBytesPerHost, 
Map<TNetworkAddress, Long> replicaNumPerHost,
+            Reference<Long> backendIdRef) throws UserException {
         Long minAssignedBytes = Long.MAX_VALUE;
+        Long minReplicaNum = Long.MAX_VALUE;
         TScanRangeLocation minLocation = null;
         Long step = 1L;
         for (final TScanRangeLocation location : locations) {
             Long assignedBytes = findOrInsert(assignedBytesPerHost, 
location.server, 0L);
-            if (assignedBytes < minAssignedBytes) {
+            if (assignedBytes < minAssignedBytes || 
(assignedBytes.equals(minAssignedBytes)
+                    && replicaNumPerHost.get(location.server) < 
minReplicaNum)) {
                 minAssignedBytes = assignedBytes;
+                minReplicaNum = replicaNumPerHost.get(location.server);
                 minLocation = location;
             }
         }
+        for (TScanRangeLocation location : locations) {
+            replicaNumPerHost.put(location.server, 
replicaNumPerHost.get(location.server) - 1);
+        }
         TScanRangeLocation location = SimpleScheduler.getLocation(minLocation, 
locations,
                 this.idToBackend, backendIdRef);
-        if (assignedBytesPerHost.containsKey(location.server)) {
-            assignedBytesPerHost.put(location.server,
-                    assignedBytesPerHost.get(location.server) + step);
-        } else {
-            assignedBytesPerHost.put(location.server, step);
-        }
+        assignedBytesPerHost.put(location.server, 
assignedBytesPerHost.get(location.server) + step);
+
         return location;
     }
 
@@ -1651,11 +1680,12 @@ public class Coordinator {
             final ScanNode scanNode,
             final List<TScanRangeLocations> locations,
             FragmentScanRangeAssignment assignment,
-            HashMap<TNetworkAddress, Long> assignedBytesPerHost) throws 
Exception {
+            Map<TNetworkAddress, Long> assignedBytesPerHost,
+            Map<TNetworkAddress, Long> replicaNumPerHost) throws Exception {
         for (TScanRangeLocations scanRangeLocations : locations) {
             Reference<Long> backendIdRef = new Reference<Long>();
             TScanRangeLocation minLocation = 
selectBackendsByRoundRobin(scanRangeLocations,
-                    assignedBytesPerHost, backendIdRef);
+                    assignedBytesPerHost, replicaNumPerHost, backendIdRef);
             Backend backend = this.idToBackend.get(backendIdRef.getRef());
             TNetworkAddress execHostPort = new 
TNetworkAddress(backend.getHost(), backend.getBePort());
             this.addressToBackendID.put(execHostPort, backendIdRef.getRef());
@@ -1892,40 +1922,35 @@ public class Coordinator {
         // make sure each host have average bucket to scan
         private void 
getExecHostPortForFragmentIDAndBucketSeq(TScanRangeLocations seqLocation,
                 PlanFragmentId fragmentId, Integer bucketSeq, 
ImmutableMap<Long, Backend> idToBackend,
-                Map<TNetworkAddress, Long> addressToBackendID) throws 
Exception {
+                Map<TNetworkAddress, Long> addressToBackendID,
+                Map<TNetworkAddress, Long> replicaNumPerHost) throws Exception 
{
             Map<Long, Integer> buckendIdToBucketCountMap = 
fragmentIdToBuckendIdBucketCountMap.get(fragmentId);
             int maxBucketNum = Integer.MAX_VALUE;
             long buckendId = Long.MAX_VALUE;
+            Long minReplicaNum = Long.MAX_VALUE;
             for (TScanRangeLocation location : seqLocation.locations) {
-                if 
(buckendIdToBucketCountMap.containsKey(location.backend_id)) {
-                    if (buckendIdToBucketCountMap.get(location.backend_id) < 
maxBucketNum) {
-                        maxBucketNum = 
buckendIdToBucketCountMap.get(location.backend_id);
-                        buckendId = location.backend_id;
-                    }
-                } else {
-                    maxBucketNum = 0;
+                if 
(buckendIdToBucketCountMap.getOrDefault(location.backend_id, 0) < maxBucketNum) 
{
+                    maxBucketNum = 
buckendIdToBucketCountMap.getOrDefault(location.backend_id, 0);
                     buckendId = location.backend_id;
-                    buckendIdToBucketCountMap.put(buckendId, 0);
-                    break;
+                    minReplicaNum = replicaNumPerHost.get(location.server);
+                } else if 
(buckendIdToBucketCountMap.getOrDefault(location.backend_id, 0) == maxBucketNum
+                        && replicaNumPerHost.get(location.server) < 
minReplicaNum) {
+                    buckendId = location.backend_id;
+                    minReplicaNum = replicaNumPerHost.get(location.server);
                 }
             }
-            Reference<Long> backendIdRef = new Reference<Long>();
+            Reference<Long> backendIdRef = new Reference<>();
             TNetworkAddress execHostPort = SimpleScheduler.getHost(buckendId,
                     seqLocation.locations, idToBackend, backendIdRef);
-            if (execHostPort == null) {
-                throw new 
UserException(SystemInfoService.NO_SCAN_NODE_BACKEND_AVAILABLE_MSG);
-            }
             //the backend with buckendId is not alive, chose another new 
backend
             if (backendIdRef.getRef() != buckendId) {
-                //buckendIdToBucketCountMap does not contain the new backend, 
insert into it
-                if 
(!buckendIdToBucketCountMap.containsKey(backendIdRef.getRef())) {
-                    buckendIdToBucketCountMap.put(backendIdRef.getRef(), 1);
-                } else { //buckendIdToBucketCountMap contains the new backend, 
update it
-                    buckendIdToBucketCountMap.put(backendIdRef.getRef(),
-                            
buckendIdToBucketCountMap.get(backendIdRef.getRef()) + 1);
-                }
+                buckendIdToBucketCountMap.put(backendIdRef.getRef(),
+                        
buckendIdToBucketCountMap.getOrDefault(backendIdRef.getRef(), 0) + 1);
             } else { //the backend with buckendId is alive, update 
buckendIdToBucketCountMap directly
-                buckendIdToBucketCountMap.put(buckendId, 
buckendIdToBucketCountMap.get(buckendId) + 1);
+                buckendIdToBucketCountMap.put(buckendId, 
buckendIdToBucketCountMap.getOrDefault(buckendId, 0) + 1);
+            }
+            for (TScanRangeLocation location : seqLocation.locations) {
+                replicaNumPerHost.put(location.server, 
replicaNumPerHost.get(location.server) - 1);
             }
             addressToBackendID.put(execHostPort, backendIdRef.getRef());
             this.fragmentIdToSeqToAddressMap.get(fragmentId).put(bucketSeq, 
execHostPort);
@@ -1934,7 +1959,8 @@ public class Coordinator {
         // to ensure the same bucketSeq tablet to the same execHostPort
         private void computeScanRangeAssignmentByBucket(
                 final OlapScanNode scanNode, ImmutableMap<Long, Backend> 
idToBackend,
-                Map<TNetworkAddress, Long> addressToBackendID) throws 
Exception {
+                Map<TNetworkAddress, Long> addressToBackendID,
+                Map<TNetworkAddress, Long> replicaNumPerHost) throws Exception 
{
             if 
(!fragmentIdToSeqToAddressMap.containsKey(scanNode.getFragmentId())) {
                 // In bucket shuffle join, we have 2 situation.
                 // 1. Only one partition: in this case, we use 
scanNode.getTotalTabletsNum() to get the right bucket num
@@ -1962,7 +1988,7 @@ public class Coordinator {
                 List<TScanRangeLocations> locations = 
scanNode.bucketSeq2locations.get(bucketSeq);
                 if (!bucketSeqToAddress.containsKey(bucketSeq)) {
                     getExecHostPortForFragmentIDAndBucketSeq(locations.get(0), 
scanNode.getFragmentId(),
-                            bucketSeq, idToBackend, addressToBackendID);
+                            bucketSeq, idToBackend, addressToBackendID, 
replicaNumPerHost);
                 }
 
                 for (TScanRangeLocations location : locations) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SimpleScheduler.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/SimpleScheduler.java
index 69f4408848..b0c4d70ca7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SimpleScheduler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SimpleScheduler.java
@@ -64,7 +64,7 @@ public class SimpleScheduler {
                                           Reference<Long> backendIdRef)
             throws UserException {
         if (CollectionUtils.isEmpty(locations) || backends == null || 
backends.isEmpty()) {
-            throw new UserException("scan range location or candidate backends 
is empty");
+            throw new 
UserException(SystemInfoService.NO_SCAN_NODE_BACKEND_AVAILABLE_MSG);
         }
         LOG.debug("getHost backendID={}, backendSize={}", backendId, 
backends.size());
         Backend backend = backends.get(backendId);
@@ -134,8 +134,8 @@ public class SimpleScheduler {
         Map.Entry<Long, Backend> backendEntry = 
backends.entrySet().stream().skip(id).filter(
                 e -> isAvailable(e.getValue())).findFirst().orElse(null);
         if (backendEntry == null && id > 0) {
-            backendEntry = backends.entrySet().stream().filter(
-                e -> 
isAvailable(e.getValue())).limit(id).findFirst().orElse(null);
+            backendEntry = backends.entrySet().stream().limit(id).filter(
+                e -> isAvailable(e.getValue())).findFirst().orElse(null);
         }
         if (backendEntry != null) {
             Backend backend = backendEntry.getValue();
diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java
index c6cc28bcb7..42fa87e28a 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java
@@ -50,6 +50,7 @@ import org.apache.doris.thrift.TUniqueId;
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
 import mockit.Mocked;
 import org.apache.commons.collections.map.HashedMap;
 import org.junit.Assert;
@@ -201,6 +202,11 @@ public class CoordinatorTest extends Coordinator {
 
     @Test
     public void testComputeScanRangeAssignmentByBucketq()  {
+        // init all be network address
+        TNetworkAddress be0 = new TNetworkAddress("0.0.0.0", 1000);
+        TNetworkAddress be1 = new TNetworkAddress("0.0.0.1", 2000);
+        TNetworkAddress be2 = new TNetworkAddress("0.0.0.2", 3000);
+
         PlanFragmentId planFragmentId = new PlanFragmentId(1);
         int scanNodeId = 1;
         Map<PlanFragmentId, Set<Integer>> fragmentIdToScanNodeIds = new 
HashMap<>();
@@ -223,10 +229,13 @@ public class CoordinatorTest extends Coordinator {
         TScanRangeLocations tScanRangeLocations = new TScanRangeLocations();
         TScanRangeLocation tScanRangeLocation0 = new TScanRangeLocation();
         tScanRangeLocation0.backend_id = 0;
+        tScanRangeLocation0.server = be0;
         TScanRangeLocation tScanRangeLocation1 = new TScanRangeLocation();
         tScanRangeLocation1.backend_id = 1;
+        tScanRangeLocation1.server = be1;
         TScanRangeLocation tScanRangeLocation2 = new TScanRangeLocation();
         tScanRangeLocation2.backend_id = 2;
+        tScanRangeLocation2.server = be2;
 
         tScanRangeLocations.locations = new ArrayList<>();
         tScanRangeLocations.locations.add(tScanRangeLocation0);
@@ -249,11 +258,6 @@ public class CoordinatorTest extends Coordinator {
         Backend backend2 = new Backend();
         backend2.setAlive(true);
 
-        // init all be network address
-        TNetworkAddress be0 = new TNetworkAddress("0.0.0.0", 1000);
-        TNetworkAddress be1 = new TNetworkAddress("0.0.0.1", 2000);
-        TNetworkAddress be2 = new TNetworkAddress("0.0.0.2", 3000);
-
         HashMap<Long, Backend> idToBackend = new HashMap<>();
         idToBackend.put(0L, backend0);
         idToBackend.put(1L, backend1);
@@ -264,8 +268,13 @@ public class CoordinatorTest extends Coordinator {
         addressToBackendID.put(be1, 1L);
         addressToBackendID.put(be2, 2L);
 
+        Map<TNetworkAddress, Long> replicaNumPerHost = Maps.newHashMap();
+        replicaNumPerHost.put(be0, 66L);
+        replicaNumPerHost.put(be1, 66L);
+        replicaNumPerHost.put(be2, 66L);
+
         Deencapsulation.invoke(bucketShuffleJoinController, 
"computeScanRangeAssignmentByBucket",
-                olapScanNode, ImmutableMap.copyOf(idToBackend), 
addressToBackendID);
+                olapScanNode, ImmutableMap.copyOf(idToBackend), 
addressToBackendID, replicaNumPerHost);
 
         Assert.assertEquals(java.util.Optional.of(66).get(),
                 Deencapsulation.invoke(bucketShuffleJoinController, 
"getFragmentBucketNum", new PlanFragmentId(1)));
@@ -324,6 +333,19 @@ public class CoordinatorTest extends Coordinator {
 
     @Test
     public void testComputeScanRangeAssignmentByBucket()  {
+        // init all backend
+        Backend backend0 = new Backend();
+        backend0.setAlive(true);
+        Backend backend1 = new Backend();
+        backend1.setAlive(true);
+        Backend backend2 = new Backend();
+        backend2.setAlive(true);
+
+        // init all be network address
+        TNetworkAddress be0 = new TNetworkAddress("0.0.0.0", 1000);
+        TNetworkAddress be1 = new TNetworkAddress("0.0.0.1", 2000);
+        TNetworkAddress be2 = new TNetworkAddress("0.0.0.2", 3000);
+
         PlanFragmentId planFragmentId = new PlanFragmentId(1);
         int scanNodeId = 1;
         Map<PlanFragmentId, Set<Integer>> fragmentIdToScanNodeIds = new 
HashMap<>();
@@ -346,10 +368,13 @@ public class CoordinatorTest extends Coordinator {
         TScanRangeLocations tScanRangeLocations = new TScanRangeLocations();
         TScanRangeLocation tScanRangeLocation0 = new TScanRangeLocation();
         tScanRangeLocation0.backend_id = 0;
+        tScanRangeLocation0.server = be0;
         TScanRangeLocation tScanRangeLocation1 = new TScanRangeLocation();
         tScanRangeLocation1.backend_id = 1;
+        tScanRangeLocation1.server = be1;
         TScanRangeLocation tScanRangeLocation2 = new TScanRangeLocation();
         tScanRangeLocation2.backend_id = 2;
+        tScanRangeLocation2.server = be2;
 
         tScanRangeLocations.locations = new ArrayList<>();
         tScanRangeLocations.locations.add(tScanRangeLocation0);
@@ -364,20 +389,6 @@ public class CoordinatorTest extends Coordinator {
         olapScanNode.setFragment(new PlanFragment(planFragmentId, olapScanNode,
                 new DataPartition(TPartitionType.UNPARTITIONED)));
 
-
-        // init all backend
-        Backend backend0 = new Backend();
-        backend0.setAlive(true);
-        Backend backend1 = new Backend();
-        backend1.setAlive(true);
-        Backend backend2 = new Backend();
-        backend2.setAlive(true);
-
-        // init all be network address
-        TNetworkAddress be0 = new TNetworkAddress("0.0.0.0", 1000);
-        TNetworkAddress be1 = new TNetworkAddress("0.0.0.1", 2000);
-        TNetworkAddress be2 = new TNetworkAddress("0.0.0.2", 3000);
-
         HashMap<Long, Backend> idToBackend = new HashMap<>();
         idToBackend.put(0L, backend0);
         idToBackend.put(1L, backend1);
@@ -388,9 +399,13 @@ public class CoordinatorTest extends Coordinator {
         addressToBackendID.put(be1, 1L);
         addressToBackendID.put(be2, 2L);
 
-        Deencapsulation.invoke(bucketShuffleJoinController, 
"computeScanRangeAssignmentByBucket",
-                olapScanNode, ImmutableMap.copyOf(idToBackend), 
addressToBackendID);
+        Map<TNetworkAddress, Long> replicaNumPerHost = new HashMap<>();
+        replicaNumPerHost.put(be0, 66L);
+        replicaNumPerHost.put(be1, 66L);
+        replicaNumPerHost.put(be2, 66L);
 
+        Deencapsulation.invoke(bucketShuffleJoinController, 
"computeScanRangeAssignmentByBucket",
+                olapScanNode, ImmutableMap.copyOf(idToBackend), 
addressToBackendID, replicaNumPerHost);
         Assert.assertEquals(java.util.Optional.of(66).get(),
                 Deencapsulation.invoke(bucketShuffleJoinController, 
"getFragmentBucketNum", new PlanFragmentId(1)));
 
@@ -440,13 +455,7 @@ public class CoordinatorTest extends Coordinator {
         FragmentExecParams params = new FragmentExecParams(null);
         Deencapsulation.invoke(bucketShuffleJoinController, 
"computeInstanceParam", planFragmentId, 1, params);
         Assert.assertEquals(1, params.instanceExecParams.size());
-        try {
-            StringBuilder sb = new StringBuilder();
-            params.appendTo(sb);
-            System.out.println(sb);
-        } catch (Exception e) {
-            e.printStackTrace();
-        }
+
         params = new FragmentExecParams(null);
         Deencapsulation.invoke(bucketShuffleJoinController, 
"computeInstanceParam", planFragmentId, 2, params);
         Assert.assertEquals(2, params.instanceExecParams.size());
@@ -575,11 +584,18 @@ public class CoordinatorTest extends Coordinator {
         Deencapsulation.setField(coordinator, "idToBackend", idToBackend);
         FragmentScanRangeAssignment assignment = new 
FragmentScanRangeAssignment();
         List<TScanRangeLocations> locations = new ArrayList<>();
-        HashMap<TNetworkAddress, Long> assignedBytesPerHost = 
Maps.newHashMap();
+        Map<TNetworkAddress, Long> assignedBytesPerHost = Maps.newHashMap();
+        Map<TNetworkAddress, Long> replicaNumPerHost = new HashMap<>();
+        replicaNumPerHost.put(tScanRangeLocation0.server, 1L);
+        replicaNumPerHost.put(tScanRangeLocation1.server, 1L);
+        replicaNumPerHost.put(tScanRangeLocation2.server, 1L);
+        replicaNumPerHost.put(tScanRangeLocation3.server, 1L);
+        replicaNumPerHost.put(tScanRangeLocation4.server, 1L);
+        replicaNumPerHost.put(tScanRangeLocation5.server, 1L);
         locations.add(tScanRangeLocations);
         locations.add(tScanRangeLocations1);
         Deencapsulation.invoke(coordinator, 
"computeScanRangeAssignmentByScheduler",
-                olapScanNode, locations, assignment, assignedBytesPerHost);
+                olapScanNode, locations, assignment, assignedBytesPerHost, 
replicaNumPerHost);
         for (Map.Entry entry : assignment.entrySet()) {
             Map<Integer, List<TScanRangeParams>> addr = (HashMap<Integer, 
List<TScanRangeParams>>) entry.getValue();
             for (Map.Entry item : addr.entrySet()) {
@@ -632,13 +648,18 @@ public class CoordinatorTest extends Coordinator {
         List<TScanRangeLocations> locations = new ArrayList<>();
         locations.add(tScanRangeLocations);
 
-        HashMap<TNetworkAddress, Long> assignedBytesPerHost = 
Maps.newHashMap();
+        Map<TNetworkAddress, Long> assignedBytesPerHost = Maps.newHashMap();
+        Map<TNetworkAddress, Long> replicaNumPerHost = Maps.newHashMap();
+        replicaNumPerHost.put(tScanRangeLocation0.server, 1L);
+        replicaNumPerHost.put(tScanRangeLocation1.server, 1L);
+        replicaNumPerHost.put(tScanRangeLocation2.server, 1L);
+
         Deencapsulation.invoke(coordinator, 
"getExecHostPortForFragmentIDAndBucketSeq", tScanRangeLocations,
-                planFragmentId, 1, assignedBytesPerHost);
+                planFragmentId, 1, assignedBytesPerHost, replicaNumPerHost);
         Deencapsulation.invoke(coordinator, 
"getExecHostPortForFragmentIDAndBucketSeq", tScanRangeLocations,
-                planFragmentId, 2, assignedBytesPerHost);
+                planFragmentId, 2, assignedBytesPerHost, replicaNumPerHost);
         Deencapsulation.invoke(coordinator, 
"getExecHostPortForFragmentIDAndBucketSeq", tScanRangeLocations,
-                planFragmentId, 3, assignedBytesPerHost);
+                planFragmentId, 3, assignedBytesPerHost, replicaNumPerHost);
         List<String> hosts = new ArrayList<>();
         for (Map.Entry item : assignedBytesPerHost.entrySet()) {
             Assert.assertTrue((Long) item.getValue() == 1);
@@ -695,21 +716,23 @@ public class CoordinatorTest extends Coordinator {
         BucketShuffleJoinController controller = new 
BucketShuffleJoinController(fragmentIdToScanNodeIds);
         Map<PlanFragmentId, Map<Integer, TNetworkAddress>> 
fragmentIdToSeqToAddressMap = Maps.newHashMap();
         fragmentIdToSeqToAddressMap.put(planFragmentId, new HashMap<Integer, 
TNetworkAddress>());
+        Map<TNetworkAddress, Long> replicaNumPerHost = Maps.newHashMap();
+        replicaNumPerHost.put(tScanRangeLocation0.server, 1L);
+        replicaNumPerHost.put(tScanRangeLocation1.server, 1L);
+        replicaNumPerHost.put(tScanRangeLocation2.server, 1L);
         Deencapsulation.setField(controller,  
"fragmentIdToBuckendIdBucketCountMap", fragmentIdToBuckendIdBucketCountMap);
         Deencapsulation.setField(controller, "fragmentIdToSeqToAddressMap", 
fragmentIdToSeqToAddressMap);
         Deencapsulation.invoke(controller, 
"getExecHostPortForFragmentIDAndBucketSeq",
-                tScanRangeLocations, planFragmentId, 1, idToBackend, 
addressToBackendID);
-        Assert.assertTrue(backendIdBucketCountMap.size() == 2);
+                tScanRangeLocations, planFragmentId, 1, idToBackend, 
addressToBackendID, replicaNumPerHost);
+        Assert.assertTrue(backendIdBucketCountMap.size() == 1);
         List<Long> backendIds = new ArrayList<Long>();
         List<Integer> counts = new ArrayList<Integer>();
         for (Map.Entry<Long, Integer> item : 
backendIdBucketCountMap.entrySet()) {
             backendIds.add(item.getKey());
             counts.add(item.getValue());
         }
-        Assert.assertTrue(backendIds.get(0) == 0);
-        Assert.assertTrue(counts.get(0) == 0);
-        Assert.assertTrue(backendIds.get(1) == 1);
-        Assert.assertTrue(counts.get(1) == 1);
+        Assert.assertTrue(backendIds.get(0) == 1);
+        Assert.assertTrue(counts.get(0) == 1);
     }
 
     @Test
@@ -824,25 +847,32 @@ public class CoordinatorTest extends Coordinator {
         Deencapsulation.setField(coordinator, "idToBackend", idToBackend);
 
         Deencapsulation.invoke(coordinator, "computeScanRangeAssignment");
+        Set<String> hostNames = Sets.newHashSet();
+        hostNames.add("0.0.0.0");
+        hostNames.add("0.0.0.1");
+        hostNames.add("0.0.0.2");
         FragmentScanRangeAssignment assignment = 
fragmentExecParamsMap.get(fragment.getFragmentId()).scanRangeAssignment;
         Assert.assertTrue(assignment.size() == 1);
         for (Map.Entry<TNetworkAddress, Map<Integer, List<TScanRangeParams>>> 
entry : assignment.entrySet()) {
             TNetworkAddress host = entry.getKey();
-            Assert.assertTrue(host.hostname.equals("0.0.0.0"));
+            Assert.assertTrue(hostNames.contains(host.hostname));
+            hostNames.remove(host.hostname);
         }
 
         FragmentScanRangeAssignment assignment2 = 
fragmentExecParamsMap.get(fragment2.getFragmentId()).scanRangeAssignment;
         Assert.assertTrue(assignment2.size() == 1);
         for (Map.Entry<TNetworkAddress, Map<Integer, List<TScanRangeParams>>> 
entry : assignment2.entrySet()) {
             TNetworkAddress host = entry.getKey();
-            Assert.assertTrue(host.hostname.equals("0.0.0.1"));
+            Assert.assertTrue(hostNames.contains(host.hostname));
+            hostNames.remove(host.hostname);
         }
 
         FragmentScanRangeAssignment assignment3 = 
fragmentExecParamsMap.get(fragment3.getFragmentId()).scanRangeAssignment;
         Assert.assertTrue(assignment3.size() == 1);
         for (Map.Entry<TNetworkAddress, Map<Integer, List<TScanRangeParams>>> 
entry : assignment3.entrySet()) {
             TNetworkAddress host = entry.getKey();
-            Assert.assertTrue(host.hostname.equals("0.0.0.2"));
+            Assert.assertTrue(hostNames.contains(host.hostname));
+            hostNames.remove(host.hostname);
         }
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to