This is an automated email from the ASF dual-hosted git repository. justinchen pushed a commit to branch api-13 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit af1eb3623e7667ce81e750aa9502a4613d03bbe9 Author: Caideyipi <[email protected]> AuthorDate: Mon Dec 22 10:45:20 2025 +0800 patch --- .../tablet/PipeInsertNodeTabletInsertionEvent.java | 2 +- .../common/tablet/PipeRawTabletEventConverter.java | 22 ---------------------- .../common/tablet/PipeRawTabletInsertionEvent.java | 6 +++++- .../tablet/TabletInsertionDataContainer.java | 10 +++++++--- .../event/TsFileInsertionDataContainerTest.java | 4 ---- 5 files changed, 13 insertions(+), 31 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java index 391875b674c..ade4d67b2dd 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java @@ -313,7 +313,7 @@ public class PipeInsertNodeTabletInsertionEvent extends EnrichedEvent @Override public Iterable<TabletInsertionEvent> processTabletWithCollect( BiConsumer<Tablet, TabletCollector> consumer) { - return initEventParsers().stream() + return initDataContainers().stream() .map( tabletInsertionEventParser -> tabletInsertionEventParser.processTabletWithCollect(consumer)) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletEventConverter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletEventConverter.java index 4f387e71883..829c5304dab 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletEventConverter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletEventConverter.java @@ -21,7 +21,6 @@ package org.apache.iotdb.db.pipe.event.common.tablet; import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta; import org.apache.iotdb.commons.pipe.event.EnrichedEvent; -import org.apache.iotdb.db.pipe.event.common.PipeInsertionEvent; import org.apache.iotdb.pipe.api.collector.DataCollector; import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; @@ -33,31 +32,10 @@ public abstract class PipeRawTabletEventConverter implements DataCollector { protected boolean isAligned = false; protected final PipeTaskMeta pipeTaskMeta; // Used to report progress protected final EnrichedEvent sourceEvent; // Used to report progress - protected final String sourceEventDataBaseName; - protected final Boolean isTableModel; public PipeRawTabletEventConverter(PipeTaskMeta pipeTaskMeta, EnrichedEvent sourceEvent) { this.pipeTaskMeta = pipeTaskMeta; this.sourceEvent = sourceEvent; - if (sourceEvent instanceof PipeInsertionEvent) { - sourceEventDataBaseName = - ((PipeInsertionEvent) sourceEvent).getSourceDatabaseNameFromDataRegion(); - isTableModel = ((PipeInsertionEvent) sourceEvent).getRawIsTableModelEvent(); - } else { - sourceEventDataBaseName = null; - isTableModel = null; - } - } - - public PipeRawTabletEventConverter( - PipeTaskMeta pipeTaskMeta, - EnrichedEvent sourceEvent, - String sourceEventDataBase, - Boolean isTableModel) { - this.pipeTaskMeta = pipeTaskMeta; - this.sourceEvent = sourceEvent; - this.sourceEventDataBaseName = sourceEventDataBase; - this.isTableModel = isTableModel; } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java index 514128980bf..1377c415319 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java @@ -295,7 +295,11 @@ public class PipeRawTabletInsertionEvent extends EnrichedEvent @Override public Iterable<TabletInsertionEvent> processTabletWithCollect( BiConsumer<Tablet, TabletCollector> consumer) { - return initEventParser().processTabletWithCollect(consumer); + if (dataContainer == null) { + dataContainer = + new TabletInsertionDataContainer(pipeTaskMeta, this, tablet, isAligned, pipePattern); + } + return dataContainer.processTabletWithCollect(consumer); } /////////////////////////// convertToTablet /////////////////////////// diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/TabletInsertionDataContainer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/TabletInsertionDataContainer.java index 8143ab81d1b..133dbb5bff8 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/TabletInsertionDataContainer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/TabletInsertionDataContainer.java @@ -690,6 +690,13 @@ public class TabletInsertionDataContainer { return rowCollector.convertToTabletInsertionEvents(shouldReport); } + public List<TabletInsertionEvent> processTabletWithCollect( + BiConsumer<Tablet, TabletCollector> consumer) { + final PipeTabletCollector tabletCollector = new PipeTabletCollector(pipeTaskMeta, sourceEvent); + consumer.accept(convertToTablet(), tabletCollector); + return tabletCollector.convertToTabletInsertionEvents(shouldReport); + } + //////////////////////////// convertToTablet //////////////////////////// public Tablet convertToTablet() { @@ -707,7 +714,4 @@ public class TabletInsertionDataContainer { return tablet; } - - public abstract List<TabletInsertionEvent> processTabletWithCollect( - final BiConsumer<Tablet, TabletCollector> consumer); } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionDataContainerTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionDataContainerTest.java index 709b1e8f78a..df7844bdd95 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionDataContainerTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionDataContainerTest.java @@ -22,13 +22,9 @@ package org.apache.iotdb.db.pipe.event; import org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBPipePattern; import org.apache.iotdb.commons.pipe.datastructure.pattern.PipePattern; import org.apache.iotdb.commons.pipe.datastructure.pattern.PrefixPipePattern; -import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent; import org.apache.iotdb.db.pipe.event.common.tsfile.container.TsFileInsertionDataContainer; import org.apache.iotdb.db.pipe.event.common.tsfile.container.query.TsFileInsertionQueryDataContainer; import org.apache.iotdb.db.pipe.event.common.tsfile.container.scan.TsFileInsertionScanDataContainer; -import org.apache.iotdb.commons.exception.IllegalPathException; -import org.apache.iotdb.commons.path.PartialPath; -import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent; import org.apache.iotdb.db.storageengine.dataregion.compaction.utils.CompactionTestFileWriter; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
