This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 3287f350de [feature](table) implement the round robin selection be
when create tablet (#19167)
3287f350de is described below
commit 3287f350de5a1946d4150957997c3ad8c2e9e587
Author: Luwei <[email protected]>
AuthorDate: Sat May 6 14:46:48 2023 +0800
[feature](table) implement the round robin selection be when create tablet
(#19167)
---
.../main/java/org/apache/doris/common/Config.java | 3 +
.../apache/doris/datasource/InternalCatalog.java | 45 +++++--
.../org/apache/doris/system/SystemInfoService.java | 116 ++++++++++++++++++
.../datasource/RoundRobinCreateTabletTest.java | 129 +++++++++++++++++++++
4 files changed, 285 insertions(+), 8 deletions(-)
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 0cb93bef9b..bd6f538c82 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -1895,6 +1895,9 @@ public class Config extends ConfigBase {
@ConfField(mutable = false)
public static boolean skip_localhost_auth_check = false;
+ @ConfField(mutable = true)
+ public static boolean enable_round_robin_create_tablet = false;
+
/**
* If set false, user couldn't submit analyze SQL and FE won't allocate
any related resources.
*/
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
index 1947d17a66..c317ade75f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
@@ -176,6 +176,7 @@ import org.apache.doris.thrift.TStorageType;
import org.apache.doris.thrift.TTabletType;
import org.apache.doris.thrift.TTaskType;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
@@ -204,6 +205,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
+
/**
* The Internal catalog will manage all self-managed meta object in a Doris
cluster.
* Such as Database, tables, etc.
@@ -2516,7 +2518,8 @@ public class InternalCatalog implements
CatalogIf<Database> {
LOG.info("successfully create table[{}-{}]", tableName, tableId);
}
- private void createTablets(String clusterName, MaterializedIndex index,
ReplicaState replicaState,
+ @VisibleForTesting
+ public void createTablets(String clusterName, MaterializedIndex index,
ReplicaState replicaState,
DistributionInfo distributionInfo, long version, ReplicaAllocation
replicaAlloc, TabletMeta tabletMeta,
Set<Long> tabletIdSet, IdGeneratorBuffer idGeneratorBuffer) throws
DdlException {
ColocateTableIndex colocateIndex = Env.getCurrentColocateIndex();
@@ -2538,6 +2541,20 @@ public class InternalCatalog implements
CatalogIf<Database> {
if (chooseBackendsArbitrary) {
backendsPerBucketSeq = Maps.newHashMap();
}
+
+ Map<Tag, Integer> nextIndexs = new HashMap<>();
+
+ if (Config.enable_round_robin_create_tablet) {
+ for (Map.Entry<Tag, Short> entry :
replicaAlloc.getAllocMap().entrySet()) {
+ int startPos =
Env.getCurrentSystemInfo().getStartPosOfRoundRobin(entry.getKey(), clusterName,
+ tabletMeta.getStorageMedium());
+ if (startPos == -1) {
+ throw new DdlException("The number of BEs that match the
policy is insufficient");
+ }
+ nextIndexs.put(entry.getKey(), startPos);
+ }
+ }
+
for (int i = 0; i < distributionInfo.getBucketNum(); ++i) {
// create a new tablet with random chosen backends
Tablet tablet = new Tablet(idGeneratorBuffer.getNextId());
@@ -2550,14 +2567,26 @@ public class InternalCatalog implements
CatalogIf<Database> {
Map<Tag, List<Long>> chosenBackendIds;
if (chooseBackendsArbitrary) {
// This is the first colocate table in the group, or just a
normal table,
- // randomly choose backends
- if (!Config.disable_storage_medium_check) {
- chosenBackendIds = Env.getCurrentSystemInfo()
- .selectBackendIdsForReplicaCreation(replicaAlloc,
clusterName,
- tabletMeta.getStorageMedium());
+ // choose backends
+ if (Config.enable_round_robin_create_tablet) {
+ if (!Config.disable_storage_medium_check) {
+ chosenBackendIds = Env.getCurrentSystemInfo()
+
.getBeIdRoundRobinForReplicaCreation(replicaAlloc, clusterName,
+ tabletMeta.getStorageMedium(),
nextIndexs);
+ } else {
+ chosenBackendIds = Env.getCurrentSystemInfo()
+
.getBeIdRoundRobinForReplicaCreation(replicaAlloc, clusterName, null,
+ nextIndexs);
+ }
} else {
- chosenBackendIds = Env.getCurrentSystemInfo()
- .selectBackendIdsForReplicaCreation(replicaAlloc,
clusterName, null);
+ if (!Config.disable_storage_medium_check) {
+ chosenBackendIds = Env.getCurrentSystemInfo()
+
.selectBackendIdsForReplicaCreation(replicaAlloc, clusterName,
+ tabletMeta.getStorageMedium());
+ } else {
+ chosenBackendIds = Env.getCurrentSystemInfo()
+
.selectBackendIdsForReplicaCreation(replicaAlloc, clusterName, null);
+ }
}
for (Map.Entry<Tag, List<Long>> entry :
chosenBackendIds.entrySet()) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java
b/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java
index 74824a395c..e710665640 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java
@@ -861,6 +861,122 @@ public class SystemInfoService {
return classMap;
}
+ class BeComparator implements Comparator<Backend> {
+ public int compare(Backend a, Backend b) {
+ return (int) (a.getId() - b.getId());
+ }
+ }
+
+ public List<Long> selectBackendIdsRoundRobinByPolicy(BeSelectionPolicy
policy, int number,
+ int nextIndex) {
+ Preconditions.checkArgument(number >= -1);
+ List<Backend> candidates = getCandidates(policy);
+ if (number != -1 && candidates.size() < number) {
+ LOG.info("Not match policy: {}. candidates num: {}, expected: {}",
policy, candidates.size(), number);
+ return Lists.newArrayList();
+ }
+
+ int realIndex = nextIndex % candidates.size();
+ List<Long> partialOrderList = new ArrayList<Long>();
+ partialOrderList.addAll(candidates.subList(realIndex,
candidates.size())
+ .stream().map(b -> b.getId()).collect(Collectors.toList()));
+ partialOrderList.addAll(candidates.subList(0, realIndex)
+ .stream().map(b -> b.getId()).collect(Collectors.toList()));
+
+ if (number == -1) {
+ return partialOrderList;
+ } else {
+ return partialOrderList.subList(0, number);
+ }
+ }
+
+ public List<Backend> getCandidates(BeSelectionPolicy policy) {
+ List<Backend> candidates =
policy.getCandidateBackends(idToBackendRef.values());
+ if (candidates.isEmpty()) {
+ LOG.info("Not match policy: {}. candidates num: {}", policy,
candidates.size());
+ return Lists.newArrayList();
+ }
+
+ if (!policy.allowOnSameHost) {
+ Map<String, List<Backend>> backendMaps = Maps.newHashMap();
+ for (Backend backend : candidates) {
+ if (backendMaps.containsKey(backend.getIp())) {
+ backendMaps.get(backend.getIp()).add(backend);
+ } else {
+ List<Backend> list = Lists.newArrayList();
+ list.add(backend);
+ backendMaps.put(backend.getIp(), list);
+ }
+ }
+ candidates.clear();
+ for (List<Backend> list : backendMaps.values()) {
+ candidates.add(list.get(0));
+ }
+ }
+
+ if (candidates.isEmpty()) {
+ LOG.info("Not match policy: {}. candidates num: {}", policy,
candidates.size());
+ return Lists.newArrayList();
+ }
+
+ Collections.sort(candidates, new BeComparator());
+ return candidates;
+ }
+
+ // Select the smallest number of tablets as the starting position of
+ // round robin in the BE that match the policy
+ public int getStartPosOfRoundRobin(Tag tag, String clusterName,
TStorageMedium storageMedium) {
+ BeSelectionPolicy.Builder builder = new
BeSelectionPolicy.Builder().setCluster(clusterName)
+
.needScheduleAvailable().needCheckDiskUsage().addTags(Sets.newHashSet(tag))
+ .setStorageMedium(storageMedium);
+ if (FeConstants.runningUnitTest || Config.allow_replica_on_same_host) {
+ builder.allowOnSameHost();
+ }
+
+ BeSelectionPolicy policy = builder.build();
+ List<Backend> candidates = getCandidates(policy);
+
+ long minBeTabletsNum = Long.MAX_VALUE;
+ int minIndex = -1;
+ for (int i = 0; i < candidates.size(); ++i) {
+ long tabletsNum = Env.getCurrentInvertedIndex()
+ .getTabletIdsByBackendId(candidates.get(i).getId()).size();
+ if (tabletsNum < minBeTabletsNum) {
+ minBeTabletsNum = tabletsNum;
+ minIndex = i;
+ }
+ }
+ return minIndex;
+ }
+
+ public Map<Tag, List<Long>> getBeIdRoundRobinForReplicaCreation(
+ ReplicaAllocation replicaAlloc, String clusterName, TStorageMedium
storageMedium,
+ Map<Tag, Integer> nextIndexs) throws DdlException {
+ Map<Tag, List<Long>> chosenBackendIds = Maps.newHashMap();
+ Map<Tag, Short> allocMap = replicaAlloc.getAllocMap();
+ short totalReplicaNum = 0;
+ for (Map.Entry<Tag, Short> entry : allocMap.entrySet()) {
+ BeSelectionPolicy.Builder builder = new
BeSelectionPolicy.Builder().setCluster(clusterName)
+
.needScheduleAvailable().needCheckDiskUsage().addTags(Sets.newHashSet(entry.getKey()))
+ .setStorageMedium(storageMedium);
+ if (FeConstants.runningUnitTest ||
Config.allow_replica_on_same_host) {
+ builder.allowOnSameHost();
+ }
+
+ BeSelectionPolicy policy = builder.build();
+ int nextIndex = nextIndexs.get(entry.getKey());
+ List<Long> beIds = selectBackendIdsRoundRobinByPolicy(policy,
entry.getValue(), nextIndex);
+ nextIndexs.put(entry.getKey(), nextIndex + beIds.size());
+
+ if (beIds.isEmpty()) {
+ throw new DdlException("Failed to find " + entry.getValue() +
" backend(s) for policy: " + policy);
+ }
+ chosenBackendIds.put(entry.getKey(), beIds);
+ totalReplicaNum += beIds.size();
+ }
+ Preconditions.checkState(totalReplicaNum ==
replicaAlloc.getTotalReplicaNum());
+ return chosenBackendIds;
+ }
/**
* Select a set of backends for replica creation.
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/datasource/RoundRobinCreateTabletTest.java
b/fe/fe-core/src/test/java/org/apache/doris/datasource/RoundRobinCreateTabletTest.java
new file mode 100644
index 0000000000..029ce462dd
--- /dev/null
+++
b/fe/fe-core/src/test/java/org/apache/doris/datasource/RoundRobinCreateTabletTest.java
@@ -0,0 +1,129 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource;
+
+import org.apache.doris.catalog.DiskInfo;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.HashDistributionInfo;
+import org.apache.doris.catalog.MaterializedIndex;
+import org.apache.doris.catalog.MetaIdGenerator.IdGeneratorBuffer;
+import org.apache.doris.catalog.Replica;
+import org.apache.doris.catalog.Replica.ReplicaState;
+import org.apache.doris.catalog.ReplicaAllocation;
+import org.apache.doris.catalog.Tablet;
+import org.apache.doris.catalog.TabletMeta;
+import org.apache.doris.common.Config;
+import org.apache.doris.resource.Tag;
+import org.apache.doris.system.Backend;
+import org.apache.doris.thrift.TStorageMedium;
+
+import com.google.common.collect.ImmutableMap;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+public class RoundRobinCreateTabletTest {
+ private Backend backend1;
+ private Backend backend2;
+ private Backend backend3;
+ private Backend backend4;
+
+ @Before
+ public void setUp() {
+ backend1 = new Backend(1L, "192.168.1.1", 9050);
+ backend2 = new Backend(2L, "192.168.1.2", 9050);
+ backend3 = new Backend(3L, "192.168.1.3", 9050);
+ backend4 = new Backend(4L, "192.168.1.4", 9050);
+
+ DiskInfo diskInfo1 = new DiskInfo("/disk1");
+ ImmutableMap<String, DiskInfo> diskRefs = ImmutableMap.of("disk1",
diskInfo1);
+ backend1.setDisks(diskRefs);
+ backend2.setDisks(diskRefs);
+ backend3.setDisks(diskRefs);
+ backend4.setDisks(diskRefs);
+
+ backend1.setAlive(true);
+ backend2.setAlive(true);
+ backend3.setAlive(true);
+ backend4.setAlive(true);
+
+ Map<String, String> tagMap = new HashMap<>();
+ tagMap.put(Tag.TYPE_LOCATION, Tag.VALUE_DEFAULT_TAG);
+
+ backend1.setTagMap(tagMap);
+ backend2.setTagMap(tagMap);
+ backend3.setTagMap(tagMap);
+ backend4.setTagMap(tagMap);
+
+ Env.getCurrentSystemInfo().addBackend(backend1);
+ Env.getCurrentSystemInfo().addBackend(backend2);
+ Env.getCurrentSystemInfo().addBackend(backend3);
+ Env.getCurrentSystemInfo().addBackend(backend4);
+ }
+
+ @After
+ public void tearDown() {
+ Config.enable_round_robin_create_tablet = true;
+ Config.disable_storage_medium_check = true;
+
+ try {
+ Env.getCurrentSystemInfo().dropBackend(1L);
+ Env.getCurrentSystemInfo().dropBackend(2L);
+ Env.getCurrentSystemInfo().dropBackend(3L);
+ Env.getCurrentSystemInfo().dropBackend(4L);
+ } catch (Exception e) {
+ System.out.println("failed to drop backend " + e.getMessage());
+ }
+ }
+
+ @Test
+ public void testCreateTablets() {
+ MaterializedIndex index = new MaterializedIndex();
+ String clusterName = "default_cluster";
+ HashDistributionInfo distributionInfo = new HashDistributionInfo(48,
null);
+ ReplicaAllocation replicaAlloc = new ReplicaAllocation((short) 3);
+ TabletMeta tabletMeta = new TabletMeta(1L, 2L, 3L, 4L, 5,
TStorageMedium.HDD);
+ IdGeneratorBuffer idGeneratorBuffer =
Env.getCurrentEnv().getIdGeneratorBuffer(1000);
+ Set<Long> tabletIdSet = new HashSet<>();
+
+ Config.enable_round_robin_create_tablet = true;
+ Config.disable_storage_medium_check = true;
+
+ try {
+
Env.getCurrentEnv().getInternalCatalog().createTablets(clusterName, index,
ReplicaState.NORMAL,
+ distributionInfo, 0, replicaAlloc, tabletMeta,
+ tabletIdSet, idGeneratorBuffer);
+ } catch (Exception e) {
+ System.out.println("failed to create tablets " + e.getMessage());
+ }
+
+ int i = 0;
+ int beNum = 4;
+ for (Tablet tablet : index.getTablets()) {
+ for (Replica replica : tablet.getReplicas()) {
+ Assert.assertEquals((i++ % beNum) + 1, replica.getBackendId());
+ }
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]