This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 0a3ca29e6a3 Pipe: Fix schema events can not report & Fix delete data
events in data regions may fail to mark at schema metrics (#12722)
0a3ca29e6a3 is described below
commit 0a3ca29e6a38c914c4cc2c7221067639d150de20
Author: Caideyipi <[email protected]>
AuthorDate: Wed Jun 12 21:17:17 2024 +0800
Pipe: Fix schema events can not report & Fix delete data events in data
regions may fail to mark at schema metrics (#12722)
---
.../apache/iotdb/db/pipe/task/connection/PipeEventCollector.java | 9 ++++++++-
.../db/pipe/task/subtask/connector/PipeConnectorSubtask.java | 8 +++++++-
2 files changed, 15 insertions(+), 2 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
index 1df20db17f6..3055568aadf 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
@@ -31,6 +31,7 @@ import
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertio
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
import
org.apache.iotdb.db.pipe.extractor.schemaregion.IoTDBSchemaRegionExtractor;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
import org.apache.iotdb.pipe.api.collector.EventCollector;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
@@ -71,7 +72,11 @@ public class PipeEventCollector implements EventCollector {
parseAndCollectEvent((PipeRawTabletInsertionEvent) event);
} else if (event instanceof PipeTsFileInsertionEvent) {
parseAndCollectEvent((PipeTsFileInsertionEvent) event);
- } else if (event instanceof PipeSchemaRegionWritePlanEvent) {
+ } else if (event instanceof PipeSchemaRegionWritePlanEvent
+ && ((PipeSchemaRegionWritePlanEvent) event).getPlanNode().getType()
+ == PlanNodeType.DELETE_DATA) {
+ // This is only for delete data node in data region since plan nodes
in schema regions are
+ // already parsed in schema region extractor
parseAndCollectEvent((PipeSchemaRegionWritePlanEvent) event);
} else if (!(event instanceof ProgressReportEvent)) {
collectEvent(event);
@@ -128,6 +133,8 @@ public class PipeEventCollector implements EventCollector {
}
private void parseAndCollectEvent(final PipeSchemaRegionWritePlanEvent
deleteDataEvent) {
+ // Only used by events containing delete data node, no need to bind
progress index here since
+ // delete data event does not have progress index currently
IoTDBSchemaRegionExtractor.PATTERN_PARSE_VISITOR
.process(deleteDataEvent.getPlanNode(), (IoTDBPipePattern)
deleteDataEvent.getPipePattern())
.map(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
index 40ac29eaa81..c2f7adaf8dd 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
@@ -32,6 +32,7 @@ import
org.apache.iotdb.db.pipe.event.common.schema.PipeSchemaRegionWritePlanEve
import org.apache.iotdb.db.pipe.metric.PipeDataRegionConnectorMetrics;
import org.apache.iotdb.db.pipe.metric.PipeSchemaRegionConnectorMetrics;
import org.apache.iotdb.db.pipe.task.connection.PipeEventCollector;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
import org.apache.iotdb.db.utils.ErrorHandlingUtils;
import org.apache.iotdb.pipe.api.PipeConnector;
import org.apache.iotdb.pipe.api.event.Event;
@@ -115,7 +116,12 @@ public class PipeConnectorSubtask extends
PipeAbstractConnectorSubtask {
PipeDataRegionConnectorMetrics.getInstance().markTsFileEvent(taskID);
} else if (event instanceof PipeSchemaRegionWritePlanEvent) {
outputPipeConnector.transfer(event);
- PipeSchemaRegionConnectorMetrics.getInstance().markSchemaEvent(taskID);
+ if (((PipeSchemaRegionWritePlanEvent) event).getPlanNode().getType()
+ != PlanNodeType.DELETE_DATA) {
+ // Only plan nodes in schema region will be marked, delete data node
is currently not
+ // taken into account
+
PipeSchemaRegionConnectorMetrics.getInstance().markSchemaEvent(taskID);
+ }
} else if (event instanceof PipeHeartbeatEvent) {
transferHeartbeatEvent((PipeHeartbeatEvent) event);
} else {