This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.1 by this push:
new f62e9226bcb branch-4.1: [fix](load) fix load_to_single_tablet routing
for auto partition #64356 (#64455)
f62e9226bcb is described below
commit f62e9226bcb2012fe75a31ea776ef90900d3d9c0
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Mon Jun 15 12:04:49 2026 +0800
branch-4.1: [fix](load) fix load_to_single_tablet routing for auto
partition #64356 (#64455)
Cherry-picked from #64356
Co-authored-by: hui lai <[email protected]>
---
be/src/exec/sink/vrow_distribution.cpp | 2 +
.../apache/doris/service/FrontendServiceImpl.java | 62 +++++++++++++++++++---
.../transaction/AutoPartitionCacheManager.java | 32 +++++++++--
.../transaction/AutoPartitionCacheManagerTest.java | 61 +++++++++++++++++++++
gensrc/thrift/FrontendService.thrift | 4 ++
5 files changed, 151 insertions(+), 10 deletions(-)
diff --git a/be/src/exec/sink/vrow_distribution.cpp
b/be/src/exec/sink/vrow_distribution.cpp
index ff866e8c0be..02c59987158 100644
--- a/be/src/exec/sink/vrow_distribution.cpp
+++ b/be/src/exec/sink/vrow_distribution.cpp
@@ -111,6 +111,7 @@ Status VRowDistribution::automatic_create_partition() {
request.__set_partitionValues(_partitions_need_create);
request.__set_be_endpoint(be_endpoint);
request.__set_write_single_replica(_write_single_replica);
+
request.__set_load_to_single_tablet(_tablet_finder->is_find_tablet_every_sink());
if (_state && _state->get_query_ctx()) {
// Pass query_id to FE so it can determine if this is a multi-instance
load by checking Coordinator
request.__set_query_id(_state->get_query_ctx()->query_id());
@@ -207,6 +208,7 @@ Status VRowDistribution::_replace_overwriting_partition() {
std::string be_endpoint = BackendOptions::get_be_endpoint();
request.__set_be_endpoint(be_endpoint);
+
request.__set_load_to_single_tablet(_tablet_finder->is_find_tablet_every_sink());
if (_state && _state->get_query_ctx()) {
// Pass query_id to FE so it can determine if this is a multi-instance
load by checking Coordinator
request.__set_query_id(_state->get_query_ctx()->query_id());
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index f749cc2b0f8..7441113ad56 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -32,6 +32,7 @@ import org.apache.doris.catalog.CloudTabletStatMgr;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.DatabaseIf;
+import org.apache.doris.catalog.DistributionInfo;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.MaterializedIndex;
import org.apache.doris.catalog.OlapTable;
@@ -340,6 +341,7 @@ import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
@@ -3889,6 +3891,7 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
List<TTabletLocation> tablets = new ArrayList<>();
List<TTabletLocation> slaveTablets = new ArrayList<>();
List<TOlapTablePartition> partitions = Lists.newArrayList();
+ boolean loadToSingleTablet = request.isSetLoadToSingleTablet() &&
request.isLoadToSingleTablet();
final boolean hasBeEndpoint = request.isSetBeEndpoint();
// Lazy: resolved on the first CloudTablet that needs it (skipped on
cache-hit).
String cachedClusterId = null;
@@ -3915,17 +3918,36 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
tPartition.setNumBuckets(index.getTablets().size());
}
tPartition.setIsMutable(olapTable.getPartitionInfo().getIsMutable(partition.getId()));
+ boolean randomDistribution =
+ partition.getDistributionInfo().getType() ==
DistributionInfo.DistributionInfoType.RANDOM;
+ boolean cacheLoadTabletIdx = loadToSingleTablet &&
randomDistribution;
partitions.add(tPartition);
// tablet
+ AtomicLong cachedLoadTabletIdx = new AtomicLong(-1);
if (needUseCache
&&
Env.getCurrentGlobalTransactionMgr().getAutoPartitionCacheMgr()
.getAutoPartitionInfo(txnId, partition.getId(),
partitionTablets,
- partitionSlaveTablets)) {
+ partitionSlaveTablets,
cachedLoadTabletIdx)) {
+ if (cacheLoadTabletIdx) {
+ tPartition.setLoadTabletIdx(cachedLoadTabletIdx.get());
+ }
// fast path, if cached
tablets.addAll(partitionTablets);
slaveTablets.addAll(partitionSlaveTablets);
continue;
}
+ if (cacheLoadTabletIdx) {
+ try {
+ int tabletIndex =
Env.getCurrentEnv().getTabletLoadIndexRecorderMgr()
+ .getCurrentTabletLoadIndex(dbId,
olapTable.getId(), partition);
+ tPartition.setLoadTabletIdx(tabletIndex);
+ } catch (UserException ex) {
+
errorStatus.setErrorMsgs(Lists.newArrayList(ex.getMessage()));
+ result.setStatus(errorStatus);
+ LOG.warn("send create partition error status: {}", result);
+ return result;
+ }
+ }
int quorum =
olapTable.getPartitionInfo().getReplicaAllocation(partition.getId()).getTotalReplicaNum()
/ 2
+ 1;
for (MaterializedIndex index :
partition.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL)) {
@@ -3997,9 +4019,13 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
}
if (needUseCache) {
- Env.getCurrentGlobalTransactionMgr().getAutoPartitionCacheMgr()
+ long loadTabletIdx = cacheLoadTabletIdx ?
tPartition.getLoadTabletIdx() : -1;
+ long cachedTabletIdx =
Env.getCurrentGlobalTransactionMgr().getAutoPartitionCacheMgr()
.getOrSetAutoPartitionInfo(txnId, partition.getId(),
partitionTablets,
- partitionSlaveTablets);
+ partitionSlaveTablets, loadTabletIdx);
+ if (cacheLoadTabletIdx) {
+ tPartition.setLoadTabletIdx(cachedTabletIdx);
+ }
}
tablets.addAll(partitionTablets);
@@ -4217,6 +4243,7 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
List<TTabletLocation> tablets = new ArrayList<>();
List<TTabletLocation> slaveTablets = new ArrayList<>();
PartitionInfo partitionInfo = olapTable.getPartitionInfo();
+ boolean loadToSingleTablet = request.isSetLoadToSingleTablet() &&
request.isLoadToSingleTablet();
final boolean replaceHasBeEndpoint = request.isSetBeEndpoint();
// Lazy: resolved on the first CloudTablet that needs it.
String replaceCachedClusterId = null;
@@ -4245,17 +4272,36 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
tPartition.setNumBuckets(index.getTablets().size());
}
tPartition.setIsMutable(olapTable.getPartitionInfo().getIsMutable(partition.getId()));
+ boolean randomDistribution =
+ partition.getDistributionInfo().getType() ==
DistributionInfo.DistributionInfoType.RANDOM;
+ boolean cacheLoadTabletIdx = loadToSingleTablet &&
randomDistribution;
partitions.add(tPartition);
// tablet
+ AtomicLong cachedLoadTabletIdx = new AtomicLong(-1);
if (needUseCache && txnId != 0
&&
Env.getCurrentGlobalTransactionMgr().getAutoPartitionCacheMgr()
.getAutoPartitionInfo(txnId, partition.getId(),
partitionTablets,
- partitionSlaveTablets)) {
+ partitionSlaveTablets,
cachedLoadTabletIdx)) {
+ if (cacheLoadTabletIdx) {
+ tPartition.setLoadTabletIdx(cachedLoadTabletIdx.get());
+ }
// fast path, if cached
tablets.addAll(partitionTablets);
slaveTablets.addAll(partitionSlaveTablets);
continue;
}
+ if (cacheLoadTabletIdx) {
+ try {
+ int tabletIndex =
Env.getCurrentEnv().getTabletLoadIndexRecorderMgr()
+ .getCurrentTabletLoadIndex(dbId,
olapTable.getId(), partition);
+ tPartition.setLoadTabletIdx(tabletIndex);
+ } catch (UserException ex) {
+
errorStatus.setErrorMsgs(Lists.newArrayList(ex.getMessage()));
+ result.setStatus(errorStatus);
+ LOG.warn("send replace partition error status: {}",
result);
+ return result;
+ }
+ }
int quorum =
olapTable.getPartitionInfo().getReplicaAllocation(partition.getId()).getTotalReplicaNum()
/ 2
+ 1;
for (MaterializedIndex index :
partition.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL)) {
@@ -4332,9 +4378,13 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
}
if (needUseCache) {
- Env.getCurrentGlobalTransactionMgr().getAutoPartitionCacheMgr()
+ long loadTabletIdx = cacheLoadTabletIdx ?
tPartition.getLoadTabletIdx() : -1;
+ long cachedTabletIdx =
Env.getCurrentGlobalTransactionMgr().getAutoPartitionCacheMgr()
.getOrSetAutoPartitionInfo(txnId, partition.getId(),
partitionTablets,
- partitionSlaveTablets);
+ partitionSlaveTablets, loadTabletIdx);
+ if (cacheLoadTabletIdx) {
+ tPartition.setLoadTabletIdx(cachedTabletIdx);
+ }
if (LOG.isDebugEnabled()) {
LOG.debug("Cache auto partition info, txnId: {},
partitionId: {}, "
+ "tablets: {}, slaveTablets: {}", txnId,
partition.getId(),
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/transaction/AutoPartitionCacheManager.java
b/fe/fe-core/src/main/java/org/apache/doris/transaction/AutoPartitionCacheManager.java
index 96d6033deb2..23824ef268f 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/transaction/AutoPartitionCacheManager.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/transaction/AutoPartitionCacheManager.java
@@ -26,6 +26,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
/*
** this class AutoPartitionCacheManager is used for solve the follow
question :
@@ -59,10 +60,17 @@ public class AutoPartitionCacheManager {
public static class PartitionTabletCache {
public final List<TTabletLocation> tablets;
public final List<TTabletLocation> slaveTablets;
+ public final long loadTabletIdx;
public PartitionTabletCache(List<TTabletLocation> tablets,
List<TTabletLocation> slaveTablets) {
+ this(tablets, slaveTablets, -1);
+ }
+
+ public PartitionTabletCache(List<TTabletLocation> tablets,
List<TTabletLocation> slaveTablets,
+ long loadTabletIdx) {
this.tablets = tablets;
this.slaveTablets = slaveTablets;
+ this.loadTabletIdx = loadTabletIdx;
}
}
@@ -73,6 +81,14 @@ public class AutoPartitionCacheManager {
// return true if cached, else false, this function only read cache
public boolean getAutoPartitionInfo(Long txnId, Long partitionId,
List<TTabletLocation> partitionTablets, List<TTabletLocation>
partitionSlaveTablets) {
+ return getAutoPartitionInfo(txnId, partitionId, partitionTablets,
partitionSlaveTablets,
+ new AtomicLong(-1));
+ }
+
+ // return true if cached, else false, this function only read cache
+ public boolean getAutoPartitionInfo(Long txnId, Long partitionId,
+ List<TTabletLocation> partitionTablets, List<TTabletLocation>
partitionSlaveTablets,
+ AtomicLong loadTabletIdx) {
ConcurrentHashMap<Long, PartitionTabletCache> partitionMap =
autoPartitionInfo.get(txnId);
if (partitionMap == null) {
return false;
@@ -87,11 +103,18 @@ public class AutoPartitionCacheManager {
partitionTablets.addAll(cached.tablets);
partitionSlaveTablets.clear();
partitionSlaveTablets.addAll(cached.slaveTablets);
+ loadTabletIdx.set(cached.loadTabletIdx);
return true;
}
public void getOrSetAutoPartitionInfo(Long txnId, Long partitionId,
List<TTabletLocation> partitionTablets, List<TTabletLocation>
partitionSlaveTablets) {
+ getOrSetAutoPartitionInfo(txnId, partitionId, partitionTablets,
partitionSlaveTablets, -1);
+ }
+
+ public long getOrSetAutoPartitionInfo(Long txnId, Long partitionId,
+ List<TTabletLocation> partitionTablets, List<TTabletLocation>
partitionSlaveTablets,
+ long loadTabletIdx) {
ConcurrentHashMap<Long, PartitionTabletCache> partitionMap =
autoPartitionInfo.computeIfAbsent(txnId, k -> new
ConcurrentHashMap<>());
@@ -100,7 +123,8 @@ public class AutoPartitionCacheManager {
needUpdate.set(true);
return new PartitionTabletCache(
new ArrayList<>(partitionTablets),
- new ArrayList<>(partitionSlaveTablets)
+ new ArrayList<>(partitionSlaveTablets),
+ loadTabletIdx
);
});
@@ -110,13 +134,13 @@ public class AutoPartitionCacheManager {
partitionTablets.addAll(cached.tablets);
partitionSlaveTablets.addAll(cached.slaveTablets);
LOG.debug("Get cached auto partition info from cache, txnId: {},
partitionId: {}, "
- + "tablets: {}, slaveTablets: {}", txnId, partitionId,
- cached.tablets.size(), cached.slaveTablets.size());
+ + "tablets: {}, slaveTablets: {}, loadTabletIdx: {}",
txnId, partitionId,
+ cached.tablets.size(), cached.slaveTablets.size(),
cached.loadTabletIdx);
}
+ return cached.loadTabletIdx;
}
public void clearAutoPartitionInfo(Long txnId) {
autoPartitionInfo.remove(txnId);
}
}
-
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/transaction/AutoPartitionCacheManagerTest.java
b/fe/fe-core/src/test/java/org/apache/doris/transaction/AutoPartitionCacheManagerTest.java
new file mode 100644
index 00000000000..6e56cd5551c
--- /dev/null
+++
b/fe/fe-core/src/test/java/org/apache/doris/transaction/AutoPartitionCacheManagerTest.java
@@ -0,0 +1,61 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.transaction;
+
+import org.apache.doris.thrift.TTabletLocation;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class AutoPartitionCacheManagerTest {
+ @Test
+ public void testGetOrSetAutoPartitionInfoReturnsCachedLoadTabletIdx() {
+ AutoPartitionCacheManager cacheManager = new
AutoPartitionCacheManager();
+ List<TTabletLocation> firstTablets = new ArrayList<>();
+ firstTablets.add(new TTabletLocation(10001L, Arrays.asList(1L)));
+ List<TTabletLocation> firstSlaveTablets = new ArrayList<>();
+
+ long storedLoadTabletIdx = cacheManager.getOrSetAutoPartitionInfo(
+ 10L, 20L, firstTablets, firstSlaveTablets, 3);
+ Assert.assertEquals(3, storedLoadTabletIdx);
+
+ List<TTabletLocation> secondTablets = new ArrayList<>();
+ secondTablets.add(new TTabletLocation(20001L, Arrays.asList(2L)));
+ List<TTabletLocation> secondSlaveTablets = new ArrayList<>();
+
+ long cachedLoadTabletIdx = cacheManager.getOrSetAutoPartitionInfo(
+ 10L, 20L, secondTablets, secondSlaveTablets, 5);
+ Assert.assertEquals(3, cachedLoadTabletIdx);
+ Assert.assertEquals(1, secondTablets.size());
+ Assert.assertEquals(10001L, secondTablets.get(0).getTabletId());
+
+ List<TTabletLocation> cachedTablets = new ArrayList<>();
+ List<TTabletLocation> cachedSlaveTablets = new ArrayList<>();
+ AtomicLong readLoadTabletIdx = new AtomicLong(-1);
+ Assert.assertTrue(cacheManager.getAutoPartitionInfo(
+ 10L, 20L, cachedTablets, cachedSlaveTablets,
readLoadTabletIdx));
+ Assert.assertEquals(3, readLoadTabletIdx.get());
+ Assert.assertEquals(1, cachedTablets.size());
+ Assert.assertEquals(10001L, cachedTablets.get(0).getTabletId());
+ }
+}
diff --git a/gensrc/thrift/FrontendService.thrift
b/gensrc/thrift/FrontendService.thrift
index d1f7639ba8a..d37fc59a341 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -1439,6 +1439,8 @@ struct TCreatePartitionRequest {
6: optional bool write_single_replica = false
// query_id to identify the coordinator, if coordinator exists, it means
this is a multi-instance load
7: optional Types.TUniqueId query_id
+ // Whether the caller's table sink is using load_to_single_tablet mode.
+ 8: optional bool load_to_single_tablet = false
}
struct TCreatePartitionResult {
@@ -1459,6 +1461,8 @@ struct TReplacePartitionRequest {
5: optional string be_endpoint
6: optional bool write_single_replica = false
7: optional Types.TUniqueId query_id
+ // Whether the caller's table sink is using load_to_single_tablet mode.
+ 8: optional bool load_to_single_tablet = false
}
struct TReplacePartitionResult {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]