This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch multi-cyclic-pipe
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/multi-cyclic-pipe by this push:
     new 96a36d7a0dc ..
96a36d7a0dc is described below

commit 96a36d7a0dc8385bbc1788b691f1b9207e16f63f
Author: Steve Yurong Su <[email protected]>
AuthorDate: Tue Aug 15 21:35:50 2023 +0800

    ..
---
 .../config/constant/PipeExtractorConstant.java     |   2 +-
 .../common/tsfile/PipeTsFileInsertionEvent.java    |  14 ++-
 .../event/realtime/PipeRealtimeEventFactory.java   |   5 +-
 .../PipeHistoricalDataRegionTsFileExtractor.java   |   2 +
 .../listener/PipeInsertionDataNodeListener.java    |   6 +-
 .../receiver/thrift/IoTDBThriftReceiverV1.java     |  14 +--
 .../impl/DataNodeInternalRPCServiceImpl.java       |   8 +-
 .../execution/load/LoadTsFileManager.java          |  12 +-
 .../queryengine/plan/analyze/AnalyzeVisitor.java   |  10 ++
 .../queryengine/plan/execution/QueryExecution.java |   4 +-
 .../plan/planner/LogicalPlanVisitor.java           |   7 ++
 .../planner/plan/node/load/LoadTsFileNode.java     |   4 -
 .../scheduler/load/LoadTsFileDispatcherImpl.java   |  11 +-
 .../plan/scheduler/load/LoadTsFileScheduler.java   |  11 +-
 .../plan/statement/StatementVisitor.java           |   6 +
 .../plan/statement/crud/LoadTsFileStatement.java   |  11 ++
 .../crud/PipeEnrichedLoadTsFileStatement.java      | 132 +++++++++++++++++++++
 .../iotdb/db/storageengine/StorageEngine.java      |   5 +-
 .../db/storageengine/dataregion/DataRegion.java    |   7 +-
 .../dataregion/memtable/TsFileProcessor.java       |   3 +-
 .../rescon/quotas/DefaultOperationQuota.java       |   6 +-
 .../db/pipe/extractor/PipeRealtimeExtractTest.java |   3 +-
 .../scheduler/load/LoadTsFileSchedulerTest.java    |   3 +-
 .../thrift/src/main/thrift/datanode.thrift         |   1 +
 24 files changed, 241 insertions(+), 46 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeExtractorConstant.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeExtractorConstant.java
