This is an automated email from the ASF dual-hosted git repository.
kangkaisen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new 1035e86 [Enhancement] Optimize the algorithm of selecting host for a
bucket scan task when a backend not alive (#5133)
1035e86 is described below
commit 1035e86e0bc568b9c54e823919aa247baf40faae
Author: xinghuayu007 <[email protected]>
AuthorDate: Wed Jan 6 10:20:16 2021 +0800
[Enhancement] Optimize the algorithm of selecting host for a bucket scan
task when a backend not alive (#5133)
---
.../main/java/org/apache/doris/qe/Coordinator.java | 14 ++++-
.../java/org/apache/doris/qe/CoordinatorTest.java | 65 ++++++++++++++++++++++
2 files changed, 76 insertions(+), 3 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index ff05413..f5dd310 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -1560,14 +1560,22 @@ public class Coordinator {
break;
}
}
-
- buckendIdToBucketCountMap.put(buckendId,
buckendIdToBucketCountMap.get(buckendId) + 1);
Reference<Long> backendIdRef = new Reference<Long>();
TNetworkAddress execHostPort = SimpleScheduler.getHost(buckendId,
seqLocation.locations, idToBackend, backendIdRef);
if (execHostPort == null) {
throw new UserException("there is no scanNode Backend");
}
-
+ //the backend with buckendId is not alive, chose another new
backend
+ if (backendIdRef.getRef() != buckendId) {
+ //buckendIdToBucketCountMap does not contain the new backend,
insert into it
+ if
(!buckendIdToBucketCountMap.containsKey(backendIdRef.getRef())) {
+ buckendIdToBucketCountMap.put(backendIdRef.getRef(), 1);
+ } else { //buckendIdToBucketCountMap contains the new backend,
update it
+ buckendIdToBucketCountMap.put(backendIdRef.getRef(),
buckendIdToBucketCountMap.get(backendIdRef.getRef()) + 1);
+ }
+ } else { //the backend with buckendId is alive, update
buckendIdToBucketCountMap directly
+ buckendIdToBucketCountMap.put(buckendId,
buckendIdToBucketCountMap.get(buckendId) + 1);
+ }
addressToBackendID.put(execHostPort, backendIdRef.getRef());
this.fragmentIdToSeqToAddressMap.get(fragmentId).put(bucketSeq,
execHostPort);
}
diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java
b/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java
index 35efae2..b56fc72 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java
@@ -49,6 +49,7 @@ import org.apache.doris.thrift.TUniqueId;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
import org.junit.Assert;
import org.junit.Test;
@@ -530,5 +531,69 @@ public class CoordinatorTest extends Coordinator {
}
Assert.assertTrue(hosts.size() == 3);
}
+
+ @Test
+ public void testBucketShuffleWithUnaliveBackend() {
+ Coordinator coordinator = new Coordinator(context, analyzer, planner);
+ PlanFragmentId planFragmentId = new PlanFragmentId(1);
+ // each olaptable bucket have the same TScanRangeLocations, be id is
{0, 1, 2}
+ TScanRangeLocations tScanRangeLocations = new TScanRangeLocations();
+ TScanRangeLocation tScanRangeLocation0 = new TScanRangeLocation();
+ tScanRangeLocation0.backend_id = 0;
+ tScanRangeLocation0.server = new TNetworkAddress("0.0.0.0", 9050);
+ TScanRangeLocation tScanRangeLocation1 = new TScanRangeLocation();
+ tScanRangeLocation1.backend_id = 1;
+ tScanRangeLocation1.server = new TNetworkAddress("0.0.0.1", 9050);
+ TScanRangeLocation tScanRangeLocation2 = new TScanRangeLocation();
+ tScanRangeLocation2.backend_id = 2;
+ tScanRangeLocation2.server = new TNetworkAddress("0.0.0.2", 9050);
+ tScanRangeLocations.locations = new ArrayList<>();
+ tScanRangeLocations.locations.add(tScanRangeLocation0);
+ tScanRangeLocations.locations.add(tScanRangeLocation1);
+ tScanRangeLocations.locations.add(tScanRangeLocation2);
+
+ // init all backend
+ Backend backend0 = new Backend(0, "0.0.0.0", 9060);
+ backend0.setAlive(false);
+ backend0.setBePort(9050);
+ Backend backend1 = new Backend(1, "0.0.0.1", 9060);
+ backend1.setAlive(true);
+ backend1.setBePort(9050);
+ Backend backend2 = new Backend(2, "0.0.0.2", 9060);
+ backend2.setAlive(true);
+ backend2.setBePort(9050);
+ Map<TNetworkAddress, Long> addressToBackendID = Maps.newHashMap();
+ addressToBackendID.put(tScanRangeLocation0.server,
tScanRangeLocation0.backend_id);
+ addressToBackendID.put(tScanRangeLocation1.server,
tScanRangeLocation1.backend_id);
+ addressToBackendID.put(tScanRangeLocation2.server,
tScanRangeLocation2.backend_id);
+
+ ImmutableMap<Long, Backend> idToBackend =
+ new ImmutableMap.Builder<Long, Backend>().
+ put(0l, backend0).
+ put(1l, backend1).
+ put(2l, backend2).build();
+ Map<PlanFragmentId, Map<Long, Integer>>
fragmentIdToBuckendIdBucketCountMap = Maps.newHashMap();
+ Map<Long, Integer> backendIdBucketCountMap = new HashMap<Long,
Integer>();
+ fragmentIdToBuckendIdBucketCountMap.put(planFragmentId,
backendIdBucketCountMap);
+ Map<PlanFragmentId, Set<Integer>> fragmentIdToScanNodeIds = new
HashMap<>();
+ BucketShuffleJoinController controller = new
BucketShuffleJoinController(fragmentIdToScanNodeIds);
+ Map<PlanFragmentId, Map<Integer, TNetworkAddress>>
fragmentIdToSeqToAddressMap = Maps.newHashMap();
+ fragmentIdToSeqToAddressMap.put(planFragmentId, new HashMap<Integer,
TNetworkAddress>());
+ Deencapsulation.setField(controller,
"fragmentIdToBuckendIdBucketCountMap", fragmentIdToBuckendIdBucketCountMap);
+ Deencapsulation.setField(controller, "fragmentIdToSeqToAddressMap",
fragmentIdToSeqToAddressMap);
+ Deencapsulation.invoke(controller,
"getExecHostPortForFragmentIDAndBucketSeq",
+ tScanRangeLocations, planFragmentId, 1, idToBackend,
addressToBackendID);
+ Assert.assertTrue(backendIdBucketCountMap.size() == 2);
+ List<Long> backendIds = new ArrayList<Long>();
+ List<Integer> counts = new ArrayList<Integer>();
+ for (Map.Entry<Long, Integer> item:backendIdBucketCountMap.entrySet())
{
+ backendIds.add(item.getKey());
+ counts.add(item.getValue());
+ }
+ Assert.assertTrue(backendIds.get(0) == 0);
+ Assert.assertTrue(counts.get(0) == 0);
+ Assert.assertTrue(backendIds.get(1) == 1);
+ Assert.assertTrue(counts.get(1) == 1);
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]