This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 174b2cb968f IoTConsensusV2: Transfer table deletion without any parse
or filter (#14988)
174b2cb968f is described below
commit 174b2cb968f398491cb571d4373847de7d9e2abf
Author: Peng Junzhi <[email protected]>
AuthorDate: Fri Feb 28 19:05:33 2025 +0800
IoTConsensusV2: Transfer table deletion without any parse or filter (#14988)
* transfer table deletion for iotv2
* minor fix
---
.../db/pipe/agent/task/connection/PipeEventCollector.java | 13 ++++++++++++-
.../db/pipe/agent/task/stage/PipeTaskProcessorStage.java | 5 ++++-
2 files changed, 16 insertions(+), 2 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java
index d8bd7fdce2b..f6a6f07c066 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java
@@ -57,6 +57,8 @@ public class PipeEventCollector implements EventCollector {
private final boolean skipParseTsFile;
+ private final boolean isUsedForConsensusPipe;
+
private final AtomicInteger collectInvocationCount = new AtomicInteger(0);
private boolean hasNoGeneratedEvent = true;
private boolean isFailedToIncreaseReferenceCount = false;
@@ -66,12 +68,14 @@ public class PipeEventCollector implements EventCollector {
final long creationTime,
final int regionId,
final boolean forceTabletFormat,
- final boolean skipParseTsFile) {
+ final boolean skipParseTsFile,
+ final boolean isUsedInConsensusPipe) {
this.pendingQueue = pendingQueue;
this.creationTime = creationTime;
this.regionId = regionId;
this.forceTabletFormat = forceTabletFormat;
this.skipParseTsFile = skipParseTsFile;
+ this.isUsedForConsensusPipe = isUsedInConsensusPipe;
}
@Override
@@ -153,6 +157,13 @@ public class PipeEventCollector implements EventCollector {
}
private void parseAndCollectEvent(final PipeDeleteDataNodeEvent
deleteDataEvent) {
+ // For IoTConsensusV2, there is no need to parse. So we can directly
transfer deleteDataEvent
+ if (isUsedForConsensusPipe) {
+ hasNoGeneratedEvent = false;
+ collectEvent(deleteDataEvent);
+ return;
+ }
+
// Only used by events containing delete data node, no need to bind
progress index here since
// delete data event does not have progress index currently
(deleteDataEvent.getDeleteDataNode() instanceof DeleteDataNode
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskProcessorStage.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskProcessorStage.java
index 5201a19d8bc..4242dbcb8b3 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskProcessorStage.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskProcessorStage.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.commons.consensus.DataRegionId;
import org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin;
import org.apache.iotdb.commons.pipe.agent.task.connection.EventSupplier;
import
org.apache.iotdb.commons.pipe.agent.task.connection.UnboundedBlockingPendingQueue;
+import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
import org.apache.iotdb.commons.pipe.agent.task.stage.PipeTaskStage;
import org.apache.iotdb.commons.pipe.config.constant.PipeProcessorConstant;
@@ -99,13 +100,15 @@ public class PipeTaskProcessorStage extends PipeTaskStage {
// removed, the new subtask will have the same pipeName and regionId as the
// old one, so we need creationTime to make their hash code different in
the map.
final String taskId = pipeName + "_" + regionId + "_" + creationTime;
+ final boolean isUsedForConsensusPipe =
pipeName.contains(PipeStaticMeta.CONSENSUS_PIPE_PREFIX);
final PipeEventCollector pipeConnectorOutputEventCollector =
new PipeEventCollector(
pipeConnectorOutputPendingQueue,
creationTime,
regionId,
forceTabletFormat,
- skipParseTsFile);
+ skipParseTsFile,
+ isUsedForConsensusPipe);
this.pipeProcessorSubtask =
new PipeProcessorSubtask(
taskId,