This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 3287f350de [feature](table) implement the round robin selection be 
when create tablet (#19167)
3287f350de is described below

commit 3287f350de5a1946d4150957997c3ad8c2e9e587
Author: Luwei <[email protected]>
AuthorDate: Sat May 6 14:46:48 2023 +0800

    [feature](table) implement the round robin selection be when create tablet 
(#19167)
---
 .../main/java/org/apache/doris/common/Config.java  |   3 +
 .../apache/doris/datasource/InternalCatalog.java   |  45 +++++--
 .../org/apache/doris/system/SystemInfoService.java | 116 ++++++++++++++++++
 .../datasource/RoundRobinCreateTabletTest.java     | 129 +++++++++++++++++++++
 4 files changed, 285 insertions(+), 8 deletions(-)

diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java 
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 0cb93bef9b..bd6f538c82 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -1895,6 +1895,9 @@ public class Config extends ConfigBase {
     @ConfField(mutable = false)
     public static boolean skip_localhost_auth_check  = false;
 
+    @ConfField(mutable = true)
+    public static boolean enable_round_robin_create_tablet = false;
+
     /**
      * If set false, user couldn't submit analyze SQL and FE won't allocate 
any related resources.
      */
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
index 1947d17a66..c317ade75f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
@@ -176,6 +176,7 @@ import org.apache.doris.thrift.TStorageType;
 import org.apache.doris.thrift.TTabletType;
 import org.apache.doris.thrift.TTaskType;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
@@ -204,6 +205,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
+
 /**
  * The Internal catalog will manage all self-managed meta object in a Doris 
cluster.
  * Such as Database, tables, etc.
@@ -2516,7 +2518,8 @@ public class InternalCatalog implements 
CatalogIf<Database> {
         LOG.info("successfully create table[{}-{}]", tableName, tableId);
     }
 
-    private void createTablets(String clusterName, MaterializedIndex index, 
ReplicaState replicaState,
+    @VisibleForTesting
+    public void createTablets(String clusterName, MaterializedIndex index, 
ReplicaState replicaState,
             DistributionInfo distributionInfo, long version, ReplicaAllocation 
replicaAlloc, TabletMeta tabletMeta,
             Set<Long> tabletIdSet, IdGeneratorBuffer idGeneratorBuffer) throws 
DdlException {
         ColocateTableIndex colocateIndex = Env.getCurrentColocateIndex();
@@ -2538,6 +2541,20 @@ public class InternalCatalog implements 
CatalogIf<Database> {
         if (chooseBackendsArbitrary) {
             backendsPerBucketSeq = Maps.newHashMap();
         }
+
+        Map<Tag, Integer> nextIndexs = new HashMap<>();
+
+        if (Config.enable_round_robin_create_tablet) {
+            for (Map.Entry<Tag, Short> entry : 
replicaAlloc.getAllocMap().entrySet()) {
+                int startPos = 
Env.getCurrentSystemInfo().getStartPosOfRoundRobin(entry.getKey(), clusterName,
+                        tabletMeta.getStorageMedium());
+                if (startPos == -1) {
+                    throw new DdlException("The number of BEs that match the 
policy is insufficient");
+                }
+                nextIndexs.put(entry.getKey(), startPos);
+            }
+        }
+
         for (int i = 0; i < distributionInfo.getBucketNum(); ++i) {
             // create a new tablet with random chosen backends
             Tablet tablet = new Tablet(idGeneratorBuffer.getNextId());
@@ -2550,14 +2567,26 @@ public class InternalCatalog implements 
CatalogIf<Database> {
             Map<Tag, List<Long>> chosenBackendIds;
             if (chooseBackendsArbitrary) {
                 // This is the first colocate table in the group, or just a 
normal table,
-                // randomly choose backends
-                if (!Config.disable_storage_medium_check) {
-                    chosenBackendIds = Env.getCurrentSystemInfo()
-                            .selectBackendIdsForReplicaCreation(replicaAlloc, 
clusterName,
-                                    tabletMeta.getStorageMedium());
+                // choose backends
+                if (Config.enable_round_robin_create_tablet) {
+                    if (!Config.disable_storage_medium_check) {
+                        chosenBackendIds = Env.getCurrentSystemInfo()
+                                
.getBeIdRoundRobinForReplicaCreation(replicaAlloc, clusterName,
+                                        tabletMeta.getStorageMedium(), 
nextIndexs);
+                    } else {
+                        chosenBackendIds = Env.getCurrentSystemInfo()
+                                
.getBeIdRoundRobinForReplicaCreation(replicaAlloc, clusterName, null,
+                                        nextIndexs);
+                    }
                 } else {
-                    chosenBackendIds = Env.getCurrentSystemInfo()
-                            .selectBackendIdsForReplicaCreation(replicaAlloc, 
clusterName, null);
+                    if (!Config.disable_storage_medium_check) {
+                        chosenBackendIds = Env.getCurrentSystemInfo()
+                                
.selectBackendIdsForReplicaCreation(replicaAlloc, clusterName,
+                                        tabletMeta.getStorageMedium());
+                    } else {
+                        chosenBackendIds = Env.getCurrentSystemInfo()
+                                
.selectBackendIdsForReplicaCreation(replicaAlloc, clusterName, null);
+                    }
                 }
 
                 for (Map.Entry<Tag, List<Long>> entry : 
chosenBackendIds.entrySet()) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java 
b/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java
index 74824a395c..e710665640 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java
@@ -861,6 +861,122 @@ public class SystemInfoService {
         return classMap;
     }
 
+    class BeComparator implements Comparator<Backend> {
+        public int compare(Backend a, Backend b) {
+            return (int) (a.getId() - b.getId());
+        }
+    }
+
+    public List<Long> selectBackendIdsRoundRobinByPolicy(BeSelectionPolicy 
policy, int number,
+            int nextIndex) {
+        Preconditions.checkArgument(number >= -1);
+        List<Backend> candidates = getCandidates(policy);
+        if (number != -1 && candidates.size() < number) {
+            LOG.info("Not match policy: {}. candidates num: {}, expected: {}", 
policy, candidates.size(), number);
+            return Lists.newArrayList();
+        }
+
+        int realIndex = nextIndex % candidates.size();
+        List<Long> partialOrderList = new ArrayList<Long>();
+        partialOrderList.addAll(candidates.subList(realIndex, 
candidates.size())
+                .stream().map(b -> b.getId()).collect(Collectors.toList()));
+        partialOrderList.addAll(candidates.subList(0, realIndex)
+                .stream().map(b -> b.getId()).collect(Collectors.toList()));
+
+        if (number == -1) {
+            return partialOrderList;
+        } else {
+            return partialOrderList.subList(0, number);
+        }
+    }
+
+    public List<Backend> getCandidates(BeSelectionPolicy policy) {
+        List<Backend> candidates = 
policy.getCandidateBackends(idToBackendRef.values());
+        if (candidates.isEmpty()) {
+            LOG.info("Not match policy: {}. candidates num: {}", policy, 
candidates.size());
+            return Lists.newArrayList();
+        }
+
+        if (!policy.allowOnSameHost) {
+            Map<String, List<Backend>> backendMaps = Maps.newHashMap();
+            for (Backend backend : candidates) {
+                if (backendMaps.containsKey(backend.getIp())) {
+                    backendMaps.get(backend.getIp()).add(backend);
+                } else {
+                    List<Backend> list = Lists.newArrayList();
+                    list.add(backend);
+                    backendMaps.put(backend.getIp(), list);
+                }
+            }
+            candidates.clear();
+            for (List<Backend> list : backendMaps.values()) {
+                candidates.add(list.get(0));
+            }
+        }
+
+        if (candidates.isEmpty()) {
+            LOG.info("Not match policy: {}. candidates num: {}", policy, 
candidates.size());
+            return Lists.newArrayList();
+        }
+
+        Collections.sort(candidates, new BeComparator());
+        return candidates;
+    }
+
+    // Select the smallest number of tablets as the starting position of
+    // round robin in the BE that match the policy
+    public int getStartPosOfRoundRobin(Tag tag, String clusterName, 
TStorageMedium storageMedium) {
+        BeSelectionPolicy.Builder builder = new 
BeSelectionPolicy.Builder().setCluster(clusterName)
+                
.needScheduleAvailable().needCheckDiskUsage().addTags(Sets.newHashSet(tag))
+                .setStorageMedium(storageMedium);
+        if (FeConstants.runningUnitTest || Config.allow_replica_on_same_host) {
+            builder.allowOnSameHost();
+        }
+
+        BeSelectionPolicy policy = builder.build();
+        List<Backend> candidates = getCandidates(policy);
+
+        long minBeTabletsNum = Long.MAX_VALUE;
+        int minIndex = -1;
+        for (int i = 0; i < candidates.size(); ++i) {
+            long tabletsNum = Env.getCurrentInvertedIndex()
+                    .getTabletIdsByBackendId(candidates.get(i).getId()).size();
+            if (tabletsNum < minBeTabletsNum) {
+                minBeTabletsNum = tabletsNum;
+                minIndex = i;
+            }
+        }
+        return minIndex;
+    }
+
+    public Map<Tag, List<Long>> getBeIdRoundRobinForReplicaCreation(
+            ReplicaAllocation replicaAlloc, String clusterName, TStorageMedium 
storageMedium,
+            Map<Tag, Integer> nextIndexs) throws DdlException {
+        Map<Tag, List<Long>> chosenBackendIds = Maps.newHashMap();
+        Map<Tag, Short> allocMap = replicaAlloc.getAllocMap();
+        short totalReplicaNum = 0;
+        for (Map.Entry<Tag, Short> entry : allocMap.entrySet()) {
+            BeSelectionPolicy.Builder builder = new 
BeSelectionPolicy.Builder().setCluster(clusterName)
+                    
.needScheduleAvailable().needCheckDiskUsage().addTags(Sets.newHashSet(entry.getKey()))
+                    .setStorageMedium(storageMedium);
+            if (FeConstants.runningUnitTest || 
Config.allow_replica_on_same_host) {
+                builder.allowOnSameHost();
+            }
+
+            BeSelectionPolicy policy = builder.build();
+            int nextIndex = nextIndexs.get(entry.getKey());
+            List<Long> beIds = selectBackendIdsRoundRobinByPolicy(policy, 
entry.getValue(), nextIndex);
+            nextIndexs.put(entry.getKey(), nextIndex + beIds.size());
+
+            if (beIds.isEmpty()) {
+                throw new DdlException("Failed to find " + entry.getValue() + 
" backend(s) for policy: " + policy);
+            }
+            chosenBackendIds.put(entry.getKey(), beIds);
+            totalReplicaNum += beIds.size();
+        }
+        Preconditions.checkState(totalReplicaNum == 
replicaAlloc.getTotalReplicaNum());
+        return chosenBackendIds;
+    }
 
     /**
      * Select a set of backends for replica creation.
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/RoundRobinCreateTabletTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/RoundRobinCreateTabletTest.java
new file mode 100644
index 0000000000..029ce462dd
--- /dev/null
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/RoundRobinCreateTabletTest.java
@@ -0,0 +1,129 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource;
+
+import org.apache.doris.catalog.DiskInfo;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.HashDistributionInfo;
+import org.apache.doris.catalog.MaterializedIndex;
+import org.apache.doris.catalog.MetaIdGenerator.IdGeneratorBuffer;
+import org.apache.doris.catalog.Replica;
+import org.apache.doris.catalog.Replica.ReplicaState;
+import org.apache.doris.catalog.ReplicaAllocation;
+import org.apache.doris.catalog.Tablet;
+import org.apache.doris.catalog.TabletMeta;
+import org.apache.doris.common.Config;
+import org.apache.doris.resource.Tag;
+import org.apache.doris.system.Backend;
+import org.apache.doris.thrift.TStorageMedium;
+
+import com.google.common.collect.ImmutableMap;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+public class RoundRobinCreateTabletTest {
+    private Backend backend1;
+    private Backend backend2;
+    private Backend backend3;
+    private Backend backend4;
+
+    @Before
+    public void setUp() {
+        backend1 = new Backend(1L, "192.168.1.1", 9050);
+        backend2 = new Backend(2L, "192.168.1.2", 9050);
+        backend3 = new Backend(3L, "192.168.1.3", 9050);
+        backend4 = new Backend(4L, "192.168.1.4", 9050);
+
+        DiskInfo diskInfo1 = new DiskInfo("/disk1");
+        ImmutableMap<String, DiskInfo> diskRefs = ImmutableMap.of("disk1", 
diskInfo1);
+        backend1.setDisks(diskRefs);
+        backend2.setDisks(diskRefs);
+        backend3.setDisks(diskRefs);
+        backend4.setDisks(diskRefs);
+
+        backend1.setAlive(true);
+        backend2.setAlive(true);
+        backend3.setAlive(true);
+        backend4.setAlive(true);
+
+        Map<String, String> tagMap = new HashMap<>();
+        tagMap.put(Tag.TYPE_LOCATION, Tag.VALUE_DEFAULT_TAG);
+
+        backend1.setTagMap(tagMap);
+        backend2.setTagMap(tagMap);
+        backend3.setTagMap(tagMap);
+        backend4.setTagMap(tagMap);
+
+        Env.getCurrentSystemInfo().addBackend(backend1);
+        Env.getCurrentSystemInfo().addBackend(backend2);
+        Env.getCurrentSystemInfo().addBackend(backend3);
+        Env.getCurrentSystemInfo().addBackend(backend4);
+    }
+
+    @After
+    public void tearDown() {
+        Config.enable_round_robin_create_tablet = true;
+        Config.disable_storage_medium_check = true;
+
+        try {
+            Env.getCurrentSystemInfo().dropBackend(1L);
+            Env.getCurrentSystemInfo().dropBackend(2L);
+            Env.getCurrentSystemInfo().dropBackend(3L);
+            Env.getCurrentSystemInfo().dropBackend(4L);
+        } catch (Exception e) {
+            System.out.println("failed to drop backend " + e.getMessage());
+        }
+    }
+
+    @Test
+    public void testCreateTablets() {
+        MaterializedIndex index = new MaterializedIndex();
+        String clusterName = "default_cluster";
+        HashDistributionInfo distributionInfo = new HashDistributionInfo(48, 
null);
+        ReplicaAllocation replicaAlloc = new ReplicaAllocation((short) 3);
+        TabletMeta tabletMeta = new TabletMeta(1L, 2L, 3L, 4L, 5, 
TStorageMedium.HDD);
+        IdGeneratorBuffer idGeneratorBuffer = 
Env.getCurrentEnv().getIdGeneratorBuffer(1000);
+        Set<Long> tabletIdSet = new HashSet<>();
+
+        Config.enable_round_robin_create_tablet = true;
+        Config.disable_storage_medium_check = true;
+
+        try {
+            
Env.getCurrentEnv().getInternalCatalog().createTablets(clusterName, index, 
ReplicaState.NORMAL,
+                    distributionInfo, 0, replicaAlloc, tabletMeta,
+                    tabletIdSet, idGeneratorBuffer);
+        } catch (Exception e) {
+            System.out.println("failed to create tablets " + e.getMessage());
+        }
+
+        int i = 0;
+        int beNum = 4;
+        for (Tablet tablet : index.getTablets()) {
+            for (Replica replica : tablet.getReplicas()) {
+                Assert.assertEquals((i++ % beNum) + 1, replica.getBackendId());
+            }
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to