This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new be1fdde9891 Pipe: Fix ProgressReportEvent may be transferred / marked
as rate in config subtask (#13161)
be1fdde9891 is described below
commit be1fdde9891b46f5973501e6c6045514bb564dca
Author: Caideyipi <[email protected]>
AuthorDate: Wed Aug 14 15:59:46 2024 +0800
Pipe: Fix ProgressReportEvent may be transferred / marked as rate in config
subtask (#13161)
---
.../confignode/manager/pipe/execution/PipeConfigNodeSubtask.java | 7 +++++--
.../org/apache/iotdb/commons/pipe/event/ProgressReportEvent.java | 5 +++++
2 files changed, 10 insertions(+), 2 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/execution/PipeConfigNodeSubtask.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/execution/PipeConfigNodeSubtask.java
index 02cb5373359..64c164705e2 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/execution/PipeConfigNodeSubtask.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/execution/PipeConfigNodeSubtask.java
@@ -25,6 +25,7 @@ import
org.apache.iotdb.commons.pipe.config.plugin.configuraion.PipeTaskRuntimeC
import
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskExtractorRuntimeEnvironment;
import
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskRuntimeEnvironment;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
+import org.apache.iotdb.commons.pipe.event.ProgressReportEvent;
import org.apache.iotdb.commons.pipe.plugin.builtin.BuiltinPipePlugin;
import org.apache.iotdb.commons.pipe.progress.PipeEventCommitManager;
import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
@@ -181,10 +182,12 @@ public class PipeConfigNodeSubtask extends
PipeAbstractConnectorSubtask {
return false;
}
- outputPipeConnector.transfer(event);
+ if (!(event instanceof ProgressReportEvent)) {
+ outputPipeConnector.transfer(event);
+ PipeConfigRegionConnectorMetrics.getInstance().markConfigEvent(taskID);
+ }
decreaseReferenceCountAndReleaseLastEvent(true);
- PipeConfigRegionConnectorMetrics.getInstance().markConfigEvent(taskID);
} catch (final PipeException e) {
setLastExceptionEvent(event);
if (!isClosed.get()) {
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/ProgressReportEvent.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/ProgressReportEvent.java
index 700f8f16387..e3e580ba1dd 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/ProgressReportEvent.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/ProgressReportEvent.java
@@ -88,4 +88,9 @@ public class ProgressReportEvent extends EnrichedEvent {
public boolean mayEventPathsOverlappedWithPattern() {
return true;
}
+
+ @Override
+ public String toString() {
+ return "ProgressReportEvent - " + super.toString();
+ }
}