This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch api-13
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit af1eb3623e7667ce81e750aa9502a4613d03bbe9
Author: Caideyipi <[email protected]>
AuthorDate: Mon Dec 22 10:45:20 2025 +0800

    patch
---
 .../tablet/PipeInsertNodeTabletInsertionEvent.java |  2 +-
 .../common/tablet/PipeRawTabletEventConverter.java | 22 ----------------------
 .../common/tablet/PipeRawTabletInsertionEvent.java |  6 +++++-
 .../tablet/TabletInsertionDataContainer.java       | 10 +++++++---
 .../event/TsFileInsertionDataContainerTest.java    |  4 ----
 5 files changed, 13 insertions(+), 31 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
index 391875b674c..ade4d67b2dd 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
@@ -313,7 +313,7 @@ public class PipeInsertNodeTabletInsertionEvent extends 
EnrichedEvent
   @Override
   public Iterable<TabletInsertionEvent> processTabletWithCollect(
       BiConsumer<Tablet, TabletCollector> consumer) {
-    return initEventParsers().stream()
+    return initDataContainers().stream()
         .map(
             tabletInsertionEventParser ->
                 tabletInsertionEventParser.processTabletWithCollect(consumer))
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletEventConverter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletEventConverter.java
index 4f387e71883..829c5304dab 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletEventConverter.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletEventConverter.java
@@ -21,7 +21,6 @@ package org.apache.iotdb.db.pipe.event.common.tablet;
 
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
-import org.apache.iotdb.db.pipe.event.common.PipeInsertionEvent;
 import org.apache.iotdb.pipe.api.collector.DataCollector;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
 
@@ -33,31 +32,10 @@ public abstract class PipeRawTabletEventConverter 
implements DataCollector {
   protected boolean isAligned = false;
   protected final PipeTaskMeta pipeTaskMeta; // Used to report progress
   protected final EnrichedEvent sourceEvent; // Used to report progress
-  protected final String sourceEventDataBaseName;
-  protected final Boolean isTableModel;
 
   public PipeRawTabletEventConverter(PipeTaskMeta pipeTaskMeta, EnrichedEvent 
sourceEvent) {
     this.pipeTaskMeta = pipeTaskMeta;
     this.sourceEvent = sourceEvent;
-    if (sourceEvent instanceof PipeInsertionEvent) {
-      sourceEventDataBaseName =
-          ((PipeInsertionEvent) 
sourceEvent).getSourceDatabaseNameFromDataRegion();
-      isTableModel = ((PipeInsertionEvent) 
sourceEvent).getRawIsTableModelEvent();
-    } else {
-      sourceEventDataBaseName = null;
-      isTableModel = null;
-    }
-  }
-
-  public PipeRawTabletEventConverter(
-      PipeTaskMeta pipeTaskMeta,
-      EnrichedEvent sourceEvent,
-      String sourceEventDataBase,
-      Boolean isTableModel) {
-    this.pipeTaskMeta = pipeTaskMeta;
-    this.sourceEvent = sourceEvent;
-    this.sourceEventDataBaseName = sourceEventDataBase;
-    this.isTableModel = isTableModel;
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
index 514128980bf..1377c415319 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
@@ -295,7 +295,11 @@ public class PipeRawTabletInsertionEvent extends 
EnrichedEvent
   @Override
   public Iterable<TabletInsertionEvent> processTabletWithCollect(
       BiConsumer<Tablet, TabletCollector> consumer) {
-    return initEventParser().processTabletWithCollect(consumer);
+    if (dataContainer == null) {
+      dataContainer =
+          new TabletInsertionDataContainer(pipeTaskMeta, this, tablet, 
isAligned, pipePattern);
+    }
+    return dataContainer.processTabletWithCollect(consumer);
   }
 
   /////////////////////////// convertToTablet ///////////////////////////
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/TabletInsertionDataContainer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/TabletInsertionDataContainer.java
index 8143ab81d1b..133dbb5bff8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/TabletInsertionDataContainer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/TabletInsertionDataContainer.java
@@ -690,6 +690,13 @@ public class TabletInsertionDataContainer {
     return rowCollector.convertToTabletInsertionEvents(shouldReport);
   }
 
+  public List<TabletInsertionEvent> processTabletWithCollect(
+      BiConsumer<Tablet, TabletCollector> consumer) {
+    final PipeTabletCollector tabletCollector = new 
PipeTabletCollector(pipeTaskMeta, sourceEvent);
+    consumer.accept(convertToTablet(), tabletCollector);
+    return tabletCollector.convertToTabletInsertionEvents(shouldReport);
+  }
+
   ////////////////////////////  convertToTablet  ////////////////////////////
 
   public Tablet convertToTablet() {
@@ -707,7 +714,4 @@ public class TabletInsertionDataContainer {
 
     return tablet;
   }
-
-  public abstract List<TabletInsertionEvent> processTabletWithCollect(
-      final BiConsumer<Tablet, TabletCollector> consumer);
 }
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionDataContainerTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionDataContainerTest.java
index 709b1e8f78a..df7844bdd95 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionDataContainerTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionDataContainerTest.java
@@ -22,13 +22,9 @@ package org.apache.iotdb.db.pipe.event;
 import org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBPipePattern;
 import org.apache.iotdb.commons.pipe.datastructure.pattern.PipePattern;
 import org.apache.iotdb.commons.pipe.datastructure.pattern.PrefixPipePattern;
-import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
 import 
org.apache.iotdb.db.pipe.event.common.tsfile.container.TsFileInsertionDataContainer;
 import 
org.apache.iotdb.db.pipe.event.common.tsfile.container.query.TsFileInsertionQueryDataContainer;
 import 
org.apache.iotdb.db.pipe.event.common.tsfile.container.scan.TsFileInsertionScanDataContainer;
-import org.apache.iotdb.commons.exception.IllegalPathException;
-import org.apache.iotdb.commons.path.PartialPath;
-import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.utils.CompactionTestFileWriter;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
 import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;

Reply via email to