This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch branch-1.2-lts in repository https://gitbox.apache.org/repos/asf/doris.git
commit 800a4263e56fa966339e5dae274c247c565ad854 Author: caiconghui <[email protected]> AuthorDate: Fri Jan 20 16:24:49 2023 +0800 [enhancement](query) Make query scan nodes more evenly distributed (#16037) Add replicaNumPerHost into consideration while schedule scan node to host to make final query scan nodes more evenly distributed in cluster --- .../main/java/org/apache/doris/qe/Coordinator.java | 130 ++++++++++++--------- .../java/org/apache/doris/qe/SimpleScheduler.java | 6 +- .../java/org/apache/doris/qe/CoordinatorTest.java | 118 ++++++++++++------- 3 files changed, 155 insertions(+), 99 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 40b085505d..a646cb06bd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -1332,8 +1332,7 @@ public class Coordinator { if (params.instanceExecParams.isEmpty()) { Reference<Long> backendIdRef = new Reference<Long>(); TNetworkAddress execHostport; - if (ConnectContext.get() != null - && !ConnectContext.get().isResourceTagsSet() + if (ConnectContext.get() != null && ConnectContext.get().isResourceTagsSet() && !addressToBackendID.isEmpty()) { // In this case, we only use the BE where the replica selected by the tag is located to // execute this query. Otherwise, except for the scan node, the rest of the execution nodes @@ -1419,7 +1418,7 @@ public class Coordinator { return Pair.of(fatherPlan, newPlan); } - private <K, V> V findOrInsert(HashMap<K, V> m, final K key, final V defaultVal) { + private <K, V> V findOrInsert(Map<K, V> m, final K key, final V defaultVal) { V value = m.get(key); if (value == null) { m.put(key, defaultVal); @@ -1511,10 +1510,34 @@ public class Coordinator { } } + private Map<TNetworkAddress, Long> getReplicaNumPerHost() { + Map<TNetworkAddress, Long> replicaNumPerHost = Maps.newHashMap(); + for (ScanNode scanNode : scanNodes) { + List<TScanRangeLocations> locationsList = scanNode.getScanRangeLocations(0); + if (locationsList == null) { + // only analysis olap scan node + continue; + } + for (TScanRangeLocations locations : locationsList) { + for (TScanRangeLocation location : locations.locations) { + if (replicaNumPerHost.containsKey(location.server)) { + replicaNumPerHost.put(location.server, replicaNumPerHost.get(location.server) + 1L); + } else { + replicaNumPerHost.put(location.server, 1L); + } + } + + } + } + return replicaNumPerHost; + } + // Populates scan_range_assignment_. // <fragment, <server, nodeId>> private void computeScanRangeAssignment() throws Exception { - HashMap<TNetworkAddress, Long> assignedBytesPerHost = Maps.newHashMap(); + Map<TNetworkAddress, Long> assignedBytesPerHost = Maps.newHashMap(); + Map<TNetworkAddress, Long> replicaNumPerHost = getReplicaNumPerHost(); + Collections.shuffle(scanNodes); // set scan ranges/locations for scan nodes for (ScanNode scanNode : scanNodes) { // the parameters of getScanRangeLocations may ignore, It doesn't take effect @@ -1523,7 +1546,7 @@ public class Coordinator { // only analysis olap scan node continue; } - + Collections.shuffle(locations); Set<Integer> scanNodeIds = fragmentIdToScanNodeIds.computeIfAbsent(scanNode.getFragmentId(), k -> Sets.newHashSet()); scanNodeIds.add(scanNode.getId().asInt()); @@ -1538,35 +1561,35 @@ public class Coordinator { // A fragment may contain both colocate join and bucket shuffle join // on need both compute scanRange to init basic data for query coordinator if (fragmentContainsColocateJoin) { - computeScanRangeAssignmentByColocate((OlapScanNode) scanNode); + computeScanRangeAssignmentByColocate((OlapScanNode) scanNode, assignedBytesPerHost, replicaNumPerHost); } if (fragmentContainsBucketShuffleJoin) { bucketShuffleJoinController.computeScanRangeAssignmentByBucket((OlapScanNode) scanNode, - idToBackend, addressToBackendID); + idToBackend, addressToBackendID, replicaNumPerHost); } if (!(fragmentContainsColocateJoin || fragmentContainsBucketShuffleJoin)) { - computeScanRangeAssignmentByScheduler(scanNode, locations, assignment, assignedBytesPerHost); + computeScanRangeAssignmentByScheduler(scanNode, locations, assignment, assignedBytesPerHost, + replicaNumPerHost); } } } // To ensure the same bucketSeq tablet to the same execHostPort private void computeScanRangeAssignmentByColocate( - final OlapScanNode scanNode) throws Exception { + final OlapScanNode scanNode, Map<TNetworkAddress, Long> assignedBytesPerHost, + Map<TNetworkAddress, Long> replicaNumPerHost) throws Exception { if (!fragmentIdToSeqToAddressMap.containsKey(scanNode.getFragmentId())) { fragmentIdToSeqToAddressMap.put(scanNode.getFragmentId(), new HashMap<>()); fragmentIdTobucketSeqToScanRangeMap.put(scanNode.getFragmentId(), new BucketSeqToScanRange()); } Map<Integer, TNetworkAddress> bucketSeqToAddress = fragmentIdToSeqToAddressMap.get(scanNode.getFragmentId()); BucketSeqToScanRange bucketSeqToScanRange = fragmentIdTobucketSeqToScanRangeMap.get(scanNode.getFragmentId()); - - HashMap<TNetworkAddress, Long> assignedBytesPerHost = Maps.newHashMap(); for (Integer bucketSeq : scanNode.bucketSeq2locations.keySet()) { //fill scanRangeParamsList List<TScanRangeLocations> locations = scanNode.bucketSeq2locations.get(bucketSeq); if (!bucketSeqToAddress.containsKey(bucketSeq)) { getExecHostPortForFragmentIDAndBucketSeq(locations.get(0), - scanNode.getFragmentId(), bucketSeq, assignedBytesPerHost); + scanNode.getFragmentId(), bucketSeq, assignedBytesPerHost, replicaNumPerHost); } for (TScanRangeLocations location : locations) { @@ -1586,10 +1609,11 @@ public class Coordinator { //ensure bucket sequence distribued to every host evenly private void getExecHostPortForFragmentIDAndBucketSeq(TScanRangeLocations seqLocation, - PlanFragmentId fragmentId, Integer bucketSeq, HashMap<TNetworkAddress, Long> assignedBytesPerHost) + PlanFragmentId fragmentId, Integer bucketSeq, Map<TNetworkAddress, Long> assignedBytesPerHost, + Map<TNetworkAddress, Long> replicaNumPerHost) throws Exception { Reference<Long> backendIdRef = new Reference<Long>(); - selectBackendsByRoundRobin(seqLocation, assignedBytesPerHost, backendIdRef); + selectBackendsByRoundRobin(seqLocation, assignedBytesPerHost, replicaNumPerHost, backendIdRef); Backend backend = this.idToBackend.get(backendIdRef.getRef()); TNetworkAddress execHostPort = new TNetworkAddress(backend.getHost(), backend.getBePort()); this.addressToBackendID.put(execHostPort, backendIdRef.getRef()); @@ -1597,10 +1621,12 @@ public class Coordinator { } public TScanRangeLocation selectBackendsByRoundRobin(TScanRangeLocations seqLocation, - HashMap<TNetworkAddress, Long> assignedBytesPerHost, + Map<TNetworkAddress, Long> assignedBytesPerHost, + Map<TNetworkAddress, Long> replicaNumPerHost, Reference<Long> backendIdRef) throws UserException { if (!Config.enable_local_replica_selection) { - return selectBackendsByRoundRobin(seqLocation.getLocations(), assignedBytesPerHost, backendIdRef); + return selectBackendsByRoundRobin(seqLocation.getLocations(), assignedBytesPerHost, replicaNumPerHost, + backendIdRef); } List<TScanRangeLocation> localLocations = new ArrayList<>(); @@ -1615,35 +1641,38 @@ public class Coordinator { } try { - return selectBackendsByRoundRobin(localLocations, assignedBytesPerHost, backendIdRef); + return selectBackendsByRoundRobin(localLocations, assignedBytesPerHost, replicaNumPerHost, backendIdRef); } catch (UserException ue) { if (!Config.enable_local_replica_selection_fallback) { throw ue; } - return selectBackendsByRoundRobin(nonlocalLocations, assignedBytesPerHost, backendIdRef); + return selectBackendsByRoundRobin(nonlocalLocations, assignedBytesPerHost, replicaNumPerHost, backendIdRef); } } public TScanRangeLocation selectBackendsByRoundRobin(List<TScanRangeLocation> locations, - HashMap<TNetworkAddress, Long> assignedBytesPerHost, Reference<Long> backendIdRef) throws UserException { + Map<TNetworkAddress, Long> assignedBytesPerHost, Map<TNetworkAddress, Long> replicaNumPerHost, + Reference<Long> backendIdRef) throws UserException { Long minAssignedBytes = Long.MAX_VALUE; + Long minReplicaNum = Long.MAX_VALUE; TScanRangeLocation minLocation = null; Long step = 1L; for (final TScanRangeLocation location : locations) { Long assignedBytes = findOrInsert(assignedBytesPerHost, location.server, 0L); - if (assignedBytes < minAssignedBytes) { + if (assignedBytes < minAssignedBytes || (assignedBytes.equals(minAssignedBytes) + && replicaNumPerHost.get(location.server) < minReplicaNum)) { minAssignedBytes = assignedBytes; + minReplicaNum = replicaNumPerHost.get(location.server); minLocation = location; } } + for (TScanRangeLocation location : locations) { + replicaNumPerHost.put(location.server, replicaNumPerHost.get(location.server) - 1); + } TScanRangeLocation location = SimpleScheduler.getLocation(minLocation, locations, this.idToBackend, backendIdRef); - if (assignedBytesPerHost.containsKey(location.server)) { - assignedBytesPerHost.put(location.server, - assignedBytesPerHost.get(location.server) + step); - } else { - assignedBytesPerHost.put(location.server, step); - } + assignedBytesPerHost.put(location.server, assignedBytesPerHost.get(location.server) + step); + return location; } @@ -1651,11 +1680,12 @@ public class Coordinator { final ScanNode scanNode, final List<TScanRangeLocations> locations, FragmentScanRangeAssignment assignment, - HashMap<TNetworkAddress, Long> assignedBytesPerHost) throws Exception { + Map<TNetworkAddress, Long> assignedBytesPerHost, + Map<TNetworkAddress, Long> replicaNumPerHost) throws Exception { for (TScanRangeLocations scanRangeLocations : locations) { Reference<Long> backendIdRef = new Reference<Long>(); TScanRangeLocation minLocation = selectBackendsByRoundRobin(scanRangeLocations, - assignedBytesPerHost, backendIdRef); + assignedBytesPerHost, replicaNumPerHost, backendIdRef); Backend backend = this.idToBackend.get(backendIdRef.getRef()); TNetworkAddress execHostPort = new TNetworkAddress(backend.getHost(), backend.getBePort()); this.addressToBackendID.put(execHostPort, backendIdRef.getRef()); @@ -1892,40 +1922,35 @@ public class Coordinator { // make sure each host have average bucket to scan private void getExecHostPortForFragmentIDAndBucketSeq(TScanRangeLocations seqLocation, PlanFragmentId fragmentId, Integer bucketSeq, ImmutableMap<Long, Backend> idToBackend, - Map<TNetworkAddress, Long> addressToBackendID) throws Exception { + Map<TNetworkAddress, Long> addressToBackendID, + Map<TNetworkAddress, Long> replicaNumPerHost) throws Exception { Map<Long, Integer> buckendIdToBucketCountMap = fragmentIdToBuckendIdBucketCountMap.get(fragmentId); int maxBucketNum = Integer.MAX_VALUE; long buckendId = Long.MAX_VALUE; + Long minReplicaNum = Long.MAX_VALUE; for (TScanRangeLocation location : seqLocation.locations) { - if (buckendIdToBucketCountMap.containsKey(location.backend_id)) { - if (buckendIdToBucketCountMap.get(location.backend_id) < maxBucketNum) { - maxBucketNum = buckendIdToBucketCountMap.get(location.backend_id); - buckendId = location.backend_id; - } - } else { - maxBucketNum = 0; + if (buckendIdToBucketCountMap.getOrDefault(location.backend_id, 0) < maxBucketNum) { + maxBucketNum = buckendIdToBucketCountMap.getOrDefault(location.backend_id, 0); buckendId = location.backend_id; - buckendIdToBucketCountMap.put(buckendId, 0); - break; + minReplicaNum = replicaNumPerHost.get(location.server); + } else if (buckendIdToBucketCountMap.getOrDefault(location.backend_id, 0) == maxBucketNum + && replicaNumPerHost.get(location.server) < minReplicaNum) { + buckendId = location.backend_id; + minReplicaNum = replicaNumPerHost.get(location.server); } } - Reference<Long> backendIdRef = new Reference<Long>(); + Reference<Long> backendIdRef = new Reference<>(); TNetworkAddress execHostPort = SimpleScheduler.getHost(buckendId, seqLocation.locations, idToBackend, backendIdRef); - if (execHostPort == null) { - throw new UserException(SystemInfoService.NO_SCAN_NODE_BACKEND_AVAILABLE_MSG); - } //the backend with buckendId is not alive, chose another new backend if (backendIdRef.getRef() != buckendId) { - //buckendIdToBucketCountMap does not contain the new backend, insert into it - if (!buckendIdToBucketCountMap.containsKey(backendIdRef.getRef())) { - buckendIdToBucketCountMap.put(backendIdRef.getRef(), 1); - } else { //buckendIdToBucketCountMap contains the new backend, update it - buckendIdToBucketCountMap.put(backendIdRef.getRef(), - buckendIdToBucketCountMap.get(backendIdRef.getRef()) + 1); - } + buckendIdToBucketCountMap.put(backendIdRef.getRef(), + buckendIdToBucketCountMap.getOrDefault(backendIdRef.getRef(), 0) + 1); } else { //the backend with buckendId is alive, update buckendIdToBucketCountMap directly - buckendIdToBucketCountMap.put(buckendId, buckendIdToBucketCountMap.get(buckendId) + 1); + buckendIdToBucketCountMap.put(buckendId, buckendIdToBucketCountMap.getOrDefault(buckendId, 0) + 1); + } + for (TScanRangeLocation location : seqLocation.locations) { + replicaNumPerHost.put(location.server, replicaNumPerHost.get(location.server) - 1); } addressToBackendID.put(execHostPort, backendIdRef.getRef()); this.fragmentIdToSeqToAddressMap.get(fragmentId).put(bucketSeq, execHostPort); @@ -1934,7 +1959,8 @@ public class Coordinator { // to ensure the same bucketSeq tablet to the same execHostPort private void computeScanRangeAssignmentByBucket( final OlapScanNode scanNode, ImmutableMap<Long, Backend> idToBackend, - Map<TNetworkAddress, Long> addressToBackendID) throws Exception { + Map<TNetworkAddress, Long> addressToBackendID, + Map<TNetworkAddress, Long> replicaNumPerHost) throws Exception { if (!fragmentIdToSeqToAddressMap.containsKey(scanNode.getFragmentId())) { // In bucket shuffle join, we have 2 situation. // 1. Only one partition: in this case, we use scanNode.getTotalTabletsNum() to get the right bucket num @@ -1962,7 +1988,7 @@ public class Coordinator { List<TScanRangeLocations> locations = scanNode.bucketSeq2locations.get(bucketSeq); if (!bucketSeqToAddress.containsKey(bucketSeq)) { getExecHostPortForFragmentIDAndBucketSeq(locations.get(0), scanNode.getFragmentId(), - bucketSeq, idToBackend, addressToBackendID); + bucketSeq, idToBackend, addressToBackendID, replicaNumPerHost); } for (TScanRangeLocations location : locations) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SimpleScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SimpleScheduler.java index 69f4408848..b0c4d70ca7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SimpleScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SimpleScheduler.java @@ -64,7 +64,7 @@ public class SimpleScheduler { Reference<Long> backendIdRef) throws UserException { if (CollectionUtils.isEmpty(locations) || backends == null || backends.isEmpty()) { - throw new UserException("scan range location or candidate backends is empty"); + throw new UserException(SystemInfoService.NO_SCAN_NODE_BACKEND_AVAILABLE_MSG); } LOG.debug("getHost backendID={}, backendSize={}", backendId, backends.size()); Backend backend = backends.get(backendId); @@ -134,8 +134,8 @@ public class SimpleScheduler { Map.Entry<Long, Backend> backendEntry = backends.entrySet().stream().skip(id).filter( e -> isAvailable(e.getValue())).findFirst().orElse(null); if (backendEntry == null && id > 0) { - backendEntry = backends.entrySet().stream().filter( - e -> isAvailable(e.getValue())).limit(id).findFirst().orElse(null); + backendEntry = backends.entrySet().stream().limit(id).filter( + e -> isAvailable(e.getValue())).findFirst().orElse(null); } if (backendEntry != null) { Backend backend = backendEntry.getValue(); diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java b/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java index c6cc28bcb7..42fa87e28a 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java @@ -50,6 +50,7 @@ import org.apache.doris.thrift.TUniqueId; import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; +import com.google.common.collect.Sets; import mockit.Mocked; import org.apache.commons.collections.map.HashedMap; import org.junit.Assert; @@ -201,6 +202,11 @@ public class CoordinatorTest extends Coordinator { @Test public void testComputeScanRangeAssignmentByBucketq() { + // init all be network address + TNetworkAddress be0 = new TNetworkAddress("0.0.0.0", 1000); + TNetworkAddress be1 = new TNetworkAddress("0.0.0.1", 2000); + TNetworkAddress be2 = new TNetworkAddress("0.0.0.2", 3000); + PlanFragmentId planFragmentId = new PlanFragmentId(1); int scanNodeId = 1; Map<PlanFragmentId, Set<Integer>> fragmentIdToScanNodeIds = new HashMap<>(); @@ -223,10 +229,13 @@ public class CoordinatorTest extends Coordinator { TScanRangeLocations tScanRangeLocations = new TScanRangeLocations(); TScanRangeLocation tScanRangeLocation0 = new TScanRangeLocation(); tScanRangeLocation0.backend_id = 0; + tScanRangeLocation0.server = be0; TScanRangeLocation tScanRangeLocation1 = new TScanRangeLocation(); tScanRangeLocation1.backend_id = 1; + tScanRangeLocation1.server = be1; TScanRangeLocation tScanRangeLocation2 = new TScanRangeLocation(); tScanRangeLocation2.backend_id = 2; + tScanRangeLocation2.server = be2; tScanRangeLocations.locations = new ArrayList<>(); tScanRangeLocations.locations.add(tScanRangeLocation0); @@ -249,11 +258,6 @@ public class CoordinatorTest extends Coordinator { Backend backend2 = new Backend(); backend2.setAlive(true); - // init all be network address - TNetworkAddress be0 = new TNetworkAddress("0.0.0.0", 1000); - TNetworkAddress be1 = new TNetworkAddress("0.0.0.1", 2000); - TNetworkAddress be2 = new TNetworkAddress("0.0.0.2", 3000); - HashMap<Long, Backend> idToBackend = new HashMap<>(); idToBackend.put(0L, backend0); idToBackend.put(1L, backend1); @@ -264,8 +268,13 @@ public class CoordinatorTest extends Coordinator { addressToBackendID.put(be1, 1L); addressToBackendID.put(be2, 2L); + Map<TNetworkAddress, Long> replicaNumPerHost = Maps.newHashMap(); + replicaNumPerHost.put(be0, 66L); + replicaNumPerHost.put(be1, 66L); + replicaNumPerHost.put(be2, 66L); + Deencapsulation.invoke(bucketShuffleJoinController, "computeScanRangeAssignmentByBucket", - olapScanNode, ImmutableMap.copyOf(idToBackend), addressToBackendID); + olapScanNode, ImmutableMap.copyOf(idToBackend), addressToBackendID, replicaNumPerHost); Assert.assertEquals(java.util.Optional.of(66).get(), Deencapsulation.invoke(bucketShuffleJoinController, "getFragmentBucketNum", new PlanFragmentId(1))); @@ -324,6 +333,19 @@ public class CoordinatorTest extends Coordinator { @Test public void testComputeScanRangeAssignmentByBucket() { + // init all backend + Backend backend0 = new Backend(); + backend0.setAlive(true); + Backend backend1 = new Backend(); + backend1.setAlive(true); + Backend backend2 = new Backend(); + backend2.setAlive(true); + + // init all be network address + TNetworkAddress be0 = new TNetworkAddress("0.0.0.0", 1000); + TNetworkAddress be1 = new TNetworkAddress("0.0.0.1", 2000); + TNetworkAddress be2 = new TNetworkAddress("0.0.0.2", 3000); + PlanFragmentId planFragmentId = new PlanFragmentId(1); int scanNodeId = 1; Map<PlanFragmentId, Set<Integer>> fragmentIdToScanNodeIds = new HashMap<>(); @@ -346,10 +368,13 @@ public class CoordinatorTest extends Coordinator { TScanRangeLocations tScanRangeLocations = new TScanRangeLocations(); TScanRangeLocation tScanRangeLocation0 = new TScanRangeLocation(); tScanRangeLocation0.backend_id = 0; + tScanRangeLocation0.server = be0; TScanRangeLocation tScanRangeLocation1 = new TScanRangeLocation(); tScanRangeLocation1.backend_id = 1; + tScanRangeLocation1.server = be1; TScanRangeLocation tScanRangeLocation2 = new TScanRangeLocation(); tScanRangeLocation2.backend_id = 2; + tScanRangeLocation2.server = be2; tScanRangeLocations.locations = new ArrayList<>(); tScanRangeLocations.locations.add(tScanRangeLocation0); @@ -364,20 +389,6 @@ public class CoordinatorTest extends Coordinator { olapScanNode.setFragment(new PlanFragment(planFragmentId, olapScanNode, new DataPartition(TPartitionType.UNPARTITIONED))); - - // init all backend - Backend backend0 = new Backend(); - backend0.setAlive(true); - Backend backend1 = new Backend(); - backend1.setAlive(true); - Backend backend2 = new Backend(); - backend2.setAlive(true); - - // init all be network address - TNetworkAddress be0 = new TNetworkAddress("0.0.0.0", 1000); - TNetworkAddress be1 = new TNetworkAddress("0.0.0.1", 2000); - TNetworkAddress be2 = new TNetworkAddress("0.0.0.2", 3000); - HashMap<Long, Backend> idToBackend = new HashMap<>(); idToBackend.put(0L, backend0); idToBackend.put(1L, backend1); @@ -388,9 +399,13 @@ public class CoordinatorTest extends Coordinator { addressToBackendID.put(be1, 1L); addressToBackendID.put(be2, 2L); - Deencapsulation.invoke(bucketShuffleJoinController, "computeScanRangeAssignmentByBucket", - olapScanNode, ImmutableMap.copyOf(idToBackend), addressToBackendID); + Map<TNetworkAddress, Long> replicaNumPerHost = new HashMap<>(); + replicaNumPerHost.put(be0, 66L); + replicaNumPerHost.put(be1, 66L); + replicaNumPerHost.put(be2, 66L); + Deencapsulation.invoke(bucketShuffleJoinController, "computeScanRangeAssignmentByBucket", + olapScanNode, ImmutableMap.copyOf(idToBackend), addressToBackendID, replicaNumPerHost); Assert.assertEquals(java.util.Optional.of(66).get(), Deencapsulation.invoke(bucketShuffleJoinController, "getFragmentBucketNum", new PlanFragmentId(1))); @@ -440,13 +455,7 @@ public class CoordinatorTest extends Coordinator { FragmentExecParams params = new FragmentExecParams(null); Deencapsulation.invoke(bucketShuffleJoinController, "computeInstanceParam", planFragmentId, 1, params); Assert.assertEquals(1, params.instanceExecParams.size()); - try { - StringBuilder sb = new StringBuilder(); - params.appendTo(sb); - System.out.println(sb); - } catch (Exception e) { - e.printStackTrace(); - } + params = new FragmentExecParams(null); Deencapsulation.invoke(bucketShuffleJoinController, "computeInstanceParam", planFragmentId, 2, params); Assert.assertEquals(2, params.instanceExecParams.size()); @@ -575,11 +584,18 @@ public class CoordinatorTest extends Coordinator { Deencapsulation.setField(coordinator, "idToBackend", idToBackend); FragmentScanRangeAssignment assignment = new FragmentScanRangeAssignment(); List<TScanRangeLocations> locations = new ArrayList<>(); - HashMap<TNetworkAddress, Long> assignedBytesPerHost = Maps.newHashMap(); + Map<TNetworkAddress, Long> assignedBytesPerHost = Maps.newHashMap(); + Map<TNetworkAddress, Long> replicaNumPerHost = new HashMap<>(); + replicaNumPerHost.put(tScanRangeLocation0.server, 1L); + replicaNumPerHost.put(tScanRangeLocation1.server, 1L); + replicaNumPerHost.put(tScanRangeLocation2.server, 1L); + replicaNumPerHost.put(tScanRangeLocation3.server, 1L); + replicaNumPerHost.put(tScanRangeLocation4.server, 1L); + replicaNumPerHost.put(tScanRangeLocation5.server, 1L); locations.add(tScanRangeLocations); locations.add(tScanRangeLocations1); Deencapsulation.invoke(coordinator, "computeScanRangeAssignmentByScheduler", - olapScanNode, locations, assignment, assignedBytesPerHost); + olapScanNode, locations, assignment, assignedBytesPerHost, replicaNumPerHost); for (Map.Entry entry : assignment.entrySet()) { Map<Integer, List<TScanRangeParams>> addr = (HashMap<Integer, List<TScanRangeParams>>) entry.getValue(); for (Map.Entry item : addr.entrySet()) { @@ -632,13 +648,18 @@ public class CoordinatorTest extends Coordinator { List<TScanRangeLocations> locations = new ArrayList<>(); locations.add(tScanRangeLocations); - HashMap<TNetworkAddress, Long> assignedBytesPerHost = Maps.newHashMap(); + Map<TNetworkAddress, Long> assignedBytesPerHost = Maps.newHashMap(); + Map<TNetworkAddress, Long> replicaNumPerHost = Maps.newHashMap(); + replicaNumPerHost.put(tScanRangeLocation0.server, 1L); + replicaNumPerHost.put(tScanRangeLocation1.server, 1L); + replicaNumPerHost.put(tScanRangeLocation2.server, 1L); + Deencapsulation.invoke(coordinator, "getExecHostPortForFragmentIDAndBucketSeq", tScanRangeLocations, - planFragmentId, 1, assignedBytesPerHost); + planFragmentId, 1, assignedBytesPerHost, replicaNumPerHost); Deencapsulation.invoke(coordinator, "getExecHostPortForFragmentIDAndBucketSeq", tScanRangeLocations, - planFragmentId, 2, assignedBytesPerHost); + planFragmentId, 2, assignedBytesPerHost, replicaNumPerHost); Deencapsulation.invoke(coordinator, "getExecHostPortForFragmentIDAndBucketSeq", tScanRangeLocations, - planFragmentId, 3, assignedBytesPerHost); + planFragmentId, 3, assignedBytesPerHost, replicaNumPerHost); List<String> hosts = new ArrayList<>(); for (Map.Entry item : assignedBytesPerHost.entrySet()) { Assert.assertTrue((Long) item.getValue() == 1); @@ -695,21 +716,23 @@ public class CoordinatorTest extends Coordinator { BucketShuffleJoinController controller = new BucketShuffleJoinController(fragmentIdToScanNodeIds); Map<PlanFragmentId, Map<Integer, TNetworkAddress>> fragmentIdToSeqToAddressMap = Maps.newHashMap(); fragmentIdToSeqToAddressMap.put(planFragmentId, new HashMap<Integer, TNetworkAddress>()); + Map<TNetworkAddress, Long> replicaNumPerHost = Maps.newHashMap(); + replicaNumPerHost.put(tScanRangeLocation0.server, 1L); + replicaNumPerHost.put(tScanRangeLocation1.server, 1L); + replicaNumPerHost.put(tScanRangeLocation2.server, 1L); Deencapsulation.setField(controller, "fragmentIdToBuckendIdBucketCountMap", fragmentIdToBuckendIdBucketCountMap); Deencapsulation.setField(controller, "fragmentIdToSeqToAddressMap", fragmentIdToSeqToAddressMap); Deencapsulation.invoke(controller, "getExecHostPortForFragmentIDAndBucketSeq", - tScanRangeLocations, planFragmentId, 1, idToBackend, addressToBackendID); - Assert.assertTrue(backendIdBucketCountMap.size() == 2); + tScanRangeLocations, planFragmentId, 1, idToBackend, addressToBackendID, replicaNumPerHost); + Assert.assertTrue(backendIdBucketCountMap.size() == 1); List<Long> backendIds = new ArrayList<Long>(); List<Integer> counts = new ArrayList<Integer>(); for (Map.Entry<Long, Integer> item : backendIdBucketCountMap.entrySet()) { backendIds.add(item.getKey()); counts.add(item.getValue()); } - Assert.assertTrue(backendIds.get(0) == 0); - Assert.assertTrue(counts.get(0) == 0); - Assert.assertTrue(backendIds.get(1) == 1); - Assert.assertTrue(counts.get(1) == 1); + Assert.assertTrue(backendIds.get(0) == 1); + Assert.assertTrue(counts.get(0) == 1); } @Test @@ -824,25 +847,32 @@ public class CoordinatorTest extends Coordinator { Deencapsulation.setField(coordinator, "idToBackend", idToBackend); Deencapsulation.invoke(coordinator, "computeScanRangeAssignment"); + Set<String> hostNames = Sets.newHashSet(); + hostNames.add("0.0.0.0"); + hostNames.add("0.0.0.1"); + hostNames.add("0.0.0.2"); FragmentScanRangeAssignment assignment = fragmentExecParamsMap.get(fragment.getFragmentId()).scanRangeAssignment; Assert.assertTrue(assignment.size() == 1); for (Map.Entry<TNetworkAddress, Map<Integer, List<TScanRangeParams>>> entry : assignment.entrySet()) { TNetworkAddress host = entry.getKey(); - Assert.assertTrue(host.hostname.equals("0.0.0.0")); + Assert.assertTrue(hostNames.contains(host.hostname)); + hostNames.remove(host.hostname); } FragmentScanRangeAssignment assignment2 = fragmentExecParamsMap.get(fragment2.getFragmentId()).scanRangeAssignment; Assert.assertTrue(assignment2.size() == 1); for (Map.Entry<TNetworkAddress, Map<Integer, List<TScanRangeParams>>> entry : assignment2.entrySet()) { TNetworkAddress host = entry.getKey(); - Assert.assertTrue(host.hostname.equals("0.0.0.1")); + Assert.assertTrue(hostNames.contains(host.hostname)); + hostNames.remove(host.hostname); } FragmentScanRangeAssignment assignment3 = fragmentExecParamsMap.get(fragment3.getFragmentId()).scanRangeAssignment; Assert.assertTrue(assignment3.size() == 1); for (Map.Entry<TNetworkAddress, Map<Integer, List<TScanRangeParams>>> entry : assignment3.entrySet()) { TNetworkAddress host = entry.getKey(); - Assert.assertTrue(host.hostname.equals("0.0.0.2")); + Assert.assertTrue(hostNames.contains(host.hostname)); + hostNames.remove(host.hostname); } } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
