This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 9e31c84ff99 Pipe: Fixed the IndexOutOfBound exception & Trimmed the
un-transferred schema region & Refactor for meta transfer (#15337)
9e31c84ff99 is described below
commit 9e31c84ff9916a9154051c5af44a44b4548e12a9
Author: Caideyipi <[email protected]>
AuthorDate: Thu Apr 17 12:30:08 2025 +0800
Pipe: Fixed the IndexOutOfBound exception & Trimmed the un-transferred
schema region & Refactor for meta transfer (#15337)
---
.../iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java | 4 ++--
.../db/pipe/agent/task/builder/PipeDataNodeBuilder.java | 4 ++--
.../request/PipeTransferSchemaSnapshotSealReq.java | 5 ++---
.../protocol/thrift/sync/IoTDBSchemaRegionConnector.java | 8 ++++----
.../common/schema/PipeSchemaRegionSnapshotEvent.java | 4 ++--
.../schemaregion/SchemaRegionListeningFilter.java | 15 +++++++++++++++
.../commons/pipe/datastructure/pattern/TablePattern.java | 2 ++
7 files changed, 29 insertions(+), 13 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
index 7e2191f8293..96fdbe36209 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
@@ -188,8 +188,8 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
SchemaEngine.getInstance()
.getAllSchemaRegionIds()
.contains(new SchemaRegionId(consensusGroupId))
- &&
!SchemaRegionListeningFilter.parseListeningPlanTypeSet(extractorParameters)
- .isEmpty();
+ && SchemaRegionListeningFilter.shouldSchemaRegionBeListened(
+ consensusGroupId, extractorParameters);
// Advance the extractor parameters parsing logic to avoid creating
un-relevant pipeTasks
if (needConstructDataRegionTask || needConstructSchemaRegionTask) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeBuilder.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeBuilder.java
index 3d639215258..5ed04fd2d17 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeBuilder.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeBuilder.java
@@ -71,8 +71,8 @@ public class PipeDataNodeBuilder {
extractorParameters, dataRegionId);
final boolean needConstructSchemaRegionTask =
schemaRegionIds.contains(new SchemaRegionId(consensusGroupId))
- &&
!SchemaRegionListeningFilter.parseListeningPlanTypeSet(extractorParameters)
- .isEmpty();
+ && SchemaRegionListeningFilter.shouldSchemaRegionBeListened(
+ consensusGroupId, extractorParameters);
// Advance the extractor parameters parsing logic to avoid creating
un-relevant pipeTasks
if (needConstructDataRegionTask || needConstructSchemaRegionTask) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferSchemaSnapshotSealReq.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferSchemaSnapshotSealReq.java
index be1e608efe3..7e4fbc24b13 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferSchemaSnapshotSealReq.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferSchemaSnapshotSealReq.java
@@ -22,7 +22,6 @@ package
org.apache.iotdb.db.pipe.connector.payload.evolvable.request;
import
org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeRequestType;
import
org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeTransferFileSealReqV2;
import org.apache.iotdb.commons.schema.column.ColumnHeaderConstant;
-import org.apache.iotdb.db.protocol.session.IClientSession;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
import java.io.IOException;
@@ -66,10 +65,10 @@ public class PipeTransferSchemaSnapshotSealReq extends
PipeTransferFileSealReqV2
parameters.put(DATABASE_PATTERN, tablePatternDatabase);
parameters.put(ColumnHeaderConstant.TABLE_NAME, tablePatternTable);
if (isTreeCaptured) {
- parameters.put(IClientSession.SqlDialect.TREE.toString(), "");
+ parameters.put(TREE, "");
}
if (isTableCaptured) {
- parameters.put(IClientSession.SqlDialect.TABLE.toString(), "");
+ parameters.put(TABLE, "");
}
parameters.put(ColumnHeaderConstant.DATABASE, databaseName);
parameters.put(ColumnHeaderConstant.TYPE, typeString);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBSchemaRegionConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBSchemaRegionConnector.java
index e235f225ec3..7ce697e905a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBSchemaRegionConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBSchemaRegionConnector.java
@@ -209,8 +209,8 @@ public class IoTDBSchemaRegionConnector extends
IoTDBDataNodeSyncConnector {
clientAndStatus.setRight(false);
throw new PipeConnectionException(
String.format(
- "Network error when seal snapshot file %s and %s, because %s.",
- mTreeSnapshotFile, tagLogSnapshotFile, e.getMessage()),
+ "Network error when seal snapshot file %s, %s and %s, because
%s.",
+ mTreeSnapshotFile, tagLogSnapshotFile, attributeSnapshotFile,
e.getMessage()),
e);
}
@@ -220,8 +220,8 @@ public class IoTDBSchemaRegionConnector extends
IoTDBDataNodeSyncConnector {
receiverStatusHandler.handle(
resp.getStatus(),
String.format(
- "Seal file %s and %s error, result status %s.",
- mTreeSnapshotFile, tagLogSnapshotFile, resp.getStatus()),
+ "Seal file %s, %s and %s error, result status %s.",
+ mTreeSnapshotFile, tagLogSnapshotFile, attributeSnapshotFile,
resp.getStatus()),
snapshotEvent.toString());
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/schema/PipeSchemaRegionSnapshotEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/schema/PipeSchemaRegionSnapshotEvent.java
index a2b11f26a7a..325262ca8cd 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/schema/PipeSchemaRegionSnapshotEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/schema/PipeSchemaRegionSnapshotEvent.java
@@ -210,8 +210,8 @@ public class PipeSchemaRegionSnapshotEvent extends
PipeSnapshotEvent
final long endTime) {
return new PipeSchemaRegionSnapshotEvent(
mTreeSnapshotPath,
- treePattern.isTreeModelDataAllowedToBeCaptured() ? tagLogSnapshotPath
: null,
- tablePattern.isTableModelDataAllowedToBeCaptured() ?
attributeSnapshotPath : null,
+ tagLogSnapshotPath,
+ attributeSnapshotPath,
databaseName,
pipeName,
creationTime,
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/schemaregion/SchemaRegionListeningFilter.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/schemaregion/SchemaRegionListeningFilter.java
index 86e985f330c..09e09f9846e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/schemaregion/SchemaRegionListeningFilter.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/schemaregion/SchemaRegionListeningFilter.java
@@ -19,13 +19,16 @@
package org.apache.iotdb.db.pipe.extractor.schemaregion;
+import org.apache.iotdb.commons.consensus.SchemaRegionId;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern;
import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern;
+import org.apache.iotdb.commons.utils.PathUtils;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.schema.CreateOrUpdateTableDeviceNode;
+import org.apache.iotdb.db.schemaengine.SchemaEngine;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import java.util.Arrays;
@@ -109,6 +112,18 @@ public class SchemaRegionListeningFilter {
}
}
+ public static boolean shouldSchemaRegionBeListened(
+ final int consensusGroupId, final PipeParameters parameters) throws
IllegalPathException {
+ final boolean isTableModel =
+ PathUtils.isTableModelDatabase(
+ SchemaEngine.getInstance()
+ .getSchemaRegion(new SchemaRegionId(consensusGroupId))
+ .getDatabaseFullPath());
+ return (TreePattern.isTreeModelDataAllowToBeCaptured(parameters) &&
!isTableModel
+ || TablePattern.isTableModelDataAllowToBeCaptured(parameters) &&
isTableModel)
+ && !parseListeningPlanTypeSet(parameters).isEmpty();
+ }
+
public static Set<PlanNodeType> parseListeningPlanTypeSet(final
PipeParameters parameters)
throws IllegalPathException {
final Set<PlanNodeType> planTypes = new HashSet<>();
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/TablePattern.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/TablePattern.java
index 1fe957a2454..34848ac4665 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/TablePattern.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/TablePattern.java
@@ -151,6 +151,8 @@ public class TablePattern {
@Override
public String toString() {
return "TablePattern{"
+ + "isTableModelDataAllowedToBeCaptured"
+ + isTableModelDataAllowedToBeCaptured
+ "databasePattern="
+ databasePattern
+ ", tablePattern="