This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch pipe-new-sql in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 8d8842dd2bac1babbb704556dc0062dbc32f126e Author: Steve Yurong Su <[email protected]> AuthorDate: Wed Oct 9 15:26:17 2024 +0800 Add table pattern in enriched events --- .../pipe/event/PipeConfigRegionSnapshotEvent.java | 26 ++++++++++++++---- .../pipe/event/PipeConfigRegionWritePlanEvent.java | 13 +++++---- .../agent/task/connection/PipeEventCollector.java | 5 ++-- .../db/pipe/event/UserDefinedEnrichedEvent.java | 9 ++++-- .../event/common/heartbeat/PipeHeartbeatEvent.java | 8 ++++-- .../schema/PipeSchemaRegionSnapshotEvent.java | 20 ++++++++++---- .../schema/PipeSchemaRegionWritePlanEvent.java | 19 +++++++++---- .../tablet/PipeInsertNodeTabletInsertionEvent.java | 13 ++++++--- .../common/tablet/PipeRawTabletInsertionEvent.java | 32 ++++++++++++++++------ .../event/common/terminate/PipeTerminateEvent.java | 6 ++-- .../common/tsfile/PipeTsFileInsertionEvent.java | 13 ++++++--- .../db/pipe/event/realtime/PipeRealtimeEvent.java | 28 ++++++++++++++----- .../event/realtime/PipeRealtimeEventFactory.java | 6 ++-- .../PipeHistoricalDataRegionTsFileExtractor.java | 2 +- .../realtime/assigner/PipeDataRegionAssigner.java | 3 +- .../realtime/epoch/TsFileEpochManager.java | 6 ++-- .../pattern/CachedSchemaPatternMatcherTest.java | 2 +- .../iotdb/commons/pipe/event/EnrichedEvent.java | 13 +++++++-- .../commons/pipe/event/PipeSnapshotEvent.java | 13 +++++++-- .../commons/pipe/event/PipeWritePlanEvent.java | 13 +++++++-- .../commons/pipe/event/ProgressReportEvent.java | 9 ++++-- .../extractor/IoTDBNonDataRegionExtractor.java | 17 ++++++++++-- 22 files changed, 204 insertions(+), 72 deletions(-) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/event/PipeConfigRegionSnapshotEvent.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/event/PipeConfigRegionSnapshotEvent.java index 76ff5ed2083..8705d577942 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/event/PipeConfigRegionSnapshotEvent.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/event/PipeConfigRegionSnapshotEvent.java @@ -20,6 +20,7 @@ package org.apache.iotdb.confignode.manager.pipe.event; import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta; +import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern; import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern; import org.apache.iotdb.commons.pipe.event.EnrichedEvent; import org.apache.iotdb.commons.pipe.event.PipeSnapshotEvent; @@ -90,7 +91,7 @@ public class PipeConfigRegionSnapshotEvent extends PipeSnapshotEvent { public PipeConfigRegionSnapshotEvent( final String snapshotPath, final String templateFilePath, final CNSnapshotFileType type) { - this(snapshotPath, templateFilePath, type, null, 0, null, null); + this(snapshotPath, templateFilePath, type, null, 0, null, null, null); } public PipeConfigRegionSnapshotEvent( @@ -100,8 +101,15 @@ public class PipeConfigRegionSnapshotEvent extends PipeSnapshotEvent { final String pipeName, final long creationTime, final PipeTaskMeta pipeTaskMeta, - final TreePattern pattern) { - super(pipeName, creationTime, pipeTaskMeta, pattern, PipeConfigNodeResourceManager.snapshot()); + final TreePattern treePattern, + final TablePattern tablePattern) { + super( + pipeName, + creationTime, + pipeTaskMeta, + treePattern, + tablePattern, + PipeConfigNodeResourceManager.snapshot()); this.snapshotPath = snapshotPath; this.templateFilePath = Objects.nonNull(templateFilePath) ? templateFilePath : ""; this.fileType = type; @@ -160,11 +168,19 @@ public class PipeConfigRegionSnapshotEvent extends PipeSnapshotEvent { final String pipeName, final long creationTime, final PipeTaskMeta pipeTaskMeta, - final TreePattern pattern, + final TreePattern treePattern, + final TablePattern tablePattern, final long startTime, final long endTime) { return new PipeConfigRegionSnapshotEvent( - snapshotPath, templateFilePath, fileType, pipeName, creationTime, pipeTaskMeta, pattern); + snapshotPath, + templateFilePath, + fileType, + pipeName, + creationTime, + pipeTaskMeta, + treePattern, + tablePattern); } @Override diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/event/PipeConfigRegionWritePlanEvent.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/event/PipeConfigRegionWritePlanEvent.java index a6bddf40c0a..241c54c1f01 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/event/PipeConfigRegionWritePlanEvent.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/event/PipeConfigRegionWritePlanEvent.java @@ -20,6 +20,7 @@ package org.apache.iotdb.confignode.manager.pipe.event; import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta; +import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern; import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern; import org.apache.iotdb.commons.pipe.event.EnrichedEvent; import org.apache.iotdb.commons.pipe.event.PipeWritePlanEvent; @@ -41,7 +42,7 @@ public class PipeConfigRegionWritePlanEvent extends PipeWritePlanEvent { public PipeConfigRegionWritePlanEvent( final ConfigPhysicalPlan configPhysicalPlan, final boolean isGeneratedByPipe) { - this(configPhysicalPlan, null, 0, null, null, isGeneratedByPipe); + this(configPhysicalPlan, null, 0, null, null, null, isGeneratedByPipe); } public PipeConfigRegionWritePlanEvent( @@ -49,9 +50,10 @@ public class PipeConfigRegionWritePlanEvent extends PipeWritePlanEvent { final String pipeName, final long creationTime, final PipeTaskMeta pipeTaskMeta, - final TreePattern pattern, + final TreePattern treePattern, + final TablePattern tablePattern, final boolean isGeneratedByPipe) { - super(pipeName, creationTime, pipeTaskMeta, pattern, isGeneratedByPipe); + super(pipeName, creationTime, pipeTaskMeta, treePattern, tablePattern, isGeneratedByPipe); this.configPhysicalPlan = configPhysicalPlan; } @@ -64,11 +66,12 @@ public class PipeConfigRegionWritePlanEvent extends PipeWritePlanEvent { final String pipeName, final long creationTime, final PipeTaskMeta pipeTaskMeta, - final TreePattern pattern, + final TreePattern treePattern, + final TablePattern tablePattern, final long startTime, final long endTime) { return new PipeConfigRegionWritePlanEvent( - configPhysicalPlan, pipeName, creationTime, pipeTaskMeta, pattern, false); + configPhysicalPlan, pipeName, creationTime, pipeTaskMeta, treePattern, tablePattern, false); } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java index f9341b95ec4..5ff7a87bc2c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java @@ -145,7 +145,7 @@ public class PipeEventCollector implements EventCollector { // Only used by events containing delete data node, no need to bind progress index here since // delete data event does not have progress index currently IoTDBSchemaRegionExtractor.PATTERN_PARSE_VISITOR - .process(deleteDataEvent.getPlanNode(), (IoTDBTreePattern) deleteDataEvent.getPipePattern()) + .process(deleteDataEvent.getPlanNode(), (IoTDBTreePattern) deleteDataEvent.getTreePattern()) .map( planNode -> new PipeSchemaRegionWritePlanEvent( @@ -153,7 +153,8 @@ public class PipeEventCollector implements EventCollector { deleteDataEvent.getPipeName(), deleteDataEvent.getCreationTime(), deleteDataEvent.getPipeTaskMeta(), - deleteDataEvent.getPipePattern(), + deleteDataEvent.getTreePattern(), + deleteDataEvent.getTablePattern(), deleteDataEvent.isGeneratedByPipe())) .ifPresent( event -> { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/UserDefinedEnrichedEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/UserDefinedEnrichedEvent.java index 23ea2c561e4..ea299f568d1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/UserDefinedEnrichedEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/UserDefinedEnrichedEvent.java @@ -21,6 +21,7 @@ package org.apache.iotdb.db.pipe.event; import org.apache.iotdb.commons.consensus.index.ProgressIndex; import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta; +import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern; import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern; import org.apache.iotdb.commons.pipe.event.EnrichedEvent; import org.apache.iotdb.pipe.api.event.Event; @@ -45,7 +46,8 @@ public class UserDefinedEnrichedEvent extends EnrichedEvent { enrichedEvent.getPipeName(), enrichedEvent.getCreationTime(), enrichedEvent.getPipeTaskMeta(), - enrichedEvent.getPipePattern(), + enrichedEvent.getTreePattern(), + enrichedEvent.getTablePattern(), enrichedEvent.getStartTime(), enrichedEvent.getEndTime()); this.userDefinedEvent = userDefinedEvent; @@ -76,11 +78,12 @@ public class UserDefinedEnrichedEvent extends EnrichedEvent { final String pipeName, final long creationTime, final PipeTaskMeta pipeTaskMeta, - final TreePattern pattern, + final TreePattern treePattern, + final TablePattern tablePattern, final long startTime, final long endTime) { return enrichedEvent.shallowCopySelfAndBindPipeTaskMetaForProgressReport( - pipeName, creationTime, pipeTaskMeta, pattern, startTime, endTime); + pipeName, creationTime, pipeTaskMeta, treePattern, tablePattern, startTime, endTime); } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java index de40212b656..c99f192d190 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java @@ -23,6 +23,7 @@ import org.apache.iotdb.commons.consensus.index.ProgressIndex; import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex; import org.apache.iotdb.commons.pipe.agent.task.connection.UnboundedBlockingPendingQueue; import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta; +import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern; import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern; import org.apache.iotdb.commons.pipe.event.EnrichedEvent; import org.apache.iotdb.db.pipe.metric.PipeHeartbeatEventMetrics; @@ -59,7 +60,7 @@ public class PipeHeartbeatEvent extends EnrichedEvent { private final boolean shouldPrintMessage; public PipeHeartbeatEvent(final String dataRegionId, final boolean shouldPrintMessage) { - super(null, 0, null, null, Long.MIN_VALUE, Long.MAX_VALUE); + super(null, 0, null, null, null, Long.MIN_VALUE, Long.MAX_VALUE); this.dataRegionId = dataRegionId; this.shouldPrintMessage = shouldPrintMessage; } @@ -71,7 +72,7 @@ public class PipeHeartbeatEvent extends EnrichedEvent { final String dataRegionId, final long timePublished, final boolean shouldPrintMessage) { - super(pipeName, creationTime, pipeTaskMeta, null, Long.MIN_VALUE, Long.MAX_VALUE); + super(pipeName, creationTime, pipeTaskMeta, null, null, Long.MIN_VALUE, Long.MAX_VALUE); this.dataRegionId = dataRegionId; this.timePublished = timePublished; this.shouldPrintMessage = shouldPrintMessage; @@ -102,7 +103,8 @@ public class PipeHeartbeatEvent extends EnrichedEvent { final String pipeName, final long creationTime, final PipeTaskMeta pipeTaskMeta, - final TreePattern pattern, + final TreePattern treePattern, + final TablePattern tablePattern, final long startTime, final long endTime) { // Should record PipeTaskMeta, for sometimes HeartbeatEvents should report exceptions. diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/schema/PipeSchemaRegionSnapshotEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/schema/PipeSchemaRegionSnapshotEvent.java index 63152ea8032..39f6aa30403 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/schema/PipeSchemaRegionSnapshotEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/schema/PipeSchemaRegionSnapshotEvent.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.pipe.event.common.schema; import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta; +import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern; import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern; import org.apache.iotdb.commons.pipe.event.EnrichedEvent; import org.apache.iotdb.commons.pipe.event.PipeSnapshotEvent; @@ -78,7 +79,7 @@ public class PipeSchemaRegionSnapshotEvent extends PipeSnapshotEvent { public PipeSchemaRegionSnapshotEvent( final String mTreeSnapshotPath, final String tagLogSnapshotPath, final String databaseName) { - this(mTreeSnapshotPath, tagLogSnapshotPath, databaseName, null, 0, null, null); + this(mTreeSnapshotPath, tagLogSnapshotPath, databaseName, null, 0, null, null, null); } public PipeSchemaRegionSnapshotEvent( @@ -88,8 +89,15 @@ public class PipeSchemaRegionSnapshotEvent extends PipeSnapshotEvent { final String pipeName, final long creationTime, final PipeTaskMeta pipeTaskMeta, - final TreePattern pattern) { - super(pipeName, creationTime, pipeTaskMeta, pattern, PipeDataNodeResourceManager.snapshot()); + final TreePattern treePattern, + final TablePattern tablePattern) { + super( + pipeName, + creationTime, + pipeTaskMeta, + treePattern, + tablePattern, + PipeDataNodeResourceManager.snapshot()); this.mTreeSnapshotPath = mTreeSnapshotPath; this.tagLogSnapshotPath = Objects.nonNull(tagLogSnapshotPath) ? tagLogSnapshotPath : ""; this.databaseName = databaseName; @@ -148,7 +156,8 @@ public class PipeSchemaRegionSnapshotEvent extends PipeSnapshotEvent { final String pipeName, final long creationTime, final PipeTaskMeta pipeTaskMeta, - final TreePattern pattern, + final TreePattern treePattern, + final TablePattern tablePattern, final long startTime, final long endTime) { return new PipeSchemaRegionSnapshotEvent( @@ -158,7 +167,8 @@ public class PipeSchemaRegionSnapshotEvent extends PipeSnapshotEvent { pipeName, creationTime, pipeTaskMeta, - pattern); + treePattern, + tablePattern); } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/schema/PipeSchemaRegionWritePlanEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/schema/PipeSchemaRegionWritePlanEvent.java index 0014c2038f6..3499291a2f6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/schema/PipeSchemaRegionWritePlanEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/schema/PipeSchemaRegionWritePlanEvent.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.pipe.event.common.schema; import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta; +import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern; import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern; import org.apache.iotdb.commons.pipe.event.EnrichedEvent; import org.apache.iotdb.commons.pipe.event.PipeWritePlanEvent; @@ -40,7 +41,7 @@ public class PipeSchemaRegionWritePlanEvent extends PipeWritePlanEvent { } public PipeSchemaRegionWritePlanEvent(final PlanNode planNode, final boolean isGeneratedByPipe) { - this(planNode, null, 0, null, null, isGeneratedByPipe); + this(planNode, null, 0, null, null, null, isGeneratedByPipe); } public PipeSchemaRegionWritePlanEvent( @@ -48,9 +49,10 @@ public class PipeSchemaRegionWritePlanEvent extends PipeWritePlanEvent { final String pipeName, final long creationTime, final PipeTaskMeta pipeTaskMeta, - final TreePattern pattern, + final TreePattern treePattern, + final TablePattern tablePattern, final boolean isGeneratedByPipe) { - super(pipeName, creationTime, pipeTaskMeta, pattern, isGeneratedByPipe); + super(pipeName, creationTime, pipeTaskMeta, treePattern, tablePattern, isGeneratedByPipe); this.planNode = planNode; } @@ -63,11 +65,18 @@ public class PipeSchemaRegionWritePlanEvent extends PipeWritePlanEvent { final String pipeName, final long creationTime, final PipeTaskMeta pipeTaskMeta, - final TreePattern pattern, + final TreePattern treePattern, + final TablePattern tablePattern, final long startTime, final long endTime) { return new PipeSchemaRegionWritePlanEvent( - planNode, pipeName, creationTime, pipeTaskMeta, pattern, isGeneratedByPipe); + planNode, + pipeName, + creationTime, + pipeTaskMeta, + treePattern, + tablePattern, + isGeneratedByPipe); } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java index 97a35caa1b4..9f719642da3 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java @@ -23,6 +23,7 @@ import org.apache.iotdb.commons.consensus.index.ProgressIndex; import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex; import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta; +import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern; import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern; import org.apache.iotdb.commons.pipe.event.EnrichedEvent; import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager; @@ -82,6 +83,7 @@ public class PipeInsertNodeTabletInsertionEvent extends EnrichedEvent 0, null, null, + null, Long.MIN_VALUE, Long.MAX_VALUE); } @@ -95,10 +97,11 @@ public class PipeInsertNodeTabletInsertionEvent extends EnrichedEvent final String pipeName, final long creationTime, final PipeTaskMeta pipeTaskMeta, - final TreePattern pattern, + final TreePattern treePattern, + final TablePattern tablePattern, final long startTime, final long endTime) { - super(pipeName, creationTime, pipeTaskMeta, pattern, startTime, endTime); + super(pipeName, creationTime, pipeTaskMeta, treePattern, tablePattern, startTime, endTime); this.walEntryHandler = walEntryHandler; // Record device path here so there's no need to get it from InsertNode cache later. this.devicePath = devicePath; @@ -179,7 +182,8 @@ public class PipeInsertNodeTabletInsertionEvent extends EnrichedEvent final String pipeName, final long creationTime, final PipeTaskMeta pipeTaskMeta, - final TreePattern pattern, + final TreePattern treePattern, + final TablePattern tablePattern, final long startTime, final long endTime) { return new PipeInsertNodeTabletInsertionEvent( @@ -191,7 +195,8 @@ public class PipeInsertNodeTabletInsertionEvent extends EnrichedEvent pipeName, creationTime, pipeTaskMeta, - pattern, + treePattern, + tablePattern, startTime, endTime); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java index f08ccb6d450..013ccb2579e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java @@ -22,6 +22,7 @@ package org.apache.iotdb.db.pipe.event.common.tablet; import org.apache.iotdb.commons.consensus.index.ProgressIndex; import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex; import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta; +import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern; import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern; import org.apache.iotdb.commons.pipe.event.EnrichedEvent; import org.apache.iotdb.commons.utils.TestOnly; @@ -62,10 +63,11 @@ public class PipeRawTabletInsertionEvent extends EnrichedEvent implements Tablet final String pipeName, final long creationTime, final PipeTaskMeta pipeTaskMeta, - final TreePattern pattern, + final TreePattern treePattern, + final TablePattern tablePattern, final long startTime, final long endTime) { - super(pipeName, creationTime, pipeTaskMeta, pattern, startTime, endTime); + super(pipeName, creationTime, pipeTaskMeta, treePattern, tablePattern, startTime, endTime); this.tablet = Objects.requireNonNull(tablet); this.isAligned = isAligned; this.sourceEvent = sourceEvent; @@ -89,25 +91,37 @@ public class PipeRawTabletInsertionEvent extends EnrichedEvent implements Tablet creationTime, pipeTaskMeta, null, + null, Long.MIN_VALUE, Long.MAX_VALUE); } @TestOnly public PipeRawTabletInsertionEvent(final Tablet tablet, final boolean isAligned) { - this(tablet, isAligned, null, false, null, 0, null, null, Long.MIN_VALUE, Long.MAX_VALUE); + this(tablet, isAligned, null, false, null, 0, null, null, null, Long.MIN_VALUE, Long.MAX_VALUE); } @TestOnly public PipeRawTabletInsertionEvent( - final Tablet tablet, final boolean isAligned, final TreePattern pattern) { - this(tablet, isAligned, null, false, null, 0, null, pattern, Long.MIN_VALUE, Long.MAX_VALUE); + final Tablet tablet, final boolean isAligned, final TreePattern treePattern) { + this( + tablet, + isAligned, + null, + false, + null, + 0, + null, + treePattern, + null, + Long.MIN_VALUE, + Long.MAX_VALUE); } @TestOnly public PipeRawTabletInsertionEvent( final Tablet tablet, final long startTime, final long endTime) { - this(tablet, false, null, false, null, 0, null, null, startTime, endTime); + this(tablet, false, null, false, null, 0, null, null, null, startTime, endTime); } @Override @@ -169,7 +183,8 @@ public class PipeRawTabletInsertionEvent extends EnrichedEvent implements Tablet final String pipeName, final long creationTime, final PipeTaskMeta pipeTaskMeta, - final TreePattern pattern, + final TreePattern treePattern, + final TablePattern tablePattern, final long startTime, final long endTime) { return new PipeRawTabletInsertionEvent( @@ -180,7 +195,8 @@ public class PipeRawTabletInsertionEvent extends EnrichedEvent implements Tablet pipeName, creationTime, pipeTaskMeta, - pattern, + treePattern, + tablePattern, startTime, endTime); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/terminate/PipeTerminateEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/terminate/PipeTerminateEvent.java index 28c4165a7e7..78c95a37f12 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/terminate/PipeTerminateEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/terminate/PipeTerminateEvent.java @@ -22,6 +22,7 @@ package org.apache.iotdb.db.pipe.event.common.terminate; import org.apache.iotdb.commons.consensus.index.ProgressIndex; import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex; import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta; +import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern; import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern; import org.apache.iotdb.commons.pipe.event.EnrichedEvent; import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent; @@ -42,7 +43,7 @@ public class PipeTerminateEvent extends EnrichedEvent { final long creationTime, final PipeTaskMeta pipeTaskMeta, final int dataRegionId) { - super(pipeName, creationTime, pipeTaskMeta, null, Long.MIN_VALUE, Long.MAX_VALUE); + super(pipeName, creationTime, pipeTaskMeta, null, null, Long.MIN_VALUE, Long.MAX_VALUE); this.dataRegionId = dataRegionId; } @@ -66,7 +67,8 @@ public class PipeTerminateEvent extends EnrichedEvent { final String pipeName, final long creationTime, final PipeTaskMeta pipeTaskMeta, - final TreePattern pattern, + final TreePattern treePattern, + final TablePattern tablePattern, final long startTime, final long endTime) { // Should record PipeTaskMeta, for the terminateEvent shall report progress to diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java index d84a1bbdcae..c78ea9703e4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java @@ -22,6 +22,7 @@ package org.apache.iotdb.db.pipe.event.common.tsfile; import org.apache.iotdb.commons.consensus.index.ProgressIndex; import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex; import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta; +import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern; import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern; import org.apache.iotdb.commons.pipe.event.EnrichedEvent; import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent; @@ -90,6 +91,7 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent implements TsFileIns 0, null, null, + null, Long.MIN_VALUE, Long.MAX_VALUE); } @@ -103,10 +105,11 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent implements TsFileIns final String pipeName, final long creationTime, final PipeTaskMeta pipeTaskMeta, - final TreePattern pattern, + final TreePattern treePattern, + final TablePattern tablePattern, final long startTime, final long endTime) { - super(pipeName, creationTime, pipeTaskMeta, pattern, startTime, endTime); + super(pipeName, creationTime, pipeTaskMeta, treePattern, tablePattern, startTime, endTime); this.resource = resource; tsFile = resource.getTsFile(); @@ -320,7 +323,8 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent implements TsFileIns final String pipeName, final long creationTime, final PipeTaskMeta pipeTaskMeta, - final TreePattern pattern, + final TreePattern treePattern, + final TablePattern tablePattern, final long startTime, final long endTime) { return new PipeTsFileInsertionEvent( @@ -332,7 +336,8 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent implements TsFileIns pipeName, creationTime, pipeTaskMeta, - pattern, + treePattern, + tablePattern, startTime, endTime); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEvent.java index b03e7caf9fc..b6ce88572a0 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEvent.java @@ -21,6 +21,7 @@ package org.apache.iotdb.db.pipe.event.realtime; import org.apache.iotdb.commons.consensus.index.ProgressIndex; import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta; +import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern; import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern; import org.apache.iotdb.commons.pipe.event.EnrichedEvent; import org.apache.iotdb.db.pipe.extractor.dataregion.realtime.epoch.TsFileEpoch; @@ -45,8 +46,17 @@ public class PipeRealtimeEvent extends EnrichedEvent { final EnrichedEvent event, final TsFileEpoch tsFileEpoch, final Map<IDeviceID, String[]> device2Measurements, - final TreePattern pattern) { - this(event, tsFileEpoch, device2Measurements, null, pattern, Long.MIN_VALUE, Long.MAX_VALUE); + final TreePattern treePattern, + final TablePattern tablePattern) { + this( + event, + tsFileEpoch, + device2Measurements, + null, + treePattern, + tablePattern, + Long.MIN_VALUE, + Long.MAX_VALUE); } public PipeRealtimeEvent( @@ -54,7 +64,8 @@ public class PipeRealtimeEvent extends EnrichedEvent { final TsFileEpoch tsFileEpoch, final Map<IDeviceID, String[]> device2Measurements, final PipeTaskMeta pipeTaskMeta, - final TreePattern pattern, + final TreePattern treePattern, + final TablePattern tablePattern, final long startTime, final long endTime) { // PipeTaskMeta is used to report the progress of the event, the PipeRealtimeEvent @@ -64,7 +75,8 @@ public class PipeRealtimeEvent extends EnrichedEvent { event != null ? event.getPipeName() : null, event != null ? event.getCreationTime() : 0, pipeTaskMeta, - pattern, + treePattern, + tablePattern, startTime, endTime); @@ -166,16 +178,18 @@ public class PipeRealtimeEvent extends EnrichedEvent { final String pipeName, final long creationTime, final PipeTaskMeta pipeTaskMeta, - final TreePattern pattern, + final TreePattern treePattern, + final TablePattern tablePattern, final long startTime, final long endTime) { return new PipeRealtimeEvent( event.shallowCopySelfAndBindPipeTaskMetaForProgressReport( - pipeName, creationTime, pipeTaskMeta, pattern, startTime, endTime), + pipeName, creationTime, pipeTaskMeta, treePattern, tablePattern, startTime, endTime), this.tsFileEpoch, this.device2Measurements, pipeTaskMeta, - pattern, + treePattern, + tablePattern, startTime, endTime); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEventFactory.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEventFactory.java index 3c3023c58ee..3ef134a96cb 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEventFactory.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEventFactory.java @@ -58,16 +58,16 @@ public class PipeRealtimeEventFactory { public static PipeRealtimeEvent createRealtimeEvent( final String dataRegionId, final boolean shouldPrintMessage) { return new PipeRealtimeEvent( - new PipeHeartbeatEvent(dataRegionId, shouldPrintMessage), null, null, null); + new PipeHeartbeatEvent(dataRegionId, shouldPrintMessage), null, null, null, null); } public static PipeRealtimeEvent createRealtimeEvent(final DeleteDataNode node) { return new PipeRealtimeEvent( - new PipeSchemaRegionWritePlanEvent(node, node.isGeneratedByPipe()), null, null, null); + new PipeSchemaRegionWritePlanEvent(node, node.isGeneratedByPipe()), null, null, null, null); } public static PipeRealtimeEvent createRealtimeEvent(final ProgressReportEvent event) { - return new PipeRealtimeEvent(event, null, null, null); + return new PipeRealtimeEvent(event, null, null, null, null); } private PipeRealtimeEventFactory() { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java index 82bb5e2f5e2..cfe8b639fa2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java @@ -617,7 +617,7 @@ public class PipeHistoricalDataRegionTsFileExtractor implements PipeHistoricalDa creationTime, pipeTaskMeta, treePattern, - // TODO: consider tablePattern + tablePattern, historicalDataExtractionStartTime, historicalDataExtractionEndTime); if (sloppyPattern || isDbNameCoveredByPattern) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java index e80101933d8..2a8eeda9e0a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java @@ -105,13 +105,13 @@ public class PipeDataRegionAssigner implements Closeable { counter = 0; } - // TODO: consider table pattern final ProgressReportEvent reportEvent = new ProgressReportEvent( extractor.getPipeName(), extractor.getCreationTime(), extractor.getPipeTaskMeta(), extractor.getTreePattern(), + extractor.getTablePattern(), extractor.getRealtimeDataExtractionStartTime(), extractor.getRealtimeDataExtractionEndTime()); reportEvent.bindProgressIndex(event.getProgressIndex()); @@ -131,6 +131,7 @@ public class PipeDataRegionAssigner implements Closeable { extractor.getCreationTime(), extractor.getPipeTaskMeta(), extractor.getTreePattern(), + extractor.getTablePattern(), extractor.getRealtimeDataExtractionStartTime(), extractor.getRealtimeDataExtractionEndTime()); final EnrichedEvent innerEvent = copiedEvent.getEvent(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/epoch/TsFileEpochManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/epoch/TsFileEpochManager.java index e4244a1335e..d0ee25838b6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/epoch/TsFileEpochManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/epoch/TsFileEpochManager.java @@ -71,7 +71,8 @@ public class TsFileEpochManager { epoch, resource.getDevices().stream() .collect(Collectors.toMap(Functions.identity(), device -> EMPTY_MEASUREMENT_ARRAY)), - event.getPipePattern()); + event.getTreePattern(), + event.getTablePattern()); } public PipeRealtimeEvent bindPipeInsertNodeTabletInsertionEvent( @@ -85,7 +86,8 @@ public class TsFileEpochManager { node instanceof InsertRowsNode ? getDevice2MeasurementsMapFromInsertRowsNode((InsertRowsNode) node) : Collections.singletonMap(node.getDeviceID(), node.getMeasurements()), - event.getPipePattern()); + event.getTreePattern(), + event.getTablePattern()); } private Map<IDeviceID, String[]> getDevice2MeasurementsMapFromInsertRowsNode( diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/CachedSchemaPatternMatcherTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/CachedSchemaPatternMatcherTest.java index 89c96595253..487fabfda38 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/CachedSchemaPatternMatcherTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/CachedSchemaPatternMatcherTest.java @@ -60,7 +60,7 @@ public class CachedSchemaPatternMatcherTest { TsFileEpoch tsFileEpoch, Map<IDeviceID, String[]> device2Measurements, TreePattern pattern) { - super(event, tsFileEpoch, device2Measurements, pattern); + super(event, tsFileEpoch, device2Measurements, pattern, null); } @Override diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java index da427705714..60c8a3135d3 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java @@ -24,6 +24,7 @@ import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex; import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta; import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey; import org.apache.iotdb.commons.pipe.agent.task.progress.PipeEventCommitManager; +import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern; import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern; import org.apache.iotdb.pipe.api.event.Event; @@ -61,6 +62,7 @@ public abstract class EnrichedEvent implements Event { protected int rebootTimes = 0; protected final TreePattern treePattern; + protected final TablePattern tablePattern; protected final long startTime; protected final long endTime; @@ -76,6 +78,7 @@ public abstract class EnrichedEvent implements Event { final long creationTime, final PipeTaskMeta pipeTaskMeta, final TreePattern treePattern, + final TablePattern tablePattern, final long startTime, final long endTime) { referenceCount = new AtomicInteger(0); @@ -84,6 +87,7 @@ public abstract class EnrichedEvent implements Event { this.creationTime = creationTime; this.pipeTaskMeta = pipeTaskMeta; this.treePattern = treePattern; + this.tablePattern = tablePattern; this.startTime = startTime; this.endTime = endTime; isPatternParsed = this.treePattern == null || this.treePattern.isRoot(); @@ -310,10 +314,14 @@ public abstract class EnrichedEvent implements Event { return treePattern != null ? treePattern.getPattern() : null; } - public final TreePattern getPipePattern() { + public final TreePattern getTreePattern() { return treePattern; } + public final TablePattern getTablePattern() { + return tablePattern; + } + public final long getStartTime() { return startTime; } @@ -350,7 +358,8 @@ public abstract class EnrichedEvent implements Event { final String pipeName, final long creationTime, final PipeTaskMeta pipeTaskMeta, - final TreePattern pattern, + final TreePattern treePattern, + final TablePattern tablePattern, final long startTime, final long endTime); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/PipeSnapshotEvent.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/PipeSnapshotEvent.java index f94e96588d7..94a3d11c668 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/PipeSnapshotEvent.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/PipeSnapshotEvent.java @@ -21,6 +21,7 @@ package org.apache.iotdb.commons.pipe.event; import org.apache.iotdb.commons.consensus.index.ProgressIndex; import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta; +import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern; import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern; import org.apache.iotdb.commons.pipe.resource.snapshot.PipeSnapshotResourceManager; @@ -37,9 +38,17 @@ public abstract class PipeSnapshotEvent extends EnrichedEvent implements Seriali final String pipeName, final long creationTime, final PipeTaskMeta pipeTaskMeta, - final TreePattern pattern, + final TreePattern treePattern, + final TablePattern tablePattern, final PipeSnapshotResourceManager resourceManager) { - super(pipeName, creationTime, pipeTaskMeta, pattern, Long.MIN_VALUE, Long.MAX_VALUE); + super( + pipeName, + creationTime, + pipeTaskMeta, + treePattern, + tablePattern, + Long.MIN_VALUE, + Long.MAX_VALUE); this.resourceManager = resourceManager; } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/PipeWritePlanEvent.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/PipeWritePlanEvent.java index 220974bc71a..dc3240793ba 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/PipeWritePlanEvent.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/PipeWritePlanEvent.java @@ -21,6 +21,7 @@ package org.apache.iotdb.commons.pipe.event; import org.apache.iotdb.commons.consensus.index.ProgressIndex; import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta; +import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern; import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern; public abstract class PipeWritePlanEvent extends EnrichedEvent implements SerializableEvent { @@ -33,9 +34,17 @@ public abstract class PipeWritePlanEvent extends EnrichedEvent implements Serial final String pipeName, final long creationTime, final PipeTaskMeta pipeTaskMeta, - final TreePattern pattern, + final TreePattern treePattern, + final TablePattern tablePattern, final boolean isGeneratedByPipe) { - super(pipeName, creationTime, pipeTaskMeta, pattern, Long.MIN_VALUE, Long.MAX_VALUE); + super( + pipeName, + creationTime, + pipeTaskMeta, + treePattern, + tablePattern, + Long.MIN_VALUE, + Long.MAX_VALUE); this.isGeneratedByPipe = isGeneratedByPipe; } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/ProgressReportEvent.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/ProgressReportEvent.java index 97c62cfaaf6..5a7dbfe7399 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/ProgressReportEvent.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/ProgressReportEvent.java @@ -21,6 +21,7 @@ package org.apache.iotdb.commons.pipe.event; import org.apache.iotdb.commons.consensus.index.ProgressIndex; import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta; +import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern; import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern; /** @@ -37,9 +38,10 @@ public class ProgressReportEvent extends EnrichedEvent { final long creationTime, final PipeTaskMeta pipeTaskMeta, final TreePattern treePattern, + final TablePattern tablePattern, final long startTime, final long endTime) { - super(pipeName, creationTime, pipeTaskMeta, treePattern, startTime, endTime); + super(pipeName, creationTime, pipeTaskMeta, treePattern, tablePattern, startTime, endTime); } @Override @@ -67,11 +69,12 @@ public class ProgressReportEvent extends EnrichedEvent { final String pipeName, final long creationTime, final PipeTaskMeta pipeTaskMeta, - final TreePattern pattern, + final TreePattern treePattern, + final TablePattern tablePattern, final long startTime, final long endTime) { return new ProgressReportEvent( - pipeName, creationTime, pipeTaskMeta, pattern, startTime, endTime); + pipeName, creationTime, pipeTaskMeta, treePattern, tablePattern, startTime, endTime); } @Override diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/extractor/IoTDBNonDataRegionExtractor.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/extractor/IoTDBNonDataRegionExtractor.java index ca58ceedf83..1f9f4250fe5 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/extractor/IoTDBNonDataRegionExtractor.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/extractor/IoTDBNonDataRegionExtractor.java @@ -165,6 +165,7 @@ public abstract class IoTDBNonDataRegionExtractor extends IoTDBExtractor { creationTime, pipeTaskMeta, pipePattern, + null, Long.MIN_VALUE, Long.MAX_VALUE); @@ -192,7 +193,13 @@ public abstract class IoTDBNonDataRegionExtractor extends IoTDBExtractor { || (!isForwardingPipeRequests && realtimeEvent.isGeneratedByPipe())) { final ProgressReportEvent event = new ProgressReportEvent( - pipeName, creationTime, pipeTaskMeta, pipePattern, Long.MIN_VALUE, Long.MAX_VALUE); + pipeName, + creationTime, + pipeTaskMeta, + pipePattern, + null, + Long.MIN_VALUE, + Long.MAX_VALUE); event.bindProgressIndex(new MetaProgressIndex(iterator.getNextIndex() - 1)); event.increaseReferenceCount(IoTDBNonDataRegionExtractor.class.getName()); return event; @@ -201,7 +208,13 @@ public abstract class IoTDBNonDataRegionExtractor extends IoTDBExtractor { realtimeEvent = (PipeWritePlanEvent) realtimeEvent.shallowCopySelfAndBindPipeTaskMetaForProgressReport( - pipeName, creationTime, pipeTaskMeta, pipePattern, Long.MIN_VALUE, Long.MAX_VALUE); + pipeName, + creationTime, + pipeTaskMeta, + pipePattern, + null, + Long.MIN_VALUE, + Long.MAX_VALUE); realtimeEvent.bindProgressIndex(new MetaProgressIndex(iterator.getNextIndex() - 1)); realtimeEvent.increaseReferenceCount(IoTDBNonDataRegionExtractor.class.getName()); return realtimeEvent;
