924060929 commented on code in PR #64167:
URL: https://github.com/apache/doris/pull/64167#discussion_r3372298740
##########
fe/fe-core/src/main/java/org/apache/doris/clone/Rebalancer.java:
##########
@@ -119,13 +120,15 @@ protected boolean canBalanceTablet(TabletMeta tabletMeta)
{
// Clone ut mocked env, but CatalogRecycleBin is not mockable (it
extends from Thread)
// so in clone ut recycleBin need to set to null.
ColocateTableIndex colocateTableIndex = Env.getCurrentColocateIndex();
+ TenantLevelColocateTableIndex colocateTableTenantLevelIndex =
Env.getCurrentTenantLevelColocateIndex();
CatalogRecycleBin recycleBin = null;
if (!FeConstants.runningUnitTest) {
recycleBin = Env.getCurrentRecycleBin();
}
return tabletMeta != null
&& !alterTableIds.contains(tabletMeta.getTableId())
- && (canBalanceColocateTable ||
!colocateTableIndex.isColocateTable(tabletMeta.getTableId()))
+ && (canBalanceColocateTable ||
!colocateTableIndex.isColocateTable(tabletMeta.getTableId())
+ ||
!colocateTableTenantLevelIndex.isColocateTable(tabletMeta.getTableId()))
Review Comment:
**Blocking — this looks like an inverted De Morgan that regresses existing
V1 colocation.**
The intent is "skip load-balancing if the tablet is V1 **or** V2 colocate",
which requires `&&` between the two negations. With `||`, a pure V1 colocate
tablet evaluates `!isV1(false) || !isV2(true)` → `true`, so `canBalanceTablet`
returns `true`. Since `canBalanceColocateTable` is `false` for
`BeLoadRebalancer` (only `DiskRebalancer` sets it `true`) and this is the sole
colocate gate in `BeLoadRebalancer.selectAlternativeTabletsForCluster`, the BE
load balancer will now move V1 colocate tablets to load-chosen backends,
breaking colocation; the colocate checker then relocates them back → thrashing.
This breaks already-GA V1 colocation on every upgraded cluster, independent of
the new feature.
```java
&& (canBalanceColocateTable
|| (!colocateTableIndex.isColocateTable(tabletMeta.getTableId())
&&
!colocateTableTenantLevelIndex.isColocateTable(tabletMeta.getTableId())))
```
##########
fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java:
##########
@@ -2263,23 +2282,32 @@ private void computeScanRangeAssignmentByColocate(
fragmentIdTobucketSeqToScanRangeMap.put(scanNode.getFragmentId(),
new BucketSeqToScanRange());
// Same as bucket shuffle.
- int bucketNum = scanNode.getBucketNum();
- scanNode.getFragment().setBucketNum(bucketNum);
+ PlanFragment fragment = scanNode.getFragment();
+ if (!fragment.getColocateData().isEmpty()) {
+ fragment.setBucketNum(fragment.getColocateData().size());
Review Comment:
**Blocking — `getColocateData()` is a `Map<Tag, List<List<Long>>>`, so
`.size()` is the number of tenant tags (normally 1), not the bucket count.**
With `fragmentBucketNum == 1`, the `colocateBucketSeq = bucketSeq %
fragmentBucketNum` a few lines below collapses **every** bucket of both the
master and the slave onto bucketSeq 0 → a single backend, so a tenant-colocate
join taking the legacy Coordinator path (e.g. `set
enable_nereids_distribute_planner=false`, or any shape that falls back here)
fails or degenerates to single-instance. The Nereids distribute path
(`UnassignedScanBucketOlapTableJob`) already uses the correct value:
```java
fragment.setBucketNum(fragment.getColocateData().values().iterator().next().size());
```
Worth a regression test that runs the master/slave colocate join with the
distribute planner disabled — the current test only exercises the default path,
so this is uncaught.
##########
fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java:
##########
@@ -3369,23 +3514,43 @@ public TStorageMedium createTablets(MaterializedIndex
index, ReplicaState replic
throws DdlException {
ColocateTableIndex colocateIndex = Env.getCurrentColocateIndex();
SystemInfoService systemInfoService = Env.getCurrentSystemInfo();
- Map<Tag, List<List<Long>>> backendsPerBucketSeq = null;
- GroupId groupId = null;
+ TenantLevelColocateTableIndex tenantLevelColocateIndex =
Env.getCurrentTenantLevelColocateIndex();
+ Map<Tag, List<List<Long>>> backendsPerBucketSeq = new HashMap<>();
+ boolean isColocateV1 = false;
+ boolean isColocateV2Master = false;
+ Set<Tag> colocateTags = new HashSet<>();
if (colocateIndex.isColocateTable(tabletMeta.getTableId())) {
if (distributionInfo.getType() == DistributionInfoType.RANDOM) {
throw new DdlException("Random distribution for colocate table
is unsupported");
}
// if this is a colocate table, try to get backend seqs from
colocation index.
- groupId = colocateIndex.getGroup(tabletMeta.getTableId());
- backendsPerBucketSeq =
colocateIndex.getBackendsPerBucketSeq(groupId);
+ GroupId groupId = colocateIndex.getGroup(tabletMeta.getTableId());
+
backendsPerBucketSeq.putAll(colocateIndex.getBackendsPerBucketSeq(groupId));
+ isColocateV1 = true;
+ colocateTags.addAll(replicaAlloc.getAllocMap().keySet());
+ } else if ((isColocateV2Master =
tenantLevelColocateIndex.isColocateMasterTable(tabletMeta.getTableId()))
+ ||
tenantLevelColocateIndex.isColocateSlaveTable(tabletMeta.getTableId())) {
+ if (distributionInfo.getType() == DistributionInfoType.RANDOM) {
+ throw new DdlException("Random distribution for colocate table
is unsupported");
+ }
+ // if this is a colocate table, try to get backend seqs from
colocation index.
+
backendsPerBucketSeq.putAll(tenantLevelColocateIndex.getBackendsPerBucketSeqByTable(tabletMeta.getTableId(),
+ distributionInfo.getBucketNum()));
+
colocateTags.addAll(tenantLevelColocateIndex.getAllSlaveTagByTable(tabletMeta.getTableId()));
+ colocateTags.forEach(tag ->
Preconditions.checkState(backendsPerBucketSeq.containsKey(tag)));
Review Comment:
**Blocking — this `checkState` aborts CREATE TABLE with an opaque
`IllegalStateException` when the master group's backend sequence hasn't been
materialized yet.**
`getBackendsPerBucketSeqByTable` only puts a tag into the result when the
master group's `group2BackendsPerBucketSeq` is non-null, and the code elsewhere
documents that creating a colocate table with empty / dynamic partitions leaves
`backendsPerBucketSeq == null`. So creating a slave that references such a
master drops the tag here and the assertion fails, hard-failing the CREATE with
a confusing error. Please handle the "master not materialized yet" case
explicitly (defer the slave's bucket-seq, or throw a clear `DdlException`)
instead of asserting.
##########
fe/fe-core/src/main/java/org/apache/doris/clone/TenantLevelColocateTableCheckerAndBalancer.java:
##########
@@ -0,0 +1,468 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.clone;
+
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.MaterializedIndex;
+import org.apache.doris.catalog.MaterializedIndex.IndexExtState;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.Partition;
+import org.apache.doris.catalog.ReplicaAllocation;
+import org.apache.doris.catalog.Tablet;
+import org.apache.doris.catalog.Tablet.TabletHealth;
+import org.apache.doris.catalog.Tablet.TabletStatus;
+import org.apache.doris.catalog.TenantLevelColocateGroupSchema;
+import org.apache.doris.catalog.TenantLevelColocateTableIndex;
+import org.apache.doris.catalog.TenantLevelColocateTableIndex.GroupV2Id;
+import org.apache.doris.clone.TabletChecker.CheckerCounter;
+import org.apache.doris.clone.TabletSchedCtx.Priority;
+import org.apache.doris.clone.TabletScheduler.AddResult;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.DdlException;
+import org.apache.doris.common.Reference;
+import org.apache.doris.persist.ModifyTenantLevelColocateMapInfo;
+import org.apache.doris.resource.Tag;
+import org.apache.doris.system.SystemInfoService;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * TenantLevelColocateTableCheckerAndBalancer is responsible for tablets'
repair and balance of colocated tables.
+ */
+public class TenantLevelColocateTableCheckerAndBalancer extends
ColocateTableCheckerAndBalancer {
+ private static final Logger LOG =
LogManager.getLogger(TenantLevelColocateTableCheckerAndBalancer.class);
+
+ private TenantLevelColocateTableCheckerAndBalancer(long intervalMs) {
+ super("colocate group clone checker v2", intervalMs);
+ }
+
+ private static volatile TenantLevelColocateTableCheckerAndBalancer
INSTANCE = null;
+
+ public static TenantLevelColocateTableCheckerAndBalancer getInstance() {
+ if (INSTANCE == null) {
+ synchronized (TenantLevelColocateTableCheckerAndBalancer.class) {
+ if (INSTANCE == null) {
+ INSTANCE = new
TenantLevelColocateTableCheckerAndBalancer(Config.tablet_checker_interval_ms);
+ }
+ }
+ }
+ return INSTANCE;
+ }
+
+ @Override
+ public void runAfterCatalogReady() {
+ relocateAndBalanceGroups();
+ matchGroups();
+ }
+
+ private void relocateAndBalanceGroups() {
+ Set<GroupV2Id> groupIds =
Env.getCurrentEnv().getTenantLevelColocateTableIndex().getAllGroupIds();
+
+ // balance only inside each group, excluded balance between all groups
+ Set<GroupV2Id> changeGroups = relocateAndBalanceGroup(groupIds, false);
+
+ if (!Config.disable_colocate_balance_between_groups
+ && !changeGroups.isEmpty()) {
+ // balance both inside each group and between all groups
+ relocateAndBalanceGroup(changeGroups, true);
+ }
+ }
+
+ private Set<GroupV2Id> relocateAndBalanceGroup(Set<GroupV2Id> groupIds,
boolean balanceBetweenGroups) {
+ Set<GroupV2Id> changeGroups = Sets.newHashSet();
+ if (Config.disable_colocate_balance) {
+ return changeGroups;
+ }
+
+ Env env = Env.getCurrentEnv();
+ TenantLevelColocateTableIndex colocateIndex =
env.getTenantLevelColocateTableIndex();
+ SystemInfoService infoService = Env.getCurrentSystemInfo();
+
+ GlobalColocateStatistic globalColocateStatistic =
buildGlobalColocateStatistic();
+
+ // get all groups
+ for (GroupV2Id groupId : groupIds) {
Review Comment:
This per-group loop has no per-group try/catch. `relocateAndBalance(...)`
(the shared super-class algorithm) contains several
`Preconditions.checkState(...)`; if one tenant's group is in an inconsistent
state, the exception propagates out of the whole loop and aborts the entire
balance round, so every group after it (other tenants) is skipped that round —
and if the bad state persists, those tenants are never balanced/repaired. That
undercuts the "a single tenant's colocate failure doesn't affect other tenants"
goal of this PR. Suggest wrapping the loop body in `try { ... } catch
(Throwable t) { LOG.warn(...); }`, as `matchMasterGroup` / `matchSlaveGroup`
already do per-table. (The `addGroup` call in `buildGlobalColocateStatistic`
has the same exposure.)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]