This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 9e31c84ff99 Pipe: Fixed the IndexOutOfBound exception & Trimmed the 
un-transferred schema region & Refactor for meta transfer (#15337)
9e31c84ff99 is described below

commit 9e31c84ff9916a9154051c5af44a44b4548e12a9
Author: Caideyipi <[email protected]>
AuthorDate: Thu Apr 17 12:30:08 2025 +0800

    Pipe: Fixed the IndexOutOfBound exception & Trimmed the un-transferred 
schema region & Refactor for meta transfer (#15337)
---
 .../iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java   |  4 ++--
 .../db/pipe/agent/task/builder/PipeDataNodeBuilder.java   |  4 ++--
 .../request/PipeTransferSchemaSnapshotSealReq.java        |  5 ++---
 .../protocol/thrift/sync/IoTDBSchemaRegionConnector.java  |  8 ++++----
 .../common/schema/PipeSchemaRegionSnapshotEvent.java      |  4 ++--
 .../schemaregion/SchemaRegionListeningFilter.java         | 15 +++++++++++++++
 .../commons/pipe/datastructure/pattern/TablePattern.java  |  2 ++
 7 files changed, 29 insertions(+), 13 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
index 7e2191f8293..96fdbe36209 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
@@ -188,8 +188,8 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
           SchemaEngine.getInstance()
                   .getAllSchemaRegionIds()
                   .contains(new SchemaRegionId(consensusGroupId))
-              && 
!SchemaRegionListeningFilter.parseListeningPlanTypeSet(extractorParameters)
-                  .isEmpty();
+              && SchemaRegionListeningFilter.shouldSchemaRegionBeListened(
+                  consensusGroupId, extractorParameters);
 
       // Advance the extractor parameters parsing logic to avoid creating 
un-relevant pipeTasks
       if (needConstructDataRegionTask || needConstructSchemaRegionTask) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeBuilder.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeBuilder.java
index 3d639215258..5ed04fd2d17 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeBuilder.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeBuilder.java
@@ -71,8 +71,8 @@ public class PipeDataNodeBuilder {
                     extractorParameters, dataRegionId);
         final boolean needConstructSchemaRegionTask =
             schemaRegionIds.contains(new SchemaRegionId(consensusGroupId))
-                && 
!SchemaRegionListeningFilter.parseListeningPlanTypeSet(extractorParameters)
-                    .isEmpty();
+                && SchemaRegionListeningFilter.shouldSchemaRegionBeListened(
+                    consensusGroupId, extractorParameters);
 
         // Advance the extractor parameters parsing logic to avoid creating 
un-relevant pipeTasks
         if (needConstructDataRegionTask || needConstructSchemaRegionTask) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferSchemaSnapshotSealReq.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferSchemaSnapshotSealReq.java
index be1e608efe3..7e4fbc24b13 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferSchemaSnapshotSealReq.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferSchemaSnapshotSealReq.java
@@ -22,7 +22,6 @@ package 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request;
 import 
org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeRequestType;
 import 
org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeTransferFileSealReqV2;
 import org.apache.iotdb.commons.schema.column.ColumnHeaderConstant;
-import org.apache.iotdb.db.protocol.session.IClientSession;
 import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
 
 import java.io.IOException;
@@ -66,10 +65,10 @@ public class PipeTransferSchemaSnapshotSealReq extends 
PipeTransferFileSealReqV2
     parameters.put(DATABASE_PATTERN, tablePatternDatabase);
     parameters.put(ColumnHeaderConstant.TABLE_NAME, tablePatternTable);
     if (isTreeCaptured) {
-      parameters.put(IClientSession.SqlDialect.TREE.toString(), "");
+      parameters.put(TREE, "");
     }
     if (isTableCaptured) {
-      parameters.put(IClientSession.SqlDialect.TABLE.toString(), "");
+      parameters.put(TABLE, "");
     }
     parameters.put(ColumnHeaderConstant.DATABASE, databaseName);
     parameters.put(ColumnHeaderConstant.TYPE, typeString);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBSchemaRegionConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBSchemaRegionConnector.java
index e235f225ec3..7ce697e905a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBSchemaRegionConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBSchemaRegionConnector.java
@@ -209,8 +209,8 @@ public class IoTDBSchemaRegionConnector extends 
IoTDBDataNodeSyncConnector {
       clientAndStatus.setRight(false);
       throw new PipeConnectionException(
           String.format(
-              "Network error when seal snapshot file %s and %s, because %s.",
-              mTreeSnapshotFile, tagLogSnapshotFile, e.getMessage()),
+              "Network error when seal snapshot file %s, %s and %s, because 
%s.",
+              mTreeSnapshotFile, tagLogSnapshotFile, attributeSnapshotFile, 
e.getMessage()),
           e);
     }
 
@@ -220,8 +220,8 @@ public class IoTDBSchemaRegionConnector extends 
IoTDBDataNodeSyncConnector {
       receiverStatusHandler.handle(
           resp.getStatus(),
           String.format(
-              "Seal file %s and %s error, result status %s.",
-              mTreeSnapshotFile, tagLogSnapshotFile, resp.getStatus()),
+              "Seal file %s, %s and %s error, result status %s.",
+              mTreeSnapshotFile, tagLogSnapshotFile, attributeSnapshotFile, 
resp.getStatus()),
           snapshotEvent.toString());
     }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/schema/PipeSchemaRegionSnapshotEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/schema/PipeSchemaRegionSnapshotEvent.java
