This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 7b8a1479225 Subscription: let subscription module fully manage the
parsing process of the insert node event (#15012) (#15043)
7b8a1479225 is described below
commit 7b8a14792258acee8a1e6dc91da0afe0eb55b2a6
Author: VGalaxies <[email protected]>
AuthorDate: Mon Mar 10 10:31:27 2025 +0800
Subscription: let subscription module fully manage the parsing process of
the insert node event (#15012) (#15043)
---
.../db/pipe/agent/task/connection/PipeEventCollector.java | 14 +++++++++-----
.../db/pipe/agent/task/stage/PipeTaskProcessorStage.java | 4 ++--
2 files changed, 11 insertions(+), 7 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java
index bb8ebeb5fc8..86a4d00741e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java
@@ -54,7 +54,7 @@ public class PipeEventCollector implements EventCollector {
private final boolean forceTabletFormat;
- private final boolean skipParseTsFile;
+ private final boolean skipParsing;
private final AtomicInteger collectInvocationCount = new AtomicInteger(0);
private boolean hasNoGeneratedEvent = true;
@@ -65,12 +65,12 @@ public class PipeEventCollector implements EventCollector {
final long creationTime,
final int regionId,
final boolean forceTabletFormat,
- final boolean skipParseTsFile) {
+ final boolean skipParsing) {
this.pendingQueue = pendingQueue;
this.creationTime = creationTime;
this.regionId = regionId;
this.forceTabletFormat = forceTabletFormat;
- this.skipParseTsFile = skipParseTsFile;
+ this.skipParsing = skipParsing;
}
@Override
@@ -99,7 +99,11 @@ public class PipeEventCollector implements EventCollector {
}
private void parseAndCollectEvent(final PipeInsertNodeTabletInsertionEvent
sourceEvent) {
- // TODO: let subscription module fully manage the parsing process of the
insert node event
+ if (skipParsing) {
+ collectEvent(sourceEvent);
+ return;
+ }
+
if (sourceEvent.shouldParseTimeOrPattern()) {
for (final PipeRawTabletInsertionEvent parsedEvent :
sourceEvent.toRawTabletInsertionEvents()) {
@@ -125,7 +129,7 @@ public class PipeEventCollector implements EventCollector {
return;
}
- if (skipParseTsFile) {
+ if (skipParsing) {
collectEvent(sourceEvent);
return;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskProcessorStage.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskProcessorStage.java
index 5201a19d8bc..ddc194716d2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskProcessorStage.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskProcessorStage.java
@@ -69,7 +69,7 @@ public class PipeTaskProcessorStage extends PipeTaskStage {
final PipeProcessorSubtaskExecutor executor,
final PipeTaskMeta pipeTaskMeta,
final boolean forceTabletFormat,
- final boolean skipParseTsFile) {
+ final boolean skipParsing) {
final PipeProcessorRuntimeConfiguration runtimeConfiguration =
new PipeTaskRuntimeConfiguration(
new PipeTaskProcessorRuntimeEnvironment(
@@ -105,7 +105,7 @@ public class PipeTaskProcessorStage extends PipeTaskStage {
creationTime,
regionId,
forceTabletFormat,
- skipParseTsFile);
+ skipParsing);
this.pipeProcessorSubtask =
new PipeProcessorSubtask(
taskId,