This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 0a3ca29e6a3 Pipe: Fix schema events can not report & Fix delete data 
events in data regions may fail to mark at schema metrics (#12722)
0a3ca29e6a3 is described below

commit 0a3ca29e6a38c914c4cc2c7221067639d150de20
Author: Caideyipi <[email protected]>
AuthorDate: Wed Jun 12 21:17:17 2024 +0800

    Pipe: Fix schema events can not report & Fix delete data events in data 
regions may fail to mark at schema metrics (#12722)
---
 .../apache/iotdb/db/pipe/task/connection/PipeEventCollector.java | 9 ++++++++-
 .../db/pipe/task/subtask/connector/PipeConnectorSubtask.java     | 8 +++++++-
 2 files changed, 15 insertions(+), 2 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
index 1df20db17f6..3055568aadf 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
@@ -31,6 +31,7 @@ import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertio
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
 import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
 import 
org.apache.iotdb.db.pipe.extractor.schemaregion.IoTDBSchemaRegionExtractor;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
 import org.apache.iotdb.pipe.api.collector.EventCollector;
 import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
@@ -71,7 +72,11 @@ public class PipeEventCollector implements EventCollector {
         parseAndCollectEvent((PipeRawTabletInsertionEvent) event);
       } else if (event instanceof PipeTsFileInsertionEvent) {
         parseAndCollectEvent((PipeTsFileInsertionEvent) event);
-      } else if (event instanceof PipeSchemaRegionWritePlanEvent) {
+      } else if (event instanceof PipeSchemaRegionWritePlanEvent
+          && ((PipeSchemaRegionWritePlanEvent) event).getPlanNode().getType()
+              == PlanNodeType.DELETE_DATA) {
+        // This is only for delete data node in data region since plan nodes 
in schema regions are
+        // already parsed in schema region extractor
         parseAndCollectEvent((PipeSchemaRegionWritePlanEvent) event);
       } else if (!(event instanceof ProgressReportEvent)) {
         collectEvent(event);
@@ -128,6 +133,8 @@ public class PipeEventCollector implements EventCollector {
   }
 
   private void parseAndCollectEvent(final PipeSchemaRegionWritePlanEvent 
deleteDataEvent) {
+    // Only used by events containing delete data node, no need to bind 
progress index here since
+    // delete data event does not have progress index currently
     IoTDBSchemaRegionExtractor.PATTERN_PARSE_VISITOR
         .process(deleteDataEvent.getPlanNode(), (IoTDBPipePattern) 
deleteDataEvent.getPipePattern())
         .map(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
index 40ac29eaa81..c2f7adaf8dd 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
@@ -32,6 +32,7 @@ import 
org.apache.iotdb.db.pipe.event.common.schema.PipeSchemaRegionWritePlanEve
 import org.apache.iotdb.db.pipe.metric.PipeDataRegionConnectorMetrics;
 import org.apache.iotdb.db.pipe.metric.PipeSchemaRegionConnectorMetrics;
 import org.apache.iotdb.db.pipe.task.connection.PipeEventCollector;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
 import org.apache.iotdb.db.utils.ErrorHandlingUtils;
 import org.apache.iotdb.pipe.api.PipeConnector;
 import org.apache.iotdb.pipe.api.event.Event;
@@ -115,7 +116,12 @@ public class PipeConnectorSubtask extends 
PipeAbstractConnectorSubtask {
         PipeDataRegionConnectorMetrics.getInstance().markTsFileEvent(taskID);
       } else if (event instanceof PipeSchemaRegionWritePlanEvent) {
         outputPipeConnector.transfer(event);
-        PipeSchemaRegionConnectorMetrics.getInstance().markSchemaEvent(taskID);
+        if (((PipeSchemaRegionWritePlanEvent) event).getPlanNode().getType()
+            != PlanNodeType.DELETE_DATA) {
+          // Only plan nodes in schema region will be marked, delete data node 
is currently not
+          // taken into account
+          
PipeSchemaRegionConnectorMetrics.getInstance().markSchemaEvent(taskID);
+        }
       } else if (event instanceof PipeHeartbeatEvent) {
         transferHeartbeatEvent((PipeHeartbeatEvent) event);
       } else {

Reply via email to