index a2b11f26a7a..325262ca8cd 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/schema/PipeSchemaRegionSnapshotEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/schema/PipeSchemaRegionSnapshotEvent.java
@@ -210,8 +210,8 @@ public class PipeSchemaRegionSnapshotEvent extends 
PipeSnapshotEvent
       final long endTime) {
     return new PipeSchemaRegionSnapshotEvent(
         mTreeSnapshotPath,
-        treePattern.isTreeModelDataAllowedToBeCaptured() ? tagLogSnapshotPath 
: null,
-        tablePattern.isTableModelDataAllowedToBeCaptured() ? 
attributeSnapshotPath : null,
+        tagLogSnapshotPath,
+        attributeSnapshotPath,
         databaseName,
         pipeName,
         creationTime,
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/schemaregion/SchemaRegionListeningFilter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/schemaregion/SchemaRegionListeningFilter.java
index 86e985f330c..09e09f9846e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/schemaregion/SchemaRegionListeningFilter.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/schemaregion/SchemaRegionListeningFilter.java
@@ -19,13 +19,16 @@
 
 package org.apache.iotdb.db.pipe.extractor.schemaregion;
 
+import org.apache.iotdb.commons.consensus.SchemaRegionId;
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern;
 import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern;
+import org.apache.iotdb.commons.utils.PathUtils;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.schema.CreateOrUpdateTableDeviceNode;
+import org.apache.iotdb.db.schemaengine.SchemaEngine;
 import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
 
 import java.util.Arrays;
@@ -109,6 +112,18 @@ public class SchemaRegionListeningFilter {
     }
   }
 
+  public static boolean shouldSchemaRegionBeListened(
+      final int consensusGroupId, final PipeParameters parameters) throws 
IllegalPathException {
+    final boolean isTableModel =
+        PathUtils.isTableModelDatabase(
+            SchemaEngine.getInstance()
+                .getSchemaRegion(new SchemaRegionId(consensusGroupId))
+                .getDatabaseFullPath());
+    return (TreePattern.isTreeModelDataAllowToBeCaptured(parameters) && 
!isTableModel
+            || TablePattern.isTableModelDataAllowToBeCaptured(parameters) && 
isTableModel)
+        && !parseListeningPlanTypeSet(parameters).isEmpty();
+  }
+
   public static Set<PlanNodeType> parseListeningPlanTypeSet(final 
PipeParameters parameters)
       throws IllegalPathException {
     final Set<PlanNodeType> planTypes = new HashSet<>();
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/TablePattern.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/TablePattern.java
index 1fe957a2454..34848ac4665 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/TablePattern.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/TablePattern.java
@@ -151,6 +151,8 @@ public class TablePattern {
   @Override
   public String toString() {
     return "TablePattern{"
+        + "isTableModelDataAllowedToBeCaptured"
+        + isTableModelDataAllowedToBeCaptured
         + "databasePattern="
         + databasePattern
         + ", tablePattern="

Reply via email to