This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch rel/1.2
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/1.2 by this push:
new 3c0d8264bd8 [IOTDB-6114] Pipe: Support multi-cluster data sync
(#10868)(#10926)
3c0d8264bd8 is described below
commit 3c0d8264bd80cfd8e2dced092acc11be6c22cd18
Author: Steve Yurong Su <[email protected]>
AuthorDate: Tue Aug 22 17:07:35 2023 +0800
[IOTDB-6114] Pipe: Support multi-cluster data sync (#10868)(#10926)
---
.../org/apache/iotdb/db/audit/AuditLogger.java | 1 +
.../org/apache/iotdb/db/auth/AuthorityChecker.java | 1 +
.../dataregion/DataExecutionVisitor.java | 23 ++
.../config/constant/PipeExtractorConstant.java | 4 +
.../apache/iotdb/db/pipe/event/EnrichedEvent.java | 2 +
.../tablet/PipeInsertNodeTabletInsertionEvent.java | 17 +-
.../common/tsfile/PipeTsFileInsertionEvent.java | 17 +-
.../db/pipe/event/realtime/PipeRealtimeEvent.java | 8 +-
.../event/realtime/PipeRealtimeEventFactory.java | 10 +-
.../PipeHistoricalDataRegionTsFileExtractor.java | 2 +
.../realtime/PipeRealtimeDataRegionExtractor.java | 10 +
.../realtime/assigner/PipeDataRegionAssigner.java | 4 +
.../listener/PipeInsertionDataNodeListener.java | 6 +-
.../receiver/thrift/IoTDBThriftReceiverV1.java | 12 +-
.../impl/DataNodeInternalRPCServiceImpl.java | 8 +-
.../execution/executor/RegionWriteExecutor.java | 7 +
.../execution/load/LoadTsFileManager.java | 12 +-
.../queryengine/plan/analyze/AnalyzeVisitor.java | 42 ++++
.../queryengine/plan/execution/QueryExecution.java | 4 +-
.../plan/execution/config/ConfigTaskVisitor.java | 10 +-
.../config/executor/ClusterConfigTaskExecutor.java | 10 +-
.../config/executor/IConfigTaskExecutor.java | 10 +-
.../execution/config/sys/pipe/CreatePipeTask.java | 2 +-
.../execution/config/sys/pipe/DropPipeTask.java | 2 +-
.../execution/config/sys/pipe/ShowPipeTask.java | 2 +-
.../execution/config/sys/pipe/StartPipeTask.java | 2 +-
.../execution/config/sys/pipe/StopPipeTask.java | 2 +-
.../db/queryengine/plan/parser/ASTVisitor.java | 10 +-
.../plan/planner/LogicalPlanVisitor.java | 45 ++++
.../plan/planner/plan/node/PlanNodeType.java | 7 +-
.../plan/planner/plan/node/PlanVisitor.java | 5 +
.../planner/plan/node/load/LoadTsFileNode.java | 4 -
.../plan/node/write/InsertMultiTabletsNode.java | 6 +
.../plan/planner/plan/node/write/InsertNode.java | 12 +-
.../planner/plan/node/write/InsertRowsNode.java | 6 +
.../plan/node/write/InsertRowsOfOneDeviceNode.java | 6 +
.../plan/node/write/PipeEnrichedInsertNode.java | 276 +++++++++++++++++++++
.../scheduler/load/LoadTsFileDispatcherImpl.java | 11 +-
.../plan/scheduler/load/LoadTsFileScheduler.java | 11 +-
.../queryengine/plan/statement/StatementType.java | 1 +
.../plan/statement/StatementVisitor.java | 58 +++--
.../plan/statement/crud/InsertBaseStatement.java | 3 +-
.../plan/statement/crud/LoadTsFileStatement.java | 11 +
.../crud/PipeEnrichedInsertBaseStatement.java | 224 +++++++++++++++++
.../crud/PipeEnrichedLoadTsFileStatement.java | 132 ++++++++++
.../pipe/CreatePipeStatement.java | 2 +-
.../{sys => metadata}/pipe/DropPipeStatement.java | 2 +-
.../{sys => metadata}/pipe/ShowPipesStatement.java | 2 +-
.../{sys => metadata}/pipe/StartPipeStatement.java | 2 +-
.../{sys => metadata}/pipe/StopPipeStatement.java | 2 +-
.../iotdb/db/storageengine/StorageEngine.java | 5 +-
.../db/storageengine/dataregion/DataRegion.java | 7 +-
.../dataregion/memtable/TsFileProcessor.java | 3 +-
.../quotas/DataNodeThrottleQuotaManager.java | 1 +
.../rescon/quotas/DefaultOperationQuota.java | 14 +-
.../db/trigger/executor/TriggerFireVisitor.java | 20 ++
.../db/pipe/extractor/PipeRealtimeExtractTest.java | 3 +-
.../plan/statement/sys/pipe/PipeStatementTest.java | 10 +-
.../scheduler/load/LoadTsFileSchedulerTest.java | 3 +-
.../thrift/src/main/thrift/datanode.thrift | 1 +
60 files changed, 1029 insertions(+), 106 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/audit/AuditLogger.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/audit/AuditLogger.java
index 5c7b9da4923..fa1fd88dbd8 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/audit/AuditLogger.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/audit/AuditLogger.java
@@ -205,6 +205,7 @@ public class AuditLogger {
case BATCH_INSERT_ROWS:
case BATCH_INSERT_ONE_DEVICE:
case MULTI_BATCH_INSERT:
+ case PIPE_ENRICHED_INSERT:
case DELETE:
case SELECT_INTO:
case LOAD_FILES:
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/auth/AuthorityChecker.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/auth/AuthorityChecker.java
index 674f09d2bb1..2b6298feff9 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/auth/AuthorityChecker.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/auth/AuthorityChecker.java
@@ -209,6 +209,7 @@ public class AuthorityChecker {
case BATCH_INSERT_ONE_DEVICE:
case BATCH_INSERT_ROWS:
case MULTI_BATCH_INSERT:
+ case PIPE_ENRICHED_INSERT:
return PrivilegeType.INSERT_TIMESERIES.ordinal();
case LIST_ROLE:
case LIST_ROLE_USERS:
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java
index 28384aee64b..1e647995a57 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java
@@ -28,10 +28,12 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertMultiTabletsNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsOfOneDeviceNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.PipeEnrichedInsertNode;
import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
@@ -164,6 +166,27 @@ public class DataExecutionVisitor extends
PlanVisitor<TSStatus, DataRegion> {
}
}
+ @Override
+ public TSStatus visitPipeEnrichedInsert(PipeEnrichedInsertNode node,
DataRegion context) {
+ final InsertNode realInsertNode = node.getInsertNode();
+
+ realInsertNode.markAsGeneratedByPipe();
+
+ if (realInsertNode instanceof InsertRowNode) {
+ return visitInsertRow((InsertRowNode) realInsertNode, context);
+ } else if (realInsertNode instanceof InsertTabletNode) {
+ return visitInsertTablet((InsertTabletNode) realInsertNode, context);
+ } else if (realInsertNode instanceof InsertRowsNode) {
+ return visitInsertRows((InsertRowsNode) realInsertNode, context);
+ } else if (realInsertNode instanceof InsertMultiTabletsNode) {
+ return visitInsertMultiTablets((InsertMultiTabletsNode) realInsertNode,
context);
+ } else if (realInsertNode instanceof InsertRowsOfOneDeviceNode) {
+ return visitInsertRowsOfOneDevice((InsertRowsOfOneDeviceNode)
realInsertNode, context);
+ } else {
+ return visitPlan(realInsertNode, context);
+ }
+ }
+
@Override
public TSStatus visitDeleteData(DeleteDataNode node, DataRegion dataRegion) {
try {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeExtractorConstant.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeExtractorConstant.java
index b92b502ae19..6a7c095c148 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeExtractorConstant.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeExtractorConstant.java
@@ -26,6 +26,10 @@ public class PipeExtractorConstant {
public static final String EXTRACTOR_PATTERN_KEY = "extractor.pattern";
public static final String EXTRACTOR_PATTERN_DEFAULT_VALUE = "root";
+ public static final String EXTRACTOR_FORWARDING_PIPE_REQUESTS_KEY =
+ "extractor.forwarding-pipe-requests";
+ public static final boolean EXTRACTOR_FORWARDING_PIPE_REQUESTS_DEFAULT_VALUE
= true;
+
public static final String EXTRACTOR_HISTORY_ENABLE_KEY =
"extractor.history.enable";
public static final String EXTRACTOR_HISTORY_START_TIME =
"extractor.history.start-time";
public static final String EXTRACTOR_HISTORY_END_TIME =
"extractor.history.end-time";
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/EnrichedEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/EnrichedEvent.java
index ddd0c78877c..ef49f2bf8c9 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/EnrichedEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/EnrichedEvent.java
@@ -134,4 +134,6 @@ public abstract class EnrichedEvent implements Event {
public void reportException(PipeRuntimeException pipeRuntimeException) {
PipeAgent.runtime().report(this.pipeTaskMeta, pipeRuntimeException);
}
+
+ public abstract boolean isGeneratedByPipe();
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
index 7162cdabeca..739a2d88292 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
@@ -46,24 +46,30 @@ public class PipeInsertNodeTabletInsertionEvent extends
EnrichedEvent
private final WALEntryHandler walEntryHandler;
private final ProgressIndex progressIndex;
private final boolean isAligned;
+ private final boolean isGeneratedByPipe;
private TabletInsertionDataContainer dataContainer;
public PipeInsertNodeTabletInsertionEvent(
- WALEntryHandler walEntryHandler, ProgressIndex progressIndex, boolean
isAligned) {
- this(walEntryHandler, progressIndex, isAligned, null, null);
+ WALEntryHandler walEntryHandler,
+ ProgressIndex progressIndex,
+ boolean isAligned,
+ boolean isGeneratedByPipe) {
+ this(walEntryHandler, progressIndex, isAligned, isGeneratedByPipe, null,
null);
}
private PipeInsertNodeTabletInsertionEvent(
WALEntryHandler walEntryHandler,
ProgressIndex progressIndex,
boolean isAligned,
+ boolean isGeneratedByPipe,
PipeTaskMeta pipeTaskMeta,
String pattern) {
super(pipeTaskMeta, pattern);
this.walEntryHandler = walEntryHandler;
this.progressIndex = progressIndex;
this.isAligned = isAligned;
+ this.isGeneratedByPipe = isGeneratedByPipe;
}
public InsertNode getInsertNode() throws WALPipeException {
@@ -111,7 +117,12 @@ public class PipeInsertNodeTabletInsertionEvent extends
EnrichedEvent
public PipeInsertNodeTabletInsertionEvent
shallowCopySelfAndBindPipeTaskMetaForProgressReport(
PipeTaskMeta pipeTaskMeta, String pattern) {
return new PipeInsertNodeTabletInsertionEvent(
- walEntryHandler, progressIndex, isAligned, pipeTaskMeta, pattern);
+ walEntryHandler, progressIndex, isAligned, isGeneratedByPipe,
pipeTaskMeta, pattern);
+ }
+
+ @Override
+ public boolean isGeneratedByPipe() {
+ return isGeneratedByPipe;
}
/////////////////////////// TabletInsertionEvent ///////////////////////////
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
index bb260bf22ac..5705f5c8600 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
@@ -48,16 +48,19 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent
implements TsFileIns
private final TsFileResource resource;
private File tsFile;
+ private final boolean isGeneratedByPipe;
+
private final AtomicBoolean isClosed;
private TsFileInsertionDataContainer dataContainer;
- public PipeTsFileInsertionEvent(TsFileResource resource) {
- this(resource, null, null, Long.MIN_VALUE, Long.MAX_VALUE);
+ public PipeTsFileInsertionEvent(TsFileResource resource, boolean
isGeneratedByPipe) {
+ this(resource, isGeneratedByPipe, null, null, Long.MIN_VALUE,
Long.MAX_VALUE);
}
public PipeTsFileInsertionEvent(
TsFileResource resource,
+ boolean isGeneratedByPipe,
PipeTaskMeta pipeTaskMeta,
String pattern,
long startTime,
@@ -70,6 +73,8 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent
implements TsFileIns
this.resource = resource;
tsFile = resource.getTsFile();
+ this.isGeneratedByPipe = isGeneratedByPipe;
+
isClosed = new AtomicBoolean(resource.isClosed());
// register close listener if TsFile is not closed
if (!isClosed.get()) {
@@ -153,7 +158,13 @@ public class PipeTsFileInsertionEvent extends
EnrichedEvent implements TsFileIns
@Override
public PipeTsFileInsertionEvent
shallowCopySelfAndBindPipeTaskMetaForProgressReport(
PipeTaskMeta pipeTaskMeta, String pattern) {
- return new PipeTsFileInsertionEvent(resource, pipeTaskMeta, pattern,
startTime, endTime);
+ return new PipeTsFileInsertionEvent(
+ resource, isGeneratedByPipe, pipeTaskMeta, pattern, startTime,
endTime);
+ }
+
+ @Override
+ public boolean isGeneratedByPipe() {
+ return isGeneratedByPipe;
}
/////////////////////////// TsFileInsertionEvent ///////////////////////////
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEvent.java
index 5db7b20bde0..38f318cd05d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEvent.java
@@ -23,7 +23,6 @@ import org.apache.iotdb.commons.consensus.index.ProgressIndex;
import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
import org.apache.iotdb.db.pipe.event.EnrichedEvent;
import org.apache.iotdb.db.pipe.extractor.realtime.epoch.TsFileEpoch;
-import org.apache.iotdb.pipe.api.event.Event;
import java.util.Map;
@@ -69,7 +68,7 @@ public class PipeRealtimeEvent extends EnrichedEvent {
this.device2Measurements = device2Measurements;
}
- public Event getEvent() {
+ public EnrichedEvent getEvent() {
return event;
}
@@ -129,6 +128,11 @@ public class PipeRealtimeEvent extends EnrichedEvent {
pattern);
}
+ @Override
+ public boolean isGeneratedByPipe() {
+ return event.isGeneratedByPipe();
+ }
+
@Override
public String toString() {
return "PipeRealtimeEvent{"
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEventFactory.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEventFactory.java
index f542d8c6647..6af720da2f2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEventFactory.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEventFactory.java
@@ -30,16 +30,20 @@ public class PipeRealtimeEventFactory {
private static final TsFileEpochManager TS_FILE_EPOCH_MANAGER = new
TsFileEpochManager();
- public static PipeRealtimeEvent createRealtimeEvent(TsFileResource resource)
{
+ public static PipeRealtimeEvent createRealtimeEvent(
+ TsFileResource resource, boolean isGeneratedByPipe) {
return TS_FILE_EPOCH_MANAGER.bindPipeTsFileInsertionEvent(
- new PipeTsFileInsertionEvent(resource), resource);
+ new PipeTsFileInsertionEvent(resource, isGeneratedByPipe), resource);
}
public static PipeRealtimeEvent createRealtimeEvent(
WALEntryHandler walEntryHandler, InsertNode insertNode, TsFileResource
resource) {
return TS_FILE_EPOCH_MANAGER.bindPipeInsertNodeTabletInsertionEvent(
new PipeInsertNodeTabletInsertionEvent(
- walEntryHandler, insertNode.getProgressIndex(),
insertNode.isAligned()),
+ walEntryHandler,
+ insertNode.getProgressIndex(),
+ insertNode.isAligned(),
+ insertNode.isGeneratedByPipe()),
insertNode,
resource);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java
index a6c6b4d60ee..506dd6adcd7 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java
@@ -179,6 +179,7 @@ public class PipeHistoricalDataRegionTsFileExtractor
implements PipeHistoricalDa
resource ->
new PipeTsFileInsertionEvent(
resource,
+ false,
pipeTaskMeta,
pattern,
historicalDataExtractionStartTime,
@@ -195,6 +196,7 @@ public class PipeHistoricalDataRegionTsFileExtractor
implements PipeHistoricalDa
resource ->
new PipeTsFileInsertionEvent(
resource,
+ false,
pipeTaskMeta,
pattern,
historicalDataExtractionStartTime,
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionExtractor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionExtractor.java
index 9fd72fb7b67..22658fb3b42 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionExtractor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionExtractor.java
@@ -32,6 +32,8 @@ import
org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
public abstract class PipeRealtimeDataRegionExtractor implements PipeExtractor
{
protected String pattern;
+ protected boolean isForwardingPipeRequests;
+
protected String dataRegionId;
protected PipeTaskMeta pipeTaskMeta;
@@ -51,6 +53,10 @@ public abstract class PipeRealtimeDataRegionExtractor
implements PipeExtractor {
parameters.getStringOrDefault(
PipeExtractorConstant.EXTRACTOR_PATTERN_KEY,
PipeExtractorConstant.EXTRACTOR_PATTERN_DEFAULT_VALUE);
+ isForwardingPipeRequests =
+ parameters.getBooleanOrDefault(
+ PipeExtractorConstant.EXTRACTOR_FORWARDING_PIPE_REQUESTS_KEY,
+
PipeExtractorConstant.EXTRACTOR_FORWARDING_PIPE_REQUESTS_DEFAULT_VALUE);
final PipeTaskExtractorRuntimeEnvironment environment =
(PipeTaskExtractorRuntimeEnvironment)
configuration.getRuntimeEnvironment();
@@ -79,6 +85,10 @@ public abstract class PipeRealtimeDataRegionExtractor
implements PipeExtractor {
return pattern;
}
+ public final boolean isForwardingPipeRequests() {
+ return isForwardingPipeRequests;
+ }
+
public final PipeTaskMeta getPipeTaskMeta() {
return pipeTaskMeta;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/assigner/PipeDataRegionAssigner.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/assigner/PipeDataRegionAssigner.java
index ee569e9e4ff..1177634be9c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/assigner/PipeDataRegionAssigner.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/assigner/PipeDataRegionAssigner.java
@@ -47,6 +47,10 @@ public class PipeDataRegionAssigner {
.match(event)
.forEach(
extractor -> {
+ if (event.getEvent().isGeneratedByPipe() &&
!extractor.isForwardingPipeRequests()) {
+ return;
+ }
+
final PipeRealtimeEvent copiedEvent =
event.shallowCopySelfAndBindPipeTaskMetaForProgressReport(
extractor.getPipeTaskMeta(), extractor.getPattern());
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/listener/PipeInsertionDataNodeListener.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/listener/PipeInsertionDataNodeListener.java
index b7a7a0c3770..35648b1bb5c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/listener/PipeInsertionDataNodeListener.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/listener/PipeInsertionDataNodeListener.java
@@ -91,7 +91,8 @@ public class PipeInsertionDataNodeListener {
//////////////////////////// listen to events ////////////////////////////
- public void listenToTsFile(String dataRegionId, TsFileResource
tsFileResource) {
+ public void listenToTsFile(
+ String dataRegionId, TsFileResource tsFileResource, boolean
isGeneratedByPipe) {
// We don't judge whether listenToTsFileExtractorCount.get() == 0 here on
purpose
// because extractors may use tsfile events when some exceptions occur in
the
// insert nodes listening process.
@@ -103,7 +104,8 @@ public class PipeInsertionDataNodeListener {
return;
}
-
assigner.publishToAssign(PipeRealtimeEventFactory.createRealtimeEvent(tsFileResource));
+ assigner.publishToAssign(
+ PipeRealtimeEventFactory.createRealtimeEvent(tsFileResource,
isGeneratedByPipe));
}
public void listenToInsertNode(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBThriftReceiverV1.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBThriftReceiverV1.java
index 2aebbd7a3e5..09d6870f489 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBThriftReceiverV1.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBThriftReceiverV1.java
@@ -38,8 +38,11 @@ import
org.apache.iotdb.db.queryengine.plan.analyze.IPartitionFetcher;
import org.apache.iotdb.db.queryengine.plan.analyze.schema.ISchemaFetcher;
import org.apache.iotdb.db.queryengine.plan.execution.ExecutionResult;
import org.apache.iotdb.db.queryengine.plan.statement.Statement;
+import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertBaseStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
+import
org.apache.iotdb.db.queryengine.plan.statement.crud.PipeEnrichedInsertBaseStatement;
+import
org.apache.iotdb.db.queryengine.plan.statement.crud.PipeEnrichedLoadTsFileStatement;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
@@ -424,12 +427,17 @@ public class IoTDBThriftReceiverV1 implements
IoTDBThriftReceiver {
TSStatusCode.PIPE_TRANSFER_EXECUTE_STATEMENT_ERROR, "Execute null
statement.");
}
- final long queryId = SessionManager.getInstance().requestQueryId();
+ if (statement instanceof InsertBaseStatement) {
+ statement = new PipeEnrichedInsertBaseStatement((InsertBaseStatement)
statement);
+ } else if (statement instanceof LoadTsFileStatement) {
+ statement = new PipeEnrichedLoadTsFileStatement((LoadTsFileStatement)
statement);
+ }
+
final ExecutionResult result =
Coordinator.getInstance()
.execute(
statement,
- queryId,
+ SessionManager.getInstance().requestQueryId(),
null,
"",
partitionFetcher,
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
index 86647205bc9..446f71e2cb3 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -404,12 +404,12 @@ public class DataNodeInternalRPCServiceImpl implements
IDataNodeRPCService.Iface
@Override
public TLoadResp sendLoadCommand(TLoadCommandReq req) {
-
- TSStatus resultStatus =
+ return createTLoadResp(
StorageEngine.getInstance()
.executeLoadCommand(
- LoadTsFileScheduler.LoadCommand.values()[req.commandType],
req.uuid);
- return createTLoadResp(resultStatus);
+ LoadTsFileScheduler.LoadCommand.values()[req.commandType],
+ req.uuid,
+ req.isSetIsGeneratedByPipe() && req.isGeneratedByPipe));
}
private TLoadResp createTLoadResp(TSStatus resultStatus) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionWriteExecutor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionWriteExecutor.java
index 1b94e607282..c60787dd737 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionWriteExecutor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionWriteExecutor.java
@@ -60,6 +60,7 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNod
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsOfOneDeviceNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.PipeEnrichedInsertNode;
import org.apache.iotdb.db.schemaengine.SchemaEngine;
import org.apache.iotdb.db.schemaengine.schemaregion.ISchemaRegion;
import org.apache.iotdb.db.schemaengine.template.ClusterTemplateManager;
@@ -222,6 +223,12 @@ public class RegionWriteExecutor {
return executeDataInsert(node, context);
}
+ @Override
+ public RegionExecutionResult visitPipeEnrichedInsert(
+ PipeEnrichedInsertNode node, WritePlanNodeExecutionContext context) {
+ return executeDataInsert(node, context);
+ }
+
private RegionExecutionResult executeDataInsert(
InsertNode insertNode, WritePlanNodeExecutionContext context) {
RegionExecutionResult response = new RegionExecutionResult();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
index b1ab2b77e6b..97542ceab24 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
@@ -133,11 +133,12 @@ public class LoadTsFileManager {
}
}
- public boolean loadAll(String uuid) throws IOException, LoadFileException {
+ public boolean loadAll(String uuid, boolean isGeneratedByPipe)
+ throws IOException, LoadFileException {
if (!uuid2WriterManager.containsKey(uuid)) {
return false;
}
- uuid2WriterManager.get(uuid).loadAll();
+ uuid2WriterManager.get(uuid).loadAll(isGeneratedByPipe);
clean(uuid);
return true;
}
@@ -249,7 +250,7 @@ public class LoadTsFileManager {
}
}
- private void loadAll() throws IOException, LoadFileException {
+ private void loadAll(boolean isGeneratedByPipe) throws IOException,
LoadFileException {
if (isClosed) {
throw new
IOException(String.format(MESSAGE_WRITER_MANAGER_HAS_BEEN_CLOSED, taskDir));
}
@@ -259,7 +260,10 @@ public class LoadTsFileManager {
writer.endChunkGroup();
}
writer.endFile();
- entry.getKey().getDataRegion().loadNewTsFile(generateResource(writer),
true);
+ entry
+ .getKey()
+ .getDataRegion()
+ .loadNewTsFile(generateResource(writer), true, isGeneratedByPipe);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
index 5f56411ac91..7135a8ddb9e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
@@ -98,6 +98,8 @@ import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
+import
org.apache.iotdb.db.queryengine.plan.statement.crud.PipeEnrichedInsertBaseStatement;
+import
org.apache.iotdb.db.queryengine.plan.statement.crud.PipeEnrichedLoadTsFileStatement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.QueryStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.internal.InternalBatchActivateTemplateStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.internal.InternalCreateMultiTimeSeriesStatement;
@@ -2425,6 +2427,37 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
}
}
+ @Override
+ public Analysis visitPipeEnrichedInsert(
+ PipeEnrichedInsertBaseStatement pipeEnrichedInsertBaseStatement,
MPPQueryContext context) {
+ Analysis analysis;
+
+ final InsertBaseStatement insertBaseStatement =
+ pipeEnrichedInsertBaseStatement.getInsertBaseStatement();
+ if (insertBaseStatement instanceof InsertTabletStatement) {
+ analysis = visitInsertTablet((InsertTabletStatement)
insertBaseStatement, context);
+ } else if (insertBaseStatement instanceof InsertMultiTabletsStatement) {
+ analysis =
+ visitInsertMultiTablets((InsertMultiTabletsStatement)
insertBaseStatement, context);
+ } else if (insertBaseStatement instanceof InsertRowStatement) {
+ analysis = visitInsertRow((InsertRowStatement) insertBaseStatement,
context);
+ } else if (insertBaseStatement instanceof InsertRowsStatement) {
+ analysis = visitInsertRows((InsertRowsStatement) insertBaseStatement,
context);
+ } else if (insertBaseStatement instanceof InsertRowsOfOneDeviceStatement) {
+ analysis =
+ visitInsertRowsOfOneDevice((InsertRowsOfOneDeviceStatement)
insertBaseStatement, context);
+ } else {
+ throw new UnsupportedOperationException(
+ "Unsupported insert statement type: " +
insertBaseStatement.getClass().getName());
+ }
+
+ // statement may be changed because of logical view
+ pipeEnrichedInsertBaseStatement.setInsertBaseStatement(
+ (InsertBaseStatement) analysis.getStatement());
+ analysis.setStatement(pipeEnrichedInsertBaseStatement);
+ return analysis;
+ }
+
private void validateSchema(
Analysis analysis, InsertBaseStatement insertStatement, MPPQueryContext
context) {
final long startTime = System.nanoTime();
@@ -2478,6 +2511,15 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
.analyzeFileByFile();
}
+ @Override
+ public Analysis visitPipeEnrichedLoadFile(
+ PipeEnrichedLoadTsFileStatement pipeEnrichedLoadTsFileStatement,
MPPQueryContext context) {
+ final Analysis analysis =
+
visitLoadFile(pipeEnrichedLoadTsFileStatement.getLoadTsFileStatement(),
context);
+ analysis.setStatement(pipeEnrichedLoadTsFileStatement);
+ return analysis;
+ }
+
/** get analysis according to statement and params */
private Analysis getAnalysisForWriting(
Analysis analysis, List<DataPartitionQueryParam>
dataPartitionQueryParams) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java
index f7bca3e2a73..14765cfe2aa 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java
@@ -64,6 +64,7 @@ import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertBaseStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertMultiTabletsStatement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
+import
org.apache.iotdb.db.queryengine.plan.statement.crud.PipeEnrichedLoadTsFileStatement;
import org.apache.iotdb.db.utils.SetThreadName;
import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
import org.apache.iotdb.rpc.RpcUtils;
@@ -322,7 +323,8 @@ public class QueryExecution implements IQueryExecution {
context,
stateMachine,
syncInternalServiceClientManager,
- partitionFetcher);
+ partitionFetcher,
+ rawStatement instanceof PipeEnrichedLoadTsFileStatement);
this.scheduler.start();
return;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/ConfigTaskVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/ConfigTaskVisitor.java
index f98bd503882..57b6a3cbad9 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/ConfigTaskVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/ConfigTaskVisitor.java
@@ -119,6 +119,11 @@ import
org.apache.iotdb.db.queryengine.plan.statement.metadata.model.CreateModel
import
org.apache.iotdb.db.queryengine.plan.statement.metadata.model.DropModelStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.metadata.model.ShowModelsStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.metadata.model.ShowTrailsStatement;
+import
org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe.CreatePipeStatement;
+import
org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe.DropPipeStatement;
+import
org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe.ShowPipesStatement;
+import
org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe.StartPipeStatement;
+import
org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe.StopPipeStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.metadata.template.AlterSchemaTemplateStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.metadata.template.CreateSchemaTemplateStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.metadata.template.DeactivateTemplateStatement;
@@ -138,11 +143,6 @@ import
org.apache.iotdb.db.queryengine.plan.statement.sys.KillQueryStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.sys.LoadConfigurationStatement;
import org.apache.iotdb.db.queryengine.plan.statement.sys.MergeStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.sys.SetSystemStatusStatement;
-import
org.apache.iotdb.db.queryengine.plan.statement.sys.pipe.CreatePipeStatement;
-import
org.apache.iotdb.db.queryengine.plan.statement.sys.pipe.DropPipeStatement;
-import
org.apache.iotdb.db.queryengine.plan.statement.sys.pipe.ShowPipesStatement;
-import
org.apache.iotdb.db.queryengine.plan.statement.sys.pipe.StartPipeStatement;
-import
org.apache.iotdb.db.queryengine.plan.statement.sys.pipe.StopPipeStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.sys.quota.SetSpaceQuotaStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.sys.quota.SetThrottleQuotaStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.sys.quota.ShowSpaceQuotaStatement;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index a2c4783e266..1f25a869d04 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -160,6 +160,11 @@ import
org.apache.iotdb.db.queryengine.plan.statement.metadata.ShowDatabaseState
import
org.apache.iotdb.db.queryengine.plan.statement.metadata.ShowRegionStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.metadata.ShowTTLStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.metadata.model.CreateModelStatement;
+import
org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe.CreatePipeStatement;
+import
org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe.DropPipeStatement;
+import
org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe.ShowPipesStatement;
+import
org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe.StartPipeStatement;
+import
org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe.StopPipeStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.metadata.template.AlterSchemaTemplateStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.metadata.template.CreateSchemaTemplateStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.metadata.template.DeactivateTemplateStatement;
@@ -174,11 +179,6 @@ import
org.apache.iotdb.db.queryengine.plan.statement.metadata.view.CreateLogica
import
org.apache.iotdb.db.queryengine.plan.statement.metadata.view.DeleteLogicalViewStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.metadata.view.RenameLogicalViewStatement;
import org.apache.iotdb.db.queryengine.plan.statement.sys.KillQueryStatement;
-import
org.apache.iotdb.db.queryengine.plan.statement.sys.pipe.CreatePipeStatement;
-import
org.apache.iotdb.db.queryengine.plan.statement.sys.pipe.DropPipeStatement;
-import
org.apache.iotdb.db.queryengine.plan.statement.sys.pipe.ShowPipesStatement;
-import
org.apache.iotdb.db.queryengine.plan.statement.sys.pipe.StartPipeStatement;
-import
org.apache.iotdb.db.queryengine.plan.statement.sys.pipe.StopPipeStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.sys.quota.SetSpaceQuotaStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.sys.quota.SetThrottleQuotaStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.sys.quota.ShowSpaceQuotaStatement;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/IConfigTaskExecutor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/IConfigTaskExecutor.java
index a59ada9d88b..895f81cd9be 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/IConfigTaskExecutor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/IConfigTaskExecutor.java
@@ -44,6 +44,11 @@ import
org.apache.iotdb.db.queryengine.plan.statement.metadata.ShowDatabaseState
import
org.apache.iotdb.db.queryengine.plan.statement.metadata.ShowRegionStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.metadata.ShowTTLStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.metadata.model.CreateModelStatement;
+import
org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe.CreatePipeStatement;
+import
org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe.DropPipeStatement;
+import
org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe.ShowPipesStatement;
+import
org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe.StartPipeStatement;
+import
org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe.StopPipeStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.metadata.template.AlterSchemaTemplateStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.metadata.template.CreateSchemaTemplateStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.metadata.template.DeactivateTemplateStatement;
@@ -57,11 +62,6 @@ import
org.apache.iotdb.db.queryengine.plan.statement.metadata.view.AlterLogical
import
org.apache.iotdb.db.queryengine.plan.statement.metadata.view.DeleteLogicalViewStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.metadata.view.RenameLogicalViewStatement;
import org.apache.iotdb.db.queryengine.plan.statement.sys.KillQueryStatement;
-import
org.apache.iotdb.db.queryengine.plan.statement.sys.pipe.CreatePipeStatement;
-import
org.apache.iotdb.db.queryengine.plan.statement.sys.pipe.DropPipeStatement;
-import
org.apache.iotdb.db.queryengine.plan.statement.sys.pipe.ShowPipesStatement;
-import
org.apache.iotdb.db.queryengine.plan.statement.sys.pipe.StartPipeStatement;
-import
org.apache.iotdb.db.queryengine.plan.statement.sys.pipe.StopPipeStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.sys.quota.SetSpaceQuotaStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.sys.quota.SetThrottleQuotaStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.sys.quota.ShowSpaceQuotaStatement;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/CreatePipeTask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/CreatePipeTask.java
index 5d71693f6bc..f0213c7d4f7 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/CreatePipeTask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/CreatePipeTask.java
@@ -22,7 +22,7 @@ package
org.apache.iotdb.db.queryengine.plan.execution.config.sys.pipe;
import org.apache.iotdb.db.queryengine.plan.execution.config.ConfigTaskResult;
import org.apache.iotdb.db.queryengine.plan.execution.config.IConfigTask;
import
org.apache.iotdb.db.queryengine.plan.execution.config.executor.IConfigTaskExecutor;
-import
org.apache.iotdb.db.queryengine.plan.statement.sys.pipe.CreatePipeStatement;
+import
org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe.CreatePipeStatement;
import com.google.common.util.concurrent.ListenableFuture;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/DropPipeTask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/DropPipeTask.java
index ec074a0de87..f3449f36990 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/DropPipeTask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/DropPipeTask.java
@@ -22,7 +22,7 @@ package
org.apache.iotdb.db.queryengine.plan.execution.config.sys.pipe;
import org.apache.iotdb.db.queryengine.plan.execution.config.ConfigTaskResult;
import org.apache.iotdb.db.queryengine.plan.execution.config.IConfigTask;
import
org.apache.iotdb.db.queryengine.plan.execution.config.executor.IConfigTaskExecutor;
-import
org.apache.iotdb.db.queryengine.plan.statement.sys.pipe.DropPipeStatement;
+import
org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe.DropPipeStatement;
import com.google.common.util.concurrent.ListenableFuture;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/ShowPipeTask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/ShowPipeTask.java
index 1e7dc50be2c..e5343e3c8eb 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/ShowPipeTask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/ShowPipeTask.java
@@ -27,7 +27,7 @@ import
org.apache.iotdb.db.queryengine.common.header.DatasetHeaderFactory;
import org.apache.iotdb.db.queryengine.plan.execution.config.ConfigTaskResult;
import org.apache.iotdb.db.queryengine.plan.execution.config.IConfigTask;
import
org.apache.iotdb.db.queryengine.plan.execution.config.executor.IConfigTaskExecutor;
-import
org.apache.iotdb.db.queryengine.plan.statement.sys.pipe.ShowPipesStatement;
+import
org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe.ShowPipesStatement;
import org.apache.iotdb.db.utils.DateTimeUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/StartPipeTask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/StartPipeTask.java
index 9b9539c732f..62b55bb017c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/StartPipeTask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/StartPipeTask.java
@@ -22,7 +22,7 @@ package
org.apache.iotdb.db.queryengine.plan.execution.config.sys.pipe;
import org.apache.iotdb.db.queryengine.plan.execution.config.ConfigTaskResult;
import org.apache.iotdb.db.queryengine.plan.execution.config.IConfigTask;
import
org.apache.iotdb.db.queryengine.plan.execution.config.executor.IConfigTaskExecutor;
-import
org.apache.iotdb.db.queryengine.plan.statement.sys.pipe.StartPipeStatement;
+import
org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe.StartPipeStatement;
import com.google.common.util.concurrent.ListenableFuture;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/StopPipeTask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/StopPipeTask.java
index a0e495cc8bd..c18070a5027 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/StopPipeTask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/StopPipeTask.java
@@ -22,7 +22,7 @@ package
org.apache.iotdb.db.queryengine.plan.execution.config.sys.pipe;
import org.apache.iotdb.db.queryengine.plan.execution.config.ConfigTaskResult;
import org.apache.iotdb.db.queryengine.plan.execution.config.IConfigTask;
import
org.apache.iotdb.db.queryengine.plan.execution.config.executor.IConfigTaskExecutor;
-import
org.apache.iotdb.db.queryengine.plan.statement.sys.pipe.StopPipeStatement;
+import
org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe.StopPipeStatement;
import com.google.common.util.concurrent.ListenableFuture;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java
index c03775b3d39..4ea3445b14b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java
@@ -160,6 +160,11 @@ import
org.apache.iotdb.db.queryengine.plan.statement.metadata.model.CreateModel
import
org.apache.iotdb.db.queryengine.plan.statement.metadata.model.DropModelStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.metadata.model.ShowModelsStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.metadata.model.ShowTrailsStatement;
+import
org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe.CreatePipeStatement;
+import
org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe.DropPipeStatement;
+import
org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe.ShowPipesStatement;
+import
org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe.StartPipeStatement;
+import
org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe.StopPipeStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.metadata.template.ActivateTemplateStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.metadata.template.AlterSchemaTemplateStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.metadata.template.CreateSchemaTemplateStatement;
@@ -185,11 +190,6 @@ import
org.apache.iotdb.db.queryengine.plan.statement.sys.MergeStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.sys.SetSystemStatusStatement;
import org.apache.iotdb.db.queryengine.plan.statement.sys.ShowQueriesStatement;
import org.apache.iotdb.db.queryengine.plan.statement.sys.ShowVersionStatement;
-import
org.apache.iotdb.db.queryengine.plan.statement.sys.pipe.CreatePipeStatement;
-import
org.apache.iotdb.db.queryengine.plan.statement.sys.pipe.DropPipeStatement;
-import
org.apache.iotdb.db.queryengine.plan.statement.sys.pipe.ShowPipesStatement;
-import
org.apache.iotdb.db.queryengine.plan.statement.sys.pipe.StartPipeStatement;
-import
org.apache.iotdb.db.queryengine.plan.statement.sys.pipe.StopPipeStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.sys.quota.SetSpaceQuotaStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.sys.quota.SetThrottleQuotaStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.sys.quota.ShowSpaceQuotaStatement;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java
index 86b98eda96a..372afb78488 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java
@@ -42,21 +42,26 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.Mea
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.view.CreateLogicalViewNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertMultiTabletsNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsOfOneDeviceNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.PipeEnrichedInsertNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.AggregationStep;
import org.apache.iotdb.db.queryengine.plan.statement.StatementNode;
import org.apache.iotdb.db.queryengine.plan.statement.StatementVisitor;
import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering;
import org.apache.iotdb.db.queryengine.plan.statement.crud.DeleteDataStatement;
+import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertBaseStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertMultiTabletsStatement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsOfOneDeviceStatement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
+import
org.apache.iotdb.db.queryengine.plan.statement.crud.PipeEnrichedInsertBaseStatement;
+import
org.apache.iotdb.db.queryengine.plan.statement.crud.PipeEnrichedLoadTsFileStatement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.QueryStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.internal.InternalBatchActivateTemplateStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.internal.InternalCreateMultiTimeSeriesStatement;
@@ -487,12 +492,52 @@ public class LogicalPlanVisitor extends
StatementVisitor<PlanNode, MPPQueryConte
return insertNode;
}
+ @Override
+ public PlanNode visitPipeEnrichedInsert(
+ PipeEnrichedInsertBaseStatement pipeEnrichedInsertBaseStatement,
MPPQueryContext context) {
+ InsertNode insertNode;
+
+ final InsertBaseStatement insertBaseStatement =
+ pipeEnrichedInsertBaseStatement.getInsertBaseStatement();
+ if (insertBaseStatement instanceof InsertRowStatement) {
+ insertNode =
+ (InsertRowNode) visitInsertRow((InsertRowStatement)
insertBaseStatement, context);
+ } else if (insertBaseStatement instanceof InsertRowsStatement) {
+ insertNode =
+ (InsertRowsNode) visitInsertRows((InsertRowsStatement)
insertBaseStatement, context);
+ } else if (insertBaseStatement instanceof InsertRowsOfOneDeviceStatement) {
+ insertNode =
+ (InsertRowsOfOneDeviceNode)
+ visitInsertRowsOfOneDevice(
+ (InsertRowsOfOneDeviceStatement) insertBaseStatement,
context);
+ } else if (insertBaseStatement instanceof InsertTabletStatement) {
+ insertNode =
+ (InsertTabletNode)
+ visitInsertTablet((InsertTabletStatement) insertBaseStatement,
context);
+ } else if (insertBaseStatement instanceof InsertMultiTabletsStatement) {
+ insertNode =
+ (InsertMultiTabletsNode)
+ visitInsertMultiTablets((InsertMultiTabletsStatement)
insertBaseStatement, context);
+ } else {
+ throw new UnsupportedOperationException(
+ "Unsupported insert statement type: " +
insertBaseStatement.getClass().getName());
+ }
+
+ return new PipeEnrichedInsertNode(insertNode);
+ }
+
@Override
public PlanNode visitLoadFile(LoadTsFileStatement loadTsFileStatement,
MPPQueryContext context) {
return new LoadTsFileNode(
context.getQueryId().genPlanNodeId(),
loadTsFileStatement.getResources());
}
+ @Override
+ public PlanNode visitPipeEnrichedLoadFile(
+ PipeEnrichedLoadTsFileStatement pipeEnrichedLoadTsFileStatement,
MPPQueryContext context) {
+ return
visitLoadFile(pipeEnrichedLoadTsFileStatement.getLoadTsFileStatement(),
context);
+ }
+
@Override
public PlanNode visitShowTimeSeries(
ShowTimeSeriesStatement showTimeSeriesStatement, MPPQueryContext
context) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
index d2379d1435d..2364aef9d7e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
@@ -93,6 +93,7 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNod
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsOfOneDeviceNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.PipeEnrichedInsertNode;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import java.io.DataInputStream;
@@ -177,7 +178,9 @@ public enum PlanNodeType {
ROLLBACK_LOGICAL_VIEW_BLACK_LIST((short) 75),
DELETE_LOGICAL_VIEW((short) 76),
LOGICAL_VIEW_SCHEMA_SCAN((short) 77),
- ALTER_LOGICAL_VIEW((short) 78);
+ ALTER_LOGICAL_VIEW((short) 78),
+ PIPE_ENRICHED_INSERT((short) 79),
+ ;
public static final int BYTES = Short.BYTES;
@@ -380,6 +383,8 @@ public enum PlanNodeType {
return LogicalViewSchemaScanNode.deserialize(buffer);
case 78:
return AlterLogicalViewNode.deserialize(buffer);
+ case 79:
+ return PipeEnrichedInsertNode.deserialize(buffer);
default:
throw new IllegalArgumentException("Invalid node type: " + nodeType);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java
index c603af07f0e..e6eb22fc5a2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java
@@ -94,6 +94,7 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNod
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsOfOneDeviceNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.PipeEnrichedInsertNode;
public abstract class PlanVisitor<R, C> {
@@ -423,6 +424,10 @@ public abstract class PlanVisitor<R, C> {
return visitPlan(node, context);
}
+ public R visitPipeEnrichedInsert(PipeEnrichedInsertNode node, C context) {
+ return visitPlan(node, context);
+ }
+
public R visitDeleteData(DeleteDataNode node, C context) {
return visitPlan(node, context);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFileNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFileNode.java
index f304678fc3c..793e6139693 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFileNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFileNode.java
@@ -40,10 +40,6 @@ public class LoadTsFileNode extends WritePlanNode {
private final List<TsFileResource> resources;
- public LoadTsFileNode(PlanNodeId id) {
- this(id, new ArrayList<>());
- }
-
public LoadTsFileNode(PlanNodeId id, List<TsFileResource> resources) {
super(id);
this.resources = resources;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertMultiTabletsNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertMultiTabletsNode.java
index 167019fe361..69af99b48cf 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertMultiTabletsNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertMultiTabletsNode.java
@@ -240,6 +240,12 @@ public class InsertMultiTabletsNode extends InsertNode {
}
}
+ @Override
+ public void markAsGeneratedByPipe() {
+ isGeneratedByPipe = true;
+ insertTabletNodeList.forEach(InsertTabletNode::markAsGeneratedByPipe);
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) return true;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java
index fba7c0574ca..abca512e1e8 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java
@@ -77,6 +77,8 @@ public abstract class InsertNode extends WritePlanNode
implements ComparableCons
protected ProgressIndex progressIndex;
+ protected boolean isGeneratedByPipe = false;
+
protected InsertNode(PlanNodeId id) {
super(id);
}
@@ -169,6 +171,14 @@ public abstract class InsertNode extends WritePlanNode
implements ComparableCons
throw new NotImplementedException("serializeAttributes of InsertNode is
not implemented");
}
+ public boolean isGeneratedByPipe() {
+ return isGeneratedByPipe;
+ }
+
+ public void markAsGeneratedByPipe() {
+ isGeneratedByPipe = true;
+ }
+
// region Serialization methods for WAL
/** Serialized size of measurement schemas, ignoring failed time series */
protected int serializeMeasurementSchemasSize() {
@@ -274,7 +284,7 @@ public abstract class InsertNode extends WritePlanNode
implements ComparableCons
// region progress index
@Override
- public final ProgressIndex getProgressIndex() {
+ public ProgressIndex getProgressIndex() {
return progressIndex;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
index 61949962285..20394f14ae0 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
@@ -202,6 +202,12 @@ public class InsertRowsNode extends InsertNode {
}
}
+ @Override
+ public void markAsGeneratedByPipe() {
+ isGeneratedByPipe = true;
+ insertRowNodeList.forEach(InsertRowNode::markAsGeneratedByPipe);
+ }
+
@Override
public List<WritePlanNode> splitByPartition(Analysis analysis) {
Map<TRegionReplicaSet, InsertRowsNode> splitMap = new HashMap<>();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
index 28b436c6237..a05e2080f95 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
@@ -269,6 +269,12 @@ public class InsertRowsOfOneDeviceNode extends InsertNode {
}
}
+ @Override
+ public void markAsGeneratedByPipe() {
+ isGeneratedByPipe = true;
+ insertRowNodeList.forEach(InsertRowNode::markAsGeneratedByPipe);
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) return true;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/PipeEnrichedInsertNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/PipeEnrichedInsertNode.java
new file mode 100644
index 00000000000..c2988beafe5
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/PipeEnrichedInsertNode.java
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.queryengine.plan.planner.plan.node.write;
+
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.commons.consensus.index.ProgressIndex;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.queryengine.plan.analyze.Analysis;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.WritePlanNode;
+import org.apache.iotdb.db.storageengine.dataregion.memtable.IDeviceID;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class PipeEnrichedInsertNode extends InsertNode {
+
+ private final InsertNode insertNode;
+
+ public PipeEnrichedInsertNode(InsertNode insertNode) {
+ super(insertNode.getPlanNodeId());
+ this.insertNode = insertNode;
+ }
+
+ public InsertNode getInsertNode() {
+ return insertNode;
+ }
+
+ @Override
+ public boolean isGeneratedByPipe() {
+ return insertNode.isGeneratedByPipe();
+ }
+
+ @Override
+ public void markAsGeneratedByPipe() {
+ insertNode.markAsGeneratedByPipe();
+ }
+
+ @Override
+ public PlanNodeId getPlanNodeId() {
+ return insertNode.getPlanNodeId();
+ }
+
+ @Override
+ public void setPlanNodeId(PlanNodeId id) {
+ insertNode.setPlanNodeId(id);
+ }
+
+ @Override
+ public List<PlanNode> getChildren() {
+ return insertNode.getChildren();
+ }
+
+ @Override
+ public void addChild(PlanNode child) {
+ insertNode.addChild(child);
+ }
+
+ @Override
+ public PlanNode clone() {
+ return new PipeEnrichedInsertNode((InsertNode) insertNode.clone());
+ }
+
+ @Override
+ public PlanNode createSubNode(int subNodeId, int startIndex, int endIndex) {
+ return new PipeEnrichedInsertNode(
+ (InsertNode) insertNode.createSubNode(subNodeId, startIndex,
endIndex));
+ }
+
+ @Override
+ public PlanNode cloneWithChildren(List<PlanNode> children) {
+ return new PipeEnrichedInsertNode((InsertNode)
insertNode.cloneWithChildren(children));
+ }
+
+ @Override
+ public int allowedChildCount() {
+ return insertNode.allowedChildCount();
+ }
+
+ @Override
+ public List<String> getOutputColumnNames() {
+ return insertNode.getOutputColumnNames();
+ }
+
+ @Override
+ public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+ return visitor.visitPipeEnrichedInsert(this, context);
+ }
+
+ @Override
+ public List<WritePlanNode> splitByPartition(Analysis analysis) {
+ return insertNode.splitByPartition(analysis).stream()
+ .map(
+ plan ->
+ plan instanceof PipeEnrichedInsertNode
+ ? plan
+ : new PipeEnrichedInsertNode((InsertNode) plan))
+ .collect(Collectors.toList());
+ }
+
+ @Override
+ public TRegionReplicaSet getDataRegionReplicaSet() {
+ return insertNode.getDataRegionReplicaSet();
+ }
+
+ @Override
+ public void setDataRegionReplicaSet(TRegionReplicaSet dataRegionReplicaSet) {
+ insertNode.setDataRegionReplicaSet(dataRegionReplicaSet);
+ }
+
+ @Override
+ public PartialPath getDevicePath() {
+ return insertNode.getDevicePath();
+ }
+
+ @Override
+ public void setDevicePath(PartialPath devicePath) {
+ insertNode.setDevicePath(devicePath);
+ }
+
+ @Override
+ public boolean isAligned() {
+ return insertNode.isAligned();
+ }
+
+ @Override
+ public void setAligned(boolean aligned) {
+ insertNode.setAligned(aligned);
+ }
+
+ @Override
+ public MeasurementSchema[] getMeasurementSchemas() {
+ return insertNode.getMeasurementSchemas();
+ }
+
+ @Override
+ public void setMeasurementSchemas(MeasurementSchema[] measurementSchemas) {
+ insertNode.setMeasurementSchemas(measurementSchemas);
+ }
+
+ @Override
+ public String[] getMeasurements() {
+ return insertNode.getMeasurements();
+ }
+
+ @Override
+ public TSDataType[] getDataTypes() {
+ return insertNode.getDataTypes();
+ }
+
+ @Override
+ public TSDataType getDataType(int index) {
+ return insertNode.getDataType(index);
+ }
+
+ @Override
+ public void setDataTypes(TSDataType[] dataTypes) {
+ insertNode.setDataTypes(dataTypes);
+ }
+
+ @Override
+ public IDeviceID getDeviceID() {
+ return insertNode.getDeviceID();
+ }
+
+ @Override
+ public void setDeviceID(IDeviceID deviceID) {
+ insertNode.setDeviceID(deviceID);
+ }
+
+ @Override
+ public long getSearchIndex() {
+ return insertNode.getSearchIndex();
+ }
+
+ @Override
+ public void setSearchIndex(long searchIndex) {
+ insertNode.setSearchIndex(searchIndex);
+ }
+
+ @Override
+ protected void serializeAttributes(ByteBuffer byteBuffer) {
+ PlanNodeType.PIPE_ENRICHED_INSERT.serialize(byteBuffer);
+ insertNode.serialize(byteBuffer);
+ }
+
+ @Override
+ protected void serializeAttributes(DataOutputStream stream) throws
IOException {
+ PlanNodeType.PIPE_ENRICHED_INSERT.serialize(stream);
+ insertNode.serialize(stream);
+ }
+
+ public static PlanNode deserialize(ByteBuffer buffer) {
+ return new PipeEnrichedInsertNode((InsertNode)
PlanNodeType.deserialize(buffer));
+ }
+
+ @Override
+ public TRegionReplicaSet getRegionReplicaSet() {
+ return insertNode.getRegionReplicaSet();
+ }
+
+ @Override
+ public long getMinTime() {
+ return insertNode.getMinTime();
+ }
+
+ @Override
+ public boolean isSyncFromLeaderWhenUsingIoTConsensus() {
+ return insertNode.isSyncFromLeaderWhenUsingIoTConsensus();
+ }
+
+ @Override
+ public void markFailedMeasurement(int index) {
+ insertNode.markFailedMeasurement(index);
+ }
+
+ @Override
+ public boolean hasValidMeasurements() {
+ return insertNode.hasValidMeasurements();
+ }
+
+ @Override
+ public void setFailedMeasurementNumber(int failedMeasurementNumber) {
+ insertNode.setFailedMeasurementNumber(failedMeasurementNumber);
+ }
+
+ @Override
+ public int getFailedMeasurementNumber() {
+ return insertNode.getFailedMeasurementNumber();
+ }
+
+ @Override
+ public void setProgressIndex(ProgressIndex progressIndex) {
+ insertNode.setProgressIndex(progressIndex);
+ }
+
+ @Override
+ public ProgressIndex getProgressIndex() {
+ return insertNode.getProgressIndex();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ return o instanceof PipeEnrichedInsertNode
+ && insertNode.equals(((PipeEnrichedInsertNode) o).insertNode);
+ }
+
+ @Override
+ public int hashCode() {
+ return insertNode.hashCode();
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java
index 075c6e1433a..620920a01a8 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java
@@ -68,16 +68,19 @@ public class LoadTsFileDispatcherImpl implements
IFragInstanceDispatcher {
private final IClientManager<TEndPoint, SyncDataNodeInternalServiceClient>
internalServiceClientManager;
private final ExecutorService executor;
+ private final boolean isGeneratedByPipe;
private static final String NODE_CONNECTION_ERROR = "can't connect to node
{}";
public LoadTsFileDispatcherImpl(
- IClientManager<TEndPoint, SyncDataNodeInternalServiceClient>
internalServiceClientManager) {
+ IClientManager<TEndPoint, SyncDataNodeInternalServiceClient>
internalServiceClientManager,
+ boolean isGeneratedByPipe) {
this.internalServiceClientManager = internalServiceClientManager;
this.localhostIpAddr =
IoTDBDescriptor.getInstance().getConfig().getInternalAddress();
this.localhostInternalPort =
IoTDBDescriptor.getInstance().getConfig().getInternalPort();
this.executor =
IoTDBThreadPoolFactory.newCachedThreadPool(LoadTsFileDispatcherImpl.class.getName());
+ this.isGeneratedByPipe = isGeneratedByPipe;
}
public void setUuid(String uuid) {
@@ -178,7 +181,8 @@ public class LoadTsFileDispatcherImpl implements
IFragInstanceDispatcher {
StorageEngine.getInstance()
.executeLoadCommand(
LoadTsFileScheduler.LoadCommand.values()[loadCommandReq.commandType],
- loadCommandReq.uuid);
+ loadCommandReq.uuid,
+ loadCommandReq.isSetIsGeneratedByPipe() &&
loadCommandReq.isGeneratedByPipe);
if (!RpcUtils.SUCCESS_STATUS.equals(resultStatus)) {
throw new FragmentInstanceDispatchException(resultStatus);
}
@@ -211,7 +215,8 @@ public class LoadTsFileDispatcherImpl implements
IFragInstanceDispatcher {
.getDataRegion((DataRegionId) groupId)
.loadNewTsFile(
((LoadSingleTsFileNode) planNode).getTsFileResource(),
- ((LoadSingleTsFileNode) planNode).isDeleteAfterLoad());
+ ((LoadSingleTsFileNode) planNode).isDeleteAfterLoad(),
+ isGeneratedByPipe);
} catch (LoadFileException e) {
logger.warn(String.format("Load TsFile Node %s error.", planNode), e);
TSStatus resultStatus = new TSStatus();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
index 9127eead0ad..834829c3849 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
@@ -102,22 +102,24 @@ public class LoadTsFileScheduler implements IScheduler {
private final DataPartitionBatchFetcher partitionFetcher;
private final List<LoadSingleTsFileNode> tsFileNodeList;
private final PlanFragmentId fragmentId;
-
- private Set<TRegionReplicaSet> allReplicaSets;
+ private final Set<TRegionReplicaSet> allReplicaSets;
+ private final boolean isGeneratedByPipe;
public LoadTsFileScheduler(
DistributedQueryPlan distributedQueryPlan,
MPPQueryContext queryContext,
QueryStateMachine stateMachine,
IClientManager<TEndPoint, SyncDataNodeInternalServiceClient>
internalServiceClientManager,
- IPartitionFetcher partitionFetcher) {
+ IPartitionFetcher partitionFetcher,
+ boolean isGeneratedByPipe) {
this.queryContext = queryContext;
this.stateMachine = stateMachine;
this.tsFileNodeList = new ArrayList<>();
this.fragmentId =
distributedQueryPlan.getRootSubPlan().getPlanFragment().getId();
- this.dispatcher = new
LoadTsFileDispatcherImpl(internalServiceClientManager);
+ this.dispatcher = new
LoadTsFileDispatcherImpl(internalServiceClientManager, isGeneratedByPipe);
this.partitionFetcher = new DataPartitionBatchFetcher(partitionFetcher);
this.allReplicaSets = new HashSet<>();
+ this.isGeneratedByPipe = isGeneratedByPipe;
for (FragmentInstance fragmentInstance :
distributedQueryPlan.getInstances()) {
tsFileNodeList.add((LoadSingleTsFileNode)
fragmentInstance.getFragment().getPlanNodeTree());
@@ -279,6 +281,7 @@ public class LoadTsFileScheduler implements IScheduler {
TLoadCommandReq loadCommandReq =
new TLoadCommandReq(
(isFirstPhaseSuccess ? LoadCommand.EXECUTE :
LoadCommand.ROLLBACK).ordinal(), uuid);
+ loadCommandReq.setIsGeneratedByPipe(isGeneratedByPipe);
Future<FragInstanceDispatchResult> dispatchResultFuture =
dispatcher.dispatchCommand(loadCommandReq, allReplicaSets);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementType.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementType.java
index b8089c96e22..b818f763e7e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementType.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementType.java
@@ -64,6 +64,7 @@ public enum StatementType {
BATCH_INSERT_ROWS,
BATCH_INSERT_ONE_DEVICE,
MULTI_BATCH_INSERT,
+ PIPE_ENRICHED_INSERT,
DELETE,
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementVisitor.java
index 0fc4bd25d98..17bee5aa40b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementVisitor.java
@@ -27,6 +27,8 @@ import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
+import
org.apache.iotdb.db.queryengine.plan.statement.crud.PipeEnrichedInsertBaseStatement;
+import
org.apache.iotdb.db.queryengine.plan.statement.crud.PipeEnrichedLoadTsFileStatement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.QueryStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.internal.InternalBatchActivateTemplateStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.internal.InternalCreateMultiTimeSeriesStatement;
@@ -78,6 +80,11 @@ import
org.apache.iotdb.db.queryengine.plan.statement.metadata.model.CreateModel
import
org.apache.iotdb.db.queryengine.plan.statement.metadata.model.DropModelStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.metadata.model.ShowModelsStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.metadata.model.ShowTrailsStatement;
+import
org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe.CreatePipeStatement;
+import
org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe.DropPipeStatement;
+import
org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe.ShowPipesStatement;
+import
org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe.StartPipeStatement;
+import
org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe.StopPipeStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.metadata.template.ActivateTemplateStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.metadata.template.AlterSchemaTemplateStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.metadata.template.BatchActivateTemplateStatement;
@@ -105,11 +112,6 @@ import
org.apache.iotdb.db.queryengine.plan.statement.sys.MergeStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.sys.SetSystemStatusStatement;
import org.apache.iotdb.db.queryengine.plan.statement.sys.ShowQueriesStatement;
import org.apache.iotdb.db.queryengine.plan.statement.sys.ShowVersionStatement;
-import
org.apache.iotdb.db.queryengine.plan.statement.sys.pipe.CreatePipeStatement;
-import
org.apache.iotdb.db.queryengine.plan.statement.sys.pipe.DropPipeStatement;
-import
org.apache.iotdb.db.queryengine.plan.statement.sys.pipe.ShowPipesStatement;
-import
org.apache.iotdb.db.queryengine.plan.statement.sys.pipe.StartPipeStatement;
-import
org.apache.iotdb.db.queryengine.plan.statement.sys.pipe.StopPipeStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.sys.quota.SetSpaceQuotaStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.sys.quota.SetThrottleQuotaStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.sys.quota.ShowSpaceQuotaStatement;
@@ -302,6 +304,34 @@ public abstract class StatementVisitor<R, C> {
return visitStatement(loadTsFileStatement, context);
}
+ public R visitPipeEnrichedLoadFile(
+ PipeEnrichedLoadTsFileStatement pipeEnrichedLoadTsFileStatement, C
context) {
+ return visitStatement(pipeEnrichedLoadTsFileStatement, context);
+ }
+
+ public R visitInsertRow(InsertRowStatement insertRowStatement, C context) {
+ return visitStatement(insertRowStatement, context);
+ }
+
+ public R visitInsertRows(InsertRowsStatement insertRowsStatement, C context)
{
+ return visitStatement(insertRowsStatement, context);
+ }
+
+ public R visitInsertMultiTablets(
+ InsertMultiTabletsStatement insertMultiTabletsStatement, C context) {
+ return visitStatement(insertMultiTabletsStatement, context);
+ }
+
+ public R visitInsertRowsOfOneDevice(
+ InsertRowsOfOneDeviceStatement insertRowsOfOneDeviceStatement, C
context) {
+ return visitStatement(insertRowsOfOneDeviceStatement, context);
+ }
+
+ public R visitPipeEnrichedInsert(
+ PipeEnrichedInsertBaseStatement pipeEnrichedInsertBaseStatement, C
context) {
+ return visitStatement(pipeEnrichedInsertBaseStatement, context);
+ }
+
/** Data Control Language (DCL) */
public R visitAuthor(AuthorStatement authorStatement, C context) {
return visitStatement(authorStatement, context);
@@ -339,24 +369,6 @@ public abstract class StatementVisitor<R, C> {
return visitStatement(countStatement, context);
}
- public R visitInsertRow(InsertRowStatement insertRowStatement, C context) {
- return visitStatement(insertRowStatement, context);
- }
-
- public R visitInsertRows(InsertRowsStatement insertRowsStatement, C context)
{
- return visitStatement(insertRowsStatement, context);
- }
-
- public R visitInsertMultiTablets(
- InsertMultiTabletsStatement insertMultiTabletsStatement, C context) {
- return visitStatement(insertMultiTabletsStatement, context);
- }
-
- public R visitInsertRowsOfOneDevice(
- InsertRowsOfOneDeviceStatement insertRowsOfOneDeviceStatement, C
context) {
- return visitStatement(insertRowsOfOneDeviceStatement, context);
- }
-
public R visitSchemaFetch(SchemaFetchStatement schemaFetchStatement, C
context) {
return visitStatement(schemaFetchStatement, context);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertBaseStatement.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertBaseStatement.java
index d118884cef6..848d5806f53 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertBaseStatement.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertBaseStatement.java
@@ -279,8 +279,7 @@ public abstract class InsertBaseStatement extends Statement
{
*
* @return map from device path to its measurements.
*/
- protected final Map<PartialPath, List<Pair<String, Integer>>>
- getMapFromDeviceToMeasurementAndIndex() {
+ protected Map<PartialPath, List<Pair<String, Integer>>>
getMapFromDeviceToMeasurementAndIndex() {
boolean[] isLogicalView = new boolean[this.measurements.length];
int[] indexMapToLogicalViewList = new int[this.measurements.length];
Arrays.fill(isLogicalView, false);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java
index 381d367afe9..a58da41af62 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java
@@ -69,6 +69,17 @@ public class LoadTsFileStatement extends Statement {
sortTsFiles(tsFiles);
}
+ protected LoadTsFileStatement() {
+ this.file = null;
+ this.databaseLevel =
IoTDBDescriptor.getInstance().getConfig().getDefaultStorageGroupLevel();
+ this.verifySchema = true;
+ this.deleteAfterLoad = true;
+ this.autoCreateDatabase =
IoTDBDescriptor.getInstance().getConfig().isAutoCreateSchemaEnabled();
+ this.tsFiles = new ArrayList<>();
+ this.resources = new ArrayList<>();
+ this.statementType = StatementType.MULTI_BATCH_INSERT;
+ }
+
private void findAllTsFile(File file) {
final File[] files = file.listFiles();
if (files == null) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/PipeEnrichedInsertBaseStatement.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/PipeEnrichedInsertBaseStatement.java
new file mode 100644
index 00000000000..10834b20ecc
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/PipeEnrichedInsertBaseStatement.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.statement.crud;
+
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.exception.metadata.DataTypeMismatchException;
+import org.apache.iotdb.db.exception.metadata.PathNotExistException;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.queryengine.plan.analyze.schema.ISchemaValidation;
+import org.apache.iotdb.db.queryengine.plan.statement.StatementType;
+import org.apache.iotdb.db.queryengine.plan.statement.StatementVisitor;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
+import java.util.List;
+import java.util.Map;
+
+public class PipeEnrichedInsertBaseStatement extends InsertBaseStatement {
+
+ private InsertBaseStatement insertBaseStatement;
+
+ public PipeEnrichedInsertBaseStatement(InsertBaseStatement
insertBaseStatement) {
+ statementType = StatementType.PIPE_ENRICHED_INSERT;
+ this.insertBaseStatement = insertBaseStatement;
+ }
+
+ public InsertBaseStatement getInsertBaseStatement() {
+ return insertBaseStatement;
+ }
+
+ public void setInsertBaseStatement(InsertBaseStatement insertBaseStatement) {
+ this.insertBaseStatement = insertBaseStatement;
+ }
+
+ @Override
+ public <R, C> R accept(StatementVisitor<R, C> visitor, C context) {
+ return visitor.visitPipeEnrichedInsert(this, context);
+ }
+
+ @Override
+ public boolean isDebug() {
+ return insertBaseStatement.isDebug();
+ }
+
+ @Override
+ public void setDebug(boolean debug) {
+ insertBaseStatement.setDebug(debug);
+ }
+
+ @Override
+ public boolean isQuery() {
+ return insertBaseStatement.isQuery();
+ }
+
+ @Override
+ public boolean isAuthenticationRequired() {
+ return insertBaseStatement.isAuthenticationRequired();
+ }
+
+ @Override
+ public PartialPath getDevicePath() {
+ return insertBaseStatement.getDevicePath();
+ }
+
+ @Override
+ public void setDevicePath(PartialPath devicePath) {
+ insertBaseStatement.setDevicePath(devicePath);
+ }
+
+ @Override
+ public String[] getMeasurements() {
+ return insertBaseStatement.getMeasurements();
+ }
+
+ @Override
+ public void setMeasurements(String[] measurements) {
+ insertBaseStatement.setMeasurements(measurements);
+ }
+
+ @Override
+ public MeasurementSchema[] getMeasurementSchemas() {
+ return insertBaseStatement.getMeasurementSchemas();
+ }
+
+ @Override
+ public void setMeasurementSchemas(MeasurementSchema[] measurementSchemas) {
+ insertBaseStatement.setMeasurementSchemas(measurementSchemas);
+ }
+
+ @Override
+ public boolean isAligned() {
+ return insertBaseStatement.isAligned();
+ }
+
+ @Override
+ public void setAligned(boolean aligned) {
+ insertBaseStatement.setAligned(aligned);
+ }
+
+ @Override
+ public TSDataType[] getDataTypes() {
+ return insertBaseStatement.getDataTypes();
+ }
+
+ @Override
+ public void setDataTypes(TSDataType[] dataTypes) {
+ insertBaseStatement.setDataTypes(dataTypes);
+ }
+
+ @Override
+ public List<PartialPath> getPaths() {
+ return insertBaseStatement.getPaths();
+ }
+
+ @Override
+ public void updateAfterSchemaValidation() throws QueryProcessException {
+ insertBaseStatement.updateAfterSchemaValidation();
+ }
+
+ @Override
+ protected void selfCheckDataTypes(int index)
+ throws DataTypeMismatchException, PathNotExistException {
+ insertBaseStatement.selfCheckDataTypes(index);
+ }
+
+ @Override
+ public void markFailedMeasurement(int index, Exception cause) {
+ insertBaseStatement.markFailedMeasurement(index, cause);
+ }
+
+ @Override
+ public boolean hasValidMeasurements() {
+ return insertBaseStatement.hasValidMeasurements();
+ }
+
+ @Override
+ public boolean hasFailedMeasurements() {
+ return insertBaseStatement.hasFailedMeasurements();
+ }
+
+ @Override
+ public int getFailedMeasurementNumber() {
+ return insertBaseStatement.getFailedMeasurementNumber();
+ }
+
+ @Override
+ public List<String> getFailedMeasurements() {
+ return insertBaseStatement.getFailedMeasurements();
+ }
+
+ @Override
+ public List<Exception> getFailedExceptions() {
+ return insertBaseStatement.getFailedExceptions();
+ }
+
+ @Override
+ public List<String> getFailedMessages() {
+ return insertBaseStatement.getFailedMessages();
+ }
+
+ @Override
+ public void setFailedMeasurementIndex2Info(
+ Map<Integer, InsertBaseStatement.FailedMeasurementInfo>
failedMeasurementIndex2Info) {
+
insertBaseStatement.setFailedMeasurementIndex2Info(failedMeasurementIndex2Info);
+ }
+
+ @Override
+ protected Map<PartialPath, List<Pair<String, Integer>>>
getMapFromDeviceToMeasurementAndIndex() {
+ return insertBaseStatement.getMapFromDeviceToMeasurementAndIndex();
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return insertBaseStatement.isEmpty();
+ }
+
+ @Override
+ public ISchemaValidation getSchemaValidation() {
+ return insertBaseStatement.getSchemaValidation();
+ }
+
+ @Override
+ public List<ISchemaValidation> getSchemaValidationList() {
+ return insertBaseStatement.getSchemaValidationList();
+ }
+
+ @Override
+ protected boolean checkAndCastDataType(int columnIndex, TSDataType dataType)
{
+ return insertBaseStatement.checkAndCastDataType(columnIndex, dataType);
+ }
+
+ @Override
+ public long getMinTime() {
+ return insertBaseStatement.getMinTime();
+ }
+
+ @Override
+ public Object getFirstValueOfIndex(int index) {
+ return insertBaseStatement.getFirstValueOfIndex(index);
+ }
+
+ @Override
+ public InsertBaseStatement removeLogicalView() {
+ return new
PipeEnrichedInsertBaseStatement(insertBaseStatement.removeLogicalView());
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/PipeEnrichedLoadTsFileStatement.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/PipeEnrichedLoadTsFileStatement.java
new file mode 100644
index 00000000000..c25749c4838
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/PipeEnrichedLoadTsFileStatement.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.statement.crud;
+
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.queryengine.plan.statement.StatementType;
+import org.apache.iotdb.db.queryengine.plan.statement.StatementVisitor;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+
+import java.io.File;
+import java.util.List;
+
+public class PipeEnrichedLoadTsFileStatement extends LoadTsFileStatement {
+
+ private final LoadTsFileStatement loadTsFileStatement;
+
+ public PipeEnrichedLoadTsFileStatement(LoadTsFileStatement
loadTsFileStatement) {
+ this.loadTsFileStatement = loadTsFileStatement;
+ statementType = StatementType.MULTI_BATCH_INSERT;
+ }
+
+ public LoadTsFileStatement getLoadTsFileStatement() {
+ return loadTsFileStatement;
+ }
+
+ @Override
+ public boolean isDebug() {
+ return loadTsFileStatement.isDebug();
+ }
+
+ @Override
+ public void setDebug(boolean debug) {
+ loadTsFileStatement.setDebug(debug);
+ }
+
+ @Override
+ public boolean isQuery() {
+ return loadTsFileStatement.isQuery();
+ }
+
+ @Override
+ public boolean isAuthenticationRequired() {
+ return loadTsFileStatement.isAuthenticationRequired();
+ }
+
+ @Override
+ public void setDeleteAfterLoad(boolean deleteAfterLoad) {
+ loadTsFileStatement.setDeleteAfterLoad(deleteAfterLoad);
+ }
+
+ @Override
+ public void setDatabaseLevel(int databaseLevel) {
+ loadTsFileStatement.setDatabaseLevel(databaseLevel);
+ }
+
+ @Override
+ public void setVerifySchema(boolean verifySchema) {
+ loadTsFileStatement.setVerifySchema(verifySchema);
+ }
+
+ @Override
+ public void setAutoCreateDatabase(boolean autoCreateDatabase) {
+ loadTsFileStatement.setAutoCreateDatabase(autoCreateDatabase);
+ }
+
+ @Override
+ public boolean isVerifySchema() {
+ return loadTsFileStatement.isVerifySchema();
+ }
+
+ @Override
+ public boolean isDeleteAfterLoad() {
+ return loadTsFileStatement.isDeleteAfterLoad();
+ }
+
+ @Override
+ public boolean isAutoCreateDatabase() {
+ return loadTsFileStatement.isAutoCreateDatabase();
+ }
+
+ @Override
+ public int getDatabaseLevel() {
+ return loadTsFileStatement.getDatabaseLevel();
+ }
+
+ @Override
+ public List<File> getTsFiles() {
+ return loadTsFileStatement.getTsFiles();
+ }
+
+ @Override
+ public void addTsFileResource(TsFileResource resource) {
+ loadTsFileStatement.addTsFileResource(resource);
+ }
+
+ @Override
+ public List<TsFileResource> getResources() {
+ return loadTsFileStatement.getResources();
+ }
+
+ @Override
+ public List<PartialPath> getPaths() {
+ return loadTsFileStatement.getPaths();
+ }
+
+ @Override
+ public <R, C> R accept(StatementVisitor<R, C> visitor, C context) {
+ return visitor.visitPipeEnrichedLoadFile(this, context);
+ }
+
+ @Override
+ public String toString() {
+ return "PipeEnrichedLoadTsFileStatement{" + "loadTsFileStatement=" +
loadTsFileStatement + '}';
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/sys/pipe/CreatePipeStatement.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/CreatePipeStatement.java
similarity index 97%
rename from
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/sys/pipe/CreatePipeStatement.java
rename to
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/CreatePipeStatement.java
index d58eb871bac..8737969d724 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/sys/pipe/CreatePipeStatement.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/CreatePipeStatement.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.db.queryengine.plan.statement.sys.pipe;
+package org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.queryengine.plan.analyze.QueryType;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/sys/pipe/DropPipeStatement.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/DropPipeStatement.java
similarity index 96%
rename from
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/sys/pipe/DropPipeStatement.java
rename to
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/DropPipeStatement.java
index 7c8dbbf4358..e5a1717123a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/sys/pipe/DropPipeStatement.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/DropPipeStatement.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.db.queryengine.plan.statement.sys.pipe;
+package org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.queryengine.plan.analyze.QueryType;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/sys/pipe/ShowPipesStatement.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/ShowPipesStatement.java
similarity index 96%
rename from
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/sys/pipe/ShowPipesStatement.java
rename to
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/ShowPipesStatement.java
index 84164e012be..0c7d209516c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/sys/pipe/ShowPipesStatement.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/ShowPipesStatement.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.db.queryengine.plan.statement.sys.pipe;
+package org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe;
import org.apache.iotdb.db.queryengine.plan.analyze.QueryType;
import org.apache.iotdb.db.queryengine.plan.statement.IConfigStatement;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/sys/pipe/StartPipeStatement.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/StartPipeStatement.java
similarity index 96%
rename from
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/sys/pipe/StartPipeStatement.java
rename to
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/StartPipeStatement.java
index b908e9a9b35..54fa1fd97cf 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/sys/pipe/StartPipeStatement.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/StartPipeStatement.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.db.queryengine.plan.statement.sys.pipe;
+package org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.queryengine.plan.analyze.QueryType;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/sys/pipe/StopPipeStatement.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/StopPipeStatement.java
similarity index 96%
rename from
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/sys/pipe/StopPipeStatement.java
rename to
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/StopPipeStatement.java
index 989bf45bc5f..5dc50ee9eae 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/sys/pipe/StopPipeStatement.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/StopPipeStatement.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.db.queryengine.plan.statement.sys.pipe;
+package org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.queryengine.plan.analyze.QueryType;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
index 9fc9b49c38d..d1b8cd8c1ff 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
@@ -796,13 +796,14 @@ public class StorageEngine implements IService {
return RpcUtils.SUCCESS_STATUS;
}
- public TSStatus executeLoadCommand(LoadTsFileScheduler.LoadCommand
loadCommand, String uuid) {
+ public TSStatus executeLoadCommand(
+ LoadTsFileScheduler.LoadCommand loadCommand, String uuid, boolean
isGeneratedByPipe) {
TSStatus status = new TSStatus();
try {
switch (loadCommand) {
case EXECUTE:
- if (getLoadTsFileManager().loadAll(uuid)) {
+ if (getLoadTsFileManager().loadAll(uuid, isGeneratedByPipe)) {
status = RpcUtils.SUCCESS_STATUS;
} else {
status.setCode(TSStatusCode.LOAD_FILE_ERROR.getStatusCode());
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index 0011db21534..95ffb6264a7 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -2197,8 +2197,10 @@ public class DataRegion implements IDataRegionForQuery {
*
* @param newTsFileResource tsfile resource @UsedBy load external tsfile
module
* @param deleteOriginFile whether to delete origin tsfile
+ * @param isGeneratedByPipe whether the load tsfile request is generated by
pipe
*/
- public void loadNewTsFile(TsFileResource newTsFileResource, boolean
deleteOriginFile)
+ public void loadNewTsFile(
+ TsFileResource newTsFileResource, boolean deleteOriginFile, boolean
isGeneratedByPipe)
throws LoadFileException {
File tsfileToBeInserted = newTsFileResource.getTsFile();
long newFilePartitionId = newTsFileResource.getTimePartitionWithCheck();
@@ -2223,7 +2225,8 @@ public class DataRegion implements IDataRegionForQuery {
loadTsFileToUnSequence(
tsfileToBeInserted, newTsFileResource, newFilePartitionId,
deleteOriginFile);
- PipeInsertionDataNodeListener.getInstance().listenToTsFile(dataRegionId,
newTsFileResource);
+ PipeInsertionDataNodeListener.getInstance()
+ .listenToTsFile(dataRegionId, newTsFileResource, isGeneratedByPipe);
FileMetrics.getInstance()
.addFile(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
index dc7086dde70..45e755debb3 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
@@ -893,7 +893,8 @@ public class TsFileProcessor {
try {
PipeAgent.runtime().assignSimpleProgressIndexIfNeeded(tsFileResource);
PipeInsertionDataNodeListener.getInstance()
- .listenToTsFile(dataRegionInfo.getDataRegion().getDataRegionId(),
tsFileResource);
+ .listenToTsFile(
+ dataRegionInfo.getDataRegion().getDataRegionId(),
tsFileResource, false);
// When invoke closing TsFile after insert data to memTable, we
shouldn't flush until invoke
// flushing memTable in System module.
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/quotas/DataNodeThrottleQuotaManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/quotas/DataNodeThrottleQuotaManager.java
index bb07e4457de..70037a99089 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/quotas/DataNodeThrottleQuotaManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/quotas/DataNodeThrottleQuotaManager.java
@@ -85,6 +85,7 @@ public class DataNodeThrottleQuotaManager {
case BATCH_INSERT_ONE_DEVICE:
case BATCH_INSERT_ROWS:
case MULTI_BATCH_INSERT:
+ case PIPE_ENRICHED_INSERT:
return checkQuota(userName, 1, 0, s);
case QUERY:
case GROUP_BY_TIME:
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/quotas/DefaultOperationQuota.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/quotas/DefaultOperationQuota.java
index db9c12114e8..b6b714b5de2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/quotas/DefaultOperationQuota.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/quotas/DefaultOperationQuota.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.storageengine.rescon.quotas;
import org.apache.iotdb.commons.exception.RpcThrottlingException;
import org.apache.iotdb.db.queryengine.plan.statement.Statement;
+import org.apache.iotdb.db.queryengine.plan.statement.StatementType;
import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertMultiTabletsStatement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsOfOneDeviceStatement;
@@ -28,6 +29,8 @@ import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
+import
org.apache.iotdb.db.queryengine.plan.statement.crud.PipeEnrichedInsertBaseStatement;
+import
org.apache.iotdb.db.queryengine.plan.statement.crud.PipeEnrichedLoadTsFileStatement;
import org.apache.iotdb.db.utils.TypeInferenceUtils;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.utils.BitMap;
@@ -92,7 +95,11 @@ public class DefaultOperationQuota implements OperationQuota
{
protected void updateEstimateConsumeQuota(int numWrites, int numReads,
Statement s) {
if (numWrites > 0) {
long avgSize = 0;
- switch (s.getType()) {
+ final StatementType statementType =
+ s.getType() == StatementType.PIPE_ENRICHED_INSERT
+ ? ((PipeEnrichedInsertBaseStatement)
s).getInsertBaseStatement().getType()
+ : s.getType();
+ switch (statementType) {
case INSERT:
// InsertStatement InsertRowStatement
if (s instanceof InsertStatement) {
@@ -131,7 +138,10 @@ public class DefaultOperationQuota implements
OperationQuota {
}
break;
case MULTI_BATCH_INSERT:
- // LoadTsFileStatement InsertMultiTabletsStatement
+ // PipeEnrichedLoadTsFileStatement LoadTsFileStatement
InsertMultiTabletsStatement
+ if (s instanceof PipeEnrichedLoadTsFileStatement) {
+ s = ((PipeEnrichedLoadTsFileStatement) s).getLoadTsFileStatement();
+ }
if (s instanceof LoadTsFileStatement) {
LoadTsFileStatement loadTsFileStatement = (LoadTsFileStatement) s;
for (int i = 0; i < loadTsFileStatement.getResources().size();
i++) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/trigger/executor/TriggerFireVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/trigger/executor/TriggerFireVisitor.java
index 1d91ec43ddf..68f87bcc86f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/trigger/executor/TriggerFireVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/trigger/executor/TriggerFireVisitor.java
@@ -42,6 +42,7 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNod
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsOfOneDeviceNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.PipeEnrichedInsertNode;
import org.apache.iotdb.db.trigger.service.TriggerManagementService;
import org.apache.iotdb.mpp.rpc.thrift.TFireTriggerReq;
import org.apache.iotdb.mpp.rpc.thrift.TFireTriggerResp;
@@ -249,6 +250,25 @@ public class TriggerFireVisitor extends
PlanVisitor<TriggerFireResult, TriggerEv
return hasFailedTrigger ? TriggerFireResult.FAILED_NO_TERMINATION :
TriggerFireResult.SUCCESS;
}
+ @Override
+ public TriggerFireResult visitPipeEnrichedInsert(
+ PipeEnrichedInsertNode node, TriggerEvent context) {
+ final InsertNode realInsertNode = node.getInsertNode();
+ if (realInsertNode instanceof InsertRowNode) {
+ return visitInsertRow((InsertRowNode) realInsertNode, context);
+ } else if (realInsertNode instanceof InsertTabletNode) {
+ return visitInsertTablet((InsertTabletNode) realInsertNode, context);
+ } else if (realInsertNode instanceof InsertRowsNode) {
+ return visitInsertRows((InsertRowsNode) realInsertNode, context);
+ } else if (realInsertNode instanceof InsertMultiTabletsNode) {
+ return visitInsertMultiTablets((InsertMultiTabletsNode) realInsertNode,
context);
+ } else if (realInsertNode instanceof InsertRowsOfOneDeviceNode) {
+ return visitInsertRowsOfOneDevice((InsertRowsOfOneDeviceNode)
realInsertNode, context);
+ } else {
+ return visitPlan(realInsertNode, context);
+ }
+ }
+
private Map<String, Integer> constructMeasurementToSchemaIndexMap(
String[] measurements, MeasurementSchema[] schemas) {
// The index of measurement and schema is the same now.
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/extractor/PipeRealtimeExtractTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/extractor/PipeRealtimeExtractTest.java
index f6dabc9b81d..1900ff539bb 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/extractor/PipeRealtimeExtractTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/extractor/PipeRealtimeExtractTest.java
@@ -286,7 +286,8 @@ public class PipeRealtimeExtractTest {
null,
false),
resource);
-
PipeInsertionDataNodeListener.getInstance().listenToTsFile(dataRegionId,
resource);
+ PipeInsertionDataNodeListener.getInstance()
+ .listenToTsFile(dataRegionId, resource, false);
}
});
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/statement/sys/pipe/PipeStatementTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/statement/sys/pipe/PipeStatementTest.java
index f3c982ac5d3..dd35996e37e 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/statement/sys/pipe/PipeStatementTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/statement/sys/pipe/PipeStatementTest.java
@@ -21,11 +21,11 @@ package
org.apache.iotdb.db.queryengine.plan.plan.statement.sys.pipe;
import org.apache.iotdb.db.queryengine.plan.analyze.QueryType;
import org.apache.iotdb.db.queryengine.plan.statement.StatementType;
-import
org.apache.iotdb.db.queryengine.plan.statement.sys.pipe.CreatePipeStatement;
-import
org.apache.iotdb.db.queryengine.plan.statement.sys.pipe.DropPipeStatement;
-import
org.apache.iotdb.db.queryengine.plan.statement.sys.pipe.ShowPipesStatement;
-import
org.apache.iotdb.db.queryengine.plan.statement.sys.pipe.StartPipeStatement;
-import
org.apache.iotdb.db.queryengine.plan.statement.sys.pipe.StopPipeStatement;
+import
org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe.CreatePipeStatement;
+import
org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe.DropPipeStatement;
+import
org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe.ShowPipesStatement;
+import
org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe.StartPipeStatement;
+import
org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe.StopPipeStatement;
import org.junit.Assert;
import org.junit.Test;
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileSchedulerTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileSchedulerTest.java
index 741db029ea0..161a95b4ec9 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileSchedulerTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileSchedulerTest.java
@@ -61,7 +61,8 @@ public class LoadTsFileSchedulerTest {
mock(MPPQueryContext.class),
mock(QueryStateMachine.class),
mock(IClientManager.class),
- mock(IPartitionFetcher.class)));
+ mock(IPartitionFetcher.class),
+ false));
t.start();
Assert.assertNull(t.getTotalCpuTime());
Assert.assertNull(t.getFragmentInfo());
diff --git a/iotdb-protocol/thrift/src/main/thrift/datanode.thrift
b/iotdb-protocol/thrift/src/main/thrift/datanode.thrift
index 30e1e0d676f..1268c399c5f 100644
--- a/iotdb-protocol/thrift/src/main/thrift/datanode.thrift
+++ b/iotdb-protocol/thrift/src/main/thrift/datanode.thrift
@@ -313,6 +313,7 @@ struct TTsFilePieceReq{
struct TLoadCommandReq{
1: required i32 commandType
2: required string uuid
+ 3: optional bool isGeneratedByPipe
}
struct TLoadResp{