index 810f7a717f0..6a7c095c148 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeExtractorConstant.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeExtractorConstant.java
@@ -28,7 +28,7 @@ public class PipeExtractorConstant {
 
   public static final String EXTRACTOR_FORWARDING_PIPE_REQUESTS_KEY =
       "extractor.forwarding-pipe-requests";
-  public static final boolean EXTRACTOR_FORWARDING_PIPE_REQUESTS_DEFAULT_VALUE 
= false;
+  public static final boolean EXTRACTOR_FORWARDING_PIPE_REQUESTS_DEFAULT_VALUE 
= true;
 
   public static final String EXTRACTOR_HISTORY_ENABLE_KEY = 
"extractor.history.enable";
   public static final String EXTRACTOR_HISTORY_START_TIME = 
"extractor.history.start-time";
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
index 52dd0d594bd..5705f5c8600 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
@@ -48,16 +48,19 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent 
implements TsFileIns
   private final TsFileResource resource;
   private File tsFile;
 
+  private final boolean isGeneratedByPipe;
+
   private final AtomicBoolean isClosed;
 
   private TsFileInsertionDataContainer dataContainer;
 
-  public PipeTsFileInsertionEvent(TsFileResource resource) {
-    this(resource, null, null, Long.MIN_VALUE, Long.MAX_VALUE);
+  public PipeTsFileInsertionEvent(TsFileResource resource, boolean 
isGeneratedByPipe) {
+    this(resource, isGeneratedByPipe, null, null, Long.MIN_VALUE, 
Long.MAX_VALUE);
   }
 
   public PipeTsFileInsertionEvent(
       TsFileResource resource,
+      boolean isGeneratedByPipe,
       PipeTaskMeta pipeTaskMeta,
       String pattern,
       long startTime,
@@ -70,6 +73,8 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent 
implements TsFileIns
     this.resource = resource;
     tsFile = resource.getTsFile();
 
+    this.isGeneratedByPipe = isGeneratedByPipe;
+
     isClosed = new AtomicBoolean(resource.isClosed());
     // register close listener if TsFile is not closed
     if (!isClosed.get()) {
@@ -153,12 +158,13 @@ public class PipeTsFileInsertionEvent extends 
EnrichedEvent implements TsFileIns
   @Override
   public PipeTsFileInsertionEvent 
shallowCopySelfAndBindPipeTaskMetaForProgressReport(
       PipeTaskMeta pipeTaskMeta, String pattern) {
-    return new PipeTsFileInsertionEvent(resource, pipeTaskMeta, pattern, 
startTime, endTime);
+    return new PipeTsFileInsertionEvent(
+        resource, isGeneratedByPipe, pipeTaskMeta, pattern, startTime, 
endTime);
   }
 
   @Override
   public boolean isGeneratedByPipe() {
-    return false;
+    return isGeneratedByPipe;
   }
 
   /////////////////////////// TsFileInsertionEvent ///////////////////////////
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEventFactory.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEventFactory.java
index 1be1f91d1a7..6af720da2f2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEventFactory.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEventFactory.java
@@ -30,9 +30,10 @@ public class PipeRealtimeEventFactory {
 
   private static final TsFileEpochManager TS_FILE_EPOCH_MANAGER = new 
TsFileEpochManager();
 
-  public static PipeRealtimeEvent createRealtimeEvent(TsFileResource resource) 
{
+  public static PipeRealtimeEvent createRealtimeEvent(
+      TsFileResource resource, boolean isGeneratedByPipe) {
     return TS_FILE_EPOCH_MANAGER.bindPipeTsFileInsertionEvent(
-        new PipeTsFileInsertionEvent(resource), resource);
+        new PipeTsFileInsertionEvent(resource, isGeneratedByPipe), resource);
   }
 
   public static PipeRealtimeEvent createRealtimeEvent(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java
index a6c6b4d60ee..506dd6adcd7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java
@@ -179,6 +179,7 @@ public class PipeHistoricalDataRegionTsFileExtractor 
implements PipeHistoricalDa
                     resource ->
                         new PipeTsFileInsertionEvent(
                             resource,
+                            false,
                             pipeTaskMeta,
                             pattern,
                             historicalDataExtractionStartTime,
@@ -195,6 +196,7 @@ public class PipeHistoricalDataRegionTsFileExtractor 
implements PipeHistoricalDa
                     resource ->
                         new PipeTsFileInsertionEvent(
                             resource,
+                            false,
                             pipeTaskMeta,
                             pattern,
                             historicalDataExtractionStartTime,
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/listener/PipeInsertionDataNodeListener.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/listener/PipeInsertionDataNodeListener.java
index b7a7a0c3770..35648b1bb5c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/listener/PipeInsertionDataNodeListener.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/listener/PipeInsertionDataNodeListener.java
@@ -91,7 +91,8 @@ public class PipeInsertionDataNodeListener {
 
   //////////////////////////// listen to events ////////////////////////////
 
-  public void listenToTsFile(String dataRegionId, TsFileResource 
tsFileResource) {
+  public void listenToTsFile(
+      String dataRegionId, TsFileResource tsFileResource, boolean 
isGeneratedByPipe) {
     // We don't judge whether listenToTsFileExtractorCount.get() == 0 here on 
purpose
     // because extractors may use tsfile events when some exceptions occur in 
the
     // insert nodes listening process.
@@ -103,7 +104,8 @@ public class PipeInsertionDataNodeListener {
       return;
     }
 
-    
assigner.publishToAssign(PipeRealtimeEventFactory.createRealtimeEvent(tsFileResource));
+    assigner.publishToAssign(
+        PipeRealtimeEventFactory.createRealtimeEvent(tsFileResource, 
isGeneratedByPipe));
   }
 
   public void listenToInsertNode(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBThriftReceiverV1.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBThriftReceiverV1.java
index 39c47acf439..481947dbd39 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBThriftReceiverV1.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBThriftReceiverV1.java
@@ -41,6 +41,7 @@ import 
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertBaseStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
 import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.crud.PipeEnrichedInsertBaseStatement;
+import 
org.apache.iotdb.db.queryengine.plan.statement.crud.PipeEnrichedLoadTsFileStatement;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
@@ -414,15 +415,10 @@ public class IoTDBThriftReceiverV1 implements 
IoTDBThriftReceiver {
           TSStatusCode.PIPE_TRANSFER_EXECUTE_STATEMENT_ERROR, "Execute null 
statement.");
     }
 
-    switch (statement.getType()) {
-      case INSERT:
-      case BATCH_INSERT:
-      case BATCH_INSERT_ROWS:
-      case BATCH_INSERT_ONE_DEVICE:
-      case MULTI_BATCH_INSERT:
-        statement = new PipeEnrichedInsertBaseStatement((InsertBaseStatement) 
statement);
-        break;
-        // TODO: LOAD
+    if (statement instanceof InsertBaseStatement) {
+      statement = new PipeEnrichedInsertBaseStatement((InsertBaseStatement) 
statement);
+    } else if (statement instanceof LoadTsFileStatement) {
+      statement = new PipeEnrichedLoadTsFileStatement((LoadTsFileStatement) 
statement);
     }
 
     final ExecutionResult result =
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
index 33c6cfcee4d..826aeaa0cd3 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -401,12 +401,12 @@ public class DataNodeInternalRPCServiceImpl implements 
IDataNodeRPCService.Iface
 
   @Override
   public TLoadResp sendLoadCommand(TLoadCommandReq req) {
-
-    TSStatus resultStatus =
+    return createTLoadResp(
         StorageEngine.getInstance()
             .executeLoadCommand(
-                LoadTsFileScheduler.LoadCommand.values()[req.commandType], 
req.uuid);
-    return createTLoadResp(resultStatus);
+                LoadTsFileScheduler.LoadCommand.values()[req.commandType],
+                req.uuid,
+                req.isSetIsGeneratedByPipe() && req.isGeneratedByPipe));
   }
 
   private TLoadResp createTLoadResp(TSStatus resultStatus) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
index b1ab2b77e6b..97542ceab24 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
@@ -133,11 +133,12 @@ public class LoadTsFileManager {
     }
   }
 
-  public boolean loadAll(String uuid) throws IOException, LoadFileException {
+  public boolean loadAll(String uuid, boolean isGeneratedByPipe)
+      throws IOException, LoadFileException {
     if (!uuid2WriterManager.containsKey(uuid)) {
       return false;
     }
-    uuid2WriterManager.get(uuid).loadAll();
+    uuid2WriterManager.get(uuid).loadAll(isGeneratedByPipe);
     clean(uuid);
     return true;
   }
@@ -249,7 +250,7 @@ public class LoadTsFileManager {
       }
     }
 
-    private void loadAll() throws IOException, LoadFileException {
+    private void loadAll(boolean isGeneratedByPipe) throws IOException, 
LoadFileException {
       if (isClosed) {
         throw new 
IOException(String.format(MESSAGE_WRITER_MANAGER_HAS_BEEN_CLOSED, taskDir));
       }
@@ -259,7 +260,10 @@ public class LoadTsFileManager {
           writer.endChunkGroup();
         }
         writer.endFile();
-        entry.getKey().getDataRegion().loadNewTsFile(generateResource(writer), 
true);
+        entry
+            .getKey()
+            .getDataRegion()
+            .loadNewTsFile(generateResource(writer), true, isGeneratedByPipe);
       }
     }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
index 3cfda307723..5a045d5c051 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
@@ -99,6 +99,7 @@ import 
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
 import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.crud.PipeEnrichedInsertBaseStatement;
+import 
org.apache.iotdb.db.queryengine.plan.statement.crud.PipeEnrichedLoadTsFileStatement;
 import org.apache.iotdb.db.queryengine.plan.statement.crud.QueryStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.internal.InternalBatchActivateTemplateStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.internal.InternalCreateMultiTimeSeriesStatement;
@@ -2509,6 +2510,15 @@ public class AnalyzeVisitor extends 
StatementVisitor<Analysis, MPPQueryContext>
         .analyzeFileByFile();
   }
 
+  @Override
+  public Analysis visitPipeEnrichedLoadFile(
+      PipeEnrichedLoadTsFileStatement pipeEnrichedLoadTsFileStatement, 
MPPQueryContext context) {
+    final Analysis analysis =
+        
visitLoadFile(pipeEnrichedLoadTsFileStatement.getLoadTsFileStatement(), 
context);
+    analysis.setStatement(pipeEnrichedLoadTsFileStatement);
+    return analysis;
+  }
+
   /** get analysis according to statement and params */
   private Analysis getAnalysisForWriting(
       Analysis analysis, List<DataPartitionQueryParam> 
dataPartitionQueryParams) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java
index f7bca3e2a73..14765cfe2aa 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java
@@ -64,6 +64,7 @@ import 
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertBaseStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertMultiTabletsStatement;
 import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement;
 import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
+import 
org.apache.iotdb.db.queryengine.plan.statement.crud.PipeEnrichedLoadTsFileStatement;
 import org.apache.iotdb.db.utils.SetThreadName;
 import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
 import org.apache.iotdb.rpc.RpcUtils;
@@ -322,7 +323,8 @@ public class QueryExecution implements IQueryExecution {
               context,
               stateMachine,
               syncInternalServiceClientManager,
-              partitionFetcher);
+              partitionFetcher,
+              rawStatement instanceof PipeEnrichedLoadTsFileStatement);
       this.scheduler.start();
       return;
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java
index d5889cf208e..cbe1cbfa509 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java
@@ -61,6 +61,7 @@ import 
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
 import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.crud.PipeEnrichedInsertBaseStatement;
+import 
org.apache.iotdb.db.queryengine.plan.statement.crud.PipeEnrichedLoadTsFileStatement;
 import org.apache.iotdb.db.queryengine.plan.statement.crud.QueryStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.internal.InternalBatchActivateTemplateStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.internal.InternalCreateMultiTimeSeriesStatement;
@@ -529,6 +530,12 @@ public class LogicalPlanVisitor extends 
StatementVisitor<PlanNode, MPPQueryConte
         context.getQueryId().genPlanNodeId(), 
loadTsFileStatement.getResources());
   }
 
+  @Override
+  public PlanNode visitPipeEnrichedLoadFile(
+      PipeEnrichedLoadTsFileStatement pipeEnrichedLoadTsFileStatement, 
MPPQueryContext context) {
+    return 
visitLoadFile(pipeEnrichedLoadTsFileStatement.getLoadTsFileStatement(), 
context);
+  }
+
   @Override
   public PlanNode visitShowTimeSeries(
       ShowTimeSeriesStatement showTimeSeriesStatement, MPPQueryContext 
context) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFileNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFileNode.java
index f304678fc3c..793e6139693 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFileNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFileNode.java
@@ -40,10 +40,6 @@ public class LoadTsFileNode extends WritePlanNode {
 
   private final List<TsFileResource> resources;
 
-  public LoadTsFileNode(PlanNodeId id) {
-    this(id, new ArrayList<>());
-  }
-
   public LoadTsFileNode(PlanNodeId id, List<TsFileResource> resources) {
     super(id);
     this.resources = resources;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java
index 075c6e1433a..620920a01a8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java
@@ -68,16 +68,19 @@ public class LoadTsFileDispatcherImpl implements 
IFragInstanceDispatcher {
   private final IClientManager<TEndPoint, SyncDataNodeInternalServiceClient>
       internalServiceClientManager;
   private final ExecutorService executor;
+  private final boolean isGeneratedByPipe;
 
   private static final String NODE_CONNECTION_ERROR = "can't connect to node 
{}";
 
   public LoadTsFileDispatcherImpl(
-      IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> 
internalServiceClientManager) {
+      IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> 
internalServiceClientManager,
+      boolean isGeneratedByPipe) {
     this.internalServiceClientManager = internalServiceClientManager;
     this.localhostIpAddr = 
IoTDBDescriptor.getInstance().getConfig().getInternalAddress();
     this.localhostInternalPort = 
IoTDBDescriptor.getInstance().getConfig().getInternalPort();
     this.executor =
         
IoTDBThreadPoolFactory.newCachedThreadPool(LoadTsFileDispatcherImpl.class.getName());
+    this.isGeneratedByPipe = isGeneratedByPipe;
   }
 
   public void setUuid(String uuid) {
@@ -178,7 +181,8 @@ public class LoadTsFileDispatcherImpl implements 
IFragInstanceDispatcher {
         StorageEngine.getInstance()
             .executeLoadCommand(
                 
LoadTsFileScheduler.LoadCommand.values()[loadCommandReq.commandType],
-                loadCommandReq.uuid);
+                loadCommandReq.uuid,
+                loadCommandReq.isSetIsGeneratedByPipe() && 
loadCommandReq.isGeneratedByPipe);
     if (!RpcUtils.SUCCESS_STATUS.equals(resultStatus)) {
       throw new FragmentInstanceDispatchException(resultStatus);
     }
@@ -211,7 +215,8 @@ public class LoadTsFileDispatcherImpl implements 
IFragInstanceDispatcher {
             .getDataRegion((DataRegionId) groupId)
             .loadNewTsFile(
                 ((LoadSingleTsFileNode) planNode).getTsFileResource(),
-                ((LoadSingleTsFileNode) planNode).isDeleteAfterLoad());
+                ((LoadSingleTsFileNode) planNode).isDeleteAfterLoad(),
+                isGeneratedByPipe);
       } catch (LoadFileException e) {
         logger.warn(String.format("Load TsFile Node %s error.", planNode), e);
         TSStatus resultStatus = new TSStatus();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
index 9127eead0ad..834829c3849 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
@@ -102,22 +102,24 @@ public class LoadTsFileScheduler implements IScheduler {
   private final DataPartitionBatchFetcher partitionFetcher;
   private final List<LoadSingleTsFileNode> tsFileNodeList;
   private final PlanFragmentId fragmentId;
-
-  private Set<TRegionReplicaSet> allReplicaSets;
+  private final Set<TRegionReplicaSet> allReplicaSets;
+  private final boolean isGeneratedByPipe;
 
   public LoadTsFileScheduler(
       DistributedQueryPlan distributedQueryPlan,
       MPPQueryContext queryContext,
       QueryStateMachine stateMachine,
       IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> 
internalServiceClientManager,
-      IPartitionFetcher partitionFetcher) {
+      IPartitionFetcher partitionFetcher,
+      boolean isGeneratedByPipe) {
     this.queryContext = queryContext;
     this.stateMachine = stateMachine;
     this.tsFileNodeList = new ArrayList<>();
     this.fragmentId = 
distributedQueryPlan.getRootSubPlan().getPlanFragment().getId();
-    this.dispatcher = new 
LoadTsFileDispatcherImpl(internalServiceClientManager);
+    this.dispatcher = new 
LoadTsFileDispatcherImpl(internalServiceClientManager, isGeneratedByPipe);
     this.partitionFetcher = new DataPartitionBatchFetcher(partitionFetcher);
     this.allReplicaSets = new HashSet<>();
+    this.isGeneratedByPipe = isGeneratedByPipe;
 
     for (FragmentInstance fragmentInstance : 
distributedQueryPlan.getInstances()) {
       tsFileNodeList.add((LoadSingleTsFileNode) 
fragmentInstance.getFragment().getPlanNodeTree());
@@ -279,6 +281,7 @@ public class LoadTsFileScheduler implements IScheduler {
     TLoadCommandReq loadCommandReq =
         new TLoadCommandReq(
             (isFirstPhaseSuccess ? LoadCommand.EXECUTE : 
LoadCommand.ROLLBACK).ordinal(), uuid);
+    loadCommandReq.setIsGeneratedByPipe(isGeneratedByPipe);
     Future<FragInstanceDispatchResult> dispatchResultFuture =
         dispatcher.dispatchCommand(loadCommandReq, allReplicaSets);
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementVisitor.java
index 1ffa3d8f108..17bee5aa40b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementVisitor.java
@@ -28,6 +28,7 @@ import 
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
 import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.crud.PipeEnrichedInsertBaseStatement;
+import 
org.apache.iotdb.db.queryengine.plan.statement.crud.PipeEnrichedLoadTsFileStatement;
 import org.apache.iotdb.db.queryengine.plan.statement.crud.QueryStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.internal.InternalBatchActivateTemplateStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.internal.InternalCreateMultiTimeSeriesStatement;
@@ -303,6 +304,11 @@ public abstract class StatementVisitor<R, C> {
     return visitStatement(loadTsFileStatement, context);
   }
 
+  public R visitPipeEnrichedLoadFile(
+      PipeEnrichedLoadTsFileStatement pipeEnrichedLoadTsFileStatement, C 
context) {
+    return visitStatement(pipeEnrichedLoadTsFileStatement, context);
+  }
+
   public R visitInsertRow(InsertRowStatement insertRowStatement, C context) {
     return visitStatement(insertRowStatement, context);
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java
index 381d367afe9..a58da41af62 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java
@@ -69,6 +69,17 @@ public class LoadTsFileStatement extends Statement {
     sortTsFiles(tsFiles);
   }
 
+  protected LoadTsFileStatement() {
+    this.file = null;
+    this.databaseLevel = 
IoTDBDescriptor.getInstance().getConfig().getDefaultStorageGroupLevel();
+    this.verifySchema = true;
+    this.deleteAfterLoad = true;
+    this.autoCreateDatabase = 
IoTDBDescriptor.getInstance().getConfig().isAutoCreateSchemaEnabled();
+    this.tsFiles = new ArrayList<>();
+    this.resources = new ArrayList<>();
+    this.statementType = StatementType.MULTI_BATCH_INSERT;
+  }
+
   private void findAllTsFile(File file) {
     final File[] files = file.listFiles();
     if (files == null) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/PipeEnrichedLoadTsFileStatement.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/PipeEnrichedLoadTsFileStatement.java
new file mode 100644
index 00000000000..c25749c4838
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/PipeEnrichedLoadTsFileStatement.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.statement.crud;
+
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.queryengine.plan.statement.StatementType;
+import org.apache.iotdb.db.queryengine.plan.statement.StatementVisitor;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+
+import java.io.File;
+import java.util.List;
+
+public class PipeEnrichedLoadTsFileStatement extends LoadTsFileStatement {
+
+  private final LoadTsFileStatement loadTsFileStatement;
+
+  public PipeEnrichedLoadTsFileStatement(LoadTsFileStatement 
loadTsFileStatement) {
+    this.loadTsFileStatement = loadTsFileStatement;
+    statementType = StatementType.MULTI_BATCH_INSERT;
+  }
+
+  public LoadTsFileStatement getLoadTsFileStatement() {
+    return loadTsFileStatement;
+  }
+
+  @Override
+  public boolean isDebug() {
+    return loadTsFileStatement.isDebug();
+  }
+
+  @Override
+  public void setDebug(boolean debug) {
+    loadTsFileStatement.setDebug(debug);
+  }
+
+  @Override
+  public boolean isQuery() {
+    return loadTsFileStatement.isQuery();
+  }
+
+  @Override
+  public boolean isAuthenticationRequired() {
+    return loadTsFileStatement.isAuthenticationRequired();
+  }
+
+  @Override
+  public void setDeleteAfterLoad(boolean deleteAfterLoad) {
+    loadTsFileStatement.setDeleteAfterLoad(deleteAfterLoad);
+  }
+
+  @Override
+  public void setDatabaseLevel(int databaseLevel) {
+    loadTsFileStatement.setDatabaseLevel(databaseLevel);
+  }
+
+  @Override
+  public void setVerifySchema(boolean verifySchema) {
+    loadTsFileStatement.setVerifySchema(verifySchema);
+  }
+
+  @Override
+  public void setAutoCreateDatabase(boolean autoCreateDatabase) {
+    loadTsFileStatement.setAutoCreateDatabase(autoCreateDatabase);
+  }
+
+  @Override
+  public boolean isVerifySchema() {
+    return loadTsFileStatement.isVerifySchema();
+  }
+
+  @Override
+  public boolean isDeleteAfterLoad() {
+    return loadTsFileStatement.isDeleteAfterLoad();
+  }
+
+  @Override
+  public boolean isAutoCreateDatabase() {
+    return loadTsFileStatement.isAutoCreateDatabase();
+  }
+
+  @Override
+  public int getDatabaseLevel() {
+    return loadTsFileStatement.getDatabaseLevel();
+  }
+
+  @Override
+  public List<File> getTsFiles() {
+    return loadTsFileStatement.getTsFiles();
+  }
+
+  @Override
+  public void addTsFileResource(TsFileResource resource) {
+    loadTsFileStatement.addTsFileResource(resource);
+  }
+
+  @Override
+  public List<TsFileResource> getResources() {
+    return loadTsFileStatement.getResources();
+  }
+
+  @Override
+  public List<PartialPath> getPaths() {
+    return loadTsFileStatement.getPaths();
+  }
+
+  @Override
+  public <R, C> R accept(StatementVisitor<R, C> visitor, C context) {
+    return visitor.visitPipeEnrichedLoadFile(this, context);
+  }
+
+  @Override
+  public String toString() {
+    return "PipeEnrichedLoadTsFileStatement{" + "loadTsFileStatement=" + 
loadTsFileStatement + '}';
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
index 9fc9b49c38d..d1b8cd8c1ff 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
@@ -796,13 +796,14 @@ public class StorageEngine implements IService {
     return RpcUtils.SUCCESS_STATUS;
   }
 
-  public TSStatus executeLoadCommand(LoadTsFileScheduler.LoadCommand 
loadCommand, String uuid) {
+  public TSStatus executeLoadCommand(
+      LoadTsFileScheduler.LoadCommand loadCommand, String uuid, boolean 
isGeneratedByPipe) {
     TSStatus status = new TSStatus();
 
     try {
       switch (loadCommand) {
         case EXECUTE:
-          if (getLoadTsFileManager().loadAll(uuid)) {
+          if (getLoadTsFileManager().loadAll(uuid, isGeneratedByPipe)) {
             status = RpcUtils.SUCCESS_STATUS;
           } else {
             status.setCode(TSStatusCode.LOAD_FILE_ERROR.getStatusCode());
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index 0011db21534..95ffb6264a7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -2197,8 +2197,10 @@ public class DataRegion implements IDataRegionForQuery {
    *
    * @param newTsFileResource tsfile resource @UsedBy load external tsfile 
module
    * @param deleteOriginFile whether to delete origin tsfile
+   * @param isGeneratedByPipe whether the load tsfile request is generated by 
pipe
    */
-  public void loadNewTsFile(TsFileResource newTsFileResource, boolean 
deleteOriginFile)
+  public void loadNewTsFile(
+      TsFileResource newTsFileResource, boolean deleteOriginFile, boolean 
isGeneratedByPipe)
       throws LoadFileException {
     File tsfileToBeInserted = newTsFileResource.getTsFile();
     long newFilePartitionId = newTsFileResource.getTimePartitionWithCheck();
@@ -2223,7 +2225,8 @@ public class DataRegion implements IDataRegionForQuery {
       loadTsFileToUnSequence(
           tsfileToBeInserted, newTsFileResource, newFilePartitionId, 
deleteOriginFile);
 
-      PipeInsertionDataNodeListener.getInstance().listenToTsFile(dataRegionId, 
newTsFileResource);
+      PipeInsertionDataNodeListener.getInstance()
+          .listenToTsFile(dataRegionId, newTsFileResource, isGeneratedByPipe);
 
       FileMetrics.getInstance()
           .addFile(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
index dc7086dde70..45e755debb3 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
@@ -893,7 +893,8 @@ public class TsFileProcessor {
       try {
         PipeAgent.runtime().assignSimpleProgressIndexIfNeeded(tsFileResource);
         PipeInsertionDataNodeListener.getInstance()
-            .listenToTsFile(dataRegionInfo.getDataRegion().getDataRegionId(), 
tsFileResource);
+            .listenToTsFile(
+                dataRegionInfo.getDataRegion().getDataRegionId(), 
tsFileResource, false);
 
         // When invoke closing TsFile after insert data to memTable, we 
shouldn't flush until invoke
         // flushing memTable in System module.
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/quotas/DefaultOperationQuota.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/quotas/DefaultOperationQuota.java
index 750258eef55..b6b714b5de2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/quotas/DefaultOperationQuota.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/quotas/DefaultOperationQuota.java
@@ -30,6 +30,7 @@ import 
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
 import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.crud.PipeEnrichedInsertBaseStatement;
+import 
org.apache.iotdb.db.queryengine.plan.statement.crud.PipeEnrichedLoadTsFileStatement;
 import org.apache.iotdb.db.utils.TypeInferenceUtils;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.utils.BitMap;
@@ -137,7 +138,10 @@ public class DefaultOperationQuota implements 
OperationQuota {
           }
           break;
         case MULTI_BATCH_INSERT:
-          // LoadTsFileStatement  InsertMultiTabletsStatement
+          // PipeEnrichedLoadTsFileStatement  LoadTsFileStatement  
InsertMultiTabletsStatement
+          if (s instanceof PipeEnrichedLoadTsFileStatement) {
+            s = ((PipeEnrichedLoadTsFileStatement) s).getLoadTsFileStatement();
+          }
           if (s instanceof LoadTsFileStatement) {
             LoadTsFileStatement loadTsFileStatement = (LoadTsFileStatement) s;
             for (int i = 0; i < loadTsFileStatement.getResources().size(); 
i++) {
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/extractor/PipeRealtimeExtractTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/extractor/PipeRealtimeExtractTest.java
index f6dabc9b81d..1900ff539bb 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/extractor/PipeRealtimeExtractTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/extractor/PipeRealtimeExtractTest.java
@@ -286,7 +286,8 @@ public class PipeRealtimeExtractTest {
                         null,
                         false),
                     resource);
-            
PipeInsertionDataNodeListener.getInstance().listenToTsFile(dataRegionId, 
resource);
+            PipeInsertionDataNodeListener.getInstance()
+                .listenToTsFile(dataRegionId, resource, false);
           }
         });
   }
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileSchedulerTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileSchedulerTest.java
index 741db029ea0..161a95b4ec9 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileSchedulerTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileSchedulerTest.java
@@ -61,7 +61,8 @@ public class LoadTsFileSchedulerTest {
                 mock(MPPQueryContext.class),
                 mock(QueryStateMachine.class),
                 mock(IClientManager.class),
-                mock(IPartitionFetcher.class)));
+                mock(IPartitionFetcher.class),
+                false));
     t.start();
     Assert.assertNull(t.getTotalCpuTime());
     Assert.assertNull(t.getFragmentInfo());
diff --git a/iotdb-protocol/thrift/src/main/thrift/datanode.thrift 
b/iotdb-protocol/thrift/src/main/thrift/datanode.thrift
index 99f2ed1419d..aa522f2d77b 100644
--- a/iotdb-protocol/thrift/src/main/thrift/datanode.thrift
+++ b/iotdb-protocol/thrift/src/main/thrift/datanode.thrift
@@ -313,6 +313,7 @@ struct TTsFilePieceReq{
 struct TLoadCommandReq{
     1: required i32 commandType
     2: required string uuid
+    3: optional bool isGeneratedByPipe
 }
 
 struct TLoadResp{


Reply via email to