This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch multi-cyclic-pipe
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/multi-cyclic-pipe by this push:
new 96a36d7a0dc ..
96a36d7a0dc is described below
commit 96a36d7a0dc8385bbc1788b691f1b9207e16f63f
Author: Steve Yurong Su <[email protected]>
AuthorDate: Tue Aug 15 21:35:50 2023 +0800
..
---
.../config/constant/PipeExtractorConstant.java | 2 +-
.../common/tsfile/PipeTsFileInsertionEvent.java | 14 ++-
.../event/realtime/PipeRealtimeEventFactory.java | 5 +-
.../PipeHistoricalDataRegionTsFileExtractor.java | 2 +
.../listener/PipeInsertionDataNodeListener.java | 6 +-
.../receiver/thrift/IoTDBThriftReceiverV1.java | 14 +--
.../impl/DataNodeInternalRPCServiceImpl.java | 8 +-
.../execution/load/LoadTsFileManager.java | 12 +-
.../queryengine/plan/analyze/AnalyzeVisitor.java | 10 ++
.../queryengine/plan/execution/QueryExecution.java | 4 +-
.../plan/planner/LogicalPlanVisitor.java | 7 ++
.../planner/plan/node/load/LoadTsFileNode.java | 4 -
.../scheduler/load/LoadTsFileDispatcherImpl.java | 11 +-
.../plan/scheduler/load/LoadTsFileScheduler.java | 11 +-
.../plan/statement/StatementVisitor.java | 6 +
.../plan/statement/crud/LoadTsFileStatement.java | 11 ++
.../crud/PipeEnrichedLoadTsFileStatement.java | 132 +++++++++++++++++++++
.../iotdb/db/storageengine/StorageEngine.java | 5 +-
.../db/storageengine/dataregion/DataRegion.java | 7 +-
.../dataregion/memtable/TsFileProcessor.java | 3 +-
.../rescon/quotas/DefaultOperationQuota.java | 6 +-
.../db/pipe/extractor/PipeRealtimeExtractTest.java | 3 +-
.../scheduler/load/LoadTsFileSchedulerTest.java | 3 +-
.../thrift/src/main/thrift/datanode.thrift | 1 +
24 files changed, 241 insertions(+), 46 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeExtractorConstant.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeExtractorConstant.java
index 810f7a717f0..6a7c095c148 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeExtractorConstant.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeExtractorConstant.java
@@ -28,7 +28,7 @@ public class PipeExtractorConstant {
public static final String EXTRACTOR_FORWARDING_PIPE_REQUESTS_KEY =
"extractor.forwarding-pipe-requests";
- public static final boolean EXTRACTOR_FORWARDING_PIPE_REQUESTS_DEFAULT_VALUE
= false;
+ public static final boolean EXTRACTOR_FORWARDING_PIPE_REQUESTS_DEFAULT_VALUE
= true;
public static final String EXTRACTOR_HISTORY_ENABLE_KEY =
"extractor.history.enable";
public static final String EXTRACTOR_HISTORY_START_TIME =
"extractor.history.start-time";
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
index 52dd0d594bd..5705f5c8600 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
@@ -48,16 +48,19 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent
implements TsFileIns
private final TsFileResource resource;
private File tsFile;
+ private final boolean isGeneratedByPipe;
+
private final AtomicBoolean isClosed;
private TsFileInsertionDataContainer dataContainer;
- public PipeTsFileInsertionEvent(TsFileResource resource) {
- this(resource, null, null, Long.MIN_VALUE, Long.MAX_VALUE);
+ public PipeTsFileInsertionEvent(TsFileResource resource, boolean
isGeneratedByPipe) {
+ this(resource, isGeneratedByPipe, null, null, Long.MIN_VALUE,
Long.MAX_VALUE);
}
public PipeTsFileInsertionEvent(
TsFileResource resource,
+ boolean isGeneratedByPipe,
PipeTaskMeta pipeTaskMeta,
String pattern,
long startTime,
@@ -70,6 +73,8 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent
implements TsFileIns
this.resource = resource;
tsFile = resource.getTsFile();
+ this.isGeneratedByPipe = isGeneratedByPipe;
+
isClosed = new AtomicBoolean(resource.isClosed());
// register close listener if TsFile is not closed
if (!isClosed.get()) {
@@ -153,12 +158,13 @@ public class PipeTsFileInsertionEvent extends
EnrichedEvent implements TsFileIns
@Override
public PipeTsFileInsertionEvent
shallowCopySelfAndBindPipeTaskMetaForProgressReport(
PipeTaskMeta pipeTaskMeta, String pattern) {
- return new PipeTsFileInsertionEvent(resource, pipeTaskMeta, pattern,
startTime, endTime);
+ return new PipeTsFileInsertionEvent(
+ resource, isGeneratedByPipe, pipeTaskMeta, pattern, startTime,
endTime);
}
@Override
public boolean isGeneratedByPipe() {
- return false;
+ return isGeneratedByPipe;
}
/////////////////////////// TsFileInsertionEvent ///////////////////////////
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEventFactory.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEventFactory.java
index 1be1f91d1a7..6af720da2f2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEventFactory.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEventFactory.java
@@ -30,9 +30,10 @@ public class PipeRealtimeEventFactory {
private static final TsFileEpochManager TS_FILE_EPOCH_MANAGER = new
TsFileEpochManager();
- public static PipeRealtimeEvent createRealtimeEvent(TsFileResource resource)
{
+ public static PipeRealtimeEvent createRealtimeEvent(
+ TsFileResource resource, boolean isGeneratedByPipe) {
return TS_FILE_EPOCH_MANAGER.bindPipeTsFileInsertionEvent(
- new PipeTsFileInsertionEvent(resource), resource);
+ new PipeTsFileInsertionEvent(resource, isGeneratedByPipe), resource);
}
public static PipeRealtimeEvent createRealtimeEvent(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java
index a6c6b4d60ee..506dd6adcd7 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java
@@ -179,6 +179,7 @@ public class PipeHistoricalDataRegionTsFileExtractor
implements PipeHistoricalDa
resource ->
new PipeTsFileInsertionEvent(
resource,
+ false,
pipeTaskMeta,
pattern,
historicalDataExtractionStartTime,
@@ -195,6 +196,7 @@ public class PipeHistoricalDataRegionTsFileExtractor
implements PipeHistoricalDa
resource ->
new PipeTsFileInsertionEvent(
resource,
+ false,
pipeTaskMeta,
pattern,
historicalDataExtractionStartTime,
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/listener/PipeInsertionDataNodeListener.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/listener/PipeInsertionDataNodeListener.java
index b7a7a0c3770..35648b1bb5c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/listener/PipeInsertionDataNodeListener.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/listener/PipeInsertionDataNodeListener.java
@@ -91,7 +91,8 @@ public class PipeInsertionDataNodeListener {
//////////////////////////// listen to events ////////////////////////////
- public void listenToTsFile(String dataRegionId, TsFileResource
tsFileResource) {
+ public void listenToTsFile(
+ String dataRegionId, TsFileResource tsFileResource, boolean
isGeneratedByPipe) {
// We don't judge whether listenToTsFileExtractorCount.get() == 0 here on
purpose
// because extractors may use tsfile events when some exceptions occur in
the
// insert nodes listening process.
@@ -103,7 +104,8 @@ public class PipeInsertionDataNodeListener {
return;
}
-
assigner.publishToAssign(PipeRealtimeEventFactory.createRealtimeEvent(tsFileResource));
+ assigner.publishToAssign(
+ PipeRealtimeEventFactory.createRealtimeEvent(tsFileResource,
isGeneratedByPipe));
}
public void listenToInsertNode(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBThriftReceiverV1.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBThriftReceiverV1.java
index 39c47acf439..481947dbd39 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBThriftReceiverV1.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBThriftReceiverV1.java
@@ -41,6 +41,7 @@ import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertBaseStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.crud.PipeEnrichedInsertBaseStatement;
+import
org.apache.iotdb.db.queryengine.plan.statement.crud.PipeEnrichedLoadTsFileStatement;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
@@ -414,15 +415,10 @@ public class IoTDBThriftReceiverV1 implements
IoTDBThriftReceiver {
TSStatusCode.PIPE_TRANSFER_EXECUTE_STATEMENT_ERROR, "Execute null
statement.");
}
- switch (statement.getType()) {
- case INSERT:
- case BATCH_INSERT:
- case BATCH_INSERT_ROWS:
- case BATCH_INSERT_ONE_DEVICE:
- case MULTI_BATCH_INSERT:
- statement = new PipeEnrichedInsertBaseStatement((InsertBaseStatement)
statement);
- break;
- // TODO: LOAD
+ if (statement instanceof InsertBaseStatement) {
+ statement = new PipeEnrichedInsertBaseStatement((InsertBaseStatement)
statement);
+ } else if (statement instanceof LoadTsFileStatement) {
+ statement = new PipeEnrichedLoadTsFileStatement((LoadTsFileStatement)
statement);
}
final ExecutionResult result =
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
index 33c6cfcee4d..826aeaa0cd3 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -401,12 +401,12 @@ public class DataNodeInternalRPCServiceImpl implements
IDataNodeRPCService.Iface
@Override
public TLoadResp sendLoadCommand(TLoadCommandReq req) {
-
- TSStatus resultStatus =
+ return createTLoadResp(
StorageEngine.getInstance()
.executeLoadCommand(
- LoadTsFileScheduler.LoadCommand.values()[req.commandType],
req.uuid);
- return createTLoadResp(resultStatus);
+ LoadTsFileScheduler.LoadCommand.values()[req.commandType],
+ req.uuid,
+ req.isSetIsGeneratedByPipe() && req.isGeneratedByPipe));
}
private TLoadResp createTLoadResp(TSStatus resultStatus) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
index b1ab2b77e6b..97542ceab24 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
@@ -133,11 +133,12 @@ public class LoadTsFileManager {
}
}
- public boolean loadAll(String uuid) throws IOException, LoadFileException {
+ public boolean loadAll(String uuid, boolean isGeneratedByPipe)
+ throws IOException, LoadFileException {
if (!uuid2WriterManager.containsKey(uuid)) {
return false;
}
- uuid2WriterManager.get(uuid).loadAll();
+ uuid2WriterManager.get(uuid).loadAll(isGeneratedByPipe);
clean(uuid);
return true;
}
@@ -249,7 +250,7 @@ public class LoadTsFileManager {
}
}
- private void loadAll() throws IOException, LoadFileException {
+ private void loadAll(boolean isGeneratedByPipe) throws IOException,
LoadFileException {
if (isClosed) {
throw new
IOException(String.format(MESSAGE_WRITER_MANAGER_HAS_BEEN_CLOSED, taskDir));
}
@@ -259,7 +260,10 @@ public class LoadTsFileManager {
writer.endChunkGroup();
}
writer.endFile();
- entry.getKey().getDataRegion().loadNewTsFile(generateResource(writer),
true);
+ entry
+ .getKey()
+ .getDataRegion()
+ .loadNewTsFile(generateResource(writer), true, isGeneratedByPipe);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
index 3cfda307723..5a045d5c051 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
@@ -99,6 +99,7 @@ import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.crud.PipeEnrichedInsertBaseStatement;
+import
org.apache.iotdb.db.queryengine.plan.statement.crud.PipeEnrichedLoadTsFileStatement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.QueryStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.internal.InternalBatchActivateTemplateStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.internal.InternalCreateMultiTimeSeriesStatement;
@@ -2509,6 +2510,15 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
.analyzeFileByFile();
}
+ @Override
+ public Analysis visitPipeEnrichedLoadFile(
+ PipeEnrichedLoadTsFileStatement pipeEnrichedLoadTsFileStatement,
MPPQueryContext context) {
+ final Analysis analysis =
+
visitLoadFile(pipeEnrichedLoadTsFileStatement.getLoadTsFileStatement(),
context);
+ analysis.setStatement(pipeEnrichedLoadTsFileStatement);
+ return analysis;
+ }
+
/** get analysis according to statement and params */
private Analysis getAnalysisForWriting(
Analysis analysis, List<DataPartitionQueryParam>
dataPartitionQueryParams) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java
index f7bca3e2a73..14765cfe2aa 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java
@@ -64,6 +64,7 @@ import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertBaseStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertMultiTabletsStatement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
+import
org.apache.iotdb.db.queryengine.plan.statement.crud.PipeEnrichedLoadTsFileStatement;
import org.apache.iotdb.db.utils.SetThreadName;
import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
import org.apache.iotdb.rpc.RpcUtils;
@@ -322,7 +323,8 @@ public class QueryExecution implements IQueryExecution {
context,
stateMachine,
syncInternalServiceClientManager,
- partitionFetcher);
+ partitionFetcher,
+ rawStatement instanceof PipeEnrichedLoadTsFileStatement);
this.scheduler.start();
return;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java
index d5889cf208e..cbe1cbfa509 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java
@@ -61,6 +61,7 @@ import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.crud.PipeEnrichedInsertBaseStatement;
+import
org.apache.iotdb.db.queryengine.plan.statement.crud.PipeEnrichedLoadTsFileStatement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.QueryStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.internal.InternalBatchActivateTemplateStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.internal.InternalCreateMultiTimeSeriesStatement;
@@ -529,6 +530,12 @@ public class LogicalPlanVisitor extends
StatementVisitor<PlanNode, MPPQueryConte
context.getQueryId().genPlanNodeId(),
loadTsFileStatement.getResources());
}
+ @Override
+ public PlanNode visitPipeEnrichedLoadFile(
+ PipeEnrichedLoadTsFileStatement pipeEnrichedLoadTsFileStatement,
MPPQueryContext context) {
+ return
visitLoadFile(pipeEnrichedLoadTsFileStatement.getLoadTsFileStatement(),
context);
+ }
+
@Override
public PlanNode visitShowTimeSeries(
ShowTimeSeriesStatement showTimeSeriesStatement, MPPQueryContext
context) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFileNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFileNode.java
index f304678fc3c..793e6139693 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFileNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFileNode.java
@@ -40,10 +40,6 @@ public class LoadTsFileNode extends WritePlanNode {
private final List<TsFileResource> resources;
- public LoadTsFileNode(PlanNodeId id) {
- this(id, new ArrayList<>());
- }
-
public LoadTsFileNode(PlanNodeId id, List<TsFileResource> resources) {
super(id);
this.resources = resources;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java
index 075c6e1433a..620920a01a8 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java
@@ -68,16 +68,19 @@ public class LoadTsFileDispatcherImpl implements
IFragInstanceDispatcher {
private final IClientManager<TEndPoint, SyncDataNodeInternalServiceClient>
internalServiceClientManager;
private final ExecutorService executor;
+ private final boolean isGeneratedByPipe;
private static final String NODE_CONNECTION_ERROR = "can't connect to node
{}";
public LoadTsFileDispatcherImpl(
- IClientManager<TEndPoint, SyncDataNodeInternalServiceClient>
internalServiceClientManager) {
+ IClientManager<TEndPoint, SyncDataNodeInternalServiceClient>
internalServiceClientManager,
+ boolean isGeneratedByPipe) {
this.internalServiceClientManager = internalServiceClientManager;
this.localhostIpAddr =
IoTDBDescriptor.getInstance().getConfig().getInternalAddress();
this.localhostInternalPort =
IoTDBDescriptor.getInstance().getConfig().getInternalPort();
this.executor =
IoTDBThreadPoolFactory.newCachedThreadPool(LoadTsFileDispatcherImpl.class.getName());
+ this.isGeneratedByPipe = isGeneratedByPipe;
}
public void setUuid(String uuid) {
@@ -178,7 +181,8 @@ public class LoadTsFileDispatcherImpl implements
IFragInstanceDispatcher {
StorageEngine.getInstance()
.executeLoadCommand(
LoadTsFileScheduler.LoadCommand.values()[loadCommandReq.commandType],
- loadCommandReq.uuid);
+ loadCommandReq.uuid,
+ loadCommandReq.isSetIsGeneratedByPipe() &&
loadCommandReq.isGeneratedByPipe);
if (!RpcUtils.SUCCESS_STATUS.equals(resultStatus)) {
throw new FragmentInstanceDispatchException(resultStatus);
}
@@ -211,7 +215,8 @@ public class LoadTsFileDispatcherImpl implements
IFragInstanceDispatcher {
.getDataRegion((DataRegionId) groupId)
.loadNewTsFile(
((LoadSingleTsFileNode) planNode).getTsFileResource(),
- ((LoadSingleTsFileNode) planNode).isDeleteAfterLoad());
+ ((LoadSingleTsFileNode) planNode).isDeleteAfterLoad(),
+ isGeneratedByPipe);
} catch (LoadFileException e) {
logger.warn(String.format("Load TsFile Node %s error.", planNode), e);
TSStatus resultStatus = new TSStatus();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
index 9127eead0ad..834829c3849 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
@@ -102,22 +102,24 @@ public class LoadTsFileScheduler implements IScheduler {
private final DataPartitionBatchFetcher partitionFetcher;
private final List<LoadSingleTsFileNode> tsFileNodeList;
private final PlanFragmentId fragmentId;
-
- private Set<TRegionReplicaSet> allReplicaSets;
+ private final Set<TRegionReplicaSet> allReplicaSets;
+ private final boolean isGeneratedByPipe;
public LoadTsFileScheduler(
DistributedQueryPlan distributedQueryPlan,
MPPQueryContext queryContext,
QueryStateMachine stateMachine,
IClientManager<TEndPoint, SyncDataNodeInternalServiceClient>
internalServiceClientManager,
- IPartitionFetcher partitionFetcher) {
+ IPartitionFetcher partitionFetcher,
+ boolean isGeneratedByPipe) {
this.queryContext = queryContext;
this.stateMachine = stateMachine;
this.tsFileNodeList = new ArrayList<>();
this.fragmentId =
distributedQueryPlan.getRootSubPlan().getPlanFragment().getId();
- this.dispatcher = new
LoadTsFileDispatcherImpl(internalServiceClientManager);
+ this.dispatcher = new
LoadTsFileDispatcherImpl(internalServiceClientManager, isGeneratedByPipe);
this.partitionFetcher = new DataPartitionBatchFetcher(partitionFetcher);
this.allReplicaSets = new HashSet<>();
+ this.isGeneratedByPipe = isGeneratedByPipe;
for (FragmentInstance fragmentInstance :
distributedQueryPlan.getInstances()) {
tsFileNodeList.add((LoadSingleTsFileNode)
fragmentInstance.getFragment().getPlanNodeTree());
@@ -279,6 +281,7 @@ public class LoadTsFileScheduler implements IScheduler {
TLoadCommandReq loadCommandReq =
new TLoadCommandReq(
(isFirstPhaseSuccess ? LoadCommand.EXECUTE :
LoadCommand.ROLLBACK).ordinal(), uuid);
+ loadCommandReq.setIsGeneratedByPipe(isGeneratedByPipe);
Future<FragInstanceDispatchResult> dispatchResultFuture =
dispatcher.dispatchCommand(loadCommandReq, allReplicaSets);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementVisitor.java
index 1ffa3d8f108..17bee5aa40b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementVisitor.java
@@ -28,6 +28,7 @@ import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.crud.PipeEnrichedInsertBaseStatement;
+import
org.apache.iotdb.db.queryengine.plan.statement.crud.PipeEnrichedLoadTsFileStatement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.QueryStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.internal.InternalBatchActivateTemplateStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.internal.InternalCreateMultiTimeSeriesStatement;
@@ -303,6 +304,11 @@ public abstract class StatementVisitor<R, C> {
return visitStatement(loadTsFileStatement, context);
}
+ public R visitPipeEnrichedLoadFile(
+ PipeEnrichedLoadTsFileStatement pipeEnrichedLoadTsFileStatement, C
context) {
+ return visitStatement(pipeEnrichedLoadTsFileStatement, context);
+ }
+
public R visitInsertRow(InsertRowStatement insertRowStatement, C context) {
return visitStatement(insertRowStatement, context);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java
index 381d367afe9..a58da41af62 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java
@@ -69,6 +69,17 @@ public class LoadTsFileStatement extends Statement {
sortTsFiles(tsFiles);
}
+ protected LoadTsFileStatement() {
+ this.file = null;
+ this.databaseLevel =
IoTDBDescriptor.getInstance().getConfig().getDefaultStorageGroupLevel();
+ this.verifySchema = true;
+ this.deleteAfterLoad = true;
+ this.autoCreateDatabase =
IoTDBDescriptor.getInstance().getConfig().isAutoCreateSchemaEnabled();
+ this.tsFiles = new ArrayList<>();
+ this.resources = new ArrayList<>();
+ this.statementType = StatementType.MULTI_BATCH_INSERT;
+ }
+
private void findAllTsFile(File file) {
final File[] files = file.listFiles();
if (files == null) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/PipeEnrichedLoadTsFileStatement.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/PipeEnrichedLoadTsFileStatement.java
new file mode 100644
index 00000000000..c25749c4838
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/PipeEnrichedLoadTsFileStatement.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.statement.crud;
+
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.queryengine.plan.statement.StatementType;
+import org.apache.iotdb.db.queryengine.plan.statement.StatementVisitor;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+
+import java.io.File;
+import java.util.List;
+
+public class PipeEnrichedLoadTsFileStatement extends LoadTsFileStatement {
+
+ private final LoadTsFileStatement loadTsFileStatement;
+
+ public PipeEnrichedLoadTsFileStatement(LoadTsFileStatement
loadTsFileStatement) {
+ this.loadTsFileStatement = loadTsFileStatement;
+ statementType = StatementType.MULTI_BATCH_INSERT;
+ }
+
+ public LoadTsFileStatement getLoadTsFileStatement() {
+ return loadTsFileStatement;
+ }
+
+ @Override
+ public boolean isDebug() {
+ return loadTsFileStatement.isDebug();
+ }
+
+ @Override
+ public void setDebug(boolean debug) {
+ loadTsFileStatement.setDebug(debug);
+ }
+
+ @Override
+ public boolean isQuery() {
+ return loadTsFileStatement.isQuery();
+ }
+
+ @Override
+ public boolean isAuthenticationRequired() {
+ return loadTsFileStatement.isAuthenticationRequired();
+ }
+
+ @Override
+ public void setDeleteAfterLoad(boolean deleteAfterLoad) {
+ loadTsFileStatement.setDeleteAfterLoad(deleteAfterLoad);
+ }
+
+ @Override
+ public void setDatabaseLevel(int databaseLevel) {
+ loadTsFileStatement.setDatabaseLevel(databaseLevel);
+ }
+
+ @Override
+ public void setVerifySchema(boolean verifySchema) {
+ loadTsFileStatement.setVerifySchema(verifySchema);
+ }
+
+ @Override
+ public void setAutoCreateDatabase(boolean autoCreateDatabase) {
+ loadTsFileStatement.setAutoCreateDatabase(autoCreateDatabase);
+ }
+
+ @Override
+ public boolean isVerifySchema() {
+ return loadTsFileStatement.isVerifySchema();
+ }
+
+ @Override
+ public boolean isDeleteAfterLoad() {
+ return loadTsFileStatement.isDeleteAfterLoad();
+ }
+
+ @Override
+ public boolean isAutoCreateDatabase() {
+ return loadTsFileStatement.isAutoCreateDatabase();
+ }
+
+ @Override
+ public int getDatabaseLevel() {
+ return loadTsFileStatement.getDatabaseLevel();
+ }
+
+ @Override
+ public List<File> getTsFiles() {
+ return loadTsFileStatement.getTsFiles();
+ }
+
+ @Override
+ public void addTsFileResource(TsFileResource resource) {
+ loadTsFileStatement.addTsFileResource(resource);
+ }
+
+ @Override
+ public List<TsFileResource> getResources() {
+ return loadTsFileStatement.getResources();
+ }
+
+ @Override
+ public List<PartialPath> getPaths() {
+ return loadTsFileStatement.getPaths();
+ }
+
+ @Override
+ public <R, C> R accept(StatementVisitor<R, C> visitor, C context) {
+ return visitor.visitPipeEnrichedLoadFile(this, context);
+ }
+
+ @Override
+ public String toString() {
+ return "PipeEnrichedLoadTsFileStatement{" + "loadTsFileStatement=" +
loadTsFileStatement + '}';
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
index 9fc9b49c38d..d1b8cd8c1ff 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
@@ -796,13 +796,14 @@ public class StorageEngine implements IService {
return RpcUtils.SUCCESS_STATUS;
}
- public TSStatus executeLoadCommand(LoadTsFileScheduler.LoadCommand
loadCommand, String uuid) {
+ public TSStatus executeLoadCommand(
+ LoadTsFileScheduler.LoadCommand loadCommand, String uuid, boolean
isGeneratedByPipe) {
TSStatus status = new TSStatus();
try {
switch (loadCommand) {
case EXECUTE:
- if (getLoadTsFileManager().loadAll(uuid)) {
+ if (getLoadTsFileManager().loadAll(uuid, isGeneratedByPipe)) {
status = RpcUtils.SUCCESS_STATUS;
} else {
status.setCode(TSStatusCode.LOAD_FILE_ERROR.getStatusCode());
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index 0011db21534..95ffb6264a7 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -2197,8 +2197,10 @@ public class DataRegion implements IDataRegionForQuery {
*
* @param newTsFileResource tsfile resource @UsedBy load external tsfile
module
* @param deleteOriginFile whether to delete origin tsfile
+ * @param isGeneratedByPipe whether the load tsfile request is generated by
pipe
*/
- public void loadNewTsFile(TsFileResource newTsFileResource, boolean
deleteOriginFile)
+ public void loadNewTsFile(
+ TsFileResource newTsFileResource, boolean deleteOriginFile, boolean
isGeneratedByPipe)
throws LoadFileException {
File tsfileToBeInserted = newTsFileResource.getTsFile();
long newFilePartitionId = newTsFileResource.getTimePartitionWithCheck();
@@ -2223,7 +2225,8 @@ public class DataRegion implements IDataRegionForQuery {
loadTsFileToUnSequence(
tsfileToBeInserted, newTsFileResource, newFilePartitionId,
deleteOriginFile);
- PipeInsertionDataNodeListener.getInstance().listenToTsFile(dataRegionId,
newTsFileResource);
+ PipeInsertionDataNodeListener.getInstance()
+ .listenToTsFile(dataRegionId, newTsFileResource, isGeneratedByPipe);
FileMetrics.getInstance()
.addFile(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
index dc7086dde70..45e755debb3 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
@@ -893,7 +893,8 @@ public class TsFileProcessor {
try {
PipeAgent.runtime().assignSimpleProgressIndexIfNeeded(tsFileResource);
PipeInsertionDataNodeListener.getInstance()
- .listenToTsFile(dataRegionInfo.getDataRegion().getDataRegionId(),
tsFileResource);
+ .listenToTsFile(
+ dataRegionInfo.getDataRegion().getDataRegionId(),
tsFileResource, false);
// When invoke closing TsFile after insert data to memTable, we
shouldn't flush until invoke
// flushing memTable in System module.
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/quotas/DefaultOperationQuota.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/quotas/DefaultOperationQuota.java
index 750258eef55..b6b714b5de2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/quotas/DefaultOperationQuota.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/quotas/DefaultOperationQuota.java
@@ -30,6 +30,7 @@ import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.crud.PipeEnrichedInsertBaseStatement;
+import
org.apache.iotdb.db.queryengine.plan.statement.crud.PipeEnrichedLoadTsFileStatement;
import org.apache.iotdb.db.utils.TypeInferenceUtils;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.utils.BitMap;
@@ -137,7 +138,10 @@ public class DefaultOperationQuota implements
OperationQuota {
}
break;
case MULTI_BATCH_INSERT:
- // LoadTsFileStatement InsertMultiTabletsStatement
+ // PipeEnrichedLoadTsFileStatement LoadTsFileStatement
InsertMultiTabletsStatement
+ if (s instanceof PipeEnrichedLoadTsFileStatement) {
+ s = ((PipeEnrichedLoadTsFileStatement) s).getLoadTsFileStatement();
+ }
if (s instanceof LoadTsFileStatement) {
LoadTsFileStatement loadTsFileStatement = (LoadTsFileStatement) s;
for (int i = 0; i < loadTsFileStatement.getResources().size();
i++) {
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/extractor/PipeRealtimeExtractTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/extractor/PipeRealtimeExtractTest.java
index f6dabc9b81d..1900ff539bb 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/extractor/PipeRealtimeExtractTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/extractor/PipeRealtimeExtractTest.java
@@ -286,7 +286,8 @@ public class PipeRealtimeExtractTest {
null,
false),
resource);
-
PipeInsertionDataNodeListener.getInstance().listenToTsFile(dataRegionId,
resource);
+ PipeInsertionDataNodeListener.getInstance()
+ .listenToTsFile(dataRegionId, resource, false);
}
});
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileSchedulerTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileSchedulerTest.java
index 741db029ea0..161a95b4ec9 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileSchedulerTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileSchedulerTest.java
@@ -61,7 +61,8 @@ public class LoadTsFileSchedulerTest {
mock(MPPQueryContext.class),
mock(QueryStateMachine.class),
mock(IClientManager.class),
- mock(IPartitionFetcher.class)));
+ mock(IPartitionFetcher.class),
+ false));
t.start();
Assert.assertNull(t.getTotalCpuTime());
Assert.assertNull(t.getFragmentInfo());
diff --git a/iotdb-protocol/thrift/src/main/thrift/datanode.thrift
b/iotdb-protocol/thrift/src/main/thrift/datanode.thrift
index 99f2ed1419d..aa522f2d77b 100644
--- a/iotdb-protocol/thrift/src/main/thrift/datanode.thrift
+++ b/iotdb-protocol/thrift/src/main/thrift/datanode.thrift
@@ -313,6 +313,7 @@ struct TTsFilePieceReq{
struct TLoadCommandReq{
1: required i32 commandType
2: required string uuid
+ 3: optional bool isGeneratedByPipe
}
struct TLoadResp{