This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch pipe-new-sql
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 8d8842dd2bac1babbb704556dc0062dbc32f126e
Author: Steve Yurong Su <[email protected]>
AuthorDate: Wed Oct 9 15:26:17 2024 +0800

    Add table pattern in enriched events
---
 .../pipe/event/PipeConfigRegionSnapshotEvent.java  | 26 ++++++++++++++----
 .../pipe/event/PipeConfigRegionWritePlanEvent.java | 13 +++++----
 .../agent/task/connection/PipeEventCollector.java  |  5 ++--
 .../db/pipe/event/UserDefinedEnrichedEvent.java    |  9 ++++--
 .../event/common/heartbeat/PipeHeartbeatEvent.java |  8 ++++--
 .../schema/PipeSchemaRegionSnapshotEvent.java      | 20 ++++++++++----
 .../schema/PipeSchemaRegionWritePlanEvent.java     | 19 +++++++++----
 .../tablet/PipeInsertNodeTabletInsertionEvent.java | 13 ++++++---
 .../common/tablet/PipeRawTabletInsertionEvent.java | 32 ++++++++++++++++------
 .../event/common/terminate/PipeTerminateEvent.java |  6 ++--
 .../common/tsfile/PipeTsFileInsertionEvent.java    | 13 ++++++---
 .../db/pipe/event/realtime/PipeRealtimeEvent.java  | 28 ++++++++++++++-----
 .../event/realtime/PipeRealtimeEventFactory.java   |  6 ++--
 .../PipeHistoricalDataRegionTsFileExtractor.java   |  2 +-
 .../realtime/assigner/PipeDataRegionAssigner.java  |  3 +-
 .../realtime/epoch/TsFileEpochManager.java         |  6 ++--
 .../pattern/CachedSchemaPatternMatcherTest.java    |  2 +-
 .../iotdb/commons/pipe/event/EnrichedEvent.java    | 13 +++++++--
 .../commons/pipe/event/PipeSnapshotEvent.java      | 13 +++++++--
 .../commons/pipe/event/PipeWritePlanEvent.java     | 13 +++++++--
 .../commons/pipe/event/ProgressReportEvent.java    |  9 ++++--
 .../extractor/IoTDBNonDataRegionExtractor.java     | 17 ++++++++++--
 22 files changed, 204 insertions(+), 72 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/event/PipeConfigRegionSnapshotEvent.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/event/PipeConfigRegionSnapshotEvent.java
index 76ff5ed2083..8705d577942 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/event/PipeConfigRegionSnapshotEvent.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/event/PipeConfigRegionSnapshotEvent.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.confignode.manager.pipe.event;
 
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
+import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern;
 import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import org.apache.iotdb.commons.pipe.event.PipeSnapshotEvent;
