This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch pipe-api
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/pipe-api by this push:
     new 425a7466c2d may-complete-n
425a7466c2d is described below

commit 425a7466c2d6e90b7a31027cfba40e7e478c2799
Author: Caideyipi <[email protected]>
AuthorDate: Fri Dec 19 11:21:15 2025 +0800

    may-complete-n
---
 .../org/apache/iotdb/pipe/api/collector/RowCollector.java    |  2 +-
 .../pipe/api/event/dml/insertion/TabletInsertionEvent.java   | 11 +++++++++++
 .../common/tablet/PipeInsertNodeTabletInsertionEvent.java    | 12 ++++++++++++
 .../event/common/tablet/PipeRawTabletInsertionEvent.java     |  7 +++++++
 .../db/pipe/event/common/tablet/PipeTabletCollector.java     |  2 --
 .../common/tablet/parser/TabletInsertionEventParser.java     |  4 ++++
 .../parser/TabletInsertionEventTablePatternParser.java       | 10 ++++++++++
 .../tablet/parser/TabletInsertionEventTreePatternParser.java | 10 ++++++++++
 8 files changed, 55 insertions(+), 3 deletions(-)

diff --git 
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/collector/RowCollector.java
 
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/collector/RowCollector.java
index 0518f8b2040..a04eae69044 100644
--- 
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/collector/RowCollector.java
+++ 
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/collector/RowCollector.java
@@ -27,7 +27,7 @@ import java.util.function.BiConsumer;
 
 /**
  * Used to collect rows generated by {@link 
TabletInsertionEvent#processRowByRow(BiConsumer)},
- * {@link TabletInsertionEvent#processTablet(BiConsumer)}.
+ * {@link TabletInsertionEvent#processTabletWithTabletCollection(BiConsumer)}.
  */
 public interface RowCollector {
 
diff --git 
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/insertion/TabletInsertionEvent.java
 
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/insertion/TabletInsertionEvent.java
index 6e1575a464f..57234f1ea87 100644
--- 
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/insertion/TabletInsertionEvent.java
+++ 
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/insertion/TabletInsertionEvent.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.pipe.api.event.dml.insertion;
 
 import org.apache.iotdb.pipe.api.access.Row;
 import org.apache.iotdb.pipe.api.collector.RowCollector;
+import org.apache.iotdb.pipe.api.collector.TabletCollector;
 import org.apache.iotdb.pipe.api.event.Event;
 
 import org.apache.tsfile.write.record.Tablet;
@@ -45,4 +46,14 @@ public interface TabletInsertionEvent extends Event {
    *     contains the results collected by the {@link RowCollector}
    */
   Iterable<TabletInsertionEvent> processTablet(BiConsumer<Tablet, 
RowCollector> consumer);
+
+  /**
+   * The consumer processes the Tablet directly and collects the results by 
{@link
+   * org.apache.iotdb.pipe.api.collector.TabletCollector}.
+   *
+   * @return {@code Iterable<TabletInsertionEvent>} a list of new {@link 
TabletInsertionEvent}
+   *     contains the results collected by the {@link RowCollector}
+   */
+  Iterable<TabletInsertionEvent> processTabletWithCollect(
+      BiConsumer<Tablet, TabletCollector> consumer);
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
index f5ae873bbac..5a95b5328ca 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
@@ -52,6 +52,7 @@ import 
org.apache.iotdb.db.storageengine.dataregion.memtable.DeviceIDFactory;
 import 
org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALPipeException;
 import org.apache.iotdb.pipe.api.access.Row;
 import org.apache.iotdb.pipe.api.collector.RowCollector;
+import org.apache.iotdb.pipe.api.collector.TabletCollector;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
 import org.apache.iotdb.pipe.api.exception.PipeException;
 
@@ -425,6 +426,17 @@ public class PipeInsertNodeTabletInsertionEvent extends 
PipeInsertionEvent
         .collect(Collectors.toList());
   }
 
+  @Override
+  public Iterable<TabletInsertionEvent> processTabletWithCollect(
+      BiConsumer<Tablet, TabletCollector> consumer) {
+    return initEventParsers().stream()
+        .map(
+            tabletInsertionEventParser ->
+                tabletInsertionEventParser.processTabletWithCollect(consumer))
+        .flatMap(Collection::stream)
+        .collect(Collectors.toList());
+  }
+
   /////////////////////////// convertToTablet ///////////////////////////
 
   public boolean isAligned(final int i) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
index adcef5128f5..3c06c653ed8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
@@ -39,6 +39,7 @@ import 
org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil;
 import org.apache.iotdb.db.pipe.resource.memory.PipeTabletMemoryBlock;
 import org.apache.iotdb.pipe.api.access.Row;
 import org.apache.iotdb.pipe.api.collector.RowCollector;
+import org.apache.iotdb.pipe.api.collector.TabletCollector;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
 
 import org.apache.tsfile.utils.RamUsageEstimator;
@@ -415,6 +416,12 @@ public class PipeRawTabletInsertionEvent extends 
PipeInsertionEvent
     return initEventParser().processTablet(consumer);
   }
 
+  @Override
+  public Iterable<TabletInsertionEvent> processTabletWithCollect(
+      BiConsumer<Tablet, TabletCollector> consumer) {
+    return initEventParser().processTabletWithCollect(consumer);
+  }
+
   /////////////////////////// convertToTablet ///////////////////////////
 
   public boolean isAligned() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeTabletCollector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeTabletCollector.java
index ccad1e94f5d..beacc54e705 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeTabletCollector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeTabletCollector.java
@@ -26,8 +26,6 @@ import org.apache.iotdb.pipe.api.collector.TabletCollector;
 
 import org.apache.tsfile.write.record.Tablet;
 
-import java.io.IOException;
-
 public class PipeTabletCollector extends PipeRawTabletEventConverter 
implements TabletCollector {
 
   public PipeTabletCollector(PipeTaskMeta pipeTaskMeta, EnrichedEvent 
sourceEvent) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/parser/TabletInsertionEventParser.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/parser/TabletInsertionEventParser.java
index e10051325b3..d4531d78db0 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/parser/TabletInsertionEventParser.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/parser/TabletInsertionEventParser.java
@@ -26,6 +26,7 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNod
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
 import org.apache.iotdb.pipe.api.access.Row;
 import org.apache.iotdb.pipe.api.collector.RowCollector;
+import org.apache.iotdb.pipe.api.collector.TabletCollector;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
 
 import org.apache.tsfile.enums.ColumnCategory;
@@ -636,5 +637,8 @@ public abstract class TabletInsertionEventParser {
   public abstract List<TabletInsertionEvent> processTablet(
       final BiConsumer<Tablet, RowCollector> consumer);
 
+  public abstract List<TabletInsertionEvent> processTabletWithCollect(
+      final BiConsumer<Tablet, TabletCollector> consumer);
+
   public abstract Tablet convertToTablet();
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/parser/TabletInsertionEventTablePatternParser.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/parser/TabletInsertionEventTablePatternParser.java
index de68404fb2a..522450cdef8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/parser/TabletInsertionEventTablePatternParser.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/parser/TabletInsertionEventTablePatternParser.java
@@ -27,6 +27,7 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalIn
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalInsertTabletNode;
 import org.apache.iotdb.pipe.api.access.Row;
 import org.apache.iotdb.pipe.api.collector.RowCollector;
+import org.apache.iotdb.pipe.api.collector.TabletCollector;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
 
 import org.apache.tsfile.write.UnSupportedDataTypeException;
@@ -113,6 +114,15 @@ public class TabletInsertionEventTablePatternParser 
extends TabletInsertionEvent
     return Collections.emptyList();
   }
 
+  @Override
+  public List<TabletInsertionEvent> processTabletWithCollect(
+      BiConsumer<Tablet, TabletCollector> consumer) {
+    if (LOGGER.isDebugEnabled()) {
+      LOGGER.warn("TablePatternParser does not support tablet processing");
+    }
+    return Collections.emptyList();
+  }
+
   ////////////////////////////  convertToTablet  ////////////////////////////
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/parser/TabletInsertionEventTreePatternParser.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/parser/TabletInsertionEventTreePatternParser.java
index 68fb0e50b95..fb8287d4704 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/parser/TabletInsertionEventTreePatternParser.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/parser/TabletInsertionEventTreePatternParser.java
@@ -25,11 +25,13 @@ import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.db.pipe.event.common.row.PipeRow;
 import org.apache.iotdb.db.pipe.event.common.row.PipeRowCollector;
+import org.apache.iotdb.db.pipe.event.common.tablet.PipeTabletCollector;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
 import org.apache.iotdb.pipe.api.access.Row;
 import org.apache.iotdb.pipe.api.collector.RowCollector;
+import org.apache.iotdb.pipe.api.collector.TabletCollector;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
 
 import org.apache.tsfile.write.UnSupportedDataTypeException;
@@ -154,6 +156,14 @@ public class TabletInsertionEventTreePatternParser extends 
TabletInsertionEventP
     return rowCollector.convertToTabletInsertionEvents(shouldReport);
   }
 
+  @Override
+  public List<TabletInsertionEvent> processTabletWithCollect(
+      BiConsumer<Tablet, TabletCollector> consumer) {
+    final PipeTabletCollector tabletCollector = new 
PipeTabletCollector(pipeTaskMeta, sourceEvent);
+    consumer.accept(convertToTablet(), tabletCollector);
+    return tabletCollector.convertToTabletInsertionEvents(shouldReport);
+  }
+
   ////////////////////////////  convertToTablet  ////////////////////////////
 
   @Override

Reply via email to