This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch rm-load-fix in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit d8082b07c7f5da78adde0d9430de33eec8bbe53b Author: Steve Yurong Su <[email protected]> AuthorDate: Wed Mar 5 18:28:30 2025 +0800 fix --- .../impl/DataNodeInternalRPCServiceImpl.java | 41 ++++++++++++++++++---- .../scheduler/load/LoadTsFileDispatcherImpl.java | 17 +++++---- .../src/main/thrift/datanode.thrift | 1 + 3 files changed, 45 insertions(+), 14 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java index fe2c89e1966..7c52507fc48 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java @@ -21,11 +21,13 @@ package org.apache.iotdb.db.protocol.thrift.impl; import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation; import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; +import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType; import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.common.rpc.thrift.TFlushReq; import org.apache.iotdb.common.rpc.thrift.TLoadSample; import org.apache.iotdb.common.rpc.thrift.TNodeLocations; +import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.common.rpc.thrift.TSender; import org.apache.iotdb.common.rpc.thrift.TServiceType; @@ -506,6 +508,14 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface @Override public TLoadResp sendLoadCommand(final TLoadCommandReq req) { + final int regionId = req.getRegionId(); + final TRegionReplicaSet replicaSetBeforeExecution = + req.isSetRegionId() + && req.getCommandType() == LoadTsFileScheduler.LoadCommand.EXECUTE.ordinal() + ? partitionFetcher.getRegionReplicaSet( + new TConsensusGroupId(TConsensusGroupType.DataRegion, regionId)) + : null; + final ProgressIndex progressIndex; if (req.isSetProgressIndex()) { progressIndex = ProgressIndexType.deserializeFrom(ByteBuffer.wrap(req.getProgressIndex())); @@ -516,13 +526,30 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface "Use local generated load progress index {} for uuid {}.", progressIndex, req.uuid); } - return createTLoadResp( - StorageEngine.getInstance() - .executeLoadCommand( - LoadTsFileScheduler.LoadCommand.values()[req.commandType], - req.uuid, - req.isSetIsGeneratedByPipe() && req.isGeneratedByPipe, - progressIndex)); + final TLoadResp resp = + createTLoadResp( + StorageEngine.getInstance() + .executeLoadCommand( + LoadTsFileScheduler.LoadCommand.values()[req.commandType], + req.uuid, + req.isSetIsGeneratedByPipe() && req.isGeneratedByPipe, + progressIndex)); + + if (replicaSetBeforeExecution != null) { + final TRegionReplicaSet replicaSetAfterExecution = + partitionFetcher.getRegionReplicaSet( + new TConsensusGroupId(TConsensusGroupType.DataRegion, regionId)); + if (!Objects.equals(replicaSetBeforeExecution, replicaSetAfterExecution)) { + return createTLoadResp( + RpcUtils.getStatus( + TSStatusCode.LOAD_FILE_ERROR, + String.format( + "Region %d replica set changed from %s to %s", + regionId, replicaSetBeforeExecution, replicaSetAfterExecution))); + } + } + + return resp; } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java index 5189e9d6f36..03bbd50ca8d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java @@ -56,8 +56,9 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.net.SocketTimeoutException; import java.nio.ByteBuffer; -import java.util.HashSet; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; @@ -220,21 +221,23 @@ public class LoadTsFileDispatcherImpl implements IFragInstanceDispatcher { public Future<FragInstanceDispatchResult> dispatchCommand( TLoadCommandReq loadCommandReq, Set<TRegionReplicaSet> replicaSets) { - Set<TEndPoint> allEndPoint = new HashSet<>(); + Map<TRegionReplicaSet, TEndPoint> replicaSetEndPointMap = new HashMap<>(); for (TRegionReplicaSet replicaSet : replicaSets) { for (TDataNodeLocation dataNodeLocation : replicaSet.getDataNodeLocations()) { - allEndPoint.add(dataNodeLocation.getInternalEndPoint()); + replicaSetEndPointMap.put(replicaSet, dataNodeLocation.getInternalEndPoint()); } } - for (TEndPoint endPoint : allEndPoint) { - try (SetThreadName threadName = + for (final Map.Entry<TRegionReplicaSet, TEndPoint> replicaSetEndPointEntry : + replicaSetEndPointMap.entrySet()) { + try (final SetThreadName threadName = new SetThreadName( LoadTsFileScheduler.class.getName() + "-" + loadCommandReq.commandType)) { - if (isDispatchedToLocal(endPoint)) { + loadCommandReq.setRegionId(replicaSetEndPointEntry.getKey().getRegionId().getId()); + if (isDispatchedToLocal(replicaSetEndPointEntry.getValue())) { dispatchLocally(loadCommandReq); } else { - dispatchRemote(loadCommandReq, endPoint); + dispatchRemote(loadCommandReq, replicaSetEndPointEntry.getValue()); } } catch (FragmentInstanceDispatchException e) { return immediateFuture(new FragInstanceDispatchResult(e.getFailureStatus())); diff --git a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift index cfdd634a92f..d943bbb5a4c 100644 --- a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift +++ b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift @@ -385,6 +385,7 @@ struct TLoadCommandReq { 2: required string uuid 3: optional bool isGeneratedByPipe 4: optional binary progressIndex + 5: optional i32 regionId } struct TAttributeUpdateReq {
