This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new 7b8a1479225 Subscription: let subscription module fully manage the 
parsing process of the insert node event (#15012) (#15043)
7b8a1479225 is described below

commit 7b8a14792258acee8a1e6dc91da0afe0eb55b2a6
Author: VGalaxies <[email protected]>
AuthorDate: Mon Mar 10 10:31:27 2025 +0800

    Subscription: let subscription module fully manage the parsing process of 
the insert node event (#15012) (#15043)
---
 .../db/pipe/agent/task/connection/PipeEventCollector.java  | 14 +++++++++-----
 .../db/pipe/agent/task/stage/PipeTaskProcessorStage.java   |  4 ++--
 2 files changed, 11 insertions(+), 7 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java
index bb8ebeb5fc8..86a4d00741e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java
@@ -54,7 +54,7 @@ public class PipeEventCollector implements EventCollector {
 
   private final boolean forceTabletFormat;
 
-  private final boolean skipParseTsFile;
+  private final boolean skipParsing;
 
   private final AtomicInteger collectInvocationCount = new AtomicInteger(0);
   private boolean hasNoGeneratedEvent = true;
@@ -65,12 +65,12 @@ public class PipeEventCollector implements EventCollector {
       final long creationTime,
       final int regionId,
       final boolean forceTabletFormat,
-      final boolean skipParseTsFile) {
+      final boolean skipParsing) {
     this.pendingQueue = pendingQueue;
     this.creationTime = creationTime;
     this.regionId = regionId;
     this.forceTabletFormat = forceTabletFormat;
-    this.skipParseTsFile = skipParseTsFile;
+    this.skipParsing = skipParsing;
   }
 
   @Override
@@ -99,7 +99,11 @@ public class PipeEventCollector implements EventCollector {
   }
 
   private void parseAndCollectEvent(final PipeInsertNodeTabletInsertionEvent 
sourceEvent) {
-    // TODO: let subscription module fully manage the parsing process of the 
insert node event
+    if (skipParsing) {
+      collectEvent(sourceEvent);
+      return;
+    }
+
     if (sourceEvent.shouldParseTimeOrPattern()) {
       for (final PipeRawTabletInsertionEvent parsedEvent :
           sourceEvent.toRawTabletInsertionEvents()) {
@@ -125,7 +129,7 @@ public class PipeEventCollector implements EventCollector {
       return;
     }
 
-    if (skipParseTsFile) {
+    if (skipParsing) {
       collectEvent(sourceEvent);
       return;
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskProcessorStage.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskProcessorStage.java
index 5201a19d8bc..ddc194716d2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskProcessorStage.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskProcessorStage.java
@@ -69,7 +69,7 @@ public class PipeTaskProcessorStage extends PipeTaskStage {
       final PipeProcessorSubtaskExecutor executor,
       final PipeTaskMeta pipeTaskMeta,
       final boolean forceTabletFormat,
-      final boolean skipParseTsFile) {
+      final boolean skipParsing) {
     final PipeProcessorRuntimeConfiguration runtimeConfiguration =
         new PipeTaskRuntimeConfiguration(
             new PipeTaskProcessorRuntimeEnvironment(
@@ -105,7 +105,7 @@ public class PipeTaskProcessorStage extends PipeTaskStage {
             creationTime,
             regionId,
             forceTabletFormat,
-            skipParseTsFile);
+            skipParsing);
     this.pipeProcessorSubtask =
         new PipeProcessorSubtask(
             taskId,

Reply via email to