This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.1 by this push:
     new f62e9226bcb branch-4.1: [fix](load) fix load_to_single_tablet routing 
for auto partition #64356 (#64455)
f62e9226bcb is described below

commit f62e9226bcb2012fe75a31ea776ef90900d3d9c0
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Mon Jun 15 12:04:49 2026 +0800

    branch-4.1: [fix](load) fix load_to_single_tablet routing for auto 
partition #64356 (#64455)
    
    Cherry-picked from #64356
    
    Co-authored-by: hui lai <[email protected]>
---
 be/src/exec/sink/vrow_distribution.cpp             |  2 +
 .../apache/doris/service/FrontendServiceImpl.java  | 62 +++++++++++++++++++---
 .../transaction/AutoPartitionCacheManager.java     | 32 +++++++++--
 .../transaction/AutoPartitionCacheManagerTest.java | 61 +++++++++++++++++++++
 gensrc/thrift/FrontendService.thrift               |  4 ++
 5 files changed, 151 insertions(+), 10 deletions(-)

diff --git a/be/src/exec/sink/vrow_distribution.cpp 
b/be/src/exec/sink/vrow_distribution.cpp
index ff866e8c0be..02c59987158 100644
--- a/be/src/exec/sink/vrow_distribution.cpp
+++ b/be/src/exec/sink/vrow_distribution.cpp
@@ -111,6 +111,7 @@ Status VRowDistribution::automatic_create_partition() {
     request.__set_partitionValues(_partitions_need_create);
     request.__set_be_endpoint(be_endpoint);
     request.__set_write_single_replica(_write_single_replica);
+    
request.__set_load_to_single_tablet(_tablet_finder->is_find_tablet_every_sink());
     if (_state && _state->get_query_ctx()) {
         // Pass query_id to FE so it can determine if this is a multi-instance 
load by checking Coordinator
         request.__set_query_id(_state->get_query_ctx()->query_id());
@@ -207,6 +208,7 @@ Status VRowDistribution::_replace_overwriting_partition() {
 
     std::string be_endpoint = BackendOptions::get_be_endpoint();
     request.__set_be_endpoint(be_endpoint);
+    
request.__set_load_to_single_tablet(_tablet_finder->is_find_tablet_every_sink());
     if (_state && _state->get_query_ctx()) {
         // Pass query_id to FE so it can determine if this is a multi-instance 
load by checking Coordinator
         request.__set_query_id(_state->get_query_ctx()->query_id());
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java 
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index f749cc2b0f8..7441113ad56 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -32,6 +32,7 @@ import org.apache.doris.catalog.CloudTabletStatMgr;
 import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.DatabaseIf;
+import org.apache.doris.catalog.DistributionInfo;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.MaterializedIndex;
 import org.apache.doris.catalog.OlapTable;
@@ -340,6 +341,7 @@ import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.stream.Collectors;
 
@@ -3889,6 +3891,7 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
         List<TTabletLocation> tablets = new ArrayList<>();
         List<TTabletLocation> slaveTablets = new ArrayList<>();
         List<TOlapTablePartition> partitions = Lists.newArrayList();
+        boolean loadToSingleTablet = request.isSetLoadToSingleTablet() && 
request.isLoadToSingleTablet();
         final boolean hasBeEndpoint = request.isSetBeEndpoint();
         // Lazy: resolved on the first CloudTablet that needs it (skipped on 
cache-hit).
         String cachedClusterId = null;
@@ -3915,17 +3918,36 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
                 tPartition.setNumBuckets(index.getTablets().size());
             }
             
tPartition.setIsMutable(olapTable.getPartitionInfo().getIsMutable(partition.getId()));
+            boolean randomDistribution =
+                    partition.getDistributionInfo().getType() == 
DistributionInfo.DistributionInfoType.RANDOM;
+            boolean cacheLoadTabletIdx = loadToSingleTablet && 
randomDistribution;
             partitions.add(tPartition);
             // tablet
+            AtomicLong cachedLoadTabletIdx = new AtomicLong(-1);
             if (needUseCache
                     && 
Env.getCurrentGlobalTransactionMgr().getAutoPartitionCacheMgr()
                             .getAutoPartitionInfo(txnId, partition.getId(), 
partitionTablets,
-                                    partitionSlaveTablets)) {
+                                    partitionSlaveTablets, 
cachedLoadTabletIdx)) {
+                if (cacheLoadTabletIdx) {
+                    tPartition.setLoadTabletIdx(cachedLoadTabletIdx.get());
+                }
                 // fast path, if cached
                 tablets.addAll(partitionTablets);
                 slaveTablets.addAll(partitionSlaveTablets);
                 continue;
             }
+            if (cacheLoadTabletIdx) {
+                try {
+                    int tabletIndex = 
Env.getCurrentEnv().getTabletLoadIndexRecorderMgr()
+                            .getCurrentTabletLoadIndex(dbId, 
olapTable.getId(), partition);
+                    tPartition.setLoadTabletIdx(tabletIndex);
+                } catch (UserException ex) {
+                    
errorStatus.setErrorMsgs(Lists.newArrayList(ex.getMessage()));
+                    result.setStatus(errorStatus);
+                    LOG.warn("send create partition error status: {}", result);
+                    return result;
+                }
+            }
             int quorum = 
olapTable.getPartitionInfo().getReplicaAllocation(partition.getId()).getTotalReplicaNum()
 / 2
                     + 1;
             for (MaterializedIndex index : 
partition.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL)) {
@@ -3997,9 +4019,13 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
             }
 
             if (needUseCache) {
-                Env.getCurrentGlobalTransactionMgr().getAutoPartitionCacheMgr()
+                long loadTabletIdx = cacheLoadTabletIdx ? 
tPartition.getLoadTabletIdx() : -1;
+                long cachedTabletIdx = 
Env.getCurrentGlobalTransactionMgr().getAutoPartitionCacheMgr()
                         .getOrSetAutoPartitionInfo(txnId, partition.getId(), 
partitionTablets,
-                                partitionSlaveTablets);
+                                partitionSlaveTablets, loadTabletIdx);
+                if (cacheLoadTabletIdx) {
+                    tPartition.setLoadTabletIdx(cachedTabletIdx);
+                }
             }
 
             tablets.addAll(partitionTablets);
@@ -4217,6 +4243,7 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
         List<TTabletLocation> tablets = new ArrayList<>();
         List<TTabletLocation> slaveTablets = new ArrayList<>();
         PartitionInfo partitionInfo = olapTable.getPartitionInfo();
+        boolean loadToSingleTablet = request.isSetLoadToSingleTablet() && 
request.isLoadToSingleTablet();
         final boolean replaceHasBeEndpoint = request.isSetBeEndpoint();
         // Lazy: resolved on the first CloudTablet that needs it.
         String replaceCachedClusterId = null;
@@ -4245,17 +4272,36 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
                 tPartition.setNumBuckets(index.getTablets().size());
             }
             
tPartition.setIsMutable(olapTable.getPartitionInfo().getIsMutable(partition.getId()));
+            boolean randomDistribution =
+                    partition.getDistributionInfo().getType() == 
DistributionInfo.DistributionInfoType.RANDOM;
+            boolean cacheLoadTabletIdx = loadToSingleTablet && 
randomDistribution;
             partitions.add(tPartition);
             // tablet
+            AtomicLong cachedLoadTabletIdx = new AtomicLong(-1);
             if (needUseCache && txnId != 0
                     && 
Env.getCurrentGlobalTransactionMgr().getAutoPartitionCacheMgr()
                             .getAutoPartitionInfo(txnId, partition.getId(), 
partitionTablets,
-                                    partitionSlaveTablets)) {
+                                    partitionSlaveTablets, 
cachedLoadTabletIdx)) {
+                if (cacheLoadTabletIdx) {
+                    tPartition.setLoadTabletIdx(cachedLoadTabletIdx.get());
+                }
                 // fast path, if cached
                 tablets.addAll(partitionTablets);
                 slaveTablets.addAll(partitionSlaveTablets);
                 continue;
             }
+            if (cacheLoadTabletIdx) {
+                try {
+                    int tabletIndex = 
Env.getCurrentEnv().getTabletLoadIndexRecorderMgr()
+                            .getCurrentTabletLoadIndex(dbId, 
olapTable.getId(), partition);
+                    tPartition.setLoadTabletIdx(tabletIndex);
+                } catch (UserException ex) {
+                    
errorStatus.setErrorMsgs(Lists.newArrayList(ex.getMessage()));
+                    result.setStatus(errorStatus);
+                    LOG.warn("send replace partition error status: {}", 
result);
+                    return result;
+                }
+            }
             int quorum = 
olapTable.getPartitionInfo().getReplicaAllocation(partition.getId()).getTotalReplicaNum()
 / 2
                     + 1;
             for (MaterializedIndex index : 
partition.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL)) {
@@ -4332,9 +4378,13 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
             }
 
             if (needUseCache) {
-                Env.getCurrentGlobalTransactionMgr().getAutoPartitionCacheMgr()
+                long loadTabletIdx = cacheLoadTabletIdx ? 
tPartition.getLoadTabletIdx() : -1;
+                long cachedTabletIdx = 
Env.getCurrentGlobalTransactionMgr().getAutoPartitionCacheMgr()
                         .getOrSetAutoPartitionInfo(txnId, partition.getId(), 
partitionTablets,
-                                partitionSlaveTablets);
+                                partitionSlaveTablets, loadTabletIdx);
+                if (cacheLoadTabletIdx) {
+                    tPartition.setLoadTabletIdx(cachedTabletIdx);
+                }
                 if (LOG.isDebugEnabled()) {
                     LOG.debug("Cache auto partition info, txnId: {}, 
partitionId: {}, "
                             + "tablets: {}, slaveTablets: {}", txnId, 
partition.getId(),
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/AutoPartitionCacheManager.java
 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/AutoPartitionCacheManager.java
index 96d6033deb2..23824ef268f 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/AutoPartitionCacheManager.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/AutoPartitionCacheManager.java
@@ -26,6 +26,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
 
 /*
     ** this class AutoPartitionCacheManager is used for solve the follow 
question :
@@ -59,10 +60,17 @@ public class AutoPartitionCacheManager {
     public static class PartitionTabletCache {
         public final List<TTabletLocation> tablets;
         public final List<TTabletLocation> slaveTablets;
+        public final long loadTabletIdx;
 
         public PartitionTabletCache(List<TTabletLocation> tablets, 
List<TTabletLocation> slaveTablets) {
+            this(tablets, slaveTablets, -1);
+        }
+
+        public PartitionTabletCache(List<TTabletLocation> tablets, 
List<TTabletLocation> slaveTablets,
+                long loadTabletIdx) {
             this.tablets = tablets;
             this.slaveTablets = slaveTablets;
+            this.loadTabletIdx = loadTabletIdx;
         }
     }
 
@@ -73,6 +81,14 @@ public class AutoPartitionCacheManager {
     // return true if cached, else false, this function only read cache
     public boolean getAutoPartitionInfo(Long txnId, Long partitionId,
             List<TTabletLocation> partitionTablets, List<TTabletLocation> 
partitionSlaveTablets) {
+        return getAutoPartitionInfo(txnId, partitionId, partitionTablets, 
partitionSlaveTablets,
+                new AtomicLong(-1));
+    }
+
+    // return true if cached, else false, this function only read cache
+    public boolean getAutoPartitionInfo(Long txnId, Long partitionId,
+            List<TTabletLocation> partitionTablets, List<TTabletLocation> 
partitionSlaveTablets,
+            AtomicLong loadTabletIdx) {
         ConcurrentHashMap<Long, PartitionTabletCache> partitionMap = 
autoPartitionInfo.get(txnId);
         if (partitionMap == null) {
             return false;
@@ -87,11 +103,18 @@ public class AutoPartitionCacheManager {
         partitionTablets.addAll(cached.tablets);
         partitionSlaveTablets.clear();
         partitionSlaveTablets.addAll(cached.slaveTablets);
+        loadTabletIdx.set(cached.loadTabletIdx);
         return true;
     }
 
     public void getOrSetAutoPartitionInfo(Long txnId, Long partitionId,
             List<TTabletLocation> partitionTablets, List<TTabletLocation> 
partitionSlaveTablets) {
+        getOrSetAutoPartitionInfo(txnId, partitionId, partitionTablets, 
partitionSlaveTablets, -1);
+    }
+
+    public long getOrSetAutoPartitionInfo(Long txnId, Long partitionId,
+            List<TTabletLocation> partitionTablets, List<TTabletLocation> 
partitionSlaveTablets,
+            long loadTabletIdx) {
         ConcurrentHashMap<Long, PartitionTabletCache> partitionMap =
                 autoPartitionInfo.computeIfAbsent(txnId, k -> new 
ConcurrentHashMap<>());
 
@@ -100,7 +123,8 @@ public class AutoPartitionCacheManager {
             needUpdate.set(true);
             return new PartitionTabletCache(
                     new ArrayList<>(partitionTablets),
-                    new ArrayList<>(partitionSlaveTablets)
+                    new ArrayList<>(partitionSlaveTablets),
+                    loadTabletIdx
             );
         });
 
@@ -110,13 +134,13 @@ public class AutoPartitionCacheManager {
             partitionTablets.addAll(cached.tablets);
             partitionSlaveTablets.addAll(cached.slaveTablets);
             LOG.debug("Get cached auto partition info from cache, txnId: {}, 
partitionId: {}, "
-                    + "tablets: {}, slaveTablets: {}", txnId, partitionId,
-                    cached.tablets.size(), cached.slaveTablets.size());
+                    + "tablets: {}, slaveTablets: {}, loadTabletIdx: {}", 
txnId, partitionId,
+                    cached.tablets.size(), cached.slaveTablets.size(), 
cached.loadTabletIdx);
         }
+        return cached.loadTabletIdx;
     }
 
     public void clearAutoPartitionInfo(Long txnId) {
         autoPartitionInfo.remove(txnId);
     }
 }
-
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/transaction/AutoPartitionCacheManagerTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/transaction/AutoPartitionCacheManagerTest.java
new file mode 100644
index 00000000000..6e56cd5551c
--- /dev/null
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/transaction/AutoPartitionCacheManagerTest.java
@@ -0,0 +1,61 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.transaction;
+
+import org.apache.doris.thrift.TTabletLocation;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class AutoPartitionCacheManagerTest {
+    @Test
+    public void testGetOrSetAutoPartitionInfoReturnsCachedLoadTabletIdx() {
+        AutoPartitionCacheManager cacheManager = new 
AutoPartitionCacheManager();
+        List<TTabletLocation> firstTablets = new ArrayList<>();
+        firstTablets.add(new TTabletLocation(10001L, Arrays.asList(1L)));
+        List<TTabletLocation> firstSlaveTablets = new ArrayList<>();
+
+        long storedLoadTabletIdx = cacheManager.getOrSetAutoPartitionInfo(
+                10L, 20L, firstTablets, firstSlaveTablets, 3);
+        Assert.assertEquals(3, storedLoadTabletIdx);
+
+        List<TTabletLocation> secondTablets = new ArrayList<>();
+        secondTablets.add(new TTabletLocation(20001L, Arrays.asList(2L)));
+        List<TTabletLocation> secondSlaveTablets = new ArrayList<>();
+
+        long cachedLoadTabletIdx = cacheManager.getOrSetAutoPartitionInfo(
+                10L, 20L, secondTablets, secondSlaveTablets, 5);
+        Assert.assertEquals(3, cachedLoadTabletIdx);
+        Assert.assertEquals(1, secondTablets.size());
+        Assert.assertEquals(10001L, secondTablets.get(0).getTabletId());
+
+        List<TTabletLocation> cachedTablets = new ArrayList<>();
+        List<TTabletLocation> cachedSlaveTablets = new ArrayList<>();
+        AtomicLong readLoadTabletIdx = new AtomicLong(-1);
+        Assert.assertTrue(cacheManager.getAutoPartitionInfo(
+                10L, 20L, cachedTablets, cachedSlaveTablets, 
readLoadTabletIdx));
+        Assert.assertEquals(3, readLoadTabletIdx.get());
+        Assert.assertEquals(1, cachedTablets.size());
+        Assert.assertEquals(10001L, cachedTablets.get(0).getTabletId());
+    }
+}
diff --git a/gensrc/thrift/FrontendService.thrift 
b/gensrc/thrift/FrontendService.thrift
index d1f7639ba8a..d37fc59a341 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -1439,6 +1439,8 @@ struct TCreatePartitionRequest {
     6: optional bool write_single_replica = false
     // query_id to identify the coordinator, if coordinator exists, it means 
this is a multi-instance load
     7: optional Types.TUniqueId query_id
+    // Whether the caller's table sink is using load_to_single_tablet mode.
+    8: optional bool load_to_single_tablet = false
 }
 
 struct TCreatePartitionResult {
@@ -1459,6 +1461,8 @@ struct TReplacePartitionRequest {
     5: optional string be_endpoint
     6: optional bool write_single_replica = false
     7: optional Types.TUniqueId query_id
+    // Whether the caller's table sink is using load_to_single_tablet mode.
+    8: optional bool load_to_single_tablet = false
 }
 
 struct TReplacePartitionResult {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to