This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch rm-load-fix
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit d8082b07c7f5da78adde0d9430de33eec8bbe53b
Author: Steve Yurong Su <[email protected]>
AuthorDate: Wed Mar 5 18:28:30 2025 +0800

    fix
---
 .../impl/DataNodeInternalRPCServiceImpl.java       | 41 ++++++++++++++++++----
 .../scheduler/load/LoadTsFileDispatcherImpl.java   | 17 +++++----
 .../src/main/thrift/datanode.thrift                |  1 +
 3 files changed, 45 insertions(+), 14 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
index fe2c89e1966..7c52507fc48 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -21,11 +21,13 @@ package org.apache.iotdb.db.protocol.thrift.impl;
 
 import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TFlushReq;
 import org.apache.iotdb.common.rpc.thrift.TLoadSample;
 import org.apache.iotdb.common.rpc.thrift.TNodeLocations;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.common.rpc.thrift.TSender;
 import org.apache.iotdb.common.rpc.thrift.TServiceType;
@@ -506,6 +508,14 @@ public class DataNodeInternalRPCServiceImpl implements 
IDataNodeRPCService.Iface
 
   @Override
   public TLoadResp sendLoadCommand(final TLoadCommandReq req) {
+    final int regionId = req.getRegionId();
+    final TRegionReplicaSet replicaSetBeforeExecution =
+        req.isSetRegionId()
+                && req.getCommandType() == 
LoadTsFileScheduler.LoadCommand.EXECUTE.ordinal()
+            ? partitionFetcher.getRegionReplicaSet(
+                new TConsensusGroupId(TConsensusGroupType.DataRegion, 
regionId))
+            : null;
+
     final ProgressIndex progressIndex;
     if (req.isSetProgressIndex()) {
       progressIndex = 
ProgressIndexType.deserializeFrom(ByteBuffer.wrap(req.getProgressIndex()));
@@ -516,13 +526,30 @@ public class DataNodeInternalRPCServiceImpl implements 
IDataNodeRPCService.Iface
           "Use local generated load progress index {} for uuid {}.", 
progressIndex, req.uuid);
     }
 
-    return createTLoadResp(
-        StorageEngine.getInstance()
-            .executeLoadCommand(
-                LoadTsFileScheduler.LoadCommand.values()[req.commandType],
-                req.uuid,
-                req.isSetIsGeneratedByPipe() && req.isGeneratedByPipe,
-                progressIndex));
+    final TLoadResp resp =
+        createTLoadResp(
+            StorageEngine.getInstance()
+                .executeLoadCommand(
+                    LoadTsFileScheduler.LoadCommand.values()[req.commandType],
+                    req.uuid,
+                    req.isSetIsGeneratedByPipe() && req.isGeneratedByPipe,
+                    progressIndex));
+
+    if (replicaSetBeforeExecution != null) {
+      final TRegionReplicaSet replicaSetAfterExecution =
+          partitionFetcher.getRegionReplicaSet(
+              new TConsensusGroupId(TConsensusGroupType.DataRegion, regionId));
+      if (!Objects.equals(replicaSetBeforeExecution, 
replicaSetAfterExecution)) {
+        return createTLoadResp(
+            RpcUtils.getStatus(
+                TSStatusCode.LOAD_FILE_ERROR,
+                String.format(
+                    "Region %d replica set changed from %s to %s",
+                    regionId, replicaSetBeforeExecution, 
replicaSetAfterExecution)));
+      }
+    }
+
+    return resp;
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java
index 5189e9d6f36..03bbd50ca8d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java
@@ -56,8 +56,9 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.net.SocketTimeoutException;
 import java.nio.ByteBuffer;
-import java.util.HashSet;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
@@ -220,21 +221,23 @@ public class LoadTsFileDispatcherImpl implements 
IFragInstanceDispatcher {
 
   public Future<FragInstanceDispatchResult> dispatchCommand(
       TLoadCommandReq loadCommandReq, Set<TRegionReplicaSet> replicaSets) {
-    Set<TEndPoint> allEndPoint = new HashSet<>();
+    Map<TRegionReplicaSet, TEndPoint> replicaSetEndPointMap = new HashMap<>();
     for (TRegionReplicaSet replicaSet : replicaSets) {
       for (TDataNodeLocation dataNodeLocation : 
replicaSet.getDataNodeLocations()) {
-        allEndPoint.add(dataNodeLocation.getInternalEndPoint());
+        replicaSetEndPointMap.put(replicaSet, 
dataNodeLocation.getInternalEndPoint());
       }
     }
 
-    for (TEndPoint endPoint : allEndPoint) {
-      try (SetThreadName threadName =
+    for (final Map.Entry<TRegionReplicaSet, TEndPoint> replicaSetEndPointEntry 
:
+        replicaSetEndPointMap.entrySet()) {
+      try (final SetThreadName threadName =
           new SetThreadName(
               LoadTsFileScheduler.class.getName() + "-" + 
loadCommandReq.commandType)) {
-        if (isDispatchedToLocal(endPoint)) {
+        
loadCommandReq.setRegionId(replicaSetEndPointEntry.getKey().getRegionId().getId());
+        if (isDispatchedToLocal(replicaSetEndPointEntry.getValue())) {
           dispatchLocally(loadCommandReq);
         } else {
-          dispatchRemote(loadCommandReq, endPoint);
+          dispatchRemote(loadCommandReq, replicaSetEndPointEntry.getValue());
         }
       } catch (FragmentInstanceDispatchException e) {
         return immediateFuture(new 
FragInstanceDispatchResult(e.getFailureStatus()));
diff --git a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift 
b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
index cfdd634a92f..d943bbb5a4c 100644
--- a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
+++ b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
@@ -385,6 +385,7 @@ struct TLoadCommandReq {
     2: required string uuid
     3: optional bool isGeneratedByPipe
     4: optional binary progressIndex
+    5: optional i32 regionId
 }
 
 struct TAttributeUpdateReq {

Reply via email to