This is an automated email from the ASF dual-hosted git repository.
sollhui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 7e524a100c5 [fix](load) fix load_to_single_tablet routing for auto
partition (#64356)
7e524a100c5 is described below
commit 7e524a100c5eaa845002049f6523d5fbbc27242f
Author: hui lai <[email protected]>
AuthorDate: Fri Jun 12 16:46:12 2026 +0800
[fix](load) fix load_to_single_tablet routing for auto partition (#64356)
### What problem does this PR solve?
`load_tablet_idx` is used by random distribution routing. For normal
random
loads it can advance across batches, but for
`load_to_single_tablet=true` it
must remain fixed for the partition in the sink so all rows are routed
to one
tablet.
For normally planned partitions this is stable because FE builds the
sink plan
once and all BE sink instances receive the same `load_tablet_idx`.
For runtime auto partitions, each BE can independently call
`createPartition()`
or `replacePartition()` for the same txn and partition. Before this
patch, the
auto partition cache only stored tablet locations and did not store
`load_tablet_idx`. FE also did not know whether the caller was using
`load_to_single_tablet`, so runtime auto partitions could lose the
single-tablet
routing semantics or compute different tablet indexes across BE RPCs.
This patch passes `load_to_single_tablet` in create/replace partition
RPCs and
stores the selected `load_tablet_idx` in `AutoPartitionCacheManager`.
Later BE
RPCs for the same txn and partition reuse the cached tablet index
together with
the cached tablet locations. Ordinary random loads are unchanged.
---
be/src/exec/sink/vrow_distribution.cpp | 2 +
.../apache/doris/service/FrontendServiceImpl.java | 62 +++++++++++++++++++---
.../transaction/AutoPartitionCacheManager.java | 32 +++++++++--
.../transaction/AutoPartitionCacheManagerTest.java | 61 +++++++++++++++++++++
gensrc/thrift/FrontendService.thrift | 4 ++
5 files changed, 151 insertions(+), 10 deletions(-)
diff --git a/be/src/exec/sink/vrow_distribution.cpp
b/be/src/exec/sink/vrow_distribution.cpp
index 4a9dd959ebb..222dc7f8bd2 100644
--- a/be/src/exec/sink/vrow_distribution.cpp
+++ b/be/src/exec/sink/vrow_distribution.cpp
@@ -111,6 +111,7 @@ Status VRowDistribution::automatic_create_partition() {
request.__set_partitionValues(_partitions_need_create);
request.__set_be_endpoint(be_endpoint);
request.__set_write_single_replica(_write_single_replica);
+
request.__set_load_to_single_tablet(_tablet_finder->is_find_tablet_every_sink());
if (_state && _state->get_query_ctx()) {
// Pass query_id to FE so it can determine if this is a multi-instance
load by checking Coordinator
request.__set_query_id(_state->get_query_ctx()->query_id());
@@ -212,6 +213,7 @@ Status VRowDistribution::_replace_overwriting_partition() {
std::string be_endpoint = BackendOptions::get_be_endpoint();
request.__set_be_endpoint(be_endpoint);
+
request.__set_load_to_single_tablet(_tablet_finder->is_find_tablet_every_sink());
if (_state && _state->get_query_ctx()) {
// Pass query_id to FE so it can determine if this is a multi-instance
load by checking Coordinator
request.__set_query_id(_state->get_query_ctx()->query_id());
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index f01de48cf7d..fb74d7ca29a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -34,6 +34,7 @@ import org.apache.doris.catalog.CloudTabletStatMgr;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.DatabaseIf;
+import org.apache.doris.catalog.DistributionInfo;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.MaterializedIndex;
import org.apache.doris.catalog.OlapTable;
@@ -361,6 +362,7 @@ import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
@@ -4535,6 +4537,7 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
List<TTabletLocation> tablets = new ArrayList<>();
List<TTabletLocation> slaveTablets = new ArrayList<>();
List<TOlapTablePartition> partitions = Lists.newArrayList();
+ boolean loadToSingleTablet = request.isSetLoadToSingleTablet() &&
request.isLoadToSingleTablet();
final boolean hasBeEndpoint = request.isSetBeEndpoint();
// Lazy: resolved on the first CloudTablet that needs it (skipped on
cache-hit).
String cachedClusterId = null;
@@ -4561,17 +4564,36 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
tPartition.setNumBuckets(index.getTablets().size());
}
tPartition.setIsMutable(olapTable.getPartitionInfo().getIsMutable(partition.getId()));
+ boolean randomDistribution =
+ partition.getDistributionInfo().getType() ==
DistributionInfo.DistributionInfoType.RANDOM;
+ boolean cacheLoadTabletIdx = loadToSingleTablet &&
randomDistribution;
partitions.add(tPartition);
// tablet
+ AtomicLong cachedLoadTabletIdx = new AtomicLong(-1);
if (needUseCache
&&
Env.getCurrentGlobalTransactionMgr().getAutoPartitionCacheMgr()
.getAutoPartitionInfo(txnId, partition.getId(),
partitionTablets,
- partitionSlaveTablets)) {
+ partitionSlaveTablets,
cachedLoadTabletIdx)) {
+ if (cacheLoadTabletIdx) {
+ tPartition.setLoadTabletIdx(cachedLoadTabletIdx.get());
+ }
// fast path, if cached
tablets.addAll(partitionTablets);
slaveTablets.addAll(partitionSlaveTablets);
continue;
}
+ if (cacheLoadTabletIdx) {
+ try {
+ int tabletIndex =
Env.getCurrentEnv().getTabletLoadIndexRecorderMgr()
+ .getCurrentTabletLoadIndex(dbId,
olapTable.getId(), partition);
+ tPartition.setLoadTabletIdx(tabletIndex);
+ } catch (UserException ex) {
+
errorStatus.setErrorMsgs(Lists.newArrayList(ex.getMessage()));
+ result.setStatus(errorStatus);
+ LOG.warn("send create partition error status: {}", result);
+ return result;
+ }
+ }
int quorum =
olapTable.getPartitionInfo().getReplicaAllocation(partition.getId()).getTotalReplicaNum()
/ 2
+ 1;
for (MaterializedIndex index :
partition.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL)) {
@@ -4643,9 +4665,13 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
}
if (needUseCache) {
- Env.getCurrentGlobalTransactionMgr().getAutoPartitionCacheMgr()
+ long loadTabletIdx = cacheLoadTabletIdx ?
tPartition.getLoadTabletIdx() : -1;
+ long cachedTabletIdx =
Env.getCurrentGlobalTransactionMgr().getAutoPartitionCacheMgr()
.getOrSetAutoPartitionInfo(txnId, partition.getId(),
partitionTablets,
- partitionSlaveTablets);
+ partitionSlaveTablets, loadTabletIdx);
+ if (cacheLoadTabletIdx) {
+ tPartition.setLoadTabletIdx(cachedTabletIdx);
+ }
}
tablets.addAll(partitionTablets);
@@ -4863,6 +4889,7 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
List<TTabletLocation> tablets = new ArrayList<>();
List<TTabletLocation> slaveTablets = new ArrayList<>();
PartitionInfo partitionInfo = olapTable.getPartitionInfo();
+ boolean loadToSingleTablet = request.isSetLoadToSingleTablet() &&
request.isLoadToSingleTablet();
final boolean replaceHasBeEndpoint = request.isSetBeEndpoint();
// Lazy: resolved on the first CloudTablet that needs it.
String replaceCachedClusterId = null;
@@ -4891,17 +4918,36 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
tPartition.setNumBuckets(index.getTablets().size());
}
tPartition.setIsMutable(olapTable.getPartitionInfo().getIsMutable(partition.getId()));
+ boolean randomDistribution =
+ partition.getDistributionInfo().getType() ==
DistributionInfo.DistributionInfoType.RANDOM;
+ boolean cacheLoadTabletIdx = loadToSingleTablet &&
randomDistribution;
partitions.add(tPartition);
// tablet
+ AtomicLong cachedLoadTabletIdx = new AtomicLong(-1);
if (needUseCache && txnId != 0
&&
Env.getCurrentGlobalTransactionMgr().getAutoPartitionCacheMgr()
.getAutoPartitionInfo(txnId, partition.getId(),
partitionTablets,
- partitionSlaveTablets)) {
+ partitionSlaveTablets,
cachedLoadTabletIdx)) {
+ if (cacheLoadTabletIdx) {
+ tPartition.setLoadTabletIdx(cachedLoadTabletIdx.get());
+ }
// fast path, if cached
tablets.addAll(partitionTablets);
slaveTablets.addAll(partitionSlaveTablets);
continue;
}
+ if (cacheLoadTabletIdx) {
+ try {
+ int tabletIndex =
Env.getCurrentEnv().getTabletLoadIndexRecorderMgr()
+ .getCurrentTabletLoadIndex(dbId,
olapTable.getId(), partition);
+ tPartition.setLoadTabletIdx(tabletIndex);
+ } catch (UserException ex) {
+
errorStatus.setErrorMsgs(Lists.newArrayList(ex.getMessage()));
+ result.setStatus(errorStatus);
+ LOG.warn("send replace partition error status: {}",
result);
+ return result;
+ }
+ }
int quorum =
olapTable.getPartitionInfo().getReplicaAllocation(partition.getId()).getTotalReplicaNum()
/ 2
+ 1;
for (MaterializedIndex index :
partition.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL)) {
@@ -4978,9 +5024,13 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
}
if (needUseCache) {
- Env.getCurrentGlobalTransactionMgr().getAutoPartitionCacheMgr()
+ long loadTabletIdx = cacheLoadTabletIdx ?
tPartition.getLoadTabletIdx() : -1;
+ long cachedTabletIdx =
Env.getCurrentGlobalTransactionMgr().getAutoPartitionCacheMgr()
.getOrSetAutoPartitionInfo(txnId, partition.getId(),
partitionTablets,
- partitionSlaveTablets);
+ partitionSlaveTablets, loadTabletIdx);
+ if (cacheLoadTabletIdx) {
+ tPartition.setLoadTabletIdx(cachedTabletIdx);
+ }
if (LOG.isDebugEnabled()) {
LOG.debug("Cache auto partition info, txnId: {},
partitionId: {}, "
+ "tablets: {}, slaveTablets: {}", txnId,
partition.getId(),
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/transaction/AutoPartitionCacheManager.java
b/fe/fe-core/src/main/java/org/apache/doris/transaction/AutoPartitionCacheManager.java
index 96d6033deb2..23824ef268f 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/transaction/AutoPartitionCacheManager.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/transaction/AutoPartitionCacheManager.java
@@ -26,6 +26,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
/*
** this class AutoPartitionCacheManager is used for solve the follow
question :
@@ -59,10 +60,17 @@ public class AutoPartitionCacheManager {
public static class PartitionTabletCache {
public final List<TTabletLocation> tablets;
public final List<TTabletLocation> slaveTablets;
+ public final long loadTabletIdx;
public PartitionTabletCache(List<TTabletLocation> tablets,
List<TTabletLocation> slaveTablets) {
+ this(tablets, slaveTablets, -1);
+ }
+
+ public PartitionTabletCache(List<TTabletLocation> tablets,
List<TTabletLocation> slaveTablets,
+ long loadTabletIdx) {
this.tablets = tablets;
this.slaveTablets = slaveTablets;
+ this.loadTabletIdx = loadTabletIdx;
}
}
@@ -73,6 +81,14 @@ public class AutoPartitionCacheManager {
// return true if cached, else false, this function only read cache
public boolean getAutoPartitionInfo(Long txnId, Long partitionId,
List<TTabletLocation> partitionTablets, List<TTabletLocation>
partitionSlaveTablets) {
+ return getAutoPartitionInfo(txnId, partitionId, partitionTablets,
partitionSlaveTablets,
+ new AtomicLong(-1));
+ }
+
+ // return true if cached, else false, this function only read cache
+ public boolean getAutoPartitionInfo(Long txnId, Long partitionId,
+ List<TTabletLocation> partitionTablets, List<TTabletLocation>
partitionSlaveTablets,
+ AtomicLong loadTabletIdx) {
ConcurrentHashMap<Long, PartitionTabletCache> partitionMap =
autoPartitionInfo.get(txnId);
if (partitionMap == null) {
return false;
@@ -87,11 +103,18 @@ public class AutoPartitionCacheManager {
partitionTablets.addAll(cached.tablets);
partitionSlaveTablets.clear();
partitionSlaveTablets.addAll(cached.slaveTablets);
+ loadTabletIdx.set(cached.loadTabletIdx);
return true;
}
public void getOrSetAutoPartitionInfo(Long txnId, Long partitionId,
List<TTabletLocation> partitionTablets, List<TTabletLocation>
partitionSlaveTablets) {
+ getOrSetAutoPartitionInfo(txnId, partitionId, partitionTablets,
partitionSlaveTablets, -1);
+ }
+
+ public long getOrSetAutoPartitionInfo(Long txnId, Long partitionId,
+ List<TTabletLocation> partitionTablets, List<TTabletLocation>
partitionSlaveTablets,
+ long loadTabletIdx) {
ConcurrentHashMap<Long, PartitionTabletCache> partitionMap =
autoPartitionInfo.computeIfAbsent(txnId, k -> new
ConcurrentHashMap<>());
@@ -100,7 +123,8 @@ public class AutoPartitionCacheManager {
needUpdate.set(true);
return new PartitionTabletCache(
new ArrayList<>(partitionTablets),
- new ArrayList<>(partitionSlaveTablets)
+ new ArrayList<>(partitionSlaveTablets),
+ loadTabletIdx
);
});
@@ -110,13 +134,13 @@ public class AutoPartitionCacheManager {
partitionTablets.addAll(cached.tablets);
partitionSlaveTablets.addAll(cached.slaveTablets);
LOG.debug("Get cached auto partition info from cache, txnId: {},
partitionId: {}, "
- + "tablets: {}, slaveTablets: {}", txnId, partitionId,
- cached.tablets.size(), cached.slaveTablets.size());
+ + "tablets: {}, slaveTablets: {}, loadTabletIdx: {}",
txnId, partitionId,
+ cached.tablets.size(), cached.slaveTablets.size(),
cached.loadTabletIdx);
}
+ return cached.loadTabletIdx;
}
public void clearAutoPartitionInfo(Long txnId) {
autoPartitionInfo.remove(txnId);
}
}
-
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/transaction/AutoPartitionCacheManagerTest.java
b/fe/fe-core/src/test/java/org/apache/doris/transaction/AutoPartitionCacheManagerTest.java
new file mode 100644
index 00000000000..6e56cd5551c
--- /dev/null
+++
b/fe/fe-core/src/test/java/org/apache/doris/transaction/AutoPartitionCacheManagerTest.java
@@ -0,0 +1,61 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.transaction;
+
+import org.apache.doris.thrift.TTabletLocation;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class AutoPartitionCacheManagerTest {
+ @Test
+ public void testGetOrSetAutoPartitionInfoReturnsCachedLoadTabletIdx() {
+ AutoPartitionCacheManager cacheManager = new
AutoPartitionCacheManager();
+ List<TTabletLocation> firstTablets = new ArrayList<>();
+ firstTablets.add(new TTabletLocation(10001L, Arrays.asList(1L)));
+ List<TTabletLocation> firstSlaveTablets = new ArrayList<>();
+
+ long storedLoadTabletIdx = cacheManager.getOrSetAutoPartitionInfo(
+ 10L, 20L, firstTablets, firstSlaveTablets, 3);
+ Assert.assertEquals(3, storedLoadTabletIdx);
+
+ List<TTabletLocation> secondTablets = new ArrayList<>();
+ secondTablets.add(new TTabletLocation(20001L, Arrays.asList(2L)));
+ List<TTabletLocation> secondSlaveTablets = new ArrayList<>();
+
+ long cachedLoadTabletIdx = cacheManager.getOrSetAutoPartitionInfo(
+ 10L, 20L, secondTablets, secondSlaveTablets, 5);
+ Assert.assertEquals(3, cachedLoadTabletIdx);
+ Assert.assertEquals(1, secondTablets.size());
+ Assert.assertEquals(10001L, secondTablets.get(0).getTabletId());
+
+ List<TTabletLocation> cachedTablets = new ArrayList<>();
+ List<TTabletLocation> cachedSlaveTablets = new ArrayList<>();
+ AtomicLong readLoadTabletIdx = new AtomicLong(-1);
+ Assert.assertTrue(cacheManager.getAutoPartitionInfo(
+ 10L, 20L, cachedTablets, cachedSlaveTablets,
readLoadTabletIdx));
+ Assert.assertEquals(3, readLoadTabletIdx.get());
+ Assert.assertEquals(1, cachedTablets.size());
+ Assert.assertEquals(10001L, cachedTablets.get(0).getTabletId());
+ }
+}
diff --git a/gensrc/thrift/FrontendService.thrift
b/gensrc/thrift/FrontendService.thrift
index cc74ffbe065..b7111df7066 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -1405,6 +1405,8 @@ struct TCreatePartitionRequest {
6: optional bool write_single_replica = false
// query_id to identify the coordinator, if coordinator exists, it means
this is a multi-instance load
7: optional Types.TUniqueId query_id
+ // Whether the caller's table sink is using load_to_single_tablet mode.
+ 8: optional bool load_to_single_tablet = false
}
struct TCreatePartitionResult {
@@ -1425,6 +1427,8 @@ struct TReplacePartitionRequest {
5: optional string be_endpoint
6: optional bool write_single_replica = false
7: optional Types.TUniqueId query_id
+ // Whether the caller's table sink is using load_to_single_tablet mode.
+ 8: optional bool load_to_single_tablet = false
}
struct TReplacePartitionResult {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]