This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new 2e95b1c [Enhancement]Make Cholocate table join more load balance
(#5104)
2e95b1c is described below
commit 2e95b1c38935f53ed5a16720771d9d45efd91d9e
Author: xinghuayu007 <[email protected]>
AuthorDate: Thu Dec 31 09:47:06 2020 +0800
[Enhancement]Make Cholocate table join more load balance (#5104)
When two colocate tables make join operation, to make join operation
locally,
the tablet belongs to the same bucket sequence will be distributed to the
same host.
When choosing which host for a bucket sequence, it takes random strategy.
Random strategy can not make query task load balance logically for one
query.
Therefore, this patch takes round-robin strategy, make buckets distributed
evenly.
For example, if there are 6 bucket sequences and 3 hosts,
it is better to distributed 2 buckets sequence for every host.
---
.../main/java/org/apache/doris/qe/Coordinator.java | 67 +++++++++++-----------
.../java/org/apache/doris/qe/SimpleScheduler.java | 32 +++++++++++
.../java/org/apache/doris/qe/CoordinatorTest.java | 61 ++++++++++++++++++++
3 files changed, 127 insertions(+), 33 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 0f75ffd..ff05413 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -1251,12 +1251,12 @@ public class Coordinator {
fragmentIdToSeqToAddressMap.put(scanNode.getFragmentId(), new
HashedMap());
}
Map<Integer, TNetworkAddress> bucketSeqToAddress =
fragmentIdToSeqToAddressMap.get(scanNode.getFragmentId());
-
+ HashMap<TNetworkAddress, Long> assignedBytesPerHost =
Maps.newHashMap();
for(Integer bucketSeq: scanNode.bucketSeq2locations.keySet()) {
//fill scanRangeParamsList
List<TScanRangeLocations> locations =
scanNode.bucketSeq2locations.get(bucketSeq);
if (!bucketSeqToAddress.containsKey(bucketSeq)) {
- getExecHostPortForFragmentIDAndBucketSeq(locations.get(0),
scanNode.getFragmentId(), bucketSeq);
+ getExecHostPortForFragmentIDAndBucketSeq(locations.get(0),
scanNode.getFragmentId(), bucketSeq, assignedBytesPerHost);
}
for(TScanRangeLocations location: locations) {
@@ -1274,50 +1274,51 @@ public class Coordinator {
}
}
- // randomly choose a backend from the TScanRangeLocations for a certain
bucket sequence.
- private void getExecHostPortForFragmentIDAndBucketSeq(TScanRangeLocations
seqLocation, PlanFragmentId fragmentId, Integer bucketSeq) throws Exception {
- int randomLocation = new
Random().nextInt(seqLocation.locations.size());
+ //ensure bucket sequence distribued to every host evenly
+ private void getExecHostPortForFragmentIDAndBucketSeq(TScanRangeLocations
seqLocation, PlanFragmentId fragmentId, Integer bucketSeq,
+
HashMap<TNetworkAddress, Long> assignedBytesPerHost) throws Exception {
Reference<Long> backendIdRef = new Reference<Long>();
- TNetworkAddress execHostPort =
SimpleScheduler.getHost(seqLocation.locations.get(randomLocation).backend_id,
seqLocation.locations, this.idToBackend, backendIdRef);
+ selectBackendsByRoundRobin(seqLocation, assignedBytesPerHost,
backendIdRef);
+ Backend backend = this.idToBackend.get(backendIdRef.getRef());
+ TNetworkAddress execHostPort = new TNetworkAddress(backend.getHost(),
backend.getBePort());
this.addressToBackendID.put(execHostPort, backendIdRef.getRef());
this.fragmentIdToSeqToAddressMap.get(fragmentId).put(bucketSeq,
execHostPort);
}
+ public TScanRangeLocation selectBackendsByRoundRobin(TScanRangeLocations
seqLocation,
+ HashMap<TNetworkAddress, Long>
assignedBytesPerHost,
+ Reference<Long> backendIdRef) throws
UserException {
+ Long minAssignedBytes = Long.MAX_VALUE;
+ TScanRangeLocation minLocation = null;
+ Long step = 1L;
+ for (final TScanRangeLocation location : seqLocation.getLocations()) {
+ Long assignedBytes = findOrInsert(assignedBytesPerHost,
location.server, 0L);
+ if (assignedBytes < minAssignedBytes) {
+ minAssignedBytes = assignedBytes;
+ minLocation = location;
+ }
+ }
+ TScanRangeLocation location = SimpleScheduler.getLocation(minLocation,
seqLocation.locations, this.idToBackend, backendIdRef);
+ if (assignedBytesPerHost.containsKey(location.server)) {
+ assignedBytesPerHost.put(location.server,
+ assignedBytesPerHost.get(location.server) + step);
+ } else {
+ assignedBytesPerHost.put(location.server, step);
+ }
+ return location;
+ }
+
private void computeScanRangeAssignmentByScheduler(
final ScanNode scanNode,
final List<TScanRangeLocations> locations,
FragmentScanRangeAssignment assignment) throws Exception {
HashMap<TNetworkAddress, Long> assignedBytesPerHost =
Maps.newHashMap();
- Long step = 1L;
for (TScanRangeLocations scanRangeLocations : locations) {
- // assign this scan range to the host w/ the fewest assigned bytes
- Long minAssignedBytes = Long.MAX_VALUE;
- TScanRangeLocation minLocation = null;
- for (final TScanRangeLocation location :
scanRangeLocations.getLocations()) {
- Long assignedBytes = findOrInsert(assignedBytesPerHost,
location.server, 0L);
- if (assignedBytes < minAssignedBytes) {
- minAssignedBytes = assignedBytes;
- minLocation = location;
- }
- }
- assignedBytesPerHost.put(minLocation.server,
- assignedBytesPerHost.get(minLocation.server) + step);
-
Reference<Long> backendIdRef = new Reference<Long>();
- TNetworkAddress execHostPort =
SimpleScheduler.getHost(minLocation.backend_id,
- scanRangeLocations.getLocations(), this.idToBackend,
backendIdRef);
- if (!execHostPort.hostname.equals(minLocation.server.hostname) ||
- execHostPort.port != minLocation.server.port) {
- assignedBytesPerHost.put(minLocation.server,
- assignedBytesPerHost.get(minLocation.server) - step);
- Long id = assignedBytesPerHost.get(execHostPort);
- if (id == null) {
- assignedBytesPerHost.put(execHostPort, 0L);
- } else {
- assignedBytesPerHost.put(execHostPort, id + step);
- }
- }
+ TScanRangeLocation minLocation =
selectBackendsByRoundRobin(scanRangeLocations, assignedBytesPerHost,
backendIdRef);
+ Backend backend = this.idToBackend.get(backendIdRef.getRef());
+ TNetworkAddress execHostPort = new
TNetworkAddress(backend.getHost(), backend.getBePort());
this.addressToBackendID.put(execHostPort, backendIdRef.getRef());
Map<Integer, List<TScanRangeParams>> scanRanges =
findOrInsert(assignment, execHostPort,
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SimpleScheduler.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/SimpleScheduler.java
index d2c88fe..3943a13 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SimpleScheduler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SimpleScheduler.java
@@ -92,6 +92,38 @@ public class SimpleScheduler {
backends, locations.size()));
}
+ public static TScanRangeLocation getLocation(TScanRangeLocation
minLocation,
+ List<TScanRangeLocation> locations,
+ ImmutableMap<Long, Backend> backends,
+ Reference<Long> backendIdRef)
+ throws UserException {
+ if (CollectionUtils.isEmpty(locations) || backends == null ||
backends.isEmpty()) {
+ throw new UserException("scan range location or candidate backends
is empty");
+ }
+ Backend backend = backends.get(minLocation.backend_id);
+ if (isAvailable(backend)) {
+ backendIdRef.setRef(minLocation.backend_id);
+ return minLocation;
+ } else {
+ for (TScanRangeLocation location : locations) {
+ if (location.backend_id == minLocation.backend_id) {
+ continue;
+ }
+ // choose the first alive backend(in analysis stage, the
locations are random)
+ Backend candidateBackend = backends.get(location.backend_id);
+ if (isAvailable(candidateBackend)) {
+ backendIdRef.setRef(location.backend_id);
+ return location;
+ }
+ }
+ }
+
+ // no backend returned
+ throw new UserException("there is no scanNode Backend. " +
+ getBackendErrorMsg(locations.stream().map(l ->
l.backend_id).collect(Collectors.toList()),
+ backends, locations.size()));
+ }
+
public static TNetworkAddress getHost(ImmutableMap<Long, Backend> backends,
Reference<Long> backendIdRef)
throws UserException {
diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java
b/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java
index b2f780f..35efae2 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java
@@ -59,6 +59,8 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import com.google.common.collect.Maps;
+import org.apache.commons.collections.map.HashedMap;
public class CoordinatorTest extends Coordinator {
static Planner planner = new Planner();
@@ -469,5 +471,64 @@ public class CoordinatorTest extends Coordinator {
}
}
}
+
+ @Test
+ public void testGetExecHostPortForFragmentIDAndBucketSeq() {
+ Coordinator coordinator = new Coordinator(context, analyzer, planner);
+ PlanFragmentId planFragmentId = new PlanFragmentId(1);
+ // each olaptable bucket have the same TScanRangeLocations, be id is
{0, 1, 2}
+ TScanRangeLocations tScanRangeLocations = new TScanRangeLocations();
+ TScanRangeLocation tScanRangeLocation0 = new TScanRangeLocation();
+ tScanRangeLocation0.backend_id = 0;
+ tScanRangeLocation0.server = new TNetworkAddress("0.0.0.0", 9050);
+ TScanRangeLocation tScanRangeLocation1 = new TScanRangeLocation();
+ tScanRangeLocation1.backend_id = 1;
+ tScanRangeLocation1.server = new TNetworkAddress("0.0.0.1", 9050);
+ TScanRangeLocation tScanRangeLocation2 = new TScanRangeLocation();
+ tScanRangeLocation2.backend_id = 2;
+ tScanRangeLocation2.server = new TNetworkAddress("0.0.0.2", 9050);
+ tScanRangeLocations.locations = new ArrayList<>();
+ tScanRangeLocations.locations.add(tScanRangeLocation0);
+ tScanRangeLocations.locations.add(tScanRangeLocation1);
+ tScanRangeLocations.locations.add(tScanRangeLocation2);
+
+ // init all backend
+ Backend backend0 = new Backend(0, "0.0.0.0", 9060);
+ backend0.setAlive(true);
+ backend0.setBePort(9050);
+ Backend backend1 = new Backend(1, "0.0.0.1", 9060);
+ backend1.setAlive(true);
+ backend1.setBePort(9050);
+ Backend backend2 = new Backend(2, "0.0.0.2", 9060);
+ backend2.setAlive(true);
+ backend2.setBePort(9050);
+
+ ImmutableMap<Long, Backend> idToBackend =
+ new ImmutableMap.Builder<Long, Backend>().
+ put(0l, backend0).
+ put(1l, backend1).
+ put(2l, backend2).build();
+ Deencapsulation.setField(coordinator, "idToBackend", idToBackend);
+ Map<PlanFragmentId, Map<Integer, TNetworkAddress>>
fragmentIdToSeqToAddressMap = Maps.newHashMap();
+ fragmentIdToSeqToAddressMap.put(planFragmentId, new HashedMap());
+ Deencapsulation.setField(coordinator, "fragmentIdToSeqToAddressMap",
fragmentIdToSeqToAddressMap);
+ List<TScanRangeLocations> locations = new ArrayList<>();
+ locations.add(tScanRangeLocations);
+
+ HashMap<TNetworkAddress, Long> assignedBytesPerHost =
Maps.newHashMap();
+ Deencapsulation.invoke(coordinator,
"getExecHostPortForFragmentIDAndBucketSeq",tScanRangeLocations,
+ planFragmentId, 1, assignedBytesPerHost);
+ Deencapsulation.invoke(coordinator,
"getExecHostPortForFragmentIDAndBucketSeq",tScanRangeLocations,
+ planFragmentId, 2, assignedBytesPerHost);
+ Deencapsulation.invoke(coordinator,
"getExecHostPortForFragmentIDAndBucketSeq",tScanRangeLocations,
+ planFragmentId, 3, assignedBytesPerHost);
+ List<String> hosts = new ArrayList<>();
+ for (Map.Entry item:assignedBytesPerHost.entrySet()) {
+ Assert.assertTrue((Long)item.getValue() == 1);
+ TNetworkAddress addr = (TNetworkAddress)item.getKey();
+ hosts.add(addr.hostname);
+ }
+ Assert.assertTrue(hosts.size() == 3);
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]