This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch pipe-api
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/pipe-api by this push:
new 425a7466c2d may-complete-n
425a7466c2d is described below
commit 425a7466c2d6e90b7a31027cfba40e7e478c2799
Author: Caideyipi <[email protected]>
AuthorDate: Fri Dec 19 11:21:15 2025 +0800
may-complete-n
---
.../org/apache/iotdb/pipe/api/collector/RowCollector.java | 2 +-
.../pipe/api/event/dml/insertion/TabletInsertionEvent.java | 11 +++++++++++
.../common/tablet/PipeInsertNodeTabletInsertionEvent.java | 12 ++++++++++++
.../event/common/tablet/PipeRawTabletInsertionEvent.java | 7 +++++++
.../db/pipe/event/common/tablet/PipeTabletCollector.java | 2 --
.../common/tablet/parser/TabletInsertionEventParser.java | 4 ++++
.../parser/TabletInsertionEventTablePatternParser.java | 10 ++++++++++
.../tablet/parser/TabletInsertionEventTreePatternParser.java | 10 ++++++++++
8 files changed, 55 insertions(+), 3 deletions(-)
diff --git
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/collector/RowCollector.java
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/collector/RowCollector.java
index 0518f8b2040..a04eae69044 100644
---
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/collector/RowCollector.java
+++
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/collector/RowCollector.java
@@ -27,7 +27,7 @@ import java.util.function.BiConsumer;
/**
* Used to collect rows generated by {@link
TabletInsertionEvent#processRowByRow(BiConsumer)},
- * {@link TabletInsertionEvent#processTablet(BiConsumer)}.
+ * {@link TabletInsertionEvent#processTabletWithTabletCollection(BiConsumer)}.
*/
public interface RowCollector {
diff --git
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/insertion/TabletInsertionEvent.java
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/insertion/TabletInsertionEvent.java
index 6e1575a464f..57234f1ea87 100644
---
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/insertion/TabletInsertionEvent.java
+++
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/insertion/TabletInsertionEvent.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.pipe.api.event.dml.insertion;
import org.apache.iotdb.pipe.api.access.Row;
import org.apache.iotdb.pipe.api.collector.RowCollector;
+import org.apache.iotdb.pipe.api.collector.TabletCollector;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.tsfile.write.record.Tablet;
@@ -45,4 +46,14 @@ public interface TabletInsertionEvent extends Event {
* contains the results collected by the {@link RowCollector}
*/
Iterable<TabletInsertionEvent> processTablet(BiConsumer<Tablet,
RowCollector> consumer);
+
+ /**
+ * The consumer processes the Tablet directly and collects the results by
{@link
+ * org.apache.iotdb.pipe.api.collector.TabletCollector}.
+ *
+ * @return {@code Iterable<TabletInsertionEvent>} a list of new {@link
TabletInsertionEvent}
+ * contains the results collected by the {@link RowCollector}
+ */
+ Iterable<TabletInsertionEvent> processTabletWithCollect(
+ BiConsumer<Tablet, TabletCollector> consumer);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
index f5ae873bbac..5a95b5328ca 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
@@ -52,6 +52,7 @@ import
org.apache.iotdb.db.storageengine.dataregion.memtable.DeviceIDFactory;
import
org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALPipeException;
import org.apache.iotdb.pipe.api.access.Row;
import org.apache.iotdb.pipe.api.collector.RowCollector;
+import org.apache.iotdb.pipe.api.collector.TabletCollector;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.pipe.api.exception.PipeException;
@@ -425,6 +426,17 @@ public class PipeInsertNodeTabletInsertionEvent extends
PipeInsertionEvent
.collect(Collectors.toList());
}
+ @Override
+ public Iterable<TabletInsertionEvent> processTabletWithCollect(
+ BiConsumer<Tablet, TabletCollector> consumer) {
+ return initEventParsers().stream()
+ .map(
+ tabletInsertionEventParser ->
+ tabletInsertionEventParser.processTabletWithCollect(consumer))
+ .flatMap(Collection::stream)
+ .collect(Collectors.toList());
+ }
+
/////////////////////////// convertToTablet ///////////////////////////
public boolean isAligned(final int i) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
index adcef5128f5..3c06c653ed8 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
@@ -39,6 +39,7 @@ import
org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil;
import org.apache.iotdb.db.pipe.resource.memory.PipeTabletMemoryBlock;
import org.apache.iotdb.pipe.api.access.Row;
import org.apache.iotdb.pipe.api.collector.RowCollector;
+import org.apache.iotdb.pipe.api.collector.TabletCollector;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.tsfile.utils.RamUsageEstimator;
@@ -415,6 +416,12 @@ public class PipeRawTabletInsertionEvent extends
PipeInsertionEvent
return initEventParser().processTablet(consumer);
}
+ @Override
+ public Iterable<TabletInsertionEvent> processTabletWithCollect(
+ BiConsumer<Tablet, TabletCollector> consumer) {
+ return initEventParser().processTabletWithCollect(consumer);
+ }
+
/////////////////////////// convertToTablet ///////////////////////////
public boolean isAligned() {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeTabletCollector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeTabletCollector.java
index ccad1e94f5d..beacc54e705 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeTabletCollector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeTabletCollector.java
@@ -26,8 +26,6 @@ import org.apache.iotdb.pipe.api.collector.TabletCollector;
import org.apache.tsfile.write.record.Tablet;
-import java.io.IOException;
-
public class PipeTabletCollector extends PipeRawTabletEventConverter
implements TabletCollector {
public PipeTabletCollector(PipeTaskMeta pipeTaskMeta, EnrichedEvent
sourceEvent) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/parser/TabletInsertionEventParser.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/parser/TabletInsertionEventParser.java
index e10051325b3..d4531d78db0 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/parser/TabletInsertionEventParser.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/parser/TabletInsertionEventParser.java
@@ -26,6 +26,7 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNod
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
import org.apache.iotdb.pipe.api.access.Row;
import org.apache.iotdb.pipe.api.collector.RowCollector;
+import org.apache.iotdb.pipe.api.collector.TabletCollector;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.tsfile.enums.ColumnCategory;
@@ -636,5 +637,8 @@ public abstract class TabletInsertionEventParser {
public abstract List<TabletInsertionEvent> processTablet(
final BiConsumer<Tablet, RowCollector> consumer);
+ public abstract List<TabletInsertionEvent> processTabletWithCollect(
+ final BiConsumer<Tablet, TabletCollector> consumer);
+
public abstract Tablet convertToTablet();
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/parser/TabletInsertionEventTablePatternParser.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/parser/TabletInsertionEventTablePatternParser.java
index de68404fb2a..522450cdef8 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/parser/TabletInsertionEventTablePatternParser.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/parser/TabletInsertionEventTablePatternParser.java
@@ -27,6 +27,7 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalIn
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalInsertTabletNode;
import org.apache.iotdb.pipe.api.access.Row;
import org.apache.iotdb.pipe.api.collector.RowCollector;
+import org.apache.iotdb.pipe.api.collector.TabletCollector;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.tsfile.write.UnSupportedDataTypeException;
@@ -113,6 +114,15 @@ public class TabletInsertionEventTablePatternParser
extends TabletInsertionEvent
return Collections.emptyList();
}
+ @Override
+ public List<TabletInsertionEvent> processTabletWithCollect(
+ BiConsumer<Tablet, TabletCollector> consumer) {
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.warn("TablePatternParser does not support tablet processing");
+ }
+ return Collections.emptyList();
+ }
+
//////////////////////////// convertToTablet ////////////////////////////
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/parser/TabletInsertionEventTreePatternParser.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/parser/TabletInsertionEventTreePatternParser.java
index 68fb0e50b95..fb8287d4704 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/parser/TabletInsertionEventTreePatternParser.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/parser/TabletInsertionEventTreePatternParser.java
@@ -25,11 +25,13 @@ import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.pipe.event.common.row.PipeRow;
import org.apache.iotdb.db.pipe.event.common.row.PipeRowCollector;
+import org.apache.iotdb.db.pipe.event.common.tablet.PipeTabletCollector;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
import org.apache.iotdb.pipe.api.access.Row;
import org.apache.iotdb.pipe.api.collector.RowCollector;
+import org.apache.iotdb.pipe.api.collector.TabletCollector;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.tsfile.write.UnSupportedDataTypeException;
@@ -154,6 +156,14 @@ public class TabletInsertionEventTreePatternParser extends
TabletInsertionEventP
return rowCollector.convertToTabletInsertionEvents(shouldReport);
}
+ @Override
+ public List<TabletInsertionEvent> processTabletWithCollect(
+ BiConsumer<Tablet, TabletCollector> consumer) {
+ final PipeTabletCollector tabletCollector = new
PipeTabletCollector(pipeTaskMeta, sourceEvent);
+ consumer.accept(convertToTablet(), tabletCollector);
+ return tabletCollector.convertToTabletInsertionEvents(shouldReport);
+ }
+
//////////////////////////// convertToTablet ////////////////////////////
@Override