This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 85f7cf53a35 Load: Detect 2PC Second Phase and RM Running in Parallel
(#15041) (#15020)
85f7cf53a35 is described below
commit 85f7cf53a35ab44760f0272ddee126eed25a0167
Author: Zikun Ma <[email protected]>
AuthorDate: Mon Mar 10 10:34:06 2025 +0800
Load: Detect 2PC Second Phase and RM Running in Parallel (#15041) (#15020)
(cherry picked from commit 727ad63f2143876c63c0659c186e3c61fbcd5522)
Co-authored-by: Steve Yurong Su <[email protected]>
Co-authored-by: 马子坤 <[email protected]>
---
.../impl/DataNodeInternalRPCServiceImpl.java | 58 +++++++++++++++++++---
.../scheduler/load/LoadTsFileDispatcherImpl.java | 22 +++++---
.../plan/scheduler/load/LoadTsFileScheduler.java | 1 +
.../src/main/thrift/datanode.thrift | 1 +
4 files changed, 66 insertions(+), 16 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
index 34dde7d0b30..20ef565ca54 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -21,11 +21,13 @@ package org.apache.iotdb.db.protocol.thrift.impl;
import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TFlushReq;
import org.apache.iotdb.common.rpc.thrift.TLoadSample;
import org.apache.iotdb.common.rpc.thrift.TNodeLocations;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.common.rpc.thrift.TSender;
import org.apache.iotdb.common.rpc.thrift.TServiceType;
@@ -271,6 +273,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -457,7 +460,20 @@ public class DataNodeInternalRPCServiceImpl implements
IDataNodeRPCService.Iface
}
@Override
- public TLoadResp sendLoadCommand(TLoadCommandReq req) {
+ public TLoadResp sendLoadCommand(final TLoadCommandReq req) {
+ final List<Integer> regionIds = req.getRegionIds();
+ final Map<Integer, TRegionReplicaSet> id2replicaSetBeforeExecution =
+ req.isSetRegionIds()
+ && req.getCommandType() ==
LoadTsFileScheduler.LoadCommand.EXECUTE.ordinal()
+ ? regionIds.stream()
+ .collect(
+ Collectors.toMap(
+ regionId -> regionId,
+ regionId ->
+ partitionFetcher.getRegionReplicaSet(
+ new
TConsensusGroupId(TConsensusGroupType.DataRegion, regionId))))
+ : Collections.emptyMap();
+
final ProgressIndex progressIndex;
if (req.isSetProgressIndex()) {
progressIndex =
ProgressIndexType.deserializeFrom(ByteBuffer.wrap(req.getProgressIndex()));
@@ -468,13 +484,39 @@ public class DataNodeInternalRPCServiceImpl implements
IDataNodeRPCService.Iface
"Use local generated load progress index {} for uuid {}.",
progressIndex, req.uuid);
}
- return createTLoadResp(
- StorageEngine.getInstance()
- .executeLoadCommand(
- LoadTsFileScheduler.LoadCommand.values()[req.commandType],
- req.uuid,
- req.isSetIsGeneratedByPipe() && req.isGeneratedByPipe,
- progressIndex));
+ final TLoadResp resp =
+ createTLoadResp(
+ StorageEngine.getInstance()
+ .executeLoadCommand(
+ LoadTsFileScheduler.LoadCommand.values()[req.commandType],
+ req.uuid,
+ req.isSetIsGeneratedByPipe() && req.isGeneratedByPipe,
+ progressIndex));
+
+ if (!id2replicaSetBeforeExecution.isEmpty()) {
+ for (Map.Entry<Integer, TRegionReplicaSet> entryBefore :
+ id2replicaSetBeforeExecution.entrySet()) {
+ final TRegionReplicaSet replicaSetAfterExecution =
+ partitionFetcher.getRegionReplicaSet(
+ new TConsensusGroupId(TConsensusGroupType.DataRegion,
entryBefore.getKey()));
+ LOGGER.warn(
+ "Load request {} for region {} executed with replica set changed
from {} to {}",
+ req.uuid,
+ entryBefore.getKey(),
+ entryBefore.getValue(),
+ replicaSetAfterExecution);
+ if (!Objects.equals(entryBefore.getValue(), replicaSetAfterExecution))
{
+ return createTLoadResp(
+ RpcUtils.getStatus(
+ TSStatusCode.LOAD_FILE_ERROR,
+ String.format(
+ "Region %d replica set changed from %s to %s",
+ entryBefore.getKey(), entryBefore.getValue(),
replicaSetAfterExecution)));
+ }
+ }
+ }
+
+ return resp;
}
private TLoadResp createTLoadResp(TSStatus resultStatus) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java
index 5189e9d6f36..bab730c8af2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java
@@ -56,8 +56,10 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.SocketTimeoutException;
import java.nio.ByteBuffer;
-import java.util.HashSet;
+import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
@@ -220,26 +222,30 @@ public class LoadTsFileDispatcherImpl implements
IFragInstanceDispatcher {
public Future<FragInstanceDispatchResult> dispatchCommand(
TLoadCommandReq loadCommandReq, Set<TRegionReplicaSet> replicaSets) {
- Set<TEndPoint> allEndPoint = new HashSet<>();
+ Map<TEndPoint, List<Integer>> endPoint2RegionIdsMap = new HashMap<>();
for (TRegionReplicaSet replicaSet : replicaSets) {
for (TDataNodeLocation dataNodeLocation :
replicaSet.getDataNodeLocations()) {
- allEndPoint.add(dataNodeLocation.getInternalEndPoint());
+ endPoint2RegionIdsMap
+ .computeIfAbsent(dataNodeLocation.getInternalEndPoint(), o -> new
ArrayList<>())
+ .add(replicaSet.getRegionId().getId());
}
}
- for (TEndPoint endPoint : allEndPoint) {
- try (SetThreadName threadName =
+ for (final Map.Entry<TEndPoint, List<Integer>> entry :
endPoint2RegionIdsMap.entrySet()) {
+ try (final SetThreadName threadName =
new SetThreadName(
LoadTsFileScheduler.class.getName() + "-" +
loadCommandReq.commandType)) {
- if (isDispatchedToLocal(endPoint)) {
+ loadCommandReq.setRegionIds(entry.getValue());
+ if (isDispatchedToLocal(entry.getKey())) {
dispatchLocally(loadCommandReq);
} else {
- dispatchRemote(loadCommandReq, endPoint);
+ dispatchRemote(loadCommandReq, entry.getKey());
}
} catch (FragmentInstanceDispatchException e) {
+ LOGGER.warn("Cannot dispatch LoadCommand for load operation {}",
loadCommandReq, e);
return immediateFuture(new
FragInstanceDispatchResult(e.getFailureStatus()));
} catch (Exception t) {
- LOGGER.warn("cannot dispatch LoadCommand for load operation", t);
+ LOGGER.warn("Cannot dispatch LoadCommand for load operation {}",
loadCommandReq, t);
return immediateFuture(
new FragInstanceDispatchResult(
RpcUtils.getStatus(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
index 098c00d55ad..08cf36e6ada 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
@@ -456,6 +456,7 @@ public class LoadTsFileScheduler implements IScheduler {
final TRegionReplicaSet currentReplicaSet =
partitionFetcher.fetcher.getRegionReplicaSet(regionId);
if (!Objects.equals(replicaSet, currentReplicaSet)) {
+ LOGGER.warn("Region replica set changed from {} to {}", replicaSet,
currentReplicaSet);
throw new RegionReplicaSetChangedException(replicaSet,
currentReplicaSet);
}
}
diff --git a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
index 4b25e08d1f6..bbc9057bff9 100644
--- a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
+++ b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
@@ -336,6 +336,7 @@ struct TLoadCommandReq {
2: required string uuid
3: optional bool isGeneratedByPipe
4: optional binary progressIndex
+ 5: optional list<i32> regionIds
}
struct TLoadResp {