0AyanamiRei commented on code in PR #57198:
URL: https://github.com/apache/doris/pull/57198#discussion_r2558362463
##########
fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java:
##########
@@ -87,13 +87,19 @@ public class GlobalTransactionMgr implements
GlobalTransactionMgrIface {
private Env env;
+ private final AutoPartitionCacheManager autoPartitionCacheManager = new
AutoPartitionCacheManager();
Review Comment:
yes, done, i forget to change it :)
##########
gensrc/thrift/FrontendService.thrift:
##########
@@ -1360,6 +1360,10 @@ struct TCreatePartitionRequest {
// be_endpoint = <ip>:<heartbeat_port> to distinguish a particular BE
5: optional string be_endpoint
6: optional bool write_single_replica = false
+ // indicate whether this request is from stream load
+ 7: optional bool is_stream_load = false
Review Comment:
done
##########
fe/fe-core/src/main/java/org/apache/doris/transaction/AutoPartitionCacheManager.java:
##########
@@ -0,0 +1,117 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.transaction;
+
+import org.apache.doris.thrift.TTabletLocation;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/*
+ ** this class AutoPartitionCacheManager is used for solve the follow
question :
+ **
+ * RPC [P1, P2] RPC [P2, P3]
+ * | |
+ * P1:t1, t2 |
+ * ↓ |
+ * P2:t3, t4 |
+ * ↓
+ * P2:exist
+ * ↓
+ * P3:t5,t6
+ * --------------------------------------
+ * tablet rebalance during ...
+ * t1 - be1 t3 - be1 <-
+ * t2 - be2 t4 - be1
+ * t3 - be2 <- t5 - be2
+ * t4 - be1 t6 - be2
+ * --------------------------------------
+ * We ensure that only one view of the replica distribution in P2:t3,t4
above takes effect for this txn
+ * to avoid tablets being written to multiple instances within the same
transaction (assuming single replica)
+*/
+
+// AutoPartitionCacheManager is used to manage the cache of auto partition
info.
+// To distinguish the idempotence of the createPartition RPC during
incremental partition creation
+// for automatic partitioned tables, cache tablet locations per partition.
+public class AutoPartitionCacheManager {
+
+ public static class PartitionTabletCache {
+ public final List<TTabletLocation> tablets;
+ public final List<TTabletLocation> slaveTablets;
+
+ public PartitionTabletCache(List<TTabletLocation> tablets,
List<TTabletLocation> slaveTablets) {
+ this.tablets = tablets;
+ this.slaveTablets = slaveTablets;
+ }
+ }
+
+ // txnId -> partitionId -> PartitionTabletCache
+ private final ConcurrentHashMap<Long, ConcurrentHashMap<Long,
PartitionTabletCache>> autoPartitionInfo
+ = new ConcurrentHashMap<>();
+
+ // only read
+ public boolean getAutoPartitionInfo(Long txnId, Long partitionId,
+ List<TTabletLocation> partitionTablets, List<TTabletLocation>
partitionSlaveTablets) {
+ ConcurrentHashMap<Long, PartitionTabletCache> partitionMap =
autoPartitionInfo.get(txnId);
+ if (partitionMap == null) {
+ return false;
+ }
+
+ PartitionTabletCache cached = partitionMap.get(partitionId);
+ if (cached == null) {
+ return false;
+ }
+
+ partitionTablets.clear();
+ partitionTablets.addAll(cached.tablets);
+ partitionSlaveTablets.clear();
+ partitionSlaveTablets.addAll(cached.slaveTablets);
+ return true;
+ }
+
+ // if cached : we use cached info
+ // else : store it.
+ public void getOrSetAutoPartitionInfo(Long txnId, Long partitionId,
Review Comment:
I mean to log this information at the partition level.
##########
fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java:
##########
@@ -3756,28 +3781,70 @@ public TCreatePartitionResult
createPartition(TCreatePartitionRequest request) t
} catch (UserException ex) {
errorStatus.setErrorMsgs(Lists.newArrayList(ex.getMessage()));
result.setStatus(errorStatus);
- LOG.warn("send create partition error status: {}",
result);
return result;
}
+
if (bePathsMap.keySet().size() < quorum) {
LOG.warn("auto go quorum exception");
}
- if (request.isSetWriteSingleReplica() &&
request.isWriteSingleReplica()) {
- Long[] nodes = bePathsMap.keySet().toArray(new
Long[0]);
- Random random = new SecureRandom();
- Long masterNode = nodes[random.nextInt(nodes.length)];
- Multimap<Long, Long> slaveBePathsMap = bePathsMap;
- slaveBePathsMap.removeAll(masterNode);
- tablets.add(new TTabletLocation(tablet.getId(),
-
Lists.newArrayList(Sets.newHashSet(masterNode))));
- slaveTablets.add(new TTabletLocation(tablet.getId(),
- Lists.newArrayList(slaveBePathsMap.keySet())));
- } else {
- tablets.add(new TTabletLocation(tablet.getId(),
Lists.newArrayList(bePathsMap.keySet())));
+
+ for (Long beId : bePathsMap.keySet()) {
+ Long selectedBeId = beId;
+
+ if (Config.isCloudMode()) {
+ DebugPoint debugPoint =
DebugPointUtil.getDebugPoint(
+
"FE.FrontendServiceImpl.createPartition.MockRebalance");
+ // 手动递增 executeNum 并获取递增后的值
Review Comment:
will remove this Chinese comment after this term of TEST done
##########
fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java:
##########
@@ -3663,7 +3665,25 @@ public TCreatePartitionResult
createPartition(TCreatePartitionRequest request) t
LOG.warn("send create partition error status: {}", result);
return result;
}
-
+ // Cache tablet location only when needed:
+ // 1. From a requirement perspective: Only multi-instance ingestion
may trigger inconsistent replica
+ // distribution issues due to concurrent createPartition RPCs.
+ // 2. From a necessity perspective: For BE-initiated loads (e.g.,
stream load commit/abort from BE),
Review Comment:
I think that does not matter,cuz it will has only one instance.
##########
fe/fe-core/src/main/java/org/apache/doris/transaction/AutoPartitionCacheManager.java:
##########
@@ -0,0 +1,117 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.transaction;
+
+import org.apache.doris.thrift.TTabletLocation;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/*
+ ** this class AutoPartitionCacheManager is used for solve the follow
question :
+ **
+ * RPC [P1, P2] RPC [P2, P3]
+ * | |
+ * P1:t1, t2 |
+ * ↓ |
+ * P2:t3, t4 |
+ * ↓
+ * P2:exist
+ * ↓
+ * P3:t5,t6
+ * --------------------------------------
+ * tablet rebalance during ...
+ * t1 - be1 t3 - be1 <-
+ * t2 - be2 t4 - be1
+ * t3 - be2 <- t5 - be2
+ * t4 - be1 t6 - be2
+ * --------------------------------------
+ * We ensure that only one view of the replica distribution in P2:t3,t4
above takes effect for this txn
+ * to avoid tablets being written to multiple instances within the same
transaction (assuming single replica)
+*/
+
+// AutoPartitionCacheManager is used to manage the cache of auto partition
info.
+// To distinguish the idempotence of the createPartition RPC during
incremental partition creation
+// for automatic partitioned tables, cache tablet locations per partition.
+public class AutoPartitionCacheManager {
+
+ public static class PartitionTabletCache {
+ public final List<TTabletLocation> tablets;
+ public final List<TTabletLocation> slaveTablets;
+
+ public PartitionTabletCache(List<TTabletLocation> tablets,
List<TTabletLocation> slaveTablets) {
+ this.tablets = tablets;
+ this.slaveTablets = slaveTablets;
+ }
+ }
+
+ // txnId -> partitionId -> PartitionTabletCache
+ private final ConcurrentHashMap<Long, ConcurrentHashMap<Long,
PartitionTabletCache>> autoPartitionInfo
+ = new ConcurrentHashMap<>();
+
+ // only read
+ public boolean getAutoPartitionInfo(Long txnId, Long partitionId,
+ List<TTabletLocation> partitionTablets, List<TTabletLocation>
partitionSlaveTablets) {
+ ConcurrentHashMap<Long, PartitionTabletCache> partitionMap =
autoPartitionInfo.get(txnId);
+ if (partitionMap == null) {
+ return false;
+ }
+
+ PartitionTabletCache cached = partitionMap.get(partitionId);
+ if (cached == null) {
+ return false;
+ }
+
+ partitionTablets.clear();
+ partitionTablets.addAll(cached.tablets);
+ partitionSlaveTablets.clear();
+ partitionSlaveTablets.addAll(cached.slaveTablets);
+ return true;
+ }
+
+ // if cached : we use cached info
+ // else : store it.
+ public void getOrSetAutoPartitionInfo(Long txnId, Long partitionId,
+ List<TTabletLocation> partitionTablets, List<TTabletLocation>
partitionSlaveTablets) {
+ ConcurrentHashMap<Long, PartitionTabletCache> partitionMap =
+ autoPartitionInfo.computeIfAbsent(txnId, k -> new
ConcurrentHashMap<>());
+
+ final AtomicBoolean needUpdate = new AtomicBoolean(false);
Review Comment:
I need to use it in `computeIfAbsent({})`, Java say it must add "final"
there's 2 way :
1, final boolean[] needUpdate = {false};
2, final AtomicBoolean needUpdate
I chose `AtomicBoolean` over `boolean[]` cuz its Atomicity and memory
visibility
##########
fe/fe-core/src/main/java/org/apache/doris/transaction/AutoPartitionCacheManager.java:
##########
@@ -0,0 +1,117 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.transaction;
+
+import org.apache.doris.thrift.TTabletLocation;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/*
+ ** this class AutoPartitionCacheManager is used for solve the follow
question :
+ **
+ * RPC [P1, P2] RPC [P2, P3]
+ * | |
+ * P1:t1, t2 |
+ * ↓ |
+ * P2:t3, t4 |
+ * ↓
+ * P2:exist
+ * ↓
+ * P3:t5,t6
+ * --------------------------------------
+ * tablet rebalance during ...
+ * t1 - be1 t3 - be1 <-
+ * t2 - be2 t4 - be1
+ * t3 - be2 <- t5 - be2
+ * t4 - be1 t6 - be2
+ * --------------------------------------
+ * We ensure that only one view of the replica distribution in P2:t3,t4
above takes effect for this txn
+ * to avoid tablets being written to multiple instances within the same
transaction (assuming single replica)
+*/
+
+// AutoPartitionCacheManager is used to manage the cache of auto partition
info.
+// To distinguish the idempotence of the createPartition RPC during
incremental partition creation
+// for automatic partitioned tables, cache tablet locations per partition.
+public class AutoPartitionCacheManager {
+
+ public static class PartitionTabletCache {
+ public final List<TTabletLocation> tablets;
+ public final List<TTabletLocation> slaveTablets;
+
+ public PartitionTabletCache(List<TTabletLocation> tablets,
List<TTabletLocation> slaveTablets) {
+ this.tablets = tablets;
+ this.slaveTablets = slaveTablets;
+ }
+ }
+
+ // txnId -> partitionId -> PartitionTabletCache
+ private final ConcurrentHashMap<Long, ConcurrentHashMap<Long,
PartitionTabletCache>> autoPartitionInfo
+ = new ConcurrentHashMap<>();
+
+ // only read
+ public boolean getAutoPartitionInfo(Long txnId, Long partitionId,
+ List<TTabletLocation> partitionTablets, List<TTabletLocation>
partitionSlaveTablets) {
+ ConcurrentHashMap<Long, PartitionTabletCache> partitionMap =
autoPartitionInfo.get(txnId);
+ if (partitionMap == null) {
+ return false;
+ }
+
+ PartitionTabletCache cached = partitionMap.get(partitionId);
+ if (cached == null) {
+ return false;
+ }
+
+ partitionTablets.clear();
+ partitionTablets.addAll(cached.tablets);
+ partitionSlaveTablets.clear();
+ partitionSlaveTablets.addAll(cached.slaveTablets);
+ return true;
+ }
+
+ // if cached : we use cached info
+ // else : store it.
+ public void getOrSetAutoPartitionInfo(Long txnId, Long partitionId,
Review Comment:
maybe the comment "cache tablet locations per partition" enough?
##########
fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java:
##########
@@ -3754,6 +3787,28 @@ public TCreatePartitionResult
createPartition(TCreatePartitionRequest request) t
if (Config.isCloudMode() && request.isSetBeEndpoint())
{
bePathsMap = ((CloudTablet) tablet)
.getNormalReplicaBackendPathMapCloud(request.be_endpoint);
+ // only for test
+ if
(DebugPointUtil.isEnable("FE.FrontendServiceImpl.createPartition.MockRebalance"))
{
Review Comment:
cuz 1 replica is ez to test and enough. Need I add a test in non-cloud mode
with 1 replica config? :)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]