This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new f3c6ee5961 [Enhance](ComputeNode) ES Scan node support to be scheduled
to compute node (#16533)
f3c6ee5961 is described below
commit f3c6ee5961dbabb65c8e2e47c159fde9ac87e59c
Author: huangzhaowei <[email protected]>
AuthorDate: Tue Mar 14 00:13:24 2023 +0800
[Enhance](ComputeNode) ES Scan node support to be scheduled to compute node
(#16533)
ES Scan node support to be scheduled to compute node.
---
.../java/org/apache/doris/planner/EsScanNode.java | 50 +++-------------------
.../planner/external/ExternalFileScanNode.java | 2 +-
...endPolicy.java => FederationBackendPolicy.java} | 10 ++++-
.../doris/planner/external/FileGroupInfo.java | 6 +--
.../doris/planner/external/FileScanProviderIf.java | 2 +-
.../doris/planner/external/LoadScanProvider.java | 2 +-
.../doris/planner/external/MetadataScanNode.java | 2 +-
.../doris/planner/external/QueryScanProvider.java | 4 +-
.../org/apache/doris/system/BeSelectionPolicy.java | 14 ++++++
.../apache/doris/system/SystemInfoServiceTest.java | 43 +++++++++++++++++++
10 files changed, 81 insertions(+), 54 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/EsScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/EsScanNode.java
index 63a60f5024..a2349e7e57 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/EsScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/EsScanNode.java
@@ -21,7 +21,6 @@ import org.apache.doris.analysis.Analyzer;
import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.SlotDescriptor;
import org.apache.doris.analysis.TupleDescriptor;
-import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.EsResource;
import org.apache.doris.catalog.EsTable;
import org.apache.doris.catalog.PartitionInfo;
@@ -37,6 +36,7 @@ import org.apache.doris.external.elasticsearch.QueryBuilders;
import org.apache.doris.external.elasticsearch.QueryBuilders.BoolQueryBuilder;
import org.apache.doris.external.elasticsearch.QueryBuilders.BuilderOptions;
import org.apache.doris.external.elasticsearch.QueryBuilders.QueryBuilder;
+import org.apache.doris.planner.external.FederationBackendPolicy;
import org.apache.doris.statistics.StatisticalType;
import org.apache.doris.system.Backend;
import org.apache.doris.thrift.TEsScanNode;
@@ -49,18 +49,15 @@ import org.apache.doris.thrift.TScanRange;
import org.apache.doris.thrift.TScanRangeLocation;
import org.apache.doris.thrift.TScanRangeLocations;
-import com.google.common.collect.HashMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
-import com.google.common.collect.Sets;
import lombok.SneakyThrows;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -76,7 +73,6 @@ public class EsScanNode extends ScanNode {
private final Random random = new Random(System.currentTimeMillis());
private Multimap<String, Backend> backendMap;
- private List<Backend> backendList;
private EsTablePartitions esTablePartitions;
private List<TScanRangeLocations> shardScanRanges = Lists.newArrayList();
private EsTable table;
@@ -105,14 +101,12 @@ public class EsScanNode extends ScanNode {
public void init(Analyzer analyzer) throws UserException {
super.init(analyzer);
computeColumnFilter();
- assignBackends();
computeStats(analyzer);
buildQuery();
}
public void init() throws UserException {
computeColumnFilter();
- assignBackends();
buildQuery();
}
@@ -208,20 +202,6 @@ public class EsScanNode extends ScanNode {
msg.es_scan_node = esScanNode;
}
- private void assignBackends() throws UserException {
- backendMap = HashMultimap.create();
- backendList = Lists.newArrayList();
- for (Backend be :
Env.getCurrentSystemInfo().getIdToBackend().values()) {
- if (be.isAlive()) {
- backendMap.put(be.getIp(), be);
- backendList.add(be);
- }
- }
- if (backendMap.isEmpty()) {
- throw new UserException("No Alive backends");
- }
- }
-
// only do partition(es index level) prune
private List<TScanRangeLocations> getShardLocations() throws UserException
{
// has to get partition info from es state not from table because the
partition
@@ -252,39 +232,23 @@ public class EsScanNode extends ScanNode {
LOG.debug("partition prune finished, unpartitioned index [{}], " +
"partitioned index [{}]",
String.join(",", unPartitionedIndices), String.join(",",
partitionedIndices));
}
- int size = backendList.size();
- int beIndex = random.nextInt(size);
List<TScanRangeLocations> result = Lists.newArrayList();
for (EsShardPartitions indexState : selectedIndex) {
for (List<EsShardRouting> shardRouting :
indexState.getShardRoutings().values()) {
// get backends
- Set<Backend> colocatedBes = Sets.newHashSet();
- int numBe = Math.min(3, size);
List<TNetworkAddress> shardAllocations = new ArrayList<>();
+ List<String> preLocations = new ArrayList<>();
for (EsShardRouting item : shardRouting) {
shardAllocations.add(item.getHttpAddress());
+ preLocations.add(item.getHttpAddress().getHostname());
}
- Collections.shuffle(shardAllocations, random);
- for (TNetworkAddress address : shardAllocations) {
- colocatedBes.addAll(backendMap.get(address.getHostname()));
- }
- boolean usingRandomBackend = colocatedBes.size() == 0;
- List<Backend> candidateBeList = Lists.newArrayList();
- if (usingRandomBackend) {
- for (int i = 0; i < numBe; ++i) {
- candidateBeList.add(backendList.get(beIndex++ % size));
- }
- } else {
- candidateBeList.addAll(colocatedBes);
- Collections.shuffle(candidateBeList);
- }
-
- // Locations
+ FederationBackendPolicy backendPolicy = new
FederationBackendPolicy();
+ backendPolicy.init(preLocations);
TScanRangeLocations locations = new TScanRangeLocations();
- for (int i = 0; i < numBe && i < candidateBeList.size(); ++i) {
+ for (int i = 0; i < backendPolicy.numBackends(); ++i) {
TScanRangeLocation location = new TScanRangeLocation();
- Backend be = candidateBeList.get(i);
+ Backend be = backendPolicy.getNextBe();
location.setBackendId(be.getId());
location.setServer(new TNetworkAddress(be.getIp(),
be.getBePort()));
locations.addToLocations(location);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java
index 6251045b2e..f61615a497 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java
@@ -114,7 +114,7 @@ public class ExternalFileScanNode extends ExternalScanNode {
}
private Type type = Type.QUERY;
- private final BackendPolicy backendPolicy = new BackendPolicy();
+ private final FederationBackendPolicy backendPolicy = new
FederationBackendPolicy();
// Only for load job.
// Save all info about load attributes and files.
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/BackendPolicy.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FederationBackendPolicy.java
similarity index 90%
rename from
fe/fe-core/src/main/java/org/apache/doris/planner/external/BackendPolicy.java
rename to
fe/fe-core/src/main/java/org/apache/doris/planner/external/FederationBackendPolicy.java
index 3291b31741..c67f7a50bf 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/BackendPolicy.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FederationBackendPolicy.java
@@ -31,16 +31,21 @@ import com.google.common.collect.Sets;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
+import java.util.Collections;
import java.util.List;
import java.util.Set;
-public class BackendPolicy {
- private static final Logger LOG =
LogManager.getLogger(BackendPolicy.class);
+public class FederationBackendPolicy {
+ private static final Logger LOG =
LogManager.getLogger(FederationBackendPolicy.class);
private final List<Backend> backends = Lists.newArrayList();
private int nextBe = 0;
public void init() throws UserException {
+ init(Collections.emptyList());
+ }
+
+ public void init(List<String> preLocations) throws UserException {
Set<Tag> tags = Sets.newHashSet();
if (ConnectContext.get() != null &&
ConnectContext.get().getCurrentUserIdentity() != null) {
String qualifiedUser =
ConnectContext.get().getCurrentUserIdentity().getQualifiedUser();
@@ -59,6 +64,7 @@ public class BackendPolicy {
.addTags(tags)
.preferComputeNode(Config.prefer_compute_node_for_external_table)
.assignExpectBeNum(Config.min_backend_num_for_external_table)
+ .addPreLocations(preLocations)
.build();
backends.addAll(policy.getCandidateBackends(Env.getCurrentSystemInfo().getIdToBackend().values()));
if (backends.isEmpty()) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileGroupInfo.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileGroupInfo.java
index d7d90dbec6..770ebd9a7a 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileGroupInfo.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileGroupInfo.java
@@ -159,7 +159,7 @@ public class FileGroupInfo {
return hiddenColumns;
}
- public void getFileStatusAndCalcInstance(BackendPolicy backendPolicy)
throws UserException {
+ public void getFileStatusAndCalcInstance(FederationBackendPolicy
backendPolicy) throws UserException {
if (filesAdded == 0) {
throw new UserException("No source file in this table(" +
targetTable.getName() + ").");
}
@@ -188,7 +188,7 @@ public class FileGroupInfo {
LOG.info("number instance of file scan node is: {}, bytes per
instance: {}", numInstances, bytesPerInstance);
}
- public void createScanRangeLocations(ParamCreateContext context,
BackendPolicy backendPolicy,
+ public void createScanRangeLocations(ParamCreateContext context,
FederationBackendPolicy backendPolicy,
List<TScanRangeLocations> scanRangeLocations) throws UserException
{
TScanRangeLocations curLocations = newLocations(context.params,
brokerDesc, backendPolicy);
long curInstanceBytes = 0;
@@ -242,7 +242,7 @@ public class FileGroupInfo {
}
protected TScanRangeLocations newLocations(TFileScanRangeParams params,
BrokerDesc brokerDesc,
- BackendPolicy backendPolicy) throws UserException {
+ FederationBackendPolicy backendPolicy) throws UserException {
Backend selectedBackend = backendPolicy.getNextBe();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileScanProviderIf.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileScanProviderIf.java
index f962f4d827..d85b0eb073 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileScanProviderIf.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileScanProviderIf.java
@@ -44,7 +44,7 @@ public interface FileScanProviderIf {
ParamCreateContext createContext(Analyzer analyzer) throws UserException;
- void createScanRangeLocations(ParamCreateContext context, BackendPolicy
backendPolicy,
+ void createScanRangeLocations(ParamCreateContext context,
FederationBackendPolicy backendPolicy,
List<TScanRangeLocations> scanRangeLocations) throws UserException;
int getInputSplitNum();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/LoadScanProvider.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/LoadScanProvider.java
index 086191e94d..17051061a2 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/LoadScanProvider.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/LoadScanProvider.java
@@ -138,7 +138,7 @@ public class LoadScanProvider implements FileScanProviderIf
{
}
@Override
- public void createScanRangeLocations(ParamCreateContext context,
BackendPolicy backendPolicy,
+ public void createScanRangeLocations(ParamCreateContext context,
FederationBackendPolicy backendPolicy,
List<TScanRangeLocations> scanRangeLocations) throws UserException
{
Preconditions.checkNotNull(fileGroupInfo);
fileGroupInfo.getFileStatusAndCalcInstance(backendPolicy);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/MetadataScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/MetadataScanNode.java
index de4f0cdb56..aa9c840197 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/MetadataScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/MetadataScanNode.java
@@ -47,7 +47,7 @@ public class MetadataScanNode extends ScanNode {
private List<TScanRangeLocations> scanRangeLocations =
Lists.newArrayList();
- private final BackendPolicy backendPolicy = new BackendPolicy();
+ private final FederationBackendPolicy backendPolicy = new
FederationBackendPolicy();
public MetadataScanNode(PlanNodeId id, TupleDescriptor desc,
MetadataTableValuedFunction tvf) {
super(id, desc, "METADATA_SCAN_NODE",
StatisticalType.METADATA_SCAN_NODE);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/QueryScanProvider.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/QueryScanProvider.java
index 45e48c4ff5..6636642ed5 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/QueryScanProvider.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/QueryScanProvider.java
@@ -59,7 +59,7 @@ public abstract class QueryScanProvider implements
FileScanProviderIf {
public abstract TFileAttributes getFileAttributes() throws UserException;
@Override
- public void createScanRangeLocations(ParamCreateContext context,
BackendPolicy backendPolicy,
+ public void createScanRangeLocations(ParamCreateContext context,
FederationBackendPolicy backendPolicy,
List<TScanRangeLocations> scanRangeLocations) throws UserException
{
long start = System.currentTimeMillis();
List<Split> inputSplits = splitter.getSplits(context.conjuncts);
@@ -151,7 +151,7 @@ public abstract class QueryScanProvider implements
FileScanProviderIf {
return this.inputFileSize;
}
- private TScanRangeLocations newLocations(TFileScanRangeParams params,
BackendPolicy backendPolicy) {
+ private TScanRangeLocations newLocations(TFileScanRangeParams params,
FederationBackendPolicy backendPolicy) {
// Generate on file scan range
TFileScanRange fileScanRange = new TFileScanRange();
fileScanRange.setParams(params);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/system/BeSelectionPolicy.java
b/fe/fe-core/src/main/java/org/apache/doris/system/BeSelectionPolicy.java
index 3f3db6d78c..9fbd3c6104 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/system/BeSelectionPolicy.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/system/BeSelectionPolicy.java
@@ -49,6 +49,8 @@ public class BeSelectionPolicy {
public boolean preferComputeNode = false;
public int expectBeNum = 0;
+ public List<String> preferredLocations = new ArrayList<>();
+
private BeSelectionPolicy() {
}
@@ -110,6 +112,11 @@ public class BeSelectionPolicy {
return this;
}
+ public Builder addPreLocations(List<String> preferredLocations) {
+ policy.preferredLocations.addAll(preferredLocations);
+ return this;
+ }
+
public BeSelectionPolicy build() {
return policy;
}
@@ -141,6 +148,13 @@ public class BeSelectionPolicy {
public List<Backend> getCandidateBackends(ImmutableCollection<Backend>
backends) {
List<Backend> filterBackends =
backends.stream().filter(this::isMatch).collect(Collectors.toList());
+ List<Backend> preLocationFilterBackends = filterBackends.stream()
+ .filter(iterm ->
preferredLocations.contains(iterm.getHostName())).collect(Collectors.toList());
+ // If preLocations were chosen, use the preLocation backends.
Otherwise we just ignore this filter.
+ if (!preLocationFilterBackends.isEmpty()) {
+ filterBackends = preLocationFilterBackends;
+ }
+ Collections.shuffle(filterBackends);
List<Backend> candidates = new ArrayList<>();
if (preferComputeNode) {
int num = 0;
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/system/SystemInfoServiceTest.java
b/fe/fe-core/src/test/java/org/apache/doris/system/SystemInfoServiceTest.java
index ec9ce6301d..1ac62abfa9 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/system/SystemInfoServiceTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/system/SystemInfoServiceTest.java
@@ -34,6 +34,7 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -272,6 +273,48 @@ public class SystemInfoServiceTest {
Assert.assertEquals(3, infoService.selectBackendIdsByPolicy(policy07,
3).size());
}
+ @Test
+ public void testPreferLocationsSelect() throws Exception {
+ Tag taga = Tag.create(Tag.TYPE_LOCATION, "taga");
+
+ // add more backends
+ addBackend(10002, "192.168.1.2", 9050);
+ Backend be2 = infoService.getBackend(10002);
+ be2.setAlive(true);
+ addBackend(10003, "192.168.1.3", 9050);
+ Backend be3 = infoService.getBackend(10003);
+ be3.setAlive(true);
+ addBackend(10004, "192.168.1.4", 9050);
+ Backend be4 = infoService.getBackend(10004);
+ be4.setAlive(true);
+ addBackend(10005, "192.168.1.5", 9050);
+ Backend be5 = infoService.getBackend(10005);
+ be5.setAlive(true);
+
+ setComputeNode(be5, taga);
+
+ List<String> preferLocations = new ArrayList<>();
+ preferLocations.add("192.168.1.2");
+ BeSelectionPolicy policy1 = new
BeSelectionPolicy.Builder().addPreLocations(preferLocations).build();
+ Assert.assertEquals(1, infoService.selectBackendIdsByPolicy(policy1,
1).size());
+ preferLocations.add("192.168.1.3");
+ BeSelectionPolicy policy2 = new
BeSelectionPolicy.Builder().addPreLocations(preferLocations).build();
+
+ Assert.assertEquals(2, infoService.selectBackendIdsByPolicy(policy2,
2).size());
+
+ // only one preferLocations
+ preferLocations.clear();
+ preferLocations.add("192.168.1.4");
+ BeSelectionPolicy policy3 = new
BeSelectionPolicy.Builder().addTags(Sets.newHashSet(taga))
+
.addPreLocations(preferLocations).preferComputeNode(true).assignExpectBeNum(3).build();
+ Assert.assertEquals(1, infoService.selectBackendIdsByPolicy(policy3,
1).size());
+
+ preferLocations.add("192.168.1.5");
+ BeSelectionPolicy policy4 = new
BeSelectionPolicy.Builder().addTags(Sets.newHashSet(taga))
+
.addPreLocations(preferLocations).preferComputeNode(true).assignExpectBeNum(1).build();
+ Assert.assertEquals(1, infoService.selectBackendIdsByPolicy(policy4,
1).size());
+ }
+
@Test
public void testSelectBackendIdsForReplicaCreation() throws Exception {
addBackend(10001, "192.168.1.1", 9050);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]