This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 39cd8180bc8 [IOTDB-6287] Load: Add Progress Index for the Second Phase 
of Load TsFile (#11851)
39cd8180bc8 is described below

commit 39cd8180bc82f32737892f309e4d27cd4425e252
Author: yschengzi <[email protected]>
AuthorDate: Fri Jan 5 17:47:02 2024 +0800

    [IOTDB-6287] Load: Add Progress Index for the Second Phase of Load TsFile 
(#11851)
    
    Problems:
    The Load TsFile process uses a two-phase transaction commit to ensure 
consistency. Currently, commands are sent directly to each DataNode through the 
Thrift framework during the second stage of validation, which does not ensure 
that the loaded TsFiles are organized within the DataRegionGroup. This does not 
ensure that the loaded TsFiles are organized within the DataRegionGroup, nor 
can they be correctly identified as progress by the Pipe system.
    
    Solution:
    A RecoverProgressIndex will be added to the second stage command, so that 
when a local load or a cross-DataNode load is performed, the ProgressIndex will 
be incorporated into the TsFileResource to complete the progress identification.
    
    ---------
    
    Co-authored-by: Steve Yurong Su <[email protected]>
---
 .../org/apache/iotdb/db/it/IOTDBLoadTsFileIT.java  | 31 ++++++++
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 17 ++--
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |  5 ++
 .../db/pipe/agent/runtime/PipeRuntimeAgent.java    | 16 ++--
 .../impl/DataNodeInternalRPCServiceImpl.java       | 15 +++-
 .../execution/load/LoadTsFileManager.java          | 14 ++--
 .../plan/planner/LocalExecutionPlanner.java        | 22 +++--
 .../plan/node/load/LoadSingleTsFileNode.java       |  8 --
 .../scheduler/load/LoadTsFileDispatcherImpl.java   | 37 ++++++++-
 .../plan/scheduler/load/LoadTsFileScheduler.java   | 93 +++++++++++++---------
 .../iotdb/db/storageengine/StorageEngine.java      | 10 ++-
 .../dataregion/utils/TsFileResourceUtils.java      |  2 -
 .../src/main/thrift/datanode.thrift                |  1 +
 13 files changed, 194 insertions(+), 77 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/db/it/IOTDBLoadTsFileIT.java 
b/integration-test/src/test/java/org/apache/iotdb/db/it/IOTDBLoadTsFileIT.java
index d887bc5a53b..9e018c46249 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/db/it/IOTDBLoadTsFileIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/db/it/IOTDBLoadTsFileIT.java
@@ -48,6 +48,7 @@ import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -695,6 +696,36 @@ public class IOTDBLoadTsFileIT {
     }
   }
 
+  @Test
+  public void testLoadLocally() throws Exception {
+    registerSchema();
+
+    long writtenPoint1 = 0;
+    // device 0, device 1, sg 0
+    try (TsFileGenerator generator = new TsFileGenerator(new File(tmpDir, 
"1-0-0-0.tsfile"))) {
+      generator.registerTimeseries(
+          SchemaConfig.DEVICE_0, 
Collections.singletonList(SchemaConfig.MEASUREMENT_00));
+      generator.generateData(SchemaConfig.DEVICE_0, 1, PARTITION_INTERVAL / 
10_000, false);
+      writtenPoint1 = generator.getTotalNumber();
+    }
+
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+
+      statement.execute(String.format("load \"%s\" sglevel=2", 
tmpDir.getAbsolutePath()));
+
+      try (ResultSet resultSet =
+          statement.executeQuery("select count(*) from root.** group by 
level=1,2")) {
+        if (resultSet.next()) {
+          long sg1Count = resultSet.getLong("count(root.sg.test_0.*.*)");
+          Assert.assertEquals(writtenPoint1, sg1Count);
+        } else {
+          Assert.fail("This ResultSet is empty.");
+        }
+      }
+    }
+  }
+
   private static class SchemaConfig {
     private static final String STORAGE_GROUP_0 = "root.sg.test_0";
     private static final String STORAGE_GROUP_1 = "root.sg.test_1";
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index a12192cf29c..c4a59baa38f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -171,9 +171,6 @@ public class IoTDBConfig {
   /** The proportion of write memory for compaction */
   private double compactionProportion = 0.2;
 
-  /** The proportion of write memory for loading TsFile */
-  private double loadTsFileProportion = 0.125;
-
   /**
    * If memory cost of data region increased more than proportion of 
{@linkplain
    * IoTDBConfig#getAllocateMemoryForStorageEngine()}*{@linkplain
@@ -1095,6 +1092,8 @@ public class IoTDBConfig {
   private double maxMemoryRatioForQueue = 0.6;
 
   /** Load related */
+  private double maxAllocateMemoryRatioForLoad = 0.8;
+
   private int loadTsFileAnalyzeSchemaBatchFlushTimeSeriesNumber = 4096;
 
   private long loadTsFileAnalyzeSchemaMemorySizeInBytes =
@@ -3279,10 +3278,6 @@ public class IoTDBConfig {
     return compactionProportion;
   }
 
-  public double getLoadTsFileProportion() {
-    return loadTsFileProportion;
-  }
-
   public static String getEnvironmentVariables() {
     return "\n\t"
         + IoTDBConstant.IOTDB_HOME
@@ -3737,6 +3732,14 @@ public class IoTDBConfig {
     return modeMapSizeThreshold;
   }
 
+  public double getMaxAllocateMemoryRatioForLoad() {
+    return maxAllocateMemoryRatioForLoad;
+  }
+
+  public void setMaxAllocateMemoryRatioForLoad(double 
maxAllocateMemoryRatioForLoad) {
+    this.maxAllocateMemoryRatioForLoad = maxAllocateMemoryRatioForLoad;
+  }
+
   public int getLoadTsFileAnalyzeSchemaBatchFlushTimeSeriesNumber() {
     return loadTsFileAnalyzeSchemaBatchFlushTimeSeriesNumber;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 494e9d5d1c4..d876439db6a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -893,6 +893,11 @@ public class IoTDBDescriptor {
       conf.setIntoOperationExecutionThreadCount(2);
     }
 
+    conf.setMaxAllocateMemoryRatioForLoad(
+        Double.parseDouble(
+            properties.getProperty(
+                "max_allocate_memory_ratio_for_load",
+                String.valueOf(conf.getMaxAllocateMemoryRatioForLoad()))));
     conf.setLoadTsFileAnalyzeSchemaBatchFlushTimeSeriesNumber(
         Integer.parseInt(
             properties.getProperty(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
index 0aefdf2eada..991c57b3a15 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
@@ -103,15 +103,21 @@ public class PipeRuntimeAgent implements IService {
     simpleConsensusProgressIndexAssigner.assignIfNeeded(insertNode);
   }
 
-  ////////////////////// Recover ProgressIndex Assigner //////////////////////
+  ////////////////////// Load ProgressIndex Assigner //////////////////////
 
   public void assignProgressIndexForTsFileLoad(TsFileResource tsFileResource) {
-    tsFileResource.setProgressIndex(
-        new RecoverProgressIndex(
-            DATA_NODE_ID,
-            
simpleConsensusProgressIndexAssigner.getSimpleProgressIndexForTsFileRecovery()));
+    // override the progress index of the tsfile resource, not to update the 
progress index
+    tsFileResource.setProgressIndex(getNextProgressIndexForTsFileLoad());
   }
 
+  public RecoverProgressIndex getNextProgressIndexForTsFileLoad() {
+    return new RecoverProgressIndex(
+        DATA_NODE_ID,
+        
simpleConsensusProgressIndexAssigner.getSimpleProgressIndexForTsFileRecovery());
+  }
+
+  ////////////////////// Recover ProgressIndex Assigner //////////////////////
+
   public void assignProgressIndexForTsFileRecovery(TsFileResource 
tsFileResource) {
     tsFileResource.updateProgressIndex(
         new RecoverProgressIndex(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
index a3ec3892697..23736169581 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -36,6 +36,8 @@ import 
org.apache.iotdb.commons.conf.IoTDBConstant.ClientVersion;
 import org.apache.iotdb.commons.consensus.ConsensusGroupId;
 import org.apache.iotdb.commons.consensus.DataRegionId;
 import org.apache.iotdb.commons.consensus.SchemaRegionId;
+import org.apache.iotdb.commons.consensus.index.ProgressIndex;
+import org.apache.iotdb.commons.consensus.index.ProgressIndexType;
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.exception.MetadataException;
 import org.apache.iotdb.commons.path.PartialPath;
@@ -404,12 +406,23 @@ public class DataNodeInternalRPCServiceImpl implements 
IDataNodeRPCService.Iface
 
   @Override
   public TLoadResp sendLoadCommand(TLoadCommandReq req) {
+    final ProgressIndex progressIndex;
+    if (req.isSetProgressIndex()) {
+      progressIndex = 
ProgressIndexType.deserializeFrom(ByteBuffer.wrap(req.getProgressIndex()));
+    } else {
+      // fallback to use local generated progress index for compatibility
+      progressIndex = PipeAgent.runtime().getNextProgressIndexForTsFileLoad();
+      LOGGER.info(
+          "Use local generated load progress index {} for uuid {}.", 
progressIndex, req.uuid);
+    }
+
     return createTLoadResp(
         StorageEngine.getInstance()
             .executeLoadCommand(
                 LoadTsFileScheduler.LoadCommand.values()[req.commandType],
                 req.uuid,
-                req.isSetIsGeneratedByPipe() && req.isGeneratedByPipe));
+                req.isSetIsGeneratedByPipe() && req.isGeneratedByPipe,
+                progressIndex));
   }
 
   private TLoadResp createTLoadResp(TSStatus resultStatus) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
index 8b8eb97946c..17907fd47f8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.queryengine.execution.load;
 import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.commons.consensus.index.ProgressIndex;
 import org.apache.iotdb.commons.file.SystemFileFactory;
 import org.apache.iotdb.commons.service.metric.MetricService;
 import org.apache.iotdb.commons.service.metric.enums.Metric;
@@ -138,12 +139,12 @@ public class LoadTsFileManager {
     }
   }
 
-  public boolean loadAll(String uuid, boolean isGeneratedByPipe)
+  public boolean loadAll(String uuid, boolean isGeneratedByPipe, ProgressIndex 
progressIndex)
       throws IOException, LoadFileException {
     if (!uuid2WriterManager.containsKey(uuid)) {
       return false;
     }
-    uuid2WriterManager.get(uuid).loadAll(isGeneratedByPipe);
+    uuid2WriterManager.get(uuid).loadAll(isGeneratedByPipe, progressIndex);
     clean(uuid);
     return true;
   }
@@ -277,7 +278,8 @@ public class LoadTsFileManager {
       }
     }
 
-    private void loadAll(boolean isGeneratedByPipe) throws IOException, 
LoadFileException {
+    private void loadAll(boolean isGeneratedByPipe, ProgressIndex 
progressIndex)
+        throws IOException, LoadFileException {
       if (isClosed) {
         throw new 
IOException(String.format(MESSAGE_WRITER_MANAGER_HAS_BEEN_CLOSED, taskDir));
       }
@@ -293,7 +295,7 @@ public class LoadTsFileManager {
         writer.endFile();
 
         DataRegion dataRegion = entry.getKey().getDataRegion();
-        dataRegion.loadNewTsFile(generateResource(writer), true, 
isGeneratedByPipe);
+        dataRegion.loadNewTsFile(generateResource(writer, progressIndex), 
true, isGeneratedByPipe);
 
         MetricService.getInstance()
             .count(
@@ -309,8 +311,10 @@ public class LoadTsFileManager {
       }
     }
 
-    private TsFileResource generateResource(TsFileIOWriter writer) throws 
IOException {
+    private TsFileResource generateResource(TsFileIOWriter writer, 
ProgressIndex progressIndex)
+        throws IOException {
       TsFileResource tsFileResource = 
TsFileResourceUtils.generateTsFileResource(writer);
+      tsFileResource.setProgressIndex(progressIndex);
       tsFileResource.serialize();
       return tsFileResource;
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java
index 0b1fc6cecd7..bc60840e262 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java
@@ -19,6 +19,7 @@
 package org.apache.iotdb.db.queryengine.plan.planner;
 
 import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.queryengine.exception.MemoryNotEnoughException;
 import org.apache.iotdb.db.queryengine.execution.driver.DataDriverContext;
@@ -45,8 +46,17 @@ import java.util.List;
 public class LocalExecutionPlanner {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(LocalExecutionPlanner.class);
-  private static final long ALLOCATE_MEMORY_FOR_OPERATORS =
-      
IoTDBDescriptor.getInstance().getConfig().getAllocateMemoryForOperators();
+  private static final long ALLOCATE_MEMORY_FOR_OPERATORS;
+  private static final long MAX_REST_MEMORY_FOR_LOAD;
+
+  static {
+    IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig();
+    ALLOCATE_MEMORY_FOR_OPERATORS = CONFIG.getAllocateMemoryForOperators();
+    MAX_REST_MEMORY_FOR_LOAD =
+        (long)
+            (((double) ALLOCATE_MEMORY_FOR_OPERATORS)
+                * (1.0 - CONFIG.getMaxAllocateMemoryRatioForLoad()));
+  }
 
   /** allocated memory for operator execution */
   private long freeMemoryForOperators = ALLOCATE_MEMORY_FOR_OPERATORS;
@@ -165,7 +175,7 @@ public class LocalExecutionPlanner {
   }
 
   public synchronized boolean forceAllocateFreeMemoryForOperators(long 
memoryInBytes) {
-    if (freeMemoryForOperators < memoryInBytes) {
+    if (freeMemoryForOperators - memoryInBytes <= MAX_REST_MEMORY_FOR_LOAD) {
       return false;
     } else {
       freeMemoryForOperators -= memoryInBytes;
@@ -174,9 +184,9 @@ public class LocalExecutionPlanner {
   }
 
   public synchronized long tryAllocateFreeMemoryForOperators(long 
memoryInBytes) {
-    if (freeMemoryForOperators < memoryInBytes) {
-      long result = freeMemoryForOperators;
-      freeMemoryForOperators = 0;
+    if (freeMemoryForOperators - memoryInBytes <= MAX_REST_MEMORY_FOR_LOAD) {
+      long result = freeMemoryForOperators - MAX_REST_MEMORY_FOR_LOAD;
+      freeMemoryForOperators = MAX_REST_MEMORY_FOR_LOAD;
       return result;
     } else {
       freeMemoryForOperators -= memoryInBytes;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadSingleTsFileNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadSingleTsFileNode.java
index 7da6f247155..da2c9dde669 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadSingleTsFileNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadSingleTsFileNode.java
@@ -25,7 +25,6 @@ import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
 import org.apache.iotdb.commons.utils.TimePartitionUtils;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.pipe.agent.PipeAgent;
 import org.apache.iotdb.db.queryengine.plan.analyze.Analysis;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
@@ -99,13 +98,6 @@ public class LoadSingleTsFileNode extends WritePlanNode {
       needDecodeTsFile = !isDispatchedToLocal(new 
HashSet<>(partitionFetcher.apply(slotList)));
     }
 
-    PipeAgent.runtime().assignProgressIndexForTsFileLoad(resource);
-
-    // we serialize the resource file even if the tsfile does not need to be 
decoded
-    // or the resource file is already existed because we need to serialize the
-    // progress index of the tsfile
-    resource.serialize();
-
     return needDecodeTsFile;
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java
index 84e65dd35d5..e04038b4024 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java
@@ -29,9 +29,12 @@ import 
org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.commons.consensus.ConsensusGroupId;
 import org.apache.iotdb.commons.consensus.DataRegionId;
+import org.apache.iotdb.commons.consensus.index.ProgressIndex;
+import org.apache.iotdb.commons.consensus.index.ProgressIndexType;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.LoadFileException;
 import org.apache.iotdb.db.exception.mpp.FragmentInstanceDispatchException;
+import org.apache.iotdb.db.pipe.agent.PipeAgent;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.FragmentInstance;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
@@ -40,6 +43,7 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadTsFilePie
 import 
org.apache.iotdb.db.queryengine.plan.scheduler.FragInstanceDispatchResult;
 import org.apache.iotdb.db.queryengine.plan.scheduler.IFragInstanceDispatcher;
 import org.apache.iotdb.db.storageengine.StorageEngine;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
 import org.apache.iotdb.mpp.rpc.thrift.TLoadCommandReq;
 import org.apache.iotdb.mpp.rpc.thrift.TLoadResp;
 import org.apache.iotdb.mpp.rpc.thrift.TTsFilePieceReq;
@@ -51,6 +55,8 @@ import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
@@ -177,12 +183,26 @@ public class LoadTsFileDispatcherImpl implements 
IFragInstanceDispatcher {
 
   private void dispatchLocally(TLoadCommandReq loadCommandReq)
       throws FragmentInstanceDispatchException {
-    TSStatus resultStatus =
+    final ProgressIndex progressIndex;
+    if (loadCommandReq.isSetProgressIndex()) {
+      progressIndex =
+          
ProgressIndexType.deserializeFrom(ByteBuffer.wrap(loadCommandReq.getProgressIndex()));
+    } else {
+      // fallback to use local generated progress index for compatibility
+      progressIndex = PipeAgent.runtime().getNextProgressIndexForTsFileLoad();
+      LOGGER.info(
+          "Use local generated load progress index {} for uuid {}.",
+          progressIndex,
+          loadCommandReq.uuid);
+    }
+
+    final TSStatus resultStatus =
         StorageEngine.getInstance()
             .executeLoadCommand(
                 
LoadTsFileScheduler.LoadCommand.values()[loadCommandReq.commandType],
                 loadCommandReq.uuid,
-                loadCommandReq.isSetIsGeneratedByPipe() && 
loadCommandReq.isGeneratedByPipe);
+                loadCommandReq.isSetIsGeneratedByPipe() && 
loadCommandReq.isGeneratedByPipe,
+                progressIndex);
     if (!RpcUtils.SUCCESS_STATUS.equals(resultStatus)) {
       throw new FragmentInstanceDispatchException(resultStatus);
     }
@@ -210,11 +230,15 @@ public class LoadTsFileDispatcherImpl implements 
IFragInstanceDispatcher {
         throw new FragmentInstanceDispatchException(resultStatus);
       }
     } else if (planNode instanceof LoadSingleTsFileNode) { // do not need to 
split
+      final TsFileResource tsFileResource = ((LoadSingleTsFileNode) 
planNode).getTsFileResource();
       try {
+        PipeAgent.runtime().assignProgressIndexForTsFileLoad(tsFileResource);
+        tsFileResource.serialize();
+
         StorageEngine.getInstance()
             .getDataRegion((DataRegionId) groupId)
             .loadNewTsFile(
-                ((LoadSingleTsFileNode) planNode).getTsFileResource(),
+                tsFileResource,
                 ((LoadSingleTsFileNode) planNode).isDeleteAfterLoad(),
                 isGeneratedByPipe);
       } catch (LoadFileException e) {
@@ -223,6 +247,13 @@ public class LoadTsFileDispatcherImpl implements 
IFragInstanceDispatcher {
         resultStatus.setCode(TSStatusCode.LOAD_FILE_ERROR.getStatusCode());
         resultStatus.setMessage(e.getMessage());
         throw new FragmentInstanceDispatchException(resultStatus);
+      } catch (IOException e) {
+        LOGGER.warn(
+            "Serialize TsFileResource {} error.", 
tsFileResource.getTsFile().getAbsolutePath(), e);
+        TSStatus resultStatus = new TSStatus();
+        resultStatus.setCode(TSStatusCode.LOAD_FILE_ERROR.getStatusCode());
+        resultStatus.setMessage(e.getMessage());
+        throw new FragmentInstanceDispatchException(resultStatus);
       }
     }
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
index 51c4b44df38..ecdcbbc5922 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
@@ -35,10 +35,10 @@ import org.apache.iotdb.commons.partition.StorageExecutor;
 import org.apache.iotdb.commons.service.metric.MetricService;
 import org.apache.iotdb.commons.service.metric.enums.Metric;
 import org.apache.iotdb.commons.service.metric.enums.Tag;
-import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.LoadReadOnlyException;
 import org.apache.iotdb.db.exception.mpp.FragmentInstanceDispatchException;
+import org.apache.iotdb.db.pipe.agent.PipeAgent;
 import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
 import org.apache.iotdb.db.queryengine.common.PlanFragmentId;
 import org.apache.iotdb.db.queryengine.execution.QueryStateMachine;
@@ -58,16 +58,21 @@ import 
org.apache.iotdb.db.queryengine.plan.scheduler.FragInstanceDispatchResult
 import org.apache.iotdb.db.queryengine.plan.scheduler.IScheduler;
 import org.apache.iotdb.db.storageengine.StorageEngine;
 import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
 import org.apache.iotdb.metrics.utils.MetricLevel;
 import org.apache.iotdb.mpp.rpc.thrift.TLoadCommandReq;
 import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.utils.PublicBAOS;
 
 import io.airlift.units.Duration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.DataOutputStream;
 import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
@@ -93,22 +98,12 @@ import java.util.stream.IntStream;
  * 
href="https://apache-iotdb.feishu.cn/docx/doxcnyBYWzek8ksSEU6obZMpYLe";>...</a>;
  */
 public class LoadTsFileScheduler implements IScheduler {
-  private static final Logger logger = 
LoggerFactory.getLogger(LoadTsFileScheduler.class);
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(LoadTsFileScheduler.class);
   public static final long LOAD_TASK_MAX_TIME_IN_SECOND = 900L; // 15min
-  private static final long SINGLE_SCHEDULER_MAX_MEMORY_SIZE;
-  private static final int TRANSMIT_LIMIT;
-
-  static {
-    IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
-    SINGLE_SCHEDULER_MAX_MEMORY_SIZE =
-        Math.min(
-            config.getThriftMaxFrameSize() >> 2,
-            (long)
-                (config.getAllocateMemoryForStorageEngine()
-                    * config.getLoadTsFileProportion())); // TODO: change it 
to query engine
-    TRANSMIT_LIMIT =
-        
CommonDescriptor.getInstance().getConfig().getTTimePartitionSlotTransmitLimit();
-  }
+  private static final long SINGLE_SCHEDULER_MAX_MEMORY_SIZE =
+      IoTDBDescriptor.getInstance().getConfig().getThriftMaxFrameSize() >> 2;
+  private static final int TRANSMIT_LIMIT =
+      
CommonDescriptor.getInstance().getConfig().getTTimePartitionSlotTransmitLimit();
 
   private final MPPQueryContext queryContext;
   private final QueryStateMachine stateMachine;
@@ -153,7 +148,7 @@ public class LoadTsFileScheduler implements IScheduler {
       boolean isLoadSingleTsFileSuccess = true;
       try {
         if (node.isTsFileEmpty()) {
-          logger.info(
+          LOGGER.info(
               "Load skip TsFile {}, because it has no data.",
               node.getTsFileResource().getTsFilePath());
 
@@ -172,7 +167,7 @@ public class LoadTsFileScheduler implements IScheduler {
 
           boolean isFirstPhaseSuccess = firstPhase(node);
           boolean isSecondPhaseSuccess =
-              secondPhase(isFirstPhaseSuccess, uuid, 
node.getTsFileResource().getTsFile());
+              secondPhase(isFirstPhaseSuccess, uuid, node.getTsFileResource());
 
           node.clean();
           if (!isFirstPhaseSuccess || !isSecondPhaseSuccess) {
@@ -180,14 +175,14 @@ public class LoadTsFileScheduler implements IScheduler {
           }
         }
         if (isLoadSingleTsFileSuccess) {
-          logger.info(
+          LOGGER.info(
               "Load TsFile {} Successfully, load process [{}/{}]",
               node.getTsFileResource().getTsFilePath(),
               i + 1,
               tsFileNodeListSize);
         } else {
           isLoadSuccess = false;
-          logger.warn(
+          LOGGER.warn(
               "Can not Load TsFile {}, load process [{}/{}]",
               node.getTsFileResource().getTsFilePath(),
               i + 1,
@@ -196,7 +191,7 @@ public class LoadTsFileScheduler implements IScheduler {
       } catch (Exception e) {
         isLoadSuccess = false;
         stateMachine.transitionToFailed(e);
-        logger.warn(
+        LOGGER.warn(
             String.format(
                 "LoadTsFileScheduler loads TsFile %s error",
                 node.getTsFileResource().getTsFilePath()),
@@ -221,7 +216,7 @@ public class LoadTsFileScheduler implements IScheduler {
       }
     } catch (IllegalStateException e) {
       stateMachine.transitionToFailed(e);
-      logger.warn(
+      LOGGER.warn(
           String.format(
               "Dispatch TsFileData error when parsing TsFile %s.",
               node.getTsFileResource().getTsFile()),
@@ -229,7 +224,7 @@ public class LoadTsFileScheduler implements IScheduler {
       return false;
     } catch (Exception e) {
       stateMachine.transitionToFailed(e);
-      logger.warn(
+      LOGGER.warn(
           String.format("Parse or send TsFile %s error.", 
node.getTsFileResource().getTsFile()), e);
       return false;
     } finally {
@@ -259,7 +254,7 @@ public class LoadTsFileScheduler implements IScheduler {
               LoadTsFileScheduler.LOAD_TASK_MAX_TIME_IN_SECOND, 
TimeUnit.SECONDS);
       if (!result.isSuccessful()) {
         // TODO: retry.
-        logger.warn(
+        LOGGER.warn(
             "Dispatch one piece to ReplicaSet {} error. Result status code {}. 
"
                 + "Result status message {}. Dispatch piece node error:%n{}",
             replicaSet,
@@ -268,7 +263,7 @@ public class LoadTsFileScheduler implements IScheduler {
             pieceNode);
         if (result.getFailureStatus().getSubStatus() != null) {
           for (TSStatus status : result.getFailureStatus().getSubStatus()) {
-            logger.warn(
+            LOGGER.warn(
                 "Sub status code {}. Sub status message {}.",
                 TSStatusCode.representOf(status.getCode()).name(),
                 status.getMessage());
@@ -285,12 +280,12 @@ public class LoadTsFileScheduler implements IScheduler {
       if (e instanceof InterruptedException) {
         Thread.currentThread().interrupt();
       }
-      logger.warn("Interrupt or Execution error.", e);
+      LOGGER.warn("Interrupt or Execution error.", e);
       stateMachine.transitionToFailed(e);
       return false;
     } catch (TimeoutException e) {
       dispatchResultFuture.cancel(true);
-      logger.warn(
+      LOGGER.warn(
           String.format("Wait for loading %s time out.", 
LoadTsFilePieceNode.class.getName()), e);
       stateMachine.transitionToFailed(e);
       return false;
@@ -298,20 +293,24 @@ public class LoadTsFileScheduler implements IScheduler {
     return true;
   }
 
-  private boolean secondPhase(boolean isFirstPhaseSuccess, String uuid, File 
tsFile) {
-    logger.info("Start dispatching Load command for uuid {}", uuid);
-    TLoadCommandReq loadCommandReq =
+  private boolean secondPhase(
+      boolean isFirstPhaseSuccess, String uuid, TsFileResource tsFileResource) 
{
+    LOGGER.info("Start dispatching Load command for uuid {}", uuid);
+    final File tsFile = tsFileResource.getTsFile();
+    final TLoadCommandReq loadCommandReq =
         new TLoadCommandReq(
             (isFirstPhaseSuccess ? LoadCommand.EXECUTE : 
LoadCommand.ROLLBACK).ordinal(), uuid);
-    loadCommandReq.setIsGeneratedByPipe(isGeneratedByPipe);
-    Future<FragInstanceDispatchResult> dispatchResultFuture =
-        dispatcher.dispatchCommand(loadCommandReq, allReplicaSets);
 
     try {
+      loadCommandReq.setIsGeneratedByPipe(isGeneratedByPipe);
+      loadCommandReq.setProgressIndex(assignProgressIndex(tsFileResource));
+      Future<FragInstanceDispatchResult> dispatchResultFuture =
+          dispatcher.dispatchCommand(loadCommandReq, allReplicaSets);
+
       FragInstanceDispatchResult result = dispatchResultFuture.get();
       if (!result.isSuccessful()) {
         // TODO: retry.
-        logger.warn(
+        LOGGER.warn(
             "Dispatch load command {} of TsFile {} error to replicaSets {} 
error. "
                 + "Result status code {}. Result status message {}.",
             loadCommandReq,
@@ -325,19 +324,37 @@ public class LoadTsFileScheduler implements IScheduler {
         stateMachine.transitionToFailed(status);
         return false;
       }
+    } catch (IOException e) {
+      LOGGER.warn(
+          "Serialize Progress Index error, isFirstPhaseSuccess: {}, uuid: {}, 
tsFile: {}",
+          isFirstPhaseSuccess,
+          uuid,
+          tsFile.getAbsolutePath());
+      stateMachine.transitionToFailed(e);
+      return false;
     } catch (InterruptedException | ExecutionException e) {
       if (e instanceof InterruptedException) {
         Thread.currentThread().interrupt();
       }
-      logger.warn("Interrupt or Execution error.", e);
+      LOGGER.warn("Interrupt or Execution error.", e);
       stateMachine.transitionToFailed(e);
       return false;
     }
     return true;
   }
 
+  private ByteBuffer assignProgressIndex(TsFileResource tsFileResource) throws 
IOException {
+    PipeAgent.runtime().assignProgressIndexForTsFileLoad(tsFileResource);
+
+    try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
+        final DataOutputStream dataOutputStream = new 
DataOutputStream(byteArrayOutputStream)) {
+      tsFileResource.getMaxProgressIndex().serialize(dataOutputStream);
+      return ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, 
byteArrayOutputStream.size());
+    }
+  }
+
   private boolean loadLocally(LoadSingleTsFileNode node) throws IoTDBException 
{
-    logger.info("Start load TsFile {} locally.", 
node.getTsFileResource().getTsFile().getPath());
+    LOGGER.info("Start load TsFile {} locally.", 
node.getTsFileResource().getTsFile().getPath());
 
     if (CommonDescriptor.getInstance().getConfig().isReadOnly()) {
       throw new LoadReadOnlyException();
@@ -355,7 +372,7 @@ public class LoadTsFileScheduler implements IScheduler {
       instance.setExecutorAndHost(new 
StorageExecutor(node.getLocalRegionReplicaSet()));
       dispatcher.dispatchLocally(instance);
     } catch (FragmentInstanceDispatchException e) {
-      logger.warn(
+      LOGGER.warn(
           String.format(
               "Dispatch tsFile %s error to local error. Result status code %s. 
"
                   + "Result status message %s.",
@@ -522,7 +539,7 @@ public class LoadTsFileScheduler implements IScheduler {
       for (Map.Entry<TRegionReplicaSet, LoadTsFilePieceNode> entry : 
replicaSet2Piece.entrySet()) {
         block.reduceMemoryUsage(entry.getValue().getDataSize());
         if (!scheduler.dispatchOnePieceNode(entry.getValue(), entry.getKey())) 
{
-          logger.warn(
+          LOGGER.warn(
               "Dispatch piece node {} of TsFile {} error.",
               entry.getValue(),
               singleTsFileNode.getTsFileResource().getTsFile());
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
index de4f2a2200b..67c78b0b6de 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
@@ -28,6 +28,7 @@ import 
org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
 import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.commons.consensus.DataRegionId;
+import org.apache.iotdb.commons.consensus.index.ProgressIndex;
 import org.apache.iotdb.commons.exception.ShutdownException;
 import org.apache.iotdb.commons.exception.StartupException;
 import org.apache.iotdb.commons.file.SystemFileFactory;
@@ -133,8 +134,10 @@ public class StorageEngine implements IService {
   private ScheduledExecutorService unseqMemtableTimedFlushCheckThread;
 
   private TsFileFlushPolicy fileFlushPolicy = new DirectFlushPolicy();
+
   /** used to do short-lived asynchronous tasks */
   private ExecutorService cachedThreadPool;
+
   // add customized listeners here for flush and close events
   private List<CloseFileListener> customCloseFileListeners = new ArrayList<>();
   private List<FlushListener> customFlushListeners = new ArrayList<>();
@@ -795,13 +798,16 @@ public class StorageEngine implements IService {
   }
 
   public TSStatus executeLoadCommand(
-      LoadTsFileScheduler.LoadCommand loadCommand, String uuid, boolean 
isGeneratedByPipe) {
+      LoadTsFileScheduler.LoadCommand loadCommand,
+      String uuid,
+      boolean isGeneratedByPipe,
+      ProgressIndex progressIndex) {
     TSStatus status = new TSStatus();
 
     try {
       switch (loadCommand) {
         case EXECUTE:
-          if (getLoadTsFileManager().loadAll(uuid, isGeneratedByPipe)) {
+          if (getLoadTsFileManager().loadAll(uuid, isGeneratedByPipe, 
progressIndex)) {
             status = RpcUtils.SUCCESS_STATUS;
           } else {
             status.setCode(TSStatusCode.LOAD_FILE_ERROR.getStatusCode());
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/TsFileResourceUtils.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/TsFileResourceUtils.java
index c0ddd059f8d..eeae1731f68 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/TsFileResourceUtils.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/TsFileResourceUtils.java
@@ -19,7 +19,6 @@
 
 package org.apache.iotdb.db.storageengine.dataregion.utils;
 
-import org.apache.iotdb.db.pipe.agent.PipeAgent;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
 import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
 import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.DeviceTimeIndex;
@@ -422,7 +421,6 @@ public class TsFileResourceUtils {
       }
     }
     resource.setStatus(TsFileResourceStatus.NORMAL);
-    PipeAgent.runtime().assignProgressIndexForTsFileLoad(resource);
     return resource;
   }
 }
diff --git a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift 
b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
index 8a73d64e0c3..90b24f68c6d 100644
--- a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
+++ b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
@@ -323,6 +323,7 @@ struct TLoadCommandReq{
     1: required i32 commandType
     2: required string uuid
     3: optional bool isGeneratedByPipe
+    4: optional binary progressIndex
 }
 
 struct TLoadResp{


Reply via email to