This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new f3c6ee5961 [Enhance](ComputeNode) ES Scan node support to be scheduled 
to compute node (#16533)
f3c6ee5961 is described below

commit f3c6ee5961dbabb65c8e2e47c159fde9ac87e59c
Author: huangzhaowei <[email protected]>
AuthorDate: Tue Mar 14 00:13:24 2023 +0800

    [Enhance](ComputeNode) ES Scan node support to be scheduled to compute node 
(#16533)
    
    ES Scan node support to be scheduled to compute node.
---
 .../java/org/apache/doris/planner/EsScanNode.java  | 50 +++-------------------
 .../planner/external/ExternalFileScanNode.java     |  2 +-
 ...endPolicy.java => FederationBackendPolicy.java} | 10 ++++-
 .../doris/planner/external/FileGroupInfo.java      |  6 +--
 .../doris/planner/external/FileScanProviderIf.java |  2 +-
 .../doris/planner/external/LoadScanProvider.java   |  2 +-
 .../doris/planner/external/MetadataScanNode.java   |  2 +-
 .../doris/planner/external/QueryScanProvider.java  |  4 +-
 .../org/apache/doris/system/BeSelectionPolicy.java | 14 ++++++
 .../apache/doris/system/SystemInfoServiceTest.java | 43 +++++++++++++++++++
 10 files changed, 81 insertions(+), 54 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/EsScanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/EsScanNode.java
index 63a60f5024..a2349e7e57 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/EsScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/EsScanNode.java
@@ -21,7 +21,6 @@ import org.apache.doris.analysis.Analyzer;
 import org.apache.doris.analysis.Expr;
 import org.apache.doris.analysis.SlotDescriptor;
 import org.apache.doris.analysis.TupleDescriptor;
-import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.EsResource;
 import org.apache.doris.catalog.EsTable;
 import org.apache.doris.catalog.PartitionInfo;
@@ -37,6 +36,7 @@ import org.apache.doris.external.elasticsearch.QueryBuilders;
 import org.apache.doris.external.elasticsearch.QueryBuilders.BoolQueryBuilder;
 import org.apache.doris.external.elasticsearch.QueryBuilders.BuilderOptions;
 import org.apache.doris.external.elasticsearch.QueryBuilders.QueryBuilder;
+import org.apache.doris.planner.external.FederationBackendPolicy;
 import org.apache.doris.statistics.StatisticalType;
 import org.apache.doris.system.Backend;
 import org.apache.doris.thrift.TEsScanNode;
@@ -49,18 +49,15 @@ import org.apache.doris.thrift.TScanRange;
 import org.apache.doris.thrift.TScanRangeLocation;
 import org.apache.doris.thrift.TScanRangeLocations;
 
-import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Multimap;
-import com.google.common.collect.Sets;
 import lombok.SneakyThrows;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -76,7 +73,6 @@ public class EsScanNode extends ScanNode {
 
     private final Random random = new Random(System.currentTimeMillis());
     private Multimap<String, Backend> backendMap;
-    private List<Backend> backendList;
     private EsTablePartitions esTablePartitions;
     private List<TScanRangeLocations> shardScanRanges = Lists.newArrayList();
     private EsTable table;
@@ -105,14 +101,12 @@ public class EsScanNode extends ScanNode {
     public void init(Analyzer analyzer) throws UserException {
         super.init(analyzer);
         computeColumnFilter();
-        assignBackends();
         computeStats(analyzer);
         buildQuery();
     }
 
     public void init() throws UserException {
         computeColumnFilter();
-        assignBackends();
         buildQuery();
     }
 
@@ -208,20 +202,6 @@ public class EsScanNode extends ScanNode {
         msg.es_scan_node = esScanNode;
     }
 
-    private void assignBackends() throws UserException {
-        backendMap = HashMultimap.create();
-        backendList = Lists.newArrayList();
-        for (Backend be : 
Env.getCurrentSystemInfo().getIdToBackend().values()) {
-            if (be.isAlive()) {
-                backendMap.put(be.getIp(), be);
-                backendList.add(be);
-            }
-        }
-        if (backendMap.isEmpty()) {
-            throw new UserException("No Alive backends");
-        }
-    }
-
     // only do partition(es index level) prune
     private List<TScanRangeLocations> getShardLocations() throws UserException 
{
         // has to get partition info from es state not from table because the 
partition
@@ -252,39 +232,23 @@ public class EsScanNode extends ScanNode {
             LOG.debug("partition prune finished, unpartitioned index [{}], " + 
"partitioned index [{}]",
                     String.join(",", unPartitionedIndices), String.join(",", 
partitionedIndices));
         }
-        int size = backendList.size();
-        int beIndex = random.nextInt(size);
         List<TScanRangeLocations> result = Lists.newArrayList();
         for (EsShardPartitions indexState : selectedIndex) {
             for (List<EsShardRouting> shardRouting : 
indexState.getShardRoutings().values()) {
                 // get backends
-                Set<Backend> colocatedBes = Sets.newHashSet();
-                int numBe = Math.min(3, size);
                 List<TNetworkAddress> shardAllocations = new ArrayList<>();
+                List<String> preLocations = new ArrayList<>();
                 for (EsShardRouting item : shardRouting) {
                     shardAllocations.add(item.getHttpAddress());
+                    preLocations.add(item.getHttpAddress().getHostname());
                 }
 
-                Collections.shuffle(shardAllocations, random);
-                for (TNetworkAddress address : shardAllocations) {
-                    colocatedBes.addAll(backendMap.get(address.getHostname()));
-                }
-                boolean usingRandomBackend = colocatedBes.size() == 0;
-                List<Backend> candidateBeList = Lists.newArrayList();
-                if (usingRandomBackend) {
-                    for (int i = 0; i < numBe; ++i) {
-                        candidateBeList.add(backendList.get(beIndex++ % size));
-                    }
-                } else {
-                    candidateBeList.addAll(colocatedBes);
-                    Collections.shuffle(candidateBeList);
-                }
-
-                // Locations
+                FederationBackendPolicy backendPolicy = new 
FederationBackendPolicy();
+                backendPolicy.init(preLocations);
                 TScanRangeLocations locations = new TScanRangeLocations();
-                for (int i = 0; i < numBe && i < candidateBeList.size(); ++i) {
+                for (int i = 0; i < backendPolicy.numBackends(); ++i) {
                     TScanRangeLocation location = new TScanRangeLocation();
-                    Backend be = candidateBeList.get(i);
+                    Backend be = backendPolicy.getNextBe();
                     location.setBackendId(be.getId());
                     location.setServer(new TNetworkAddress(be.getIp(), 
be.getBePort()));
                     locations.addToLocations(location);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java
index 6251045b2e..f61615a497 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java
@@ -114,7 +114,7 @@ public class ExternalFileScanNode extends ExternalScanNode {
     }
 
     private Type type = Type.QUERY;
-    private final BackendPolicy backendPolicy = new BackendPolicy();
+    private final FederationBackendPolicy backendPolicy = new 
FederationBackendPolicy();
 
     // Only for load job.
     // Save all info about load attributes and files.
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/BackendPolicy.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FederationBackendPolicy.java
similarity index 90%
rename from 
fe/fe-core/src/main/java/org/apache/doris/planner/external/BackendPolicy.java
rename to 
fe/fe-core/src/main/java/org/apache/doris/planner/external/FederationBackendPolicy.java
index 3291b31741..c67f7a50bf 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/BackendPolicy.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FederationBackendPolicy.java
@@ -31,16 +31,21 @@ import com.google.common.collect.Sets;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
+import java.util.Collections;
 import java.util.List;
 import java.util.Set;
 
-public class BackendPolicy {
-    private static final Logger LOG = 
LogManager.getLogger(BackendPolicy.class);
+public class FederationBackendPolicy {
+    private static final Logger LOG = 
LogManager.getLogger(FederationBackendPolicy.class);
     private final List<Backend> backends = Lists.newArrayList();
 
     private int nextBe = 0;
 
     public void init() throws UserException {
+        init(Collections.emptyList());
+    }
+
+    public void init(List<String> preLocations) throws UserException {
         Set<Tag> tags = Sets.newHashSet();
         if (ConnectContext.get() != null && 
ConnectContext.get().getCurrentUserIdentity() != null) {
             String qualifiedUser = 
ConnectContext.get().getCurrentUserIdentity().getQualifiedUser();
@@ -59,6 +64,7 @@ public class BackendPolicy {
                 .addTags(tags)
                 
.preferComputeNode(Config.prefer_compute_node_for_external_table)
                 .assignExpectBeNum(Config.min_backend_num_for_external_table)
+                .addPreLocations(preLocations)
                 .build();
         
backends.addAll(policy.getCandidateBackends(Env.getCurrentSystemInfo().getIdToBackend().values()));
         if (backends.isEmpty()) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileGroupInfo.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileGroupInfo.java
index d7d90dbec6..770ebd9a7a 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileGroupInfo.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileGroupInfo.java
@@ -159,7 +159,7 @@ public class FileGroupInfo {
         return hiddenColumns;
     }
 
-    public void getFileStatusAndCalcInstance(BackendPolicy backendPolicy) 
throws UserException {
+    public void getFileStatusAndCalcInstance(FederationBackendPolicy 
backendPolicy) throws UserException {
         if (filesAdded == 0) {
             throw new UserException("No source file in this table(" + 
targetTable.getName() + ").");
         }
@@ -188,7 +188,7 @@ public class FileGroupInfo {
         LOG.info("number instance of file scan node is: {}, bytes per 
instance: {}", numInstances, bytesPerInstance);
     }
 
-    public void createScanRangeLocations(ParamCreateContext context, 
BackendPolicy backendPolicy,
+    public void createScanRangeLocations(ParamCreateContext context, 
FederationBackendPolicy backendPolicy,
             List<TScanRangeLocations> scanRangeLocations) throws UserException 
{
         TScanRangeLocations curLocations = newLocations(context.params, 
brokerDesc, backendPolicy);
         long curInstanceBytes = 0;
@@ -242,7 +242,7 @@ public class FileGroupInfo {
     }
 
     protected TScanRangeLocations newLocations(TFileScanRangeParams params, 
BrokerDesc brokerDesc,
-            BackendPolicy backendPolicy) throws UserException {
+            FederationBackendPolicy backendPolicy) throws UserException {
 
         Backend selectedBackend = backendPolicy.getNextBe();
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileScanProviderIf.java
 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileScanProviderIf.java
index f962f4d827..d85b0eb073 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileScanProviderIf.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileScanProviderIf.java
@@ -44,7 +44,7 @@ public interface FileScanProviderIf {
 
     ParamCreateContext createContext(Analyzer analyzer) throws UserException;
 
-    void createScanRangeLocations(ParamCreateContext context, BackendPolicy 
backendPolicy,
+    void createScanRangeLocations(ParamCreateContext context, 
FederationBackendPolicy backendPolicy,
             List<TScanRangeLocations> scanRangeLocations) throws UserException;
 
     int getInputSplitNum();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/LoadScanProvider.java
 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/LoadScanProvider.java
index 086191e94d..17051061a2 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/LoadScanProvider.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/LoadScanProvider.java
@@ -138,7 +138,7 @@ public class LoadScanProvider implements FileScanProviderIf 
{
     }
 
     @Override
-    public void createScanRangeLocations(ParamCreateContext context, 
BackendPolicy backendPolicy,
+    public void createScanRangeLocations(ParamCreateContext context, 
FederationBackendPolicy backendPolicy,
             List<TScanRangeLocations> scanRangeLocations) throws UserException 
{
         Preconditions.checkNotNull(fileGroupInfo);
         fileGroupInfo.getFileStatusAndCalcInstance(backendPolicy);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/MetadataScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/MetadataScanNode.java
index de4f0cdb56..aa9c840197 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/MetadataScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/MetadataScanNode.java
@@ -47,7 +47,7 @@ public class MetadataScanNode extends ScanNode {
 
     private List<TScanRangeLocations> scanRangeLocations = 
Lists.newArrayList();
 
-    private final BackendPolicy backendPolicy = new BackendPolicy();
+    private final FederationBackendPolicy backendPolicy = new 
FederationBackendPolicy();
 
     public MetadataScanNode(PlanNodeId id, TupleDescriptor desc, 
MetadataTableValuedFunction tvf) {
         super(id, desc, "METADATA_SCAN_NODE", 
StatisticalType.METADATA_SCAN_NODE);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/QueryScanProvider.java
 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/QueryScanProvider.java
index 45e48c4ff5..6636642ed5 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/QueryScanProvider.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/QueryScanProvider.java
@@ -59,7 +59,7 @@ public abstract class QueryScanProvider implements 
FileScanProviderIf {
     public abstract TFileAttributes getFileAttributes() throws UserException;
 
     @Override
-    public void createScanRangeLocations(ParamCreateContext context, 
BackendPolicy backendPolicy,
+    public void createScanRangeLocations(ParamCreateContext context, 
FederationBackendPolicy backendPolicy,
             List<TScanRangeLocations> scanRangeLocations) throws UserException 
{
         long start = System.currentTimeMillis();
         List<Split> inputSplits = splitter.getSplits(context.conjuncts);
@@ -151,7 +151,7 @@ public abstract class QueryScanProvider implements 
FileScanProviderIf {
         return this.inputFileSize;
     }
 
-    private TScanRangeLocations newLocations(TFileScanRangeParams params, 
BackendPolicy backendPolicy) {
+    private TScanRangeLocations newLocations(TFileScanRangeParams params, 
FederationBackendPolicy backendPolicy) {
         // Generate on file scan range
         TFileScanRange fileScanRange = new TFileScanRange();
         fileScanRange.setParams(params);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/system/BeSelectionPolicy.java 
b/fe/fe-core/src/main/java/org/apache/doris/system/BeSelectionPolicy.java
index 3f3db6d78c..9fbd3c6104 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/system/BeSelectionPolicy.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/system/BeSelectionPolicy.java
@@ -49,6 +49,8 @@ public class BeSelectionPolicy {
     public boolean preferComputeNode = false;
     public int expectBeNum = 0;
 
+    public List<String> preferredLocations = new ArrayList<>();
+
     private BeSelectionPolicy() {
 
     }
@@ -110,6 +112,11 @@ public class BeSelectionPolicy {
             return this;
         }
 
+        public Builder addPreLocations(List<String> preferredLocations) {
+            policy.preferredLocations.addAll(preferredLocations);
+            return this;
+        }
+
         public BeSelectionPolicy build() {
             return policy;
         }
@@ -141,6 +148,13 @@ public class BeSelectionPolicy {
 
     public List<Backend> getCandidateBackends(ImmutableCollection<Backend> 
backends) {
         List<Backend> filterBackends = 
backends.stream().filter(this::isMatch).collect(Collectors.toList());
+        List<Backend> preLocationFilterBackends = filterBackends.stream()
+                .filter(iterm -> 
preferredLocations.contains(iterm.getHostName())).collect(Collectors.toList());
+        // If preLocations were chosen, use the preLocation backends. 
Otherwise we just ignore this filter.
+        if (!preLocationFilterBackends.isEmpty()) {
+            filterBackends = preLocationFilterBackends;
+        }
+        Collections.shuffle(filterBackends);
         List<Backend> candidates = new ArrayList<>();
         if (preferComputeNode) {
             int num = 0;
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/system/SystemInfoServiceTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/system/SystemInfoServiceTest.java
index ec9ce6301d..1ac62abfa9 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/system/SystemInfoServiceTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/system/SystemInfoServiceTest.java
@@ -34,6 +34,7 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -272,6 +273,48 @@ public class SystemInfoServiceTest {
         Assert.assertEquals(3, infoService.selectBackendIdsByPolicy(policy07, 
3).size());
     }
 
+    @Test
+    public void testPreferLocationsSelect() throws Exception {
+        Tag taga = Tag.create(Tag.TYPE_LOCATION, "taga");
+
+        // add more backends
+        addBackend(10002, "192.168.1.2", 9050);
+        Backend be2 = infoService.getBackend(10002);
+        be2.setAlive(true);
+        addBackend(10003, "192.168.1.3", 9050);
+        Backend be3 = infoService.getBackend(10003);
+        be3.setAlive(true);
+        addBackend(10004, "192.168.1.4", 9050);
+        Backend be4 = infoService.getBackend(10004);
+        be4.setAlive(true);
+        addBackend(10005, "192.168.1.5", 9050);
+        Backend be5 = infoService.getBackend(10005);
+        be5.setAlive(true);
+
+        setComputeNode(be5, taga);
+
+        List<String> preferLocations = new ArrayList<>();
+        preferLocations.add("192.168.1.2");
+        BeSelectionPolicy policy1 = new 
BeSelectionPolicy.Builder().addPreLocations(preferLocations).build();
+        Assert.assertEquals(1, infoService.selectBackendIdsByPolicy(policy1, 
1).size());
+        preferLocations.add("192.168.1.3");
+        BeSelectionPolicy policy2 = new 
BeSelectionPolicy.Builder().addPreLocations(preferLocations).build();
+
+        Assert.assertEquals(2, infoService.selectBackendIdsByPolicy(policy2, 
2).size());
+
+        // only one preferLocations
+        preferLocations.clear();
+        preferLocations.add("192.168.1.4");
+        BeSelectionPolicy policy3 = new 
BeSelectionPolicy.Builder().addTags(Sets.newHashSet(taga))
+                
.addPreLocations(preferLocations).preferComputeNode(true).assignExpectBeNum(3).build();
+        Assert.assertEquals(1, infoService.selectBackendIdsByPolicy(policy3, 
1).size());
+
+        preferLocations.add("192.168.1.5");
+        BeSelectionPolicy policy4 = new 
BeSelectionPolicy.Builder().addTags(Sets.newHashSet(taga))
+                
.addPreLocations(preferLocations).preferComputeNode(true).assignExpectBeNum(1).build();
+        Assert.assertEquals(1, infoService.selectBackendIdsByPolicy(policy4, 
1).size());
+    }
+
     @Test
     public void testSelectBackendIdsForReplicaCreation() throws Exception {
         addBackend(10001, "192.168.1.1", 9050);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to