github-actions[bot] commented on code in PR #64167:
URL: https://github.com/apache/doris/pull/64167#discussion_r3371778514
##########
fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java:
##########
@@ -3261,6 +3374,35 @@ private boolean createOlapTable(Database db,
CreateTableInfo createTableInfo) th
ColocatePersistInfo info =
ColocatePersistInfo.createForAddTable(groupId, tableId,
backendsPerBucketSeq);
Env.getCurrentEnv().getEditLog().logColocateAddTable(info);
+ } else {
+ if
(Env.getCurrentTenantLevelColocateIndex().isColocateMasterTable(tableId)) {
+ Map<Tag, GroupV2Id> groups =
Env.getCurrentTenantLevelColocateIndex()
+ .getMasterGroupByTable(tableId);
+ Map<GroupV2Id, List<List<Long>>> map = new
HashMap<>();
+ for (Map.Entry<Tag, GroupV2Id> entry :
groups.entrySet()) {
+ GroupV2Id groupId = entry.getValue();
+ List<List<Long>> backendsPerBucketSeq =
Env.getCurrentTenantLevelColocateIndex()
+
.getBackendsPerBucketSeqByGroup(groupId);
+ map.put(groupId, backendsPerBucketSeq);
+ }
+ TenantLevelColocateTableInfo info = new
TenantLevelColocateTableInfo(db.getId(), tableId,
+ map);
+
Env.getCurrentEnv().getEditLog().logColocateAddTableV2(info);
+ }
Review Comment:
Tenant-level colocate membership is journaled only after
`db.createTableWithoutLock()` has already written `OP_CREATE_TABLE`. If the
master fails over after the create-table edit log is durable but before this
colocate log (and the same issue applies to the slave log below), replay only
runs `replayCreateTableInternal()`, which registers the table/tablets and does
not rebuild `TenantLevelColocateTableIndex`; the create properties were already
consumed/removed, and `OlapTable` has no persisted tenant-level colocate field.
The table then permanently comes back as a non-tenant-colocate table, so
planning and repair ignore the requested colocation. Please make the
tenant-level colocate metadata part of the create-table replay atomically, or
reconstruct it during create-table replay before exposing a successful create.
##########
fe/fe-core/src/main/java/org/apache/doris/clone/TenantLevelColocateTableCheckerAndBalancer.java:
##########
@@ -0,0 +1,468 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.clone;
+
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.MaterializedIndex;
+import org.apache.doris.catalog.MaterializedIndex.IndexExtState;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.Partition;
+import org.apache.doris.catalog.ReplicaAllocation;
+import org.apache.doris.catalog.Tablet;
+import org.apache.doris.catalog.Tablet.TabletHealth;
+import org.apache.doris.catalog.Tablet.TabletStatus;
+import org.apache.doris.catalog.TenantLevelColocateGroupSchema;
+import org.apache.doris.catalog.TenantLevelColocateTableIndex;
+import org.apache.doris.catalog.TenantLevelColocateTableIndex.GroupV2Id;
+import org.apache.doris.clone.TabletChecker.CheckerCounter;
+import org.apache.doris.clone.TabletSchedCtx.Priority;
+import org.apache.doris.clone.TabletScheduler.AddResult;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.DdlException;
+import org.apache.doris.common.Reference;
+import org.apache.doris.persist.ModifyTenantLevelColocateMapInfo;
+import org.apache.doris.resource.Tag;
+import org.apache.doris.system.SystemInfoService;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * TenantLevelColocateTableCheckerAndBalancer is responsible for tablets'
repair and balance of colocated tables.
+ */
+public class TenantLevelColocateTableCheckerAndBalancer extends
ColocateTableCheckerAndBalancer {
+ private static final Logger LOG =
LogManager.getLogger(TenantLevelColocateTableCheckerAndBalancer.class);
+
+ private TenantLevelColocateTableCheckerAndBalancer(long intervalMs) {
+ super("colocate group clone checker v2", intervalMs);
+ }
+
+ private static volatile TenantLevelColocateTableCheckerAndBalancer
INSTANCE = null;
+
+ public static TenantLevelColocateTableCheckerAndBalancer getInstance() {
+ if (INSTANCE == null) {
+ synchronized (TenantLevelColocateTableCheckerAndBalancer.class) {
+ if (INSTANCE == null) {
+ INSTANCE = new
TenantLevelColocateTableCheckerAndBalancer(Config.tablet_checker_interval_ms);
+ }
+ }
+ }
+ return INSTANCE;
+ }
+
+ @Override
+ public void runAfterCatalogReady() {
+ relocateAndBalanceGroups();
+ matchGroups();
+ }
+
+ private void relocateAndBalanceGroups() {
+ Set<GroupV2Id> groupIds =
Env.getCurrentEnv().getTenantLevelColocateTableIndex().getAllGroupIds();
+
+ // balance only inside each group, excluded balance between all groups
+ Set<GroupV2Id> changeGroups = relocateAndBalanceGroup(groupIds, false);
+
+ if (!Config.disable_colocate_balance_between_groups
+ && !changeGroups.isEmpty()) {
+ // balance both inside each group and between all groups
+ relocateAndBalanceGroup(changeGroups, true);
+ }
+ }
+
+ private Set<GroupV2Id> relocateAndBalanceGroup(Set<GroupV2Id> groupIds,
boolean balanceBetweenGroups) {
+ Set<GroupV2Id> changeGroups = Sets.newHashSet();
+ if (Config.disable_colocate_balance) {
+ return changeGroups;
+ }
+
+ Env env = Env.getCurrentEnv();
+ TenantLevelColocateTableIndex colocateIndex =
env.getTenantLevelColocateTableIndex();
+ SystemInfoService infoService = Env.getCurrentSystemInfo();
+
+ GlobalColocateStatistic globalColocateStatistic =
buildGlobalColocateStatistic();
+
+ // get all groups
+ for (GroupV2Id groupId : groupIds) {
+ Map<Tag, LoadStatisticForTag> statisticMap =
env.getTabletScheduler().getStatisticMap();
+ if (statisticMap == null) {
+ continue;
+ }
+
+ TenantLevelColocateGroupSchema groupSchema =
colocateIndex.getGroupSchema(groupId);
+ if (groupSchema == null) {
+ LOG.info("Not found colocate group {}, maybe delete", groupId);
+ continue;
+ }
+ ReplicaAllocation replicaAlloc = groupSchema.getReplicaAlloc();
+ try {
+
Env.getCurrentSystemInfo().checkReplicaAllocation(replicaAlloc);
+ } catch (DdlException e) {
+ colocateIndex.setErrMsgForGroup(groupId, e.getMessage());
+ continue;
+ }
+
+ Tag tag = groupSchema.getTag();
+ LoadStatisticForTag statistic = statisticMap.get(tag);
+ if (statistic == null) {
+ continue;
+ }
+ List<List<Long>> backendsPerBucketSeq =
colocateIndex.getBackendsPerBucketSeqByGroup(groupId);
+ if (backendsPerBucketSeq.isEmpty()) {
+ continue;
+ }
+
+ // get all unavailable backends in the backend bucket sequence of
this group
+ Set<Long> unavailableBeIdsInGroup = getUnavailableBeIdsInGroup(
+ infoService, colocateIndex, groupId, tag);
+ // get all available backends for this group
+ List<Long> availableBeIds = getAvailableBeIds(tag,
Collections.emptySet(),
+ infoService);
+ // try relocate or balance this group for specified tag
+ List<List<Long>> balancedBackendsPerBucketSeq =
Lists.newArrayList();
+ if (relocateAndBalance(groupId, tag, unavailableBeIdsInGroup,
availableBeIds, colocateIndex,
+ infoService, statistic, globalColocateStatistic,
balancedBackendsPerBucketSeq,
+ balanceBetweenGroups)) {
+ if (!colocateIndex.addBackendsPerBucketSeq(groupId,
balancedBackendsPerBucketSeq, replicaAlloc)) {
+ LOG.warn("relocate group {} succ, but replica allocation
has change, old replica alloc {}",
+ groupId, replicaAlloc);
+ continue;
+ }
+ colocateIndex.markMasterGroupUnstable(groupId, "relocated",
true);
+ colocateIndex.markSlaveGroupUnstable(groupId, "master is
unstable", true);
+ changeGroups.add(groupId);
+ Map<GroupV2Id, List<List<Long>>>
balancedBackendsPerBucketSeqMap = Maps.newHashMap();
+ balancedBackendsPerBucketSeqMap.put(groupId,
balancedBackendsPerBucketSeq);
+ ModifyTenantLevelColocateMapInfo info = new
ModifyTenantLevelColocateMapInfo(
+ balancedBackendsPerBucketSeqMap);
+ env.getEditLog().logColocateBackendsPerBucketSeqV2(info);
+ LOG.info("balance group {}. now backends per bucket sequence
for tag {} is: {}",
+ groupId, tag, balancedBackendsPerBucketSeq);
+ }
+ }
+
+ return changeGroups;
+ }
+
+ /*
+ * Check every tablet of a group, if replica's location does not match
backends in group, relocating those
+ * replicas, and mark that group as unstable.
+ * If every replicas match the backends in group, mark that group as
stable.
+ */
+ private void matchGroups() {
+ long start = System.currentTimeMillis();
+ CheckerCounter counter = new CheckerCounter();
+
+ Env env = Env.getCurrentEnv();
+ TenantLevelColocateTableIndex colocateIndex =
env.getTenantLevelColocateTableIndex();
+
+ // check each group
+ Set<GroupV2Id> groupIds = colocateIndex.getAllGroupIds();
+ for (GroupV2Id groupId : groupIds) {
+ TenantLevelColocateGroupSchema groupSchema =
colocateIndex.getGroupSchema(groupId);
+ if (groupSchema == null) {
+ LOG.info("Not found colocate group {}, maybe delete", groupId);
+ continue;
+ }
+
+ List<Set<Long>> backendBucketsSeq =
colocateIndex.getBackendsPerBucketSeqSet(groupId);
+ if (backendBucketsSeq.isEmpty()) {
+ continue;
+ }
+
+ String unstableReason = matchMasterGroup(env, counter, groupId,
backendBucketsSeq);
+ // mark group as stable or unstable
+ if (Strings.isNullOrEmpty(unstableReason)) {
+ colocateIndex.markMasterGroupStable(groupId, true);
+ unstableReason = matchSlaveGroup(env, counter, groupId,
backendBucketsSeq);
+ if (Strings.isNullOrEmpty(unstableReason)) {
+ colocateIndex.markSlaveGroupStable(groupId, true);
+ } else {
+ colocateIndex.markSlaveGroupUnstable(groupId,
unstableReason, true);
+ }
+ } else {
+ colocateIndex.markMasterGroupUnstable(groupId, unstableReason,
true);
+ colocateIndex.markSlaveGroupUnstable(groupId, "mater is
unstable", true);
+ }
+ } // end for groups
+
+ long cost = System.currentTimeMillis() - start;
+ LOG.info("finished to check tablets.
unhealth/total/added/in_sched/not_ready/exceed_limit: {}/{}/{}/{}/{}/{}, "
+ + "cost: {} ms",
+ counter.unhealthyTabletNum, counter.totalTabletNum,
counter.addToSchedulerTabletNum,
+ counter.tabletInScheduler, counter.tabletNotReady,
counter.tabletExceedLimit, cost);
+ }
+
+ private String matchMasterGroup(Env env, CheckerCounter counter, GroupV2Id
groupId,
+ List<Set<Long>> backendBucketsSeq) {
+ TenantLevelColocateTableIndex colocateIndex =
env.getTenantLevelColocateTableIndex();
+ List<Long> tableIds = colocateIndex.getAllMasterTableIds(groupId);
+ Set<Tag> colocateTags = Collections.singleton(groupId.getTag());
+ Reference<String> unstableReason = new Reference<>();
+ for (Long tableId : tableIds) {
+ Database db =
env.getInternalCatalog().getDbNullableByTable(tableId);
+ if (db == null) {
+ continue;
+ }
+ counter.totalTabletNum++;
+ OlapTable olapTable = (OlapTable) db.getTableNullable(tableId);
+ if (olapTable == null ||
!colocateIndex.isColocateMasterTable(olapTable.getId())) {
+ continue;
+ }
+ try {
+ if (matchTable(env, counter, db, olapTable, colocateTags,
backendBucketsSeq, unstableReason)) {
+ break;
+ }
+ } catch (Throwable e) {
+ LOG.warn("something wrong on colocate checker, dbName={},
tableName={}, errMsg={}",
+ db.getFullName(), olapTable.getName(), e.getMessage());
+ }
+ } // end for tables
+ return unstableReason.getRef();
+ }
+
+ private String matchSlaveGroup(Env env, CheckerCounter counter, GroupV2Id
groupId,
+ List<Set<Long>> masterBackendBucketsSeq) {
+ TenantLevelColocateTableIndex colocateIndex =
env.getTenantLevelColocateTableIndex();
+ List<Long> tableIds = colocateIndex.getAllSlaveTableIds(groupId);
+ Set<Tag> colocateTags = Collections.singleton(groupId.getTag());
+ Reference<String> unstableReason = new Reference<>();
+ for (Long tableId : tableIds) {
+ Database db =
env.getInternalCatalog().getDbNullableByTable(tableId);
+ if (db == null) {
+ continue;
+ }
+ counter.totalTabletNum++;
+ OlapTable olapTable = (OlapTable) db.getTableNullable(tableId);
+ if (olapTable == null ||
!colocateIndex.isColocateSlaveTable(olapTable.getId())) {
+ continue;
+ }
+ List<Set<Long>> backendBucketsSeq =
TenantLevelColocateTableIndex.getSlaveBackendsPerBucketSeqSet(
+ masterBackendBucketsSeq,
olapTable.getDefaultDistributionInfo().getBucketNum());
Review Comment:
This expands the master bucket map using the table default bucket count for
every slave partition, but `checkSlaveDistribution()` explicitly allows a slave
partition bucket count to be any multiple of the master bucket count. A valid
slave table can therefore have a later partition with, for example, 8 buckets
while the default is 4; `matchPartition()` then hits the
`backendBucketsSeq.size() == index.getTablets().size()` precondition, the catch
only logs, `unstableReason` remains null, and the slave group can be marked
stable without checking or repairing that partition. Expand using each
partition/index tablet count (or disallow these per-partition bucket multiples
consistently).
##########
fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java:
##########
@@ -2263,23 +2282,32 @@ private void computeScanRangeAssignmentByColocate(
fragmentIdTobucketSeqToScanRangeMap.put(scanNode.getFragmentId(),
new BucketSeqToScanRange());
// Same as bucket shuffle.
- int bucketNum = scanNode.getBucketNum();
- scanNode.getFragment().setBucketNum(bucketNum);
+ PlanFragment fragment = scanNode.getFragment();
+ if (!fragment.getColocateData().isEmpty()) {
+ fragment.setBucketNum(fragment.getColocateData().size());
+ } else {
Review Comment:
`getColocateData()` is a `Map<Tag, List<List<Long>>>`, so `size()` is the
number of tags, not the number of colocate buckets. For the common case with
one location tag and 32 buckets this sets `fragmentBucketNum` to 1, and the new
`bucketSeq % fragmentBucketNum` below collapses every tablet bucket into
colocate bucket 0 and one selected execution host. This breaks colocate
parallelism and can overload a single BE. Use the bucket-list size, e.g.
`fragment.getColocateData().values().iterator().next().size()` after the
planner has normalized all retained tags to the same size.
##########
fe/fe-core/src/main/java/org/apache/doris/clone/TenantLevelColocateTableCheckerAndBalancer.java:
##########
@@ -0,0 +1,468 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.clone;
+
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.MaterializedIndex;
+import org.apache.doris.catalog.MaterializedIndex.IndexExtState;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.Partition;
+import org.apache.doris.catalog.ReplicaAllocation;
+import org.apache.doris.catalog.Tablet;
+import org.apache.doris.catalog.Tablet.TabletHealth;
+import org.apache.doris.catalog.Tablet.TabletStatus;
+import org.apache.doris.catalog.TenantLevelColocateGroupSchema;
+import org.apache.doris.catalog.TenantLevelColocateTableIndex;
+import org.apache.doris.catalog.TenantLevelColocateTableIndex.GroupV2Id;
+import org.apache.doris.clone.TabletChecker.CheckerCounter;
+import org.apache.doris.clone.TabletSchedCtx.Priority;
+import org.apache.doris.clone.TabletScheduler.AddResult;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.DdlException;
+import org.apache.doris.common.Reference;
+import org.apache.doris.persist.ModifyTenantLevelColocateMapInfo;
+import org.apache.doris.resource.Tag;
+import org.apache.doris.system.SystemInfoService;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * TenantLevelColocateTableCheckerAndBalancer is responsible for tablets'
repair and balance of colocated tables.
+ */
+public class TenantLevelColocateTableCheckerAndBalancer extends
ColocateTableCheckerAndBalancer {
+ private static final Logger LOG =
LogManager.getLogger(TenantLevelColocateTableCheckerAndBalancer.class);
+
+ private TenantLevelColocateTableCheckerAndBalancer(long intervalMs) {
+ super("colocate group clone checker v2", intervalMs);
+ }
+
+ private static volatile TenantLevelColocateTableCheckerAndBalancer
INSTANCE = null;
+
+ public static TenantLevelColocateTableCheckerAndBalancer getInstance() {
+ if (INSTANCE == null) {
+ synchronized (TenantLevelColocateTableCheckerAndBalancer.class) {
+ if (INSTANCE == null) {
+ INSTANCE = new
TenantLevelColocateTableCheckerAndBalancer(Config.tablet_checker_interval_ms);
+ }
+ }
+ }
+ return INSTANCE;
+ }
+
+ @Override
+ public void runAfterCatalogReady() {
+ relocateAndBalanceGroups();
+ matchGroups();
+ }
+
+ private void relocateAndBalanceGroups() {
+ Set<GroupV2Id> groupIds =
Env.getCurrentEnv().getTenantLevelColocateTableIndex().getAllGroupIds();
+
+ // balance only inside each group, excluded balance between all groups
+ Set<GroupV2Id> changeGroups = relocateAndBalanceGroup(groupIds, false);
+
+ if (!Config.disable_colocate_balance_between_groups
+ && !changeGroups.isEmpty()) {
+ // balance both inside each group and between all groups
+ relocateAndBalanceGroup(changeGroups, true);
+ }
+ }
+
+ private Set<GroupV2Id> relocateAndBalanceGroup(Set<GroupV2Id> groupIds,
boolean balanceBetweenGroups) {
+ Set<GroupV2Id> changeGroups = Sets.newHashSet();
+ if (Config.disable_colocate_balance) {
+ return changeGroups;
+ }
+
+ Env env = Env.getCurrentEnv();
+ TenantLevelColocateTableIndex colocateIndex =
env.getTenantLevelColocateTableIndex();
+ SystemInfoService infoService = Env.getCurrentSystemInfo();
+
+ GlobalColocateStatistic globalColocateStatistic =
buildGlobalColocateStatistic();
+
+ // get all groups
+ for (GroupV2Id groupId : groupIds) {
+ Map<Tag, LoadStatisticForTag> statisticMap =
env.getTabletScheduler().getStatisticMap();
+ if (statisticMap == null) {
+ continue;
+ }
+
+ TenantLevelColocateGroupSchema groupSchema =
colocateIndex.getGroupSchema(groupId);
+ if (groupSchema == null) {
+ LOG.info("Not found colocate group {}, maybe delete", groupId);
+ continue;
+ }
+ ReplicaAllocation replicaAlloc = groupSchema.getReplicaAlloc();
+ try {
+
Env.getCurrentSystemInfo().checkReplicaAllocation(replicaAlloc);
+ } catch (DdlException e) {
+ colocateIndex.setErrMsgForGroup(groupId, e.getMessage());
+ continue;
+ }
+
+ Tag tag = groupSchema.getTag();
+ LoadStatisticForTag statistic = statisticMap.get(tag);
+ if (statistic == null) {
+ continue;
+ }
+ List<List<Long>> backendsPerBucketSeq =
colocateIndex.getBackendsPerBucketSeqByGroup(groupId);
+ if (backendsPerBucketSeq.isEmpty()) {
+ continue;
+ }
+
+ // get all unavailable backends in the backend bucket sequence of
this group
+ Set<Long> unavailableBeIdsInGroup = getUnavailableBeIdsInGroup(
+ infoService, colocateIndex, groupId, tag);
+ // get all available backends for this group
+ List<Long> availableBeIds = getAvailableBeIds(tag,
Collections.emptySet(),
+ infoService);
+ // try relocate or balance this group for specified tag
+ List<List<Long>> balancedBackendsPerBucketSeq =
Lists.newArrayList();
+ if (relocateAndBalance(groupId, tag, unavailableBeIdsInGroup,
availableBeIds, colocateIndex,
+ infoService, statistic, globalColocateStatistic,
balancedBackendsPerBucketSeq,
+ balanceBetweenGroups)) {
+ if (!colocateIndex.addBackendsPerBucketSeq(groupId,
balancedBackendsPerBucketSeq, replicaAlloc)) {
+ LOG.warn("relocate group {} succ, but replica allocation
has change, old replica alloc {}",
+ groupId, replicaAlloc);
+ continue;
+ }
+ colocateIndex.markMasterGroupUnstable(groupId, "relocated",
true);
+ colocateIndex.markSlaveGroupUnstable(groupId, "master is
unstable", true);
+ changeGroups.add(groupId);
+ Map<GroupV2Id, List<List<Long>>>
balancedBackendsPerBucketSeqMap = Maps.newHashMap();
+ balancedBackendsPerBucketSeqMap.put(groupId,
balancedBackendsPerBucketSeq);
+ ModifyTenantLevelColocateMapInfo info = new
ModifyTenantLevelColocateMapInfo(
+ balancedBackendsPerBucketSeqMap);
+ env.getEditLog().logColocateBackendsPerBucketSeqV2(info);
+ LOG.info("balance group {}. now backends per bucket sequence
for tag {} is: {}",
+ groupId, tag, balancedBackendsPerBucketSeq);
+ }
+ }
+
+ return changeGroups;
+ }
+
+ /*
+ * Check every tablet of a group, if replica's location does not match
backends in group, relocating those
+ * replicas, and mark that group as unstable.
+ * If every replicas match the backends in group, mark that group as
stable.
+ */
+ private void matchGroups() {
+ long start = System.currentTimeMillis();
+ CheckerCounter counter = new CheckerCounter();
+
+ Env env = Env.getCurrentEnv();
+ TenantLevelColocateTableIndex colocateIndex =
env.getTenantLevelColocateTableIndex();
+
+ // check each group
+ Set<GroupV2Id> groupIds = colocateIndex.getAllGroupIds();
+ for (GroupV2Id groupId : groupIds) {
+ TenantLevelColocateGroupSchema groupSchema =
colocateIndex.getGroupSchema(groupId);
+ if (groupSchema == null) {
+ LOG.info("Not found colocate group {}, maybe delete", groupId);
+ continue;
+ }
+
+ List<Set<Long>> backendBucketsSeq =
colocateIndex.getBackendsPerBucketSeqSet(groupId);
+ if (backendBucketsSeq.isEmpty()) {
+ continue;
+ }
+
+ String unstableReason = matchMasterGroup(env, counter, groupId,
backendBucketsSeq);
+ // mark group as stable or unstable
+ if (Strings.isNullOrEmpty(unstableReason)) {
+ colocateIndex.markMasterGroupStable(groupId, true);
+ unstableReason = matchSlaveGroup(env, counter, groupId,
backendBucketsSeq);
+ if (Strings.isNullOrEmpty(unstableReason)) {
+ colocateIndex.markSlaveGroupStable(groupId, true);
+ } else {
+ colocateIndex.markSlaveGroupUnstable(groupId,
unstableReason, true);
+ }
+ } else {
+ colocateIndex.markMasterGroupUnstable(groupId, unstableReason,
true);
+ colocateIndex.markSlaveGroupUnstable(groupId, "mater is
unstable", true);
+ }
+ } // end for groups
+
+ long cost = System.currentTimeMillis() - start;
+ LOG.info("finished to check tablets.
unhealth/total/added/in_sched/not_ready/exceed_limit: {}/{}/{}/{}/{}/{}, "
+ + "cost: {} ms",
+ counter.unhealthyTabletNum, counter.totalTabletNum,
counter.addToSchedulerTabletNum,
+ counter.tabletInScheduler, counter.tabletNotReady,
counter.tabletExceedLimit, cost);
+ }
+
+ private String matchMasterGroup(Env env, CheckerCounter counter, GroupV2Id
groupId,
+ List<Set<Long>> backendBucketsSeq) {
+ TenantLevelColocateTableIndex colocateIndex =
env.getTenantLevelColocateTableIndex();
+ List<Long> tableIds = colocateIndex.getAllMasterTableIds(groupId);
+ Set<Tag> colocateTags = Collections.singleton(groupId.getTag());
+ Reference<String> unstableReason = new Reference<>();
+ for (Long tableId : tableIds) {
+ Database db =
env.getInternalCatalog().getDbNullableByTable(tableId);
+ if (db == null) {
+ continue;
+ }
+ counter.totalTabletNum++;
+ OlapTable olapTable = (OlapTable) db.getTableNullable(tableId);
+ if (olapTable == null ||
!colocateIndex.isColocateMasterTable(olapTable.getId())) {
+ continue;
+ }
+ try {
+ if (matchTable(env, counter, db, olapTable, colocateTags,
backendBucketsSeq, unstableReason)) {
+ break;
+ }
+ } catch (Throwable e) {
+ LOG.warn("something wrong on colocate checker, dbName={},
tableName={}, errMsg={}",
+ db.getFullName(), olapTable.getName(), e.getMessage());
+ }
+ } // end for tables
+ return unstableReason.getRef();
+ }
+
+ private String matchSlaveGroup(Env env, CheckerCounter counter, GroupV2Id
groupId,
+ List<Set<Long>> masterBackendBucketsSeq) {
+ TenantLevelColocateTableIndex colocateIndex =
env.getTenantLevelColocateTableIndex();
+ List<Long> tableIds = colocateIndex.getAllSlaveTableIds(groupId);
+ Set<Tag> colocateTags = Collections.singleton(groupId.getTag());
+ Reference<String> unstableReason = new Reference<>();
+ for (Long tableId : tableIds) {
+ Database db =
env.getInternalCatalog().getDbNullableByTable(tableId);
+ if (db == null) {
+ continue;
+ }
+ counter.totalTabletNum++;
+ OlapTable olapTable = (OlapTable) db.getTableNullable(tableId);
+ if (olapTable == null ||
!colocateIndex.isColocateSlaveTable(olapTable.getId())) {
+ continue;
+ }
+ List<Set<Long>> backendBucketsSeq =
TenantLevelColocateTableIndex.getSlaveBackendsPerBucketSeqSet(
+ masterBackendBucketsSeq,
olapTable.getDefaultDistributionInfo().getBucketNum());
+ if (backendBucketsSeq.isEmpty()) {
+ continue;
+ }
+ try {
+ if (matchTable(env, counter, db, olapTable, colocateTags,
backendBucketsSeq, unstableReason)) {
+ break;
+ }
+ } catch (Throwable e) {
+ LOG.warn("something wrong on colocate checker, dbName={},
tableName={}, errMsg={}",
+ db.getFullName(), olapTable.getName(), e.getMessage());
+ }
+ } // end for tables
+ return unstableReason.getRef();
+ }
+
+ private boolean matchTable(Env env, CheckerCounter counter, Database db,
+ OlapTable olapTable, Set<Tag> colocateTags,
+ List<Set<Long>> backendBucketsSeq, Reference<String>
unstableReason) {
+ olapTable.readLock();
+ try {
+ for (Partition partition : olapTable.getPartitions()) {
+ if (matchPartition(env, counter, db, olapTable, partition,
colocateTags,
+ backendBucketsSeq, unstableReason)) {
+ return true;
+ }
+ }
+ } finally {
+ olapTable.readUnlock();
+ }
+ return false;
+ }
+
+ private boolean matchPartition(Env env, CheckerCounter counter, Database
db,
+ OlapTable olapTable, Partition partition, Set<Tag> colocateTags,
+ List<Set<Long>> backendBucketsSeq, Reference<String>
unstableReason) {
+ TabletScheduler tabletScheduler = env.getTabletScheduler();
+ SystemInfoService infoService = Env.getCurrentSystemInfo();
+
+ ReplicaAllocation replicaAlloc = olapTable.getPartitionInfo()
+
.getReplicaAllocation(partition.getId()).getSubMap(colocateTags);
+
+ short replicationNum = replicaAlloc.getTotalReplicaNum();
+ long visibleVersion = partition.getVisibleVersion();
+ // Here we only get VISIBLE indexes. All other indexes are not
queryable.
+ // So it does not matter if tablets of other indexes are not matched.
+ for (MaterializedIndex index :
partition.getMaterializedIndices(IndexExtState.VISIBLE)) {
+ Preconditions.checkState(backendBucketsSeq.size() ==
index.getTablets().size(),
+ backendBucketsSeq.size() + " vs. " +
index.getTablets().size());
+ List<Long> tabletIdsInOrder = index.getTabletIdsInOrder();
+ for (int idx = 0; idx < tabletIdsInOrder.size(); idx++) {
+ Long tabletId = tabletIdsInOrder.get(idx);
+ counter.totalTabletNum++;
+ Set<Long> bucketsSeq = backendBucketsSeq.get(idx);
+ Preconditions.checkState(bucketsSeq.size() == replicationNum,
+ bucketsSeq.size() + " vs. " + replicationNum);
+ Tablet tablet = index.getTablet(tabletId);
+ TabletHealth tabletHealth =
tablet.getColocateHealthV2(visibleVersion,
+ replicaAlloc, bucketsSeq, colocateTags);
+ if (tabletHealth.status != TabletStatus.HEALTHY) {
+ counter.unhealthyTabletNum++;
+ unstableReason.setRef(String.format("get unhealthy tablet
%d in colocate table."
+ + " status: %s", tablet.getId(),
tabletHealth.status));
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(unstableReason);
+ }
+
+ if (tabletHealth.status == TabletStatus.UNRECOVERABLE) {
+ continue;
+ }
+
+ if (!tablet.readyToBeRepaired(infoService,
Priority.NORMAL)) {
+ counter.tabletNotReady++;
+ continue;
+ }
+
+ TabletSchedCtx tabletCtx = new TabletSchedCtx(
+ TabletSchedCtx.Type.REPAIR,
+ db.getId(), olapTable.getId(), partition.getId(),
index.getId(), tablet.getId(),
+
olapTable.getPartitionInfo().getReplicaAllocation(partition.getId()),
+ System.currentTimeMillis());
+ // the tablet status will be set again when being scheduled
+ tabletCtx.setTabletHealth(tabletHealth);
+ tabletCtx.setTabletOrderIdx(idx);
+
tabletCtx.setIsUniqKeyMergeOnWrite(olapTable.isUniqKeyMergeOnWrite());
+
+ AddResult res = tabletScheduler.addTablet(tabletCtx, false
/* not force */);
+ if (res == AddResult.DISABLED) {
+ // tablet in scheduler exceed limit, or scheduler is
disabled,
+ // skip this group and check next one.
+ LOG.info("tablet scheduler return: {}. stop colocate
table check", res.name());
+ return true;
+ } else if (res == AddResult.ADDED) {
+ counter.addToSchedulerTabletNum++;
+ } else if (res == AddResult.ALREADY_IN) {
+ counter.tabletInScheduler++;
+ } else if (res == AddResult.REPLACE_ADDED || res ==
AddResult.LIMIT_EXCEED) {
+ counter.tabletExceedLimit++;
+ }
+ }
+ }
+ }
+ return false;
+ }
+
+ private GlobalColocateStatistic buildGlobalColocateStatistic() {
+ Env env = Env.getCurrentEnv();
+ TenantLevelColocateTableIndex colocateIndex =
env.getTenantLevelColocateTableIndex();
+ GlobalColocateStatistic globalColocateStatistic = new
GlobalColocateStatistic();
+
+ Set<GroupV2Id> groupIds = colocateIndex.getAllGroupIds();
+ for (GroupV2Id groupId : groupIds) {
+ TenantLevelColocateGroupSchema groupSchema =
colocateIndex.getGroupSchema(groupId);
+ if (groupSchema == null) {
+ LOG.info("Not found colocate group {}, maybe delete", groupId);
+ continue;
+ }
+ ReplicaAllocation replicaAlloc = groupSchema.getReplicaAlloc();
+ List<Long> tableIds = colocateIndex.getAllTableIds(groupId);
+ List<Set<Long>> backendBucketsSeq =
colocateIndex.getBackendsPerBucketSeqSet(groupId);
+ if (backendBucketsSeq.isEmpty()) {
+ continue;
+ }
+
+ int totalReplicaNumPerBucket = 0;
+ ArrayList<Long> totalReplicaDataSizes = Lists.newArrayList();
+ for (int i = 0; i < backendBucketsSeq.size(); i++) {
+ totalReplicaDataSizes.add(0L);
+ }
+
+ for (Long tableId : tableIds) {
+ Database db =
env.getInternalCatalog().getDbNullableByTable(tableId);
+ if (db == null) {
+ continue;
+ }
+ OlapTable olapTable = (OlapTable) db.getTableNullable(tableId);
+ if (olapTable == null ||
!colocateIndex.isColocateMasterTable(olapTable.getId())) {
+ continue;
Review Comment:
`getAllTableIds(groupId)` includes both master and slave tables, but this
condition skips every slave table when building the balance statistics. Slave
tables use the same bucket sequence and can be much larger than the master, so
relocation/balance decisions are made from only master-table sizes and can move
buckets as if the colocated slave data did not exist. Please include slave
table tablet sizes as well, mapping their bucket index modulo the master bucket
sequence as done for slave matching.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]