This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new 85f7cf53a35 Load: Detect 2PC Second Phase and RM Running in Parallel 
(#15041) (#15020)
85f7cf53a35 is described below

commit 85f7cf53a35ab44760f0272ddee126eed25a0167
Author: Zikun Ma <[email protected]>
AuthorDate: Mon Mar 10 10:34:06 2025 +0800

    Load: Detect 2PC Second Phase and RM Running in Parallel (#15041) (#15020)
    
    (cherry picked from commit 727ad63f2143876c63c0659c186e3c61fbcd5522)
    
    Co-authored-by: Steve Yurong Su <[email protected]>
    Co-authored-by: 马子坤 <[email protected]>
---
 .../impl/DataNodeInternalRPCServiceImpl.java       | 58 +++++++++++++++++++---
 .../scheduler/load/LoadTsFileDispatcherImpl.java   | 22 +++++---
 .../plan/scheduler/load/LoadTsFileScheduler.java   |  1 +
 .../src/main/thrift/datanode.thrift                |  1 +
 4 files changed, 66 insertions(+), 16 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
index 34dde7d0b30..20ef565ca54 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -21,11 +21,13 @@ package org.apache.iotdb.db.protocol.thrift.impl;
 
 import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TFlushReq;
 import org.apache.iotdb.common.rpc.thrift.TLoadSample;
 import org.apache.iotdb.common.rpc.thrift.TNodeLocations;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.common.rpc.thrift.TSender;
 import org.apache.iotdb.common.rpc.thrift.TServiceType;
@@ -271,6 +273,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -457,7 +460,20 @@ public class DataNodeInternalRPCServiceImpl implements 
IDataNodeRPCService.Iface
   }
 
   @Override
-  public TLoadResp sendLoadCommand(TLoadCommandReq req) {
+  public TLoadResp sendLoadCommand(final TLoadCommandReq req) {
+    final List<Integer> regionIds = req.getRegionIds();
+    final Map<Integer, TRegionReplicaSet> id2replicaSetBeforeExecution =
+        req.isSetRegionIds()
+                && req.getCommandType() == 
LoadTsFileScheduler.LoadCommand.EXECUTE.ordinal()
+            ? regionIds.stream()
+                .collect(
+                    Collectors.toMap(
+                        regionId -> regionId,
+                        regionId ->
+                            partitionFetcher.getRegionReplicaSet(
+                                new 
TConsensusGroupId(TConsensusGroupType.DataRegion, regionId))))
+            : Collections.emptyMap();
+
     final ProgressIndex progressIndex;
     if (req.isSetProgressIndex()) {
       progressIndex = 
ProgressIndexType.deserializeFrom(ByteBuffer.wrap(req.getProgressIndex()));
@@ -468,13 +484,39 @@ public class DataNodeInternalRPCServiceImpl implements 
IDataNodeRPCService.Iface
           "Use local generated load progress index {} for uuid {}.", 
progressIndex, req.uuid);
     }
 
-    return createTLoadResp(
-        StorageEngine.getInstance()
-            .executeLoadCommand(
-                LoadTsFileScheduler.LoadCommand.values()[req.commandType],
-                req.uuid,
-                req.isSetIsGeneratedByPipe() && req.isGeneratedByPipe,
-                progressIndex));
+    final TLoadResp resp =
+        createTLoadResp(
+            StorageEngine.getInstance()
+                .executeLoadCommand(
+                    LoadTsFileScheduler.LoadCommand.values()[req.commandType],
+                    req.uuid,
+                    req.isSetIsGeneratedByPipe() && req.isGeneratedByPipe,
+                    progressIndex));
+
+    if (!id2replicaSetBeforeExecution.isEmpty()) {
+      for (Map.Entry<Integer, TRegionReplicaSet> entryBefore :
+          id2replicaSetBeforeExecution.entrySet()) {
+        final TRegionReplicaSet replicaSetAfterExecution =
+            partitionFetcher.getRegionReplicaSet(
+                new TConsensusGroupId(TConsensusGroupType.DataRegion, 
entryBefore.getKey()));
+        LOGGER.warn(
+            "Load request {} for region {} executed with replica set changed 
from {} to {}",
+            req.uuid,
+            entryBefore.getKey(),
+            entryBefore.getValue(),
+            replicaSetAfterExecution);
+        if (!Objects.equals(entryBefore.getValue(), replicaSetAfterExecution)) 
{
+          return createTLoadResp(
+              RpcUtils.getStatus(
+                  TSStatusCode.LOAD_FILE_ERROR,
+                  String.format(
+                      "Region %d replica set changed from %s to %s",
+                      entryBefore.getKey(), entryBefore.getValue(), 
replicaSetAfterExecution)));
+        }
+      }
+    }
+
+    return resp;
   }
 
   private TLoadResp createTLoadResp(TSStatus resultStatus) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java
index 5189e9d6f36..bab730c8af2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java
@@ -56,8 +56,10 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.net.SocketTimeoutException;
 import java.nio.ByteBuffer;
-import java.util.HashSet;
+import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
@@ -220,26 +222,30 @@ public class LoadTsFileDispatcherImpl implements 
IFragInstanceDispatcher {
 
   public Future<FragInstanceDispatchResult> dispatchCommand(
       TLoadCommandReq loadCommandReq, Set<TRegionReplicaSet> replicaSets) {
-    Set<TEndPoint> allEndPoint = new HashSet<>();
+    Map<TEndPoint, List<Integer>> endPoint2RegionIdsMap = new HashMap<>();
     for (TRegionReplicaSet replicaSet : replicaSets) {
       for (TDataNodeLocation dataNodeLocation : 
replicaSet.getDataNodeLocations()) {
-        allEndPoint.add(dataNodeLocation.getInternalEndPoint());
+        endPoint2RegionIdsMap
+            .computeIfAbsent(dataNodeLocation.getInternalEndPoint(), o -> new 
ArrayList<>())
+            .add(replicaSet.getRegionId().getId());
       }
     }
 
-    for (TEndPoint endPoint : allEndPoint) {
-      try (SetThreadName threadName =
+    for (final Map.Entry<TEndPoint, List<Integer>> entry : 
endPoint2RegionIdsMap.entrySet()) {
+      try (final SetThreadName threadName =
           new SetThreadName(
               LoadTsFileScheduler.class.getName() + "-" + 
loadCommandReq.commandType)) {
-        if (isDispatchedToLocal(endPoint)) {
+        loadCommandReq.setRegionIds(entry.getValue());
+        if (isDispatchedToLocal(entry.getKey())) {
           dispatchLocally(loadCommandReq);
         } else {
-          dispatchRemote(loadCommandReq, endPoint);
+          dispatchRemote(loadCommandReq, entry.getKey());
         }
       } catch (FragmentInstanceDispatchException e) {
+        LOGGER.warn("Cannot dispatch LoadCommand for load operation {}", 
loadCommandReq, e);
         return immediateFuture(new 
FragInstanceDispatchResult(e.getFailureStatus()));
       } catch (Exception t) {
-        LOGGER.warn("cannot dispatch LoadCommand for load operation", t);
+        LOGGER.warn("Cannot dispatch LoadCommand for load operation {}", 
loadCommandReq, t);
         return immediateFuture(
             new FragInstanceDispatchResult(
                 RpcUtils.getStatus(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
index 098c00d55ad..08cf36e6ada 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
@@ -456,6 +456,7 @@ public class LoadTsFileScheduler implements IScheduler {
       final TRegionReplicaSet currentReplicaSet =
           partitionFetcher.fetcher.getRegionReplicaSet(regionId);
       if (!Objects.equals(replicaSet, currentReplicaSet)) {
+        LOGGER.warn("Region replica set changed from {} to {}", replicaSet, 
currentReplicaSet);
         throw new RegionReplicaSetChangedException(replicaSet, 
currentReplicaSet);
       }
     }
diff --git a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift 
b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
index 4b25e08d1f6..bbc9057bff9 100644
--- a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
+++ b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
@@ -336,6 +336,7 @@ struct TLoadCommandReq {
     2: required string uuid
     3: optional bool isGeneratedByPipe
     4: optional binary progressIndex
+    5: optional list<i32> regionIds
 }
 
 struct TLoadResp {

Reply via email to