@@ -90,7 +91,7 @@ public class PipeConfigRegionSnapshotEvent extends 
PipeSnapshotEvent {
 
   public PipeConfigRegionSnapshotEvent(
       final String snapshotPath, final String templateFilePath, final 
CNSnapshotFileType type) {
-    this(snapshotPath, templateFilePath, type, null, 0, null, null);
+    this(snapshotPath, templateFilePath, type, null, 0, null, null, null);
   }
 
   public PipeConfigRegionSnapshotEvent(
@@ -100,8 +101,15 @@ public class PipeConfigRegionSnapshotEvent extends 
PipeSnapshotEvent {
       final String pipeName,
       final long creationTime,
       final PipeTaskMeta pipeTaskMeta,
-      final TreePattern pattern) {
-    super(pipeName, creationTime, pipeTaskMeta, pattern, 
PipeConfigNodeResourceManager.snapshot());
+      final TreePattern treePattern,
+      final TablePattern tablePattern) {
+    super(
+        pipeName,
+        creationTime,
+        pipeTaskMeta,
+        treePattern,
+        tablePattern,
+        PipeConfigNodeResourceManager.snapshot());
     this.snapshotPath = snapshotPath;
     this.templateFilePath = Objects.nonNull(templateFilePath) ? 
templateFilePath : "";
     this.fileType = type;
@@ -160,11 +168,19 @@ public class PipeConfigRegionSnapshotEvent extends 
PipeSnapshotEvent {
       final String pipeName,
       final long creationTime,
       final PipeTaskMeta pipeTaskMeta,
-      final TreePattern pattern,
+      final TreePattern treePattern,
+      final TablePattern tablePattern,
       final long startTime,
       final long endTime) {
     return new PipeConfigRegionSnapshotEvent(
-        snapshotPath, templateFilePath, fileType, pipeName, creationTime, 
pipeTaskMeta, pattern);
+        snapshotPath,
+        templateFilePath,
+        fileType,
+        pipeName,
+        creationTime,
+        pipeTaskMeta,
+        treePattern,
+        tablePattern);
   }
 
   @Override
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/event/PipeConfigRegionWritePlanEvent.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/event/PipeConfigRegionWritePlanEvent.java
index a6bddf40c0a..241c54c1f01 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/event/PipeConfigRegionWritePlanEvent.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/event/PipeConfigRegionWritePlanEvent.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.confignode.manager.pipe.event;
 
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
+import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern;
 import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import org.apache.iotdb.commons.pipe.event.PipeWritePlanEvent;
@@ -41,7 +42,7 @@ public class PipeConfigRegionWritePlanEvent extends 
PipeWritePlanEvent {
 
   public PipeConfigRegionWritePlanEvent(
       final ConfigPhysicalPlan configPhysicalPlan, final boolean 
isGeneratedByPipe) {
-    this(configPhysicalPlan, null, 0, null, null, isGeneratedByPipe);
+    this(configPhysicalPlan, null, 0, null, null, null, isGeneratedByPipe);
   }
 
   public PipeConfigRegionWritePlanEvent(
@@ -49,9 +50,10 @@ public class PipeConfigRegionWritePlanEvent extends 
PipeWritePlanEvent {
       final String pipeName,
       final long creationTime,
       final PipeTaskMeta pipeTaskMeta,
-      final TreePattern pattern,
+      final TreePattern treePattern,
+      final TablePattern tablePattern,
       final boolean isGeneratedByPipe) {
-    super(pipeName, creationTime, pipeTaskMeta, pattern, isGeneratedByPipe);
+    super(pipeName, creationTime, pipeTaskMeta, treePattern, tablePattern, 
isGeneratedByPipe);
     this.configPhysicalPlan = configPhysicalPlan;
   }
 
@@ -64,11 +66,12 @@ public class PipeConfigRegionWritePlanEvent extends 
PipeWritePlanEvent {
       final String pipeName,
       final long creationTime,
       final PipeTaskMeta pipeTaskMeta,
-      final TreePattern pattern,
+      final TreePattern treePattern,
+      final TablePattern tablePattern,
       final long startTime,
       final long endTime) {
     return new PipeConfigRegionWritePlanEvent(
-        configPhysicalPlan, pipeName, creationTime, pipeTaskMeta, pattern, 
false);
+        configPhysicalPlan, pipeName, creationTime, pipeTaskMeta, treePattern, 
tablePattern, false);
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java
index f9341b95ec4..5ff7a87bc2c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java
@@ -145,7 +145,7 @@ public class PipeEventCollector implements EventCollector {
     // Only used by events containing delete data node, no need to bind 
progress index here since
     // delete data event does not have progress index currently
     IoTDBSchemaRegionExtractor.PATTERN_PARSE_VISITOR
-        .process(deleteDataEvent.getPlanNode(), (IoTDBTreePattern) 
deleteDataEvent.getPipePattern())
+        .process(deleteDataEvent.getPlanNode(), (IoTDBTreePattern) 
deleteDataEvent.getTreePattern())
         .map(
             planNode ->
                 new PipeSchemaRegionWritePlanEvent(
@@ -153,7 +153,8 @@ public class PipeEventCollector implements EventCollector {
                     deleteDataEvent.getPipeName(),
                     deleteDataEvent.getCreationTime(),
                     deleteDataEvent.getPipeTaskMeta(),
-                    deleteDataEvent.getPipePattern(),
+                    deleteDataEvent.getTreePattern(),
+                    deleteDataEvent.getTablePattern(),
                     deleteDataEvent.isGeneratedByPipe()))
         .ifPresent(
             event -> {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/UserDefinedEnrichedEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/UserDefinedEnrichedEvent.java
index 23ea2c561e4..ea299f568d1 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/UserDefinedEnrichedEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/UserDefinedEnrichedEvent.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.pipe.event;
 
 import org.apache.iotdb.commons.consensus.index.ProgressIndex;
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
+import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern;
 import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import org.apache.iotdb.pipe.api.event.Event;
@@ -45,7 +46,8 @@ public class UserDefinedEnrichedEvent extends EnrichedEvent {
         enrichedEvent.getPipeName(),
         enrichedEvent.getCreationTime(),
         enrichedEvent.getPipeTaskMeta(),
-        enrichedEvent.getPipePattern(),
+        enrichedEvent.getTreePattern(),
+        enrichedEvent.getTablePattern(),
         enrichedEvent.getStartTime(),
         enrichedEvent.getEndTime());
     this.userDefinedEvent = userDefinedEvent;
@@ -76,11 +78,12 @@ public class UserDefinedEnrichedEvent extends EnrichedEvent 
{
       final String pipeName,
       final long creationTime,
       final PipeTaskMeta pipeTaskMeta,
-      final TreePattern pattern,
+      final TreePattern treePattern,
+      final TablePattern tablePattern,
       final long startTime,
       final long endTime) {
     return enrichedEvent.shallowCopySelfAndBindPipeTaskMetaForProgressReport(
-        pipeName, creationTime, pipeTaskMeta, pattern, startTime, endTime);
+        pipeName, creationTime, pipeTaskMeta, treePattern, tablePattern, 
startTime, endTime);
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java
index de40212b656..c99f192d190 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.commons.consensus.index.ProgressIndex;
 import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
 import 
org.apache.iotdb.commons.pipe.agent.task.connection.UnboundedBlockingPendingQueue;
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
+import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern;
 import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import org.apache.iotdb.db.pipe.metric.PipeHeartbeatEventMetrics;
@@ -59,7 +60,7 @@ public class PipeHeartbeatEvent extends EnrichedEvent {
   private final boolean shouldPrintMessage;
 
   public PipeHeartbeatEvent(final String dataRegionId, final boolean 
shouldPrintMessage) {
-    super(null, 0, null, null, Long.MIN_VALUE, Long.MAX_VALUE);
+    super(null, 0, null, null, null, Long.MIN_VALUE, Long.MAX_VALUE);
     this.dataRegionId = dataRegionId;
     this.shouldPrintMessage = shouldPrintMessage;
   }
@@ -71,7 +72,7 @@ public class PipeHeartbeatEvent extends EnrichedEvent {
       final String dataRegionId,
       final long timePublished,
       final boolean shouldPrintMessage) {
-    super(pipeName, creationTime, pipeTaskMeta, null, Long.MIN_VALUE, 
Long.MAX_VALUE);
+    super(pipeName, creationTime, pipeTaskMeta, null, null, Long.MIN_VALUE, 
Long.MAX_VALUE);
     this.dataRegionId = dataRegionId;
     this.timePublished = timePublished;
     this.shouldPrintMessage = shouldPrintMessage;
@@ -102,7 +103,8 @@ public class PipeHeartbeatEvent extends EnrichedEvent {
       final String pipeName,
       final long creationTime,
       final PipeTaskMeta pipeTaskMeta,
-      final TreePattern pattern,
+      final TreePattern treePattern,
+      final TablePattern tablePattern,
       final long startTime,
       final long endTime) {
     // Should record PipeTaskMeta, for sometimes HeartbeatEvents should report 
exceptions.
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/schema/PipeSchemaRegionSnapshotEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/schema/PipeSchemaRegionSnapshotEvent.java
index 63152ea8032..39f6aa30403 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/schema/PipeSchemaRegionSnapshotEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/schema/PipeSchemaRegionSnapshotEvent.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.pipe.event.common.schema;
 
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
+import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern;
 import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import org.apache.iotdb.commons.pipe.event.PipeSnapshotEvent;
@@ -78,7 +79,7 @@ public class PipeSchemaRegionSnapshotEvent extends 
PipeSnapshotEvent {
 
   public PipeSchemaRegionSnapshotEvent(
       final String mTreeSnapshotPath, final String tagLogSnapshotPath, final 
String databaseName) {
-    this(mTreeSnapshotPath, tagLogSnapshotPath, databaseName, null, 0, null, 
null);
+    this(mTreeSnapshotPath, tagLogSnapshotPath, databaseName, null, 0, null, 
null, null);
   }
 
   public PipeSchemaRegionSnapshotEvent(
@@ -88,8 +89,15 @@ public class PipeSchemaRegionSnapshotEvent extends 
PipeSnapshotEvent {
       final String pipeName,
       final long creationTime,
       final PipeTaskMeta pipeTaskMeta,
-      final TreePattern pattern) {
-    super(pipeName, creationTime, pipeTaskMeta, pattern, 
PipeDataNodeResourceManager.snapshot());
+      final TreePattern treePattern,
+      final TablePattern tablePattern) {
+    super(
+        pipeName,
+        creationTime,
+        pipeTaskMeta,
+        treePattern,
+        tablePattern,
+        PipeDataNodeResourceManager.snapshot());
     this.mTreeSnapshotPath = mTreeSnapshotPath;
     this.tagLogSnapshotPath = Objects.nonNull(tagLogSnapshotPath) ? 
tagLogSnapshotPath : "";
     this.databaseName = databaseName;
@@ -148,7 +156,8 @@ public class PipeSchemaRegionSnapshotEvent extends 
PipeSnapshotEvent {
       final String pipeName,
       final long creationTime,
       final PipeTaskMeta pipeTaskMeta,
-      final TreePattern pattern,
+      final TreePattern treePattern,
+      final TablePattern tablePattern,
       final long startTime,
       final long endTime) {
     return new PipeSchemaRegionSnapshotEvent(
@@ -158,7 +167,8 @@ public class PipeSchemaRegionSnapshotEvent extends 
PipeSnapshotEvent {
         pipeName,
         creationTime,
         pipeTaskMeta,
-        pattern);
+        treePattern,
+        tablePattern);
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/schema/PipeSchemaRegionWritePlanEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/schema/PipeSchemaRegionWritePlanEvent.java
index 0014c2038f6..3499291a2f6 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/schema/PipeSchemaRegionWritePlanEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/schema/PipeSchemaRegionWritePlanEvent.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.pipe.event.common.schema;
 
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
+import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern;
 import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import org.apache.iotdb.commons.pipe.event.PipeWritePlanEvent;
@@ -40,7 +41,7 @@ public class PipeSchemaRegionWritePlanEvent extends 
PipeWritePlanEvent {
   }
 
   public PipeSchemaRegionWritePlanEvent(final PlanNode planNode, final boolean 
isGeneratedByPipe) {
-    this(planNode, null, 0, null, null, isGeneratedByPipe);
+    this(planNode, null, 0, null, null, null, isGeneratedByPipe);
   }
 
   public PipeSchemaRegionWritePlanEvent(
@@ -48,9 +49,10 @@ public class PipeSchemaRegionWritePlanEvent extends 
PipeWritePlanEvent {
       final String pipeName,
       final long creationTime,
       final PipeTaskMeta pipeTaskMeta,
-      final TreePattern pattern,
+      final TreePattern treePattern,
+      final TablePattern tablePattern,
       final boolean isGeneratedByPipe) {
-    super(pipeName, creationTime, pipeTaskMeta, pattern, isGeneratedByPipe);
+    super(pipeName, creationTime, pipeTaskMeta, treePattern, tablePattern, 
isGeneratedByPipe);
     this.planNode = planNode;
   }
 
@@ -63,11 +65,18 @@ public class PipeSchemaRegionWritePlanEvent extends 
PipeWritePlanEvent {
       final String pipeName,
       final long creationTime,
       final PipeTaskMeta pipeTaskMeta,
-      final TreePattern pattern,
+      final TreePattern treePattern,
+      final TablePattern tablePattern,
       final long startTime,
       final long endTime) {
     return new PipeSchemaRegionWritePlanEvent(
-        planNode, pipeName, creationTime, pipeTaskMeta, pattern, 
isGeneratedByPipe);
+        planNode,
+        pipeName,
+        creationTime,
+        pipeTaskMeta,
+        treePattern,
+        tablePattern,
+        isGeneratedByPipe);
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
index 97a35caa1b4..9f719642da3 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.commons.consensus.index.ProgressIndex;
 import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
+import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern;
 import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
@@ -82,6 +83,7 @@ public class PipeInsertNodeTabletInsertionEvent extends 
EnrichedEvent
         0,
         null,
         null,
+        null,
         Long.MIN_VALUE,
         Long.MAX_VALUE);
   }
@@ -95,10 +97,11 @@ public class PipeInsertNodeTabletInsertionEvent extends 
EnrichedEvent
       final String pipeName,
       final long creationTime,
       final PipeTaskMeta pipeTaskMeta,
-      final TreePattern pattern,
+      final TreePattern treePattern,
+      final TablePattern tablePattern,
       final long startTime,
       final long endTime) {
-    super(pipeName, creationTime, pipeTaskMeta, pattern, startTime, endTime);
+    super(pipeName, creationTime, pipeTaskMeta, treePattern, tablePattern, 
startTime, endTime);
     this.walEntryHandler = walEntryHandler;
     // Record device path here so there's no need to get it from InsertNode 
cache later.
     this.devicePath = devicePath;
@@ -179,7 +182,8 @@ public class PipeInsertNodeTabletInsertionEvent extends 
EnrichedEvent
       final String pipeName,
       final long creationTime,
       final PipeTaskMeta pipeTaskMeta,
-      final TreePattern pattern,
+      final TreePattern treePattern,
+      final TablePattern tablePattern,
       final long startTime,
       final long endTime) {
     return new PipeInsertNodeTabletInsertionEvent(
@@ -191,7 +195,8 @@ public class PipeInsertNodeTabletInsertionEvent extends 
EnrichedEvent
         pipeName,
         creationTime,
         pipeTaskMeta,
-        pattern,
+        treePattern,
+        tablePattern,
         startTime,
         endTime);
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
index f08ccb6d450..013ccb2579e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.pipe.event.common.tablet;
 import org.apache.iotdb.commons.consensus.index.ProgressIndex;
 import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
+import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern;
 import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import org.apache.iotdb.commons.utils.TestOnly;
@@ -62,10 +63,11 @@ public class PipeRawTabletInsertionEvent extends 
EnrichedEvent implements Tablet
       final String pipeName,
       final long creationTime,
       final PipeTaskMeta pipeTaskMeta,
-      final TreePattern pattern,
+      final TreePattern treePattern,
+      final TablePattern tablePattern,
       final long startTime,
       final long endTime) {
-    super(pipeName, creationTime, pipeTaskMeta, pattern, startTime, endTime);
+    super(pipeName, creationTime, pipeTaskMeta, treePattern, tablePattern, 
startTime, endTime);
     this.tablet = Objects.requireNonNull(tablet);
     this.isAligned = isAligned;
     this.sourceEvent = sourceEvent;
@@ -89,25 +91,37 @@ public class PipeRawTabletInsertionEvent extends 
EnrichedEvent implements Tablet
         creationTime,
         pipeTaskMeta,
         null,
+        null,
         Long.MIN_VALUE,
         Long.MAX_VALUE);
   }
 
   @TestOnly
   public PipeRawTabletInsertionEvent(final Tablet tablet, final boolean 
isAligned) {
-    this(tablet, isAligned, null, false, null, 0, null, null, Long.MIN_VALUE, 
Long.MAX_VALUE);
+    this(tablet, isAligned, null, false, null, 0, null, null, null, 
Long.MIN_VALUE, Long.MAX_VALUE);
   }
 
   @TestOnly
   public PipeRawTabletInsertionEvent(
-      final Tablet tablet, final boolean isAligned, final TreePattern pattern) 
{
-    this(tablet, isAligned, null, false, null, 0, null, pattern, 
Long.MIN_VALUE, Long.MAX_VALUE);
+      final Tablet tablet, final boolean isAligned, final TreePattern 
treePattern) {
+    this(
+        tablet,
+        isAligned,
+        null,
+        false,
+        null,
+        0,
+        null,
+        treePattern,
+        null,
+        Long.MIN_VALUE,
+        Long.MAX_VALUE);
   }
 
   @TestOnly
   public PipeRawTabletInsertionEvent(
       final Tablet tablet, final long startTime, final long endTime) {
-    this(tablet, false, null, false, null, 0, null, null, startTime, endTime);
+    this(tablet, false, null, false, null, 0, null, null, null, startTime, 
endTime);
   }
 
   @Override
@@ -169,7 +183,8 @@ public class PipeRawTabletInsertionEvent extends 
EnrichedEvent implements Tablet
       final String pipeName,
       final long creationTime,
       final PipeTaskMeta pipeTaskMeta,
-      final TreePattern pattern,
+      final TreePattern treePattern,
+      final TablePattern tablePattern,
       final long startTime,
       final long endTime) {
     return new PipeRawTabletInsertionEvent(
@@ -180,7 +195,8 @@ public class PipeRawTabletInsertionEvent extends 
EnrichedEvent implements Tablet
         pipeName,
         creationTime,
         pipeTaskMeta,
-        pattern,
+        treePattern,
+        tablePattern,
         startTime,
         endTime);
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/terminate/PipeTerminateEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/terminate/PipeTerminateEvent.java
index 28c4165a7e7..78c95a37f12 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/terminate/PipeTerminateEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/terminate/PipeTerminateEvent.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.pipe.event.common.terminate;
 import org.apache.iotdb.commons.consensus.index.ProgressIndex;
 import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
+import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern;
 import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
@@ -42,7 +43,7 @@ public class PipeTerminateEvent extends EnrichedEvent {
       final long creationTime,
       final PipeTaskMeta pipeTaskMeta,
       final int dataRegionId) {
-    super(pipeName, creationTime, pipeTaskMeta, null, Long.MIN_VALUE, 
Long.MAX_VALUE);
+    super(pipeName, creationTime, pipeTaskMeta, null, null, Long.MIN_VALUE, 
Long.MAX_VALUE);
     this.dataRegionId = dataRegionId;
   }
 
@@ -66,7 +67,8 @@ public class PipeTerminateEvent extends EnrichedEvent {
       final String pipeName,
       final long creationTime,
       final PipeTaskMeta pipeTaskMeta,
-      final TreePattern pattern,
+      final TreePattern treePattern,
+      final TablePattern tablePattern,
       final long startTime,
       final long endTime) {
     // Should record PipeTaskMeta, for the terminateEvent shall report 
progress to
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
index d84a1bbdcae..c78ea9703e4 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.pipe.event.common.tsfile;
 import org.apache.iotdb.commons.consensus.index.ProgressIndex;
 import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
+import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern;
 import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
@@ -90,6 +91,7 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent 
implements TsFileIns
         0,
         null,
         null,
+        null,
         Long.MIN_VALUE,
         Long.MAX_VALUE);
   }
@@ -103,10 +105,11 @@ public class PipeTsFileInsertionEvent extends 
EnrichedEvent implements TsFileIns
       final String pipeName,
       final long creationTime,
       final PipeTaskMeta pipeTaskMeta,
-      final TreePattern pattern,
+      final TreePattern treePattern,
+      final TablePattern tablePattern,
       final long startTime,
       final long endTime) {
-    super(pipeName, creationTime, pipeTaskMeta, pattern, startTime, endTime);
+    super(pipeName, creationTime, pipeTaskMeta, treePattern, tablePattern, 
startTime, endTime);
 
     this.resource = resource;
     tsFile = resource.getTsFile();
@@ -320,7 +323,8 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent 
implements TsFileIns
       final String pipeName,
       final long creationTime,
       final PipeTaskMeta pipeTaskMeta,
-      final TreePattern pattern,
+      final TreePattern treePattern,
+      final TablePattern tablePattern,
       final long startTime,
       final long endTime) {
     return new PipeTsFileInsertionEvent(
@@ -332,7 +336,8 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent 
implements TsFileIns
         pipeName,
         creationTime,
         pipeTaskMeta,
-        pattern,
+        treePattern,
+        tablePattern,
         startTime,
         endTime);
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEvent.java
index b03e7caf9fc..b6ce88572a0 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEvent.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.pipe.event.realtime;
 
 import org.apache.iotdb.commons.consensus.index.ProgressIndex;
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
+import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern;
 import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import 
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.epoch.TsFileEpoch;
@@ -45,8 +46,17 @@ public class PipeRealtimeEvent extends EnrichedEvent {
       final EnrichedEvent event,
       final TsFileEpoch tsFileEpoch,
       final Map<IDeviceID, String[]> device2Measurements,
-      final TreePattern pattern) {
-    this(event, tsFileEpoch, device2Measurements, null, pattern, 
Long.MIN_VALUE, Long.MAX_VALUE);
+      final TreePattern treePattern,
+      final TablePattern tablePattern) {
+    this(
+        event,
+        tsFileEpoch,
+        device2Measurements,
+        null,
+        treePattern,
+        tablePattern,
+        Long.MIN_VALUE,
+        Long.MAX_VALUE);
   }
 
   public PipeRealtimeEvent(
@@ -54,7 +64,8 @@ public class PipeRealtimeEvent extends EnrichedEvent {
       final TsFileEpoch tsFileEpoch,
       final Map<IDeviceID, String[]> device2Measurements,
       final PipeTaskMeta pipeTaskMeta,
-      final TreePattern pattern,
+      final TreePattern treePattern,
+      final TablePattern tablePattern,
       final long startTime,
       final long endTime) {
     // PipeTaskMeta is used to report the progress of the event, the 
PipeRealtimeEvent
@@ -64,7 +75,8 @@ public class PipeRealtimeEvent extends EnrichedEvent {
         event != null ? event.getPipeName() : null,
         event != null ? event.getCreationTime() : 0,
         pipeTaskMeta,
-        pattern,
+        treePattern,
+        tablePattern,
         startTime,
         endTime);
 
@@ -166,16 +178,18 @@ public class PipeRealtimeEvent extends EnrichedEvent {
       final String pipeName,
       final long creationTime,
       final PipeTaskMeta pipeTaskMeta,
-      final TreePattern pattern,
+      final TreePattern treePattern,
+      final TablePattern tablePattern,
       final long startTime,
       final long endTime) {
     return new PipeRealtimeEvent(
         event.shallowCopySelfAndBindPipeTaskMetaForProgressReport(
-            pipeName, creationTime, pipeTaskMeta, pattern, startTime, endTime),
+            pipeName, creationTime, pipeTaskMeta, treePattern, tablePattern, 
startTime, endTime),
         this.tsFileEpoch,
         this.device2Measurements,
         pipeTaskMeta,
-        pattern,
+        treePattern,
+        tablePattern,
         startTime,
         endTime);
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEventFactory.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEventFactory.java
index 3c3023c58ee..3ef134a96cb 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEventFactory.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEventFactory.java
@@ -58,16 +58,16 @@ public class PipeRealtimeEventFactory {
   public static PipeRealtimeEvent createRealtimeEvent(
       final String dataRegionId, final boolean shouldPrintMessage) {
     return new PipeRealtimeEvent(
-        new PipeHeartbeatEvent(dataRegionId, shouldPrintMessage), null, null, 
null);
+        new PipeHeartbeatEvent(dataRegionId, shouldPrintMessage), null, null, 
null, null);
   }
 
   public static PipeRealtimeEvent createRealtimeEvent(final DeleteDataNode 
node) {
     return new PipeRealtimeEvent(
-        new PipeSchemaRegionWritePlanEvent(node, node.isGeneratedByPipe()), 
null, null, null);
+        new PipeSchemaRegionWritePlanEvent(node, node.isGeneratedByPipe()), 
null, null, null, null);
   }
 
   public static PipeRealtimeEvent createRealtimeEvent(final 
ProgressReportEvent event) {
-    return new PipeRealtimeEvent(event, null, null, null);
+    return new PipeRealtimeEvent(event, null, null, null, null);
   }
 
   private PipeRealtimeEventFactory() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
index 82bb5e2f5e2..cfe8b639fa2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
@@ -617,7 +617,7 @@ public class PipeHistoricalDataRegionTsFileExtractor 
implements PipeHistoricalDa
             creationTime,
             pipeTaskMeta,
             treePattern,
-            // TODO: consider tablePattern
+            tablePattern,
             historicalDataExtractionStartTime,
             historicalDataExtractionEndTime);
     if (sloppyPattern || isDbNameCoveredByPattern) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java
index e80101933d8..2a8eeda9e0a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java
@@ -105,13 +105,13 @@ public class PipeDataRegionAssigner implements Closeable {
                   counter = 0;
                 }
 
-                // TODO: consider table pattern
                 final ProgressReportEvent reportEvent =
                     new ProgressReportEvent(
                         extractor.getPipeName(),
                         extractor.getCreationTime(),
                         extractor.getPipeTaskMeta(),
                         extractor.getTreePattern(),
+                        extractor.getTablePattern(),
                         extractor.getRealtimeDataExtractionStartTime(),
                         extractor.getRealtimeDataExtractionEndTime());
                 reportEvent.bindProgressIndex(event.getProgressIndex());
@@ -131,6 +131,7 @@ public class PipeDataRegionAssigner implements Closeable {
                       extractor.getCreationTime(),
                       extractor.getPipeTaskMeta(),
                       extractor.getTreePattern(),
+                      extractor.getTablePattern(),
                       extractor.getRealtimeDataExtractionStartTime(),
                       extractor.getRealtimeDataExtractionEndTime());
               final EnrichedEvent innerEvent = copiedEvent.getEvent();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/epoch/TsFileEpochManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/epoch/TsFileEpochManager.java
index e4244a1335e..d0ee25838b6 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/epoch/TsFileEpochManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/epoch/TsFileEpochManager.java
@@ -71,7 +71,8 @@ public class TsFileEpochManager {
         epoch,
         resource.getDevices().stream()
             .collect(Collectors.toMap(Functions.identity(), device -> 
EMPTY_MEASUREMENT_ARRAY)),
-        event.getPipePattern());
+        event.getTreePattern(),
+        event.getTablePattern());
   }
 
   public PipeRealtimeEvent bindPipeInsertNodeTabletInsertionEvent(
@@ -85,7 +86,8 @@ public class TsFileEpochManager {
         node instanceof InsertRowsNode
             ? getDevice2MeasurementsMapFromInsertRowsNode((InsertRowsNode) 
node)
             : Collections.singletonMap(node.getDeviceID(), 
node.getMeasurements()),
-        event.getPipePattern());
+        event.getTreePattern(),
+        event.getTablePattern());
   }
 
   private Map<IDeviceID, String[]> getDevice2MeasurementsMapFromInsertRowsNode(
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/CachedSchemaPatternMatcherTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/CachedSchemaPatternMatcherTest.java
index 89c96595253..487fabfda38 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/CachedSchemaPatternMatcherTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/CachedSchemaPatternMatcherTest.java
@@ -60,7 +60,7 @@ public class CachedSchemaPatternMatcherTest {
         TsFileEpoch tsFileEpoch,
         Map<IDeviceID, String[]> device2Measurements,
         TreePattern pattern) {
-      super(event, tsFileEpoch, device2Measurements, pattern);
+      super(event, tsFileEpoch, device2Measurements, pattern, null);
     }
 
     @Override
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java
index da427705714..60c8a3135d3 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java
@@ -24,6 +24,7 @@ import 
org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
 import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey;
 import 
org.apache.iotdb.commons.pipe.agent.task.progress.PipeEventCommitManager;
+import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern;
 import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern;
 import org.apache.iotdb.pipe.api.event.Event;
 
@@ -61,6 +62,7 @@ public abstract class EnrichedEvent implements Event {
   protected int rebootTimes = 0;
 
   protected final TreePattern treePattern;
+  protected final TablePattern tablePattern;
 
   protected final long startTime;
   protected final long endTime;
@@ -76,6 +78,7 @@ public abstract class EnrichedEvent implements Event {
       final long creationTime,
       final PipeTaskMeta pipeTaskMeta,
       final TreePattern treePattern,
+      final TablePattern tablePattern,
       final long startTime,
       final long endTime) {
     referenceCount = new AtomicInteger(0);
@@ -84,6 +87,7 @@ public abstract class EnrichedEvent implements Event {
     this.creationTime = creationTime;
     this.pipeTaskMeta = pipeTaskMeta;
     this.treePattern = treePattern;
+    this.tablePattern = tablePattern;
     this.startTime = startTime;
     this.endTime = endTime;
     isPatternParsed = this.treePattern == null || this.treePattern.isRoot();
@@ -310,10 +314,14 @@ public abstract class EnrichedEvent implements Event {
     return treePattern != null ? treePattern.getPattern() : null;
   }
 
-  public final TreePattern getPipePattern() {
+  public final TreePattern getTreePattern() {
     return treePattern;
   }
 
+  public final TablePattern getTablePattern() {
+    return tablePattern;
+  }
+
   public final long getStartTime() {
     return startTime;
   }
@@ -350,7 +358,8 @@ public abstract class EnrichedEvent implements Event {
       final String pipeName,
       final long creationTime,
       final PipeTaskMeta pipeTaskMeta,
-      final TreePattern pattern,
+      final TreePattern treePattern,
+      final TablePattern tablePattern,
       final long startTime,
       final long endTime);
 
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/PipeSnapshotEvent.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/PipeSnapshotEvent.java
index f94e96588d7..94a3d11c668 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/PipeSnapshotEvent.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/PipeSnapshotEvent.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.commons.pipe.event;
 
 import org.apache.iotdb.commons.consensus.index.ProgressIndex;
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
+import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern;
 import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern;
 import 
org.apache.iotdb.commons.pipe.resource.snapshot.PipeSnapshotResourceManager;
 
@@ -37,9 +38,17 @@ public abstract class PipeSnapshotEvent extends 
EnrichedEvent implements Seriali
       final String pipeName,
       final long creationTime,
       final PipeTaskMeta pipeTaskMeta,
-      final TreePattern pattern,
+      final TreePattern treePattern,
+      final TablePattern tablePattern,
       final PipeSnapshotResourceManager resourceManager) {
-    super(pipeName, creationTime, pipeTaskMeta, pattern, Long.MIN_VALUE, 
Long.MAX_VALUE);
+    super(
+        pipeName,
+        creationTime,
+        pipeTaskMeta,
+        treePattern,
+        tablePattern,
+        Long.MIN_VALUE,
+        Long.MAX_VALUE);
     this.resourceManager = resourceManager;
   }
 
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/PipeWritePlanEvent.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/PipeWritePlanEvent.java
index 220974bc71a..dc3240793ba 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/PipeWritePlanEvent.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/PipeWritePlanEvent.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.commons.pipe.event;
 
 import org.apache.iotdb.commons.consensus.index.ProgressIndex;
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
+import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern;
 import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern;
 
 public abstract class PipeWritePlanEvent extends EnrichedEvent implements 
SerializableEvent {
@@ -33,9 +34,17 @@ public abstract class PipeWritePlanEvent extends 
EnrichedEvent implements Serial
       final String pipeName,
       final long creationTime,
       final PipeTaskMeta pipeTaskMeta,
-      final TreePattern pattern,
+      final TreePattern treePattern,
+      final TablePattern tablePattern,
       final boolean isGeneratedByPipe) {
-    super(pipeName, creationTime, pipeTaskMeta, pattern, Long.MIN_VALUE, 
Long.MAX_VALUE);
+    super(
+        pipeName,
+        creationTime,
+        pipeTaskMeta,
+        treePattern,
+        tablePattern,
+        Long.MIN_VALUE,
+        Long.MAX_VALUE);
     this.isGeneratedByPipe = isGeneratedByPipe;
   }
 
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/ProgressReportEvent.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/ProgressReportEvent.java
index 97c62cfaaf6..5a7dbfe7399 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/ProgressReportEvent.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/ProgressReportEvent.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.commons.pipe.event;
 
 import org.apache.iotdb.commons.consensus.index.ProgressIndex;
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
+import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern;
 import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern;
 
 /**
@@ -37,9 +38,10 @@ public class ProgressReportEvent extends EnrichedEvent {
       final long creationTime,
       final PipeTaskMeta pipeTaskMeta,
       final TreePattern treePattern,
+      final TablePattern tablePattern,
       final long startTime,
       final long endTime) {
-    super(pipeName, creationTime, pipeTaskMeta, treePattern, startTime, 
endTime);
+    super(pipeName, creationTime, pipeTaskMeta, treePattern, tablePattern, 
startTime, endTime);
   }
 
   @Override
@@ -67,11 +69,12 @@ public class ProgressReportEvent extends EnrichedEvent {
       final String pipeName,
       final long creationTime,
       final PipeTaskMeta pipeTaskMeta,
-      final TreePattern pattern,
+      final TreePattern treePattern,
+      final TablePattern tablePattern,
       final long startTime,
       final long endTime) {
     return new ProgressReportEvent(
-        pipeName, creationTime, pipeTaskMeta, pattern, startTime, endTime);
+        pipeName, creationTime, pipeTaskMeta, treePattern, tablePattern, 
startTime, endTime);
   }
 
   @Override
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/extractor/IoTDBNonDataRegionExtractor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/extractor/IoTDBNonDataRegionExtractor.java
index ca58ceedf83..1f9f4250fe5 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/extractor/IoTDBNonDataRegionExtractor.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/extractor/IoTDBNonDataRegionExtractor.java
@@ -165,6 +165,7 @@ public abstract class IoTDBNonDataRegionExtractor extends 
IoTDBExtractor {
                       creationTime,
                       pipeTaskMeta,
                       pipePattern,
+                      null,
                       Long.MIN_VALUE,
                       Long.MAX_VALUE);
 
@@ -192,7 +193,13 @@ public abstract class IoTDBNonDataRegionExtractor extends 
IoTDBExtractor {
         || (!isForwardingPipeRequests && realtimeEvent.isGeneratedByPipe())) {
       final ProgressReportEvent event =
           new ProgressReportEvent(
-              pipeName, creationTime, pipeTaskMeta, pipePattern, 
Long.MIN_VALUE, Long.MAX_VALUE);
+              pipeName,
+              creationTime,
+              pipeTaskMeta,
+              pipePattern,
+              null,
+              Long.MIN_VALUE,
+              Long.MAX_VALUE);
       event.bindProgressIndex(new MetaProgressIndex(iterator.getNextIndex() - 
1));
       
event.increaseReferenceCount(IoTDBNonDataRegionExtractor.class.getName());
       return event;
@@ -201,7 +208,13 @@ public abstract class IoTDBNonDataRegionExtractor extends 
IoTDBExtractor {
     realtimeEvent =
         (PipeWritePlanEvent)
             realtimeEvent.shallowCopySelfAndBindPipeTaskMetaForProgressReport(
-                pipeName, creationTime, pipeTaskMeta, pipePattern, 
Long.MIN_VALUE, Long.MAX_VALUE);
+                pipeName,
+                creationTime,
+                pipeTaskMeta,
+                pipePattern,
+                null,
+                Long.MIN_VALUE,
+                Long.MAX_VALUE);
     realtimeEvent.bindProgressIndex(new 
MetaProgressIndex(iterator.getNextIndex() - 1));
     
realtimeEvent.increaseReferenceCount(IoTDBNonDataRegionExtractor.class.getName());
     return realtimeEvent;

Reply via email to