This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 174b2cb968f IoTConsensusV2: Transfer table deletion without any parse 
or filter (#14988)
174b2cb968f is described below

commit 174b2cb968f398491cb571d4373847de7d9e2abf
Author: Peng Junzhi <[email protected]>
AuthorDate: Fri Feb 28 19:05:33 2025 +0800

    IoTConsensusV2: Transfer table deletion without any parse or filter (#14988)
    
    * transfer table deletion for iotv2
    
    * minor fix
---
 .../db/pipe/agent/task/connection/PipeEventCollector.java   | 13 ++++++++++++-
 .../db/pipe/agent/task/stage/PipeTaskProcessorStage.java    |  5 ++++-
 2 files changed, 16 insertions(+), 2 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java
index d8bd7fdce2b..f6a6f07c066 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java
@@ -57,6 +57,8 @@ public class PipeEventCollector implements EventCollector {
 
   private final boolean skipParseTsFile;
 
+  private final boolean isUsedForConsensusPipe;
+
   private final AtomicInteger collectInvocationCount = new AtomicInteger(0);
   private boolean hasNoGeneratedEvent = true;
   private boolean isFailedToIncreaseReferenceCount = false;
@@ -66,12 +68,14 @@ public class PipeEventCollector implements EventCollector {
       final long creationTime,
       final int regionId,
       final boolean forceTabletFormat,
-      final boolean skipParseTsFile) {
+      final boolean skipParseTsFile,
+      final boolean isUsedInConsensusPipe) {
     this.pendingQueue = pendingQueue;
     this.creationTime = creationTime;
     this.regionId = regionId;
     this.forceTabletFormat = forceTabletFormat;
     this.skipParseTsFile = skipParseTsFile;
+    this.isUsedForConsensusPipe = isUsedInConsensusPipe;
   }
 
   @Override
@@ -153,6 +157,13 @@ public class PipeEventCollector implements EventCollector {
   }
 
   private void parseAndCollectEvent(final PipeDeleteDataNodeEvent 
deleteDataEvent) {
+    // For IoTConsensusV2, there is no need to parse. So we can directly 
transfer deleteDataEvent
+    if (isUsedForConsensusPipe) {
+      hasNoGeneratedEvent = false;
+      collectEvent(deleteDataEvent);
+      return;
+    }
+
     // Only used by events containing delete data node, no need to bind 
progress index here since
     // delete data event does not have progress index currently
     (deleteDataEvent.getDeleteDataNode() instanceof DeleteDataNode
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskProcessorStage.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskProcessorStage.java
index 5201a19d8bc..4242dbcb8b3 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskProcessorStage.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskProcessorStage.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.commons.consensus.DataRegionId;
 import org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin;
 import org.apache.iotdb.commons.pipe.agent.task.connection.EventSupplier;
 import 
org.apache.iotdb.commons.pipe.agent.task.connection.UnboundedBlockingPendingQueue;
+import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta;
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
 import org.apache.iotdb.commons.pipe.agent.task.stage.PipeTaskStage;
 import org.apache.iotdb.commons.pipe.config.constant.PipeProcessorConstant;
@@ -99,13 +100,15 @@ public class PipeTaskProcessorStage extends PipeTaskStage {
     // removed, the new subtask will have the same pipeName and regionId as the
     // old one, so we need creationTime to make their hash code different in 
the map.
     final String taskId = pipeName + "_" + regionId + "_" + creationTime;
+    final boolean isUsedForConsensusPipe = 
pipeName.contains(PipeStaticMeta.CONSENSUS_PIPE_PREFIX);
     final PipeEventCollector pipeConnectorOutputEventCollector =
         new PipeEventCollector(
             pipeConnectorOutputPendingQueue,
             creationTime,
             regionId,
             forceTabletFormat,
-            skipParseTsFile);
+            skipParseTsFile,
+            isUsedForConsensusPipe);
     this.pipeProcessorSubtask =
         new PipeProcessorSubtask(
             taskId,

Reply via email to