This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new c92b625c99d Pipe/Load: Assign distinct progress indexes for loading
tsfiles in time partitions to reduce pipe reprocessing after restart & Decrease
pipe heartbeat interval (#15583) (#15608)
c92b625c99d is described below
commit c92b625c99d445180b462b12f1f223817d6c5ab7
Author: Zhenyu Luo <[email protected]>
AuthorDate: Thu May 29 18:20:12 2025 +0800
Pipe/Load: Assign distinct progress indexes for loading tsfiles in time
partitions to reduce pipe reprocessing after restart & Decrease pipe heartbeat
interval (#15583) (#15608)
(cherry picked from commit 9b6b32344ec8095bd75c9c48a43ddb3b745fe0e3)
Co-authored-by: Steve Yurong Su <[email protected]>
---
.../PipeHistoricalDataRegionTsFileExtractor.java | 13 ++-----
.../impl/DataNodeInternalRPCServiceImpl.java | 21 +++++++----
.../scheduler/load/LoadTsFileDispatcherImpl.java | 44 +++++++++++++---------
.../plan/scheduler/load/LoadTsFileScheduler.java | 39 ++++++++++++++-----
.../iotdb/db/storageengine/StorageEngine.java | 5 ++-
.../db/storageengine/load/LoadTsFileManager.java | 22 +++++++++--
.../apache/iotdb/commons/conf/CommonConfig.java | 2 +-
.../src/main/thrift/datanode.thrift | 2 +-
8 files changed, 95 insertions(+), 53 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
index 753075f3b20..96772535634 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
@@ -517,17 +517,10 @@ public class PipeHistoricalDataRegionTsFileExtractor
implements PipeHistoricalDa
}
if (startIndex instanceof StateProgressIndex) {
- // Some different tsFiles may share the same max progressIndex, thus
tsFiles with an
- // "equals" max progressIndex must be transmitted to avoid data loss
- final ProgressIndex innerProgressIndex =
- ((StateProgressIndex) startIndex).getInnerProgressIndex();
- return
!innerProgressIndex.isAfter(resource.getMaxProgressIndexAfterClose())
- &&
!innerProgressIndex.equals(resource.getMaxProgressIndexAfterClose());
+ startIndex = ((StateProgressIndex) startIndex).getInnerProgressIndex();
}
-
- // Some different tsFiles may share the same max progressIndex, thus
tsFiles with an
- // "equals" max progressIndex must be transmitted to avoid data loss
- return !startIndex.isAfter(resource.getMaxProgressIndexAfterClose());
+ return !startIndex.isAfter(resource.getMaxProgressIndexAfterClose())
+ && !startIndex.equals(resource.getMaxProgressIndexAfterClose());
}
private boolean mayTsFileResourceOverlappedWithPattern(final TsFileResource
resource) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
index 186be9835fa..7732814c109 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -37,6 +37,7 @@ import org.apache.iotdb.common.rpc.thrift.TSettleReq;
import org.apache.iotdb.common.rpc.thrift.TShowConfigurationResp;
import org.apache.iotdb.common.rpc.thrift.TTestConnectionResp;
import org.apache.iotdb.common.rpc.thrift.TTestConnectionResult;
+import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
import org.apache.iotdb.commons.client.request.AsyncRequestContext;
import org.apache.iotdb.commons.cluster.NodeStatus;
import org.apache.iotdb.commons.conf.CommonConfig;
@@ -459,14 +460,18 @@ public class DataNodeInternalRPCServiceImpl implements
IDataNodeRPCService.Iface
@Override
public TLoadResp sendLoadCommand(TLoadCommandReq req) {
- final ProgressIndex progressIndex;
- if (req.isSetProgressIndex()) {
- progressIndex =
ProgressIndexType.deserializeFrom(ByteBuffer.wrap(req.getProgressIndex()));
+ final Map<TTimePartitionSlot, ProgressIndex> timePartitionProgressIndexMap
= new HashMap<>();
+ if (req.isSetTimePartition2ProgressIndex()) {
+ for (Map.Entry<TTimePartitionSlot, ByteBuffer> entry :
+ req.getTimePartition2ProgressIndex().entrySet()) {
+ timePartitionProgressIndexMap.put(
+ entry.getKey(),
ProgressIndexType.deserializeFrom(entry.getValue()));
+ }
} else {
- // fallback to use local generated progress index for compatibility
- progressIndex =
PipeDataNodeAgent.runtime().getNextProgressIndexForTsFileLoad();
- LOGGER.info(
- "Use local generated load progress index {} for uuid {}.",
progressIndex, req.uuid);
+ final TSStatus status = new TSStatus();
+ status.setCode(TSStatusCode.LOAD_FILE_ERROR.getStatusCode());
+ status.setMessage("Load command requires time partition to progress
index map");
+ return createTLoadResp(status);
}
return createTLoadResp(
@@ -475,7 +480,7 @@ public class DataNodeInternalRPCServiceImpl implements
IDataNodeRPCService.Iface
LoadTsFileScheduler.LoadCommand.values()[req.commandType],
req.uuid,
req.isSetIsGeneratedByPipe() && req.isGeneratedByPipe,
- progressIndex));
+ timePartitionProgressIndexMap));
}
private TLoadResp createTLoadResp(TSStatus resultStatus) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java
index 5227f230007..f50533bec47 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
@@ -56,8 +57,10 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.SocketTimeoutException;
import java.nio.ByteBuffer;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
@@ -219,7 +222,7 @@ public class LoadTsFileDispatcherImpl implements
IFragInstanceDispatcher {
}
public Future<FragInstanceDispatchResult> dispatchCommand(
- TLoadCommandReq loadCommandReq, Set<TRegionReplicaSet> replicaSets) {
+ TLoadCommandReq originalLoadCommandReq, Set<TRegionReplicaSet>
replicaSets) {
Set<TEndPoint> allEndPoint = new HashSet<>();
for (TRegionReplicaSet replicaSet : replicaSets) {
for (TDataNodeLocation dataNodeLocation :
replicaSet.getDataNodeLocations()) {
@@ -228,23 +231,27 @@ public class LoadTsFileDispatcherImpl implements
IFragInstanceDispatcher {
}
for (TEndPoint endPoint : allEndPoint) {
+ // duplicate for progress index binary serialization
+ final TLoadCommandReq duplicatedLoadCommandReq =
originalLoadCommandReq.deepCopy();
try (SetThreadName threadName =
new SetThreadName(
"load-dispatcher"
+ "-"
- +
LoadTsFileScheduler.LoadCommand.values()[loadCommandReq.commandType]
+ +
LoadTsFileScheduler.LoadCommand.values()[duplicatedLoadCommandReq.commandType]
+ "-"
- + loadCommandReq.uuid)) {
+ + duplicatedLoadCommandReq.uuid)) {
if (isDispatchedToLocal(endPoint)) {
- dispatchLocally(loadCommandReq);
+ dispatchLocally(duplicatedLoadCommandReq);
} else {
- dispatchRemote(loadCommandReq, endPoint);
+ dispatchRemote(duplicatedLoadCommandReq, endPoint);
}
} catch (FragmentInstanceDispatchException e) {
- LOGGER.warn("Cannot dispatch LoadCommand for load operation {}",
loadCommandReq, e);
+ LOGGER.warn(
+ "Cannot dispatch LoadCommand for load operation {}",
duplicatedLoadCommandReq, e);
return immediateFuture(new
FragInstanceDispatchResult(e.getFailureStatus()));
} catch (Exception t) {
- LOGGER.warn("Cannot dispatch LoadCommand for load operation {}",
loadCommandReq, t);
+ LOGGER.warn(
+ "Cannot dispatch LoadCommand for load operation {}",
duplicatedLoadCommandReq, t);
return immediateFuture(
new FragInstanceDispatchResult(
RpcUtils.getStatus(
@@ -256,17 +263,18 @@ public class LoadTsFileDispatcherImpl implements
IFragInstanceDispatcher {
private void dispatchLocally(TLoadCommandReq loadCommandReq)
throws FragmentInstanceDispatchException {
- final ProgressIndex progressIndex;
- if (loadCommandReq.isSetProgressIndex()) {
- progressIndex =
-
ProgressIndexType.deserializeFrom(ByteBuffer.wrap(loadCommandReq.getProgressIndex()));
+ final Map<TTimePartitionSlot, ProgressIndex> timePartitionProgressIndexMap
= new HashMap<>();
+ if (loadCommandReq.isSetTimePartition2ProgressIndex()) {
+ for (Map.Entry<TTimePartitionSlot, ByteBuffer> entry :
+ loadCommandReq.getTimePartition2ProgressIndex().entrySet()) {
+ timePartitionProgressIndexMap.put(
+ entry.getKey(),
ProgressIndexType.deserializeFrom(entry.getValue()));
+ }
} else {
- // fallback to use local generated progress index for compatibility
- progressIndex =
PipeDataNodeAgent.runtime().getNextProgressIndexForTsFileLoad();
- LOGGER.info(
- "Use local generated load progress index {} for uuid {}.",
- progressIndex,
- loadCommandReq.uuid);
+ final TSStatus status = new TSStatus();
+ status.setCode(TSStatusCode.LOAD_FILE_ERROR.getStatusCode());
+ status.setMessage("Load command requires time partition to progress
index map");
+ throw new FragmentInstanceDispatchException(status);
}
final TSStatus resultStatus =
@@ -275,7 +283,7 @@ public class LoadTsFileDispatcherImpl implements
IFragInstanceDispatcher {
LoadTsFileScheduler.LoadCommand.values()[loadCommandReq.commandType],
loadCommandReq.uuid,
loadCommandReq.isSetIsGeneratedByPipe() &&
loadCommandReq.isGeneratedByPipe,
- progressIndex);
+ timePartitionProgressIndexMap);
if (!RpcUtils.SUCCESS_STATUS.equals(resultStatus)) {
throw new FragmentInstanceDispatchException(resultStatus);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
index 49a9fc9e3be..43465d7ed74 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
@@ -29,6 +29,7 @@ import
org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.commons.consensus.DataRegionId;
+import org.apache.iotdb.commons.consensus.index.ProgressIndex;
import org.apache.iotdb.commons.exception.IoTDBException;
import org.apache.iotdb.commons.partition.DataPartition;
import org.apache.iotdb.commons.partition.DataPartitionQueryParam;
@@ -135,6 +136,7 @@ public class LoadTsFileScheduler implements IScheduler {
private final PlanFragmentId fragmentId;
private final Set<TRegionReplicaSet> allReplicaSets;
private final boolean isGeneratedByPipe;
+ private final Map<TTimePartitionSlot, ProgressIndex>
timePartitionSlotToProgressIndex;
private final LoadTsFileDataCacheMemoryBlock block;
public LoadTsFileScheduler(
@@ -153,6 +155,7 @@ public class LoadTsFileScheduler implements IScheduler {
this.partitionFetcher = new DataPartitionBatchFetcher(partitionFetcher);
this.allReplicaSets = new HashSet<>();
this.isGeneratedByPipe = isGeneratedByPipe;
+ this.timePartitionSlotToProgressIndex = new HashMap<>();
this.block =
LoadTsFileMemoryManager.getInstance().allocateDataCacheMemoryBlock();
for (FragmentInstance fragmentInstance :
distributedQueryPlan.getInstances()) {
@@ -397,7 +400,26 @@ public class LoadTsFileScheduler implements IScheduler {
try {
loadCommandReq.setIsGeneratedByPipe(isGeneratedByPipe);
- loadCommandReq.setProgressIndex(assignProgressIndex(tsFileResource));
+ loadCommandReq.setTimePartition2ProgressIndex(
+ timePartitionSlotToProgressIndex.entrySet().stream()
+ .collect(
+ Collectors.toMap(
+ Map.Entry::getKey,
+ entry -> {
+ try (final PublicBAOS byteArrayOutputStream = new
PublicBAOS();
+ final DataOutputStream dataOutputStream =
+ new DataOutputStream(byteArrayOutputStream)) {
+ entry.getValue().serialize(dataOutputStream);
+ return ByteBuffer.wrap(
+ byteArrayOutputStream.getBuf(), 0,
byteArrayOutputStream.size());
+ } catch (final IOException e) {
+ throw new RuntimeException(
+ String.format(
+ "Serialize Progress Index error,
isFirstPhaseSuccess: %s, uuid: %s, tsFile: %s",
+ isFirstPhaseSuccess, uuid,
tsFile.getAbsolutePath()),
+ e);
+ }
+ })));
Future<FragInstanceDispatchResult> dispatchResultFuture =
dispatcher.dispatchCommand(loadCommandReq, allReplicaSets);
@@ -420,14 +442,6 @@ public class LoadTsFileScheduler implements IScheduler {
stateMachine.transitionToFailed(status);
return false;
}
- } catch (IOException e) {
- LOGGER.warn(
- "Serialize Progress Index error, isFirstPhaseSuccess: {}, uuid: {},
tsFile: {}",
- isFirstPhaseSuccess,
- uuid,
- tsFile.getAbsolutePath());
- stateMachine.transitionToFailed(e);
- return false;
} catch (InterruptedException | ExecutionException e) {
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
@@ -601,6 +615,12 @@ public class LoadTsFileScheduler implements IScheduler {
return null;
}
+ private void computeTimePartitionSlotToProgressIndexIfAbsent(
+ final TTimePartitionSlot timePartitionSlot) {
+ timePartitionSlotToProgressIndex.putIfAbsent(
+ timePartitionSlot,
PipeDataNodeAgent.runtime().getNextProgressIndexForTsFileLoad());
+ }
+
public enum LoadCommand {
EXECUTE,
ROLLBACK
@@ -642,6 +662,7 @@ public class LoadTsFileScheduler implements IScheduler {
nonDirectionalChunkData.add(chunkData);
dataSize += chunkData.getDataSize();
block.addMemoryUsage(chunkData.getDataSize());
+
scheduler.computeTimePartitionSlotToProgressIndexIfAbsent(chunkData.getTimePartitionSlot());
if (!isMemoryEnough()) {
routeChunkData();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
index 18d43931ba4..f923fb2f341 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
@@ -22,6 +22,7 @@ import org.apache.iotdb.common.rpc.thrift.TFlushReq;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.common.rpc.thrift.TSetConfigurationReq;
import org.apache.iotdb.common.rpc.thrift.TSetTTLReq;
+import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
import org.apache.iotdb.commons.concurrent.ExceptionalCountDownLatch;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.concurrent.ThreadName;
@@ -972,13 +973,13 @@ public class StorageEngine implements IService {
LoadTsFileScheduler.LoadCommand loadCommand,
String uuid,
boolean isGeneratedByPipe,
- ProgressIndex progressIndex) {
+ Map<TTimePartitionSlot, ProgressIndex> timePartitionProgressIndexMap) {
TSStatus status = new TSStatus();
try {
switch (loadCommand) {
case EXECUTE:
- if (loadTsFileManager.loadAll(uuid, isGeneratedByPipe,
progressIndex)) {
+ if (loadTsFileManager.loadAll(uuid, isGeneratedByPipe,
timePartitionProgressIndexMap)) {
status = RpcUtils.SUCCESS_STATUS;
} else {
status.setCode(TSStatusCode.LOAD_FILE_ERROR.getStatusCode());
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java
index fe8a5806113..4ab5acae15f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.commons.consensus.index.ProgressIndex;
+import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
import org.apache.iotdb.commons.file.SystemFileFactory;
import org.apache.iotdb.commons.service.metric.MetricService;
import org.apache.iotdb.commons.service.metric.enums.Metric;
@@ -264,7 +265,10 @@ public class LoadTsFileManager {
return FOLDER_MANAGER.get().getNextFolder();
}
- public boolean loadAll(String uuid, boolean isGeneratedByPipe, ProgressIndex
progressIndex)
+ public boolean loadAll(
+ String uuid,
+ boolean isGeneratedByPipe,
+ Map<TTimePartitionSlot, ProgressIndex> timePartitionProgressIndexMap)
throws IOException, LoadFileException {
if (!uuid2WriterManager.containsKey(uuid)) {
return false;
@@ -273,7 +277,7 @@ public class LoadTsFileManager {
final Optional<CleanupTask> cleanupTask =
Optional.of(uuid2CleanupTask.get(uuid));
cleanupTask.ifPresent(CleanupTask::markLoadTaskRunning);
try {
- uuid2WriterManager.get(uuid).loadAll(isGeneratedByPipe, progressIndex);
+ uuid2WriterManager.get(uuid).loadAll(isGeneratedByPipe,
timePartitionProgressIndexMap);
} finally {
cleanupTask.ifPresent(CleanupTask::markLoadTaskNotRunning);
}
@@ -472,7 +476,9 @@ public class LoadTsFileManager {
}
}
- private void loadAll(boolean isGeneratedByPipe, ProgressIndex
progressIndex)
+ private void loadAll(
+ boolean isGeneratedByPipe,
+ Map<TTimePartitionSlot, ProgressIndex> timePartitionProgressIndexMap)
throws IOException, LoadFileException {
if (isClosed) {
throw new
IOException(String.format(MESSAGE_WRITER_MANAGER_HAS_BEEN_CLOSED, taskDir));
@@ -491,7 +497,11 @@ public class LoadTsFileManager {
final DataRegion dataRegion = entry.getKey().getDataRegion();
final TsFileResource tsFileResource =
dataPartition2Resource.get(entry.getKey());
- endTsFileResource(writer, tsFileResource, progressIndex);
+ endTsFileResource(
+ writer,
+ tsFileResource,
+ timePartitionProgressIndexMap.getOrDefault(
+ entry.getKey().getTimePartitionSlot(),
MinimumProgressIndex.INSTANCE));
dataRegion.loadNewTsFile(tsFileResource, true, isGeneratedByPipe);
// Metrics
@@ -656,6 +666,10 @@ public class LoadTsFileManager {
return dataRegion;
}
+ public TTimePartitionSlot getTimePartitionSlot() {
+ return timePartitionSlot;
+ }
+
@Override
public String toString() {
return String.join(
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index b55ef0347f1..c3f2e0f4a11 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -253,7 +253,7 @@ public class CommonConfig {
(int) (RpcUtils.THRIFT_FRAME_MAX_SIZE * 0.8);
private boolean isSeperatedPipeHeartbeatEnabled = true;
- private int pipeHeartbeatIntervalSecondsForCollectingPipeMeta = 100;
+ private int pipeHeartbeatIntervalSecondsForCollectingPipeMeta = 30;
private long pipeMetaSyncerInitialSyncDelayMinutes = 3;
private long pipeMetaSyncerSyncIntervalMinutes = 3;
private long pipeMetaSyncerAutoRestartPipeCheckIntervalRound = 1;
diff --git a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
index 2445396508f..d58e1eb1591 100644
--- a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
+++ b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
@@ -345,7 +345,7 @@ struct TLoadCommandReq {
1: required i32 commandType
2: required string uuid
3: optional bool isGeneratedByPipe
- 4: optional binary progressIndex
+ 4: optional map<common.TTimePartitionSlot, binary>
timePartition2ProgressIndex
}
struct TLoadResp {