This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 68daea60b0a Pipe/Subscription: Filter out non-working DR PipeTasks in
CN & Reduce model judgement cost in
PipeHistoricalDataRegionTsFileAndDeletionExtractor (#14059)
68daea60b0a is described below
commit 68daea60b0a4f7c8141fbe17f198a9bfdace873c
Author: Zhenyu Luo <[email protected]>
AuthorDate: Tue Nov 26 11:50:11 2024 +0800
Pipe/Subscription: Filter out non-working DR PipeTasks in CN & Reduce model
judgement cost in PipeHistoricalDataRegionTsFileAndDeletionExtractor (#14059)
Co-authored-by: Steve Yurong Su <[email protected]>
---
.../impl/pipe/AbstractOperatePipeProcedureV2.java | 79 ++++++++++++++++++++--
...bstractOperateSubscriptionAndPipeProcedure.java | 4 +-
.../dataregion/DataRegionListeningFilter.java | 28 ++++++++
...oricalDataRegionTsFileAndDeletionExtractor.java | 69 +++++++------------
4 files changed, 130 insertions(+), 50 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
index ec0a33ee677..6aa530b2860 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
@@ -19,6 +19,8 @@
package org.apache.iotdb.confignode.procedure.impl.pipe;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeMeta;
import org.apache.iotdb.confignode.manager.pipe.metric.PipeProcedureMetrics;
import org.apache.iotdb.confignode.persistence.pipe.PipeTaskInfo;
@@ -30,6 +32,9 @@ import
org.apache.iotdb.confignode.procedure.impl.node.AbstractNodeProcedure;
import
org.apache.iotdb.confignode.procedure.impl.pipe.runtime.PipeMetaSyncProcedure;
import org.apache.iotdb.confignode.procedure.state.ProcedureLockState;
import
org.apache.iotdb.confignode.procedure.state.pipe.task.OperatePipeTaskState;
+import org.apache.iotdb.confignode.rpc.thrift.TDatabaseSchema;
+import org.apache.iotdb.confignode.service.ConfigNode;
+import org.apache.iotdb.db.pipe.extractor.dataregion.DataRegionListeningFilter;
import org.apache.iotdb.mpp.rpc.thrift.TPushPipeMetaResp;
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.iotdb.rpc.TSStatusCode;
@@ -390,9 +395,8 @@ public abstract class AbstractOperatePipeProcedureV2
throws IOException {
final List<ByteBuffer> pipeMetaBinaryList = new ArrayList<>();
for (PipeMeta pipeMeta : pipeTaskInfo.get().getPipeMetaList()) {
- pipeMetaBinaryList.add(pipeMeta.serialize());
+
pipeMetaBinaryList.add(copyAndFilterOutNonWorkingDataRegionPipeTasks(pipeMeta).serialize());
}
-
return env.pushAllPipeMetaToDataNodes(pipeMetaBinaryList);
}
@@ -407,10 +411,9 @@ public abstract class AbstractOperatePipeProcedureV2
public static Map<Integer, TPushPipeMetaResp> pushPipeMetaToDataNodes(
ConfigNodeProcedureEnv env, AtomicReference<PipeTaskInfo> pipeTaskInfo)
throws IOException {
final List<ByteBuffer> pipeMetaBinaryList = new ArrayList<>();
- for (PipeMeta pipeMeta : pipeTaskInfo.get().getPipeMetaList()) {
- pipeMetaBinaryList.add(pipeMeta.serialize());
+ for (final PipeMeta pipeMeta : pipeTaskInfo.get().getPipeMetaList()) {
+
pipeMetaBinaryList.add(copyAndFilterOutNonWorkingDataRegionPipeTasks(pipeMeta).serialize());
}
-
return env.pushAllPipeMetaToDataNodes(pipeMetaBinaryList);
}
@@ -487,7 +490,9 @@ public abstract class AbstractOperatePipeProcedureV2
protected Map<Integer, TPushPipeMetaResp> pushSinglePipeMetaToDataNodes(
String pipeName, ConfigNodeProcedureEnv env) throws IOException {
return env.pushSinglePipeMetaToDataNodes(
- pipeTaskInfo.get().getPipeMetaByPipeName(pipeName).serialize());
+ copyAndFilterOutNonWorkingDataRegionPipeTasks(
+ pipeTaskInfo.get().getPipeMetaByPipeName(pipeName))
+ .serialize());
}
/**
@@ -502,6 +507,68 @@ public abstract class AbstractOperatePipeProcedureV2
return env.dropSinglePipeOnDataNodes(pipeName);
}
+ public static PipeMeta
copyAndFilterOutNonWorkingDataRegionPipeTasks(PipeMeta originalPipeMeta)
+ throws IOException {
+ final PipeMeta copiedPipeMeta = originalPipeMeta.deepCopy4TaskAgent();
+
+ copiedPipeMeta
+ .getRuntimeMeta()
+ .getConsensusGroupId2TaskMetaMap()
+ .entrySet()
+ .removeIf(
+ consensusGroupId2TaskMeta -> {
+ final String database;
+ try {
+ database =
+ ConfigNode.getInstance()
+ .getConfigManager()
+ .getPartitionManager()
+ .getRegionDatabase(
+ new TConsensusGroupId(
+ // We assume that the consensus group id is a
data region id.
+ TConsensusGroupType.DataRegion,
+ consensusGroupId2TaskMeta.getKey()));
+ if (database == null) {
+ // If the consensus group id is not a data region id, we
keep it.
+ // If the consensus group id is a data region id, but the
database is not found,
+ // we keep it.
+ return false;
+ }
+ } catch (final Exception ignore) {
+ // In case of any exception, we keep the consensus group id.
+ return false;
+ }
+
+ final boolean isTableModel;
+ try {
+ final TDatabaseSchema schema =
+ ConfigNode.getInstance()
+ .getConfigManager()
+ .getClusterSchemaManager()
+ .getDatabaseSchemaByName(database);
+ if (schema == null) {
+ // If the database is not found, we keep it.
+ return false;
+ }
+ isTableModel = schema.isIsTableModel();
+ } catch (final Exception ignore) {
+ // If the database is not found, we keep it.
+ return false;
+ }
+
+ try {
+ return !DataRegionListeningFilter.shouldDatabaseBeListened(
+ copiedPipeMeta.getStaticMeta().getExtractorParameters(),
+ isTableModel,
+ database);
+ } catch (final Exception e) {
+ return false;
+ }
+ });
+
+ return copiedPipeMeta;
+ }
+
@Override
public void serialize(DataOutputStream stream) throws IOException {
super.serialize(stream);
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/AbstractOperateSubscriptionAndPipeProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/AbstractOperateSubscriptionAndPipeProcedure.java
index 2b420136253..de90951262c 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/AbstractOperateSubscriptionAndPipeProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/AbstractOperateSubscriptionAndPipeProcedure.java
@@ -36,6 +36,8 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
+import static
org.apache.iotdb.confignode.procedure.impl.pipe.AbstractOperatePipeProcedureV2.copyAndFilterOutNonWorkingDataRegionPipeTasks;
+
public abstract class AbstractOperateSubscriptionAndPipeProcedure
extends AbstractOperateSubscriptionProcedure {
private static final Logger LOGGER =
@@ -135,7 +137,7 @@ public abstract class
AbstractOperateSubscriptionAndPipeProcedure
LOGGER.warn("Pipe {} not found in PipeTaskInfo, can not push its
meta.", pipeName);
continue;
}
- pipeMetaBinaryList.add(pipeMeta.serialize());
+
pipeMetaBinaryList.add(copyAndFilterOutNonWorkingDataRegionPipeTasks(pipeMeta).serialize());
}
return env.pushMultiPipeMetaToDataNodes(pipeMetaBinaryList);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/DataRegionListeningFilter.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/DataRegionListeningFilter.java
index 18de48cfc32..3c0ab3594ad 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/DataRegionListeningFilter.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/DataRegionListeningFilter.java
@@ -61,6 +61,34 @@ public class DataRegionListeningFilter {
}
}
+ public static boolean shouldDatabaseBeListened(
+ final PipeParameters parameters, final boolean isTableModel, final
String databaseRawName)
+ throws IllegalPathException {
+ final Pair<Boolean, Boolean> insertionDeletionListeningOptionPair =
+ parseInsertionDeletionListeningOptionPair(parameters);
+ final boolean hasSpecificListeningOption =
+ insertionDeletionListeningOptionPair.getLeft()
+ || insertionDeletionListeningOptionPair.getRight();
+ if (!hasSpecificListeningOption) {
+ return false;
+ }
+
+ if (isTableModel) {
+ final String databaseTableModel =
+ databaseRawName.startsWith("root.") ? databaseRawName.substring(5) :
databaseRawName;
+ final TablePattern tablePattern =
+ TablePattern.parsePipePatternFromSourceParameters(parameters);
+ return tablePattern.isTableModelDataAllowedToBeCaptured()
+ && tablePattern.matchesDatabase(databaseTableModel);
+ } else {
+ final String databaseTreeModel =
+ databaseRawName.startsWith("root.") ? databaseRawName : "root." +
databaseRawName;
+ final TreePattern treePattern =
TreePattern.parsePipePatternFromSourceParameters(parameters);
+ return treePattern.isTreeModelDataAllowedToBeCaptured()
+ && treePattern.mayOverlapWithDb(databaseTreeModel);
+ }
+ }
+
public static boolean shouldDataRegionBeListened(
PipeParameters parameters, DataRegionId dataRegionId) throws
IllegalPathException {
final Pair<Boolean, Boolean> insertionDeletionListeningOptionPair =
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionExtractor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionExtractor.java
index c10d2ebefc3..ae08fa8b0cb 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionExtractor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionExtractor.java
@@ -132,8 +132,10 @@ public class
PipeHistoricalDataRegionTsFileAndDeletionExtractor
private TreePattern treePattern;
private TablePattern tablePattern;
- private boolean isDbNameCoveredByPattern = false;
+
private boolean isModelDetected = false;
+ private boolean isTableModel;
+ private boolean isDbNameCoveredByPattern = false;
private boolean isHistoricalExtractorEnabled = false;
private long historicalDataExtractionStartTime = Long.MIN_VALUE; // Event
time
@@ -153,8 +155,6 @@ public class
PipeHistoricalDataRegionTsFileAndDeletionExtractor
private volatile boolean hasBeenStarted = false;
- private final Map<TsFileResource, Boolean> tsfile2IsTableModelMap = new
HashMap<>(0);
-
private Queue<PersistentResource> pendingQueue;
@Override
@@ -669,49 +669,34 @@ public class
PipeHistoricalDataRegionTsFileAndDeletionExtractor
return deviceSet.stream()
.anyMatch(
deviceID -> {
- if (deviceID instanceof PlainDeviceID
- ||
deviceID.getTableName().startsWith(TREE_MODEL_EVENT_TABLE_NAME_PREFIX)
- || deviceID.getTableName().equals(PATH_ROOT)) {
- // In case of tree model deviceID
- updateIsDbNameCoveredByPattern(resource, false);
- if (treePattern.isTreeModelDataAllowedToBeCaptured()
- && treePattern.mayOverlapWithDevice(deviceID)) {
- tsfile2IsTableModelMap.computeIfAbsent(
- resource, (tsFileResource) -> Boolean.FALSE);
- return true;
- }
- } else {
- // In case of table model deviceID
- updateIsDbNameCoveredByPattern(resource, true);
- if (tablePattern.isTableModelDataAllowedToBeCaptured()
- // The database name in resource is prefixed with "root."
- &&
tablePattern.matchesDatabase(resource.getDatabaseName().substring(5))
- && tablePattern.matchesTable(deviceID.getTableName())) {
- tsfile2IsTableModelMap.computeIfAbsent(
- resource, (tsFileResource) -> Boolean.TRUE);
- return true;
- }
+ if (!isModelDetected) {
+ detectModel(resource, deviceID);
+ isModelDetected = true;
}
- return false;
+
+ return isTableModel
+ ? (tablePattern.isTableModelDataAllowedToBeCaptured()
+ // The database name in resource is prefixed with "root."
+ &&
tablePattern.matchesDatabase(resource.getDatabaseName().substring(5))
+ && tablePattern.matchesTable(deviceID.getTableName()))
+ : (treePattern.isTreeModelDataAllowedToBeCaptured()
+ && treePattern.mayOverlapWithDevice(deviceID));
});
}
- private void updateIsDbNameCoveredByPattern(
- final TsFileResource resource, final boolean isTableModel) {
- if (isModelDetected) {
- return;
- }
+ private void detectModel(final TsFileResource resource, final IDeviceID
deviceID) {
+ this.isTableModel =
+ !(deviceID instanceof PlainDeviceID
+ ||
deviceID.getTableName().startsWith(TREE_MODEL_EVENT_TABLE_NAME_PREFIX)
+ || deviceID.getTableName().equals(PATH_ROOT));
final String databaseName = resource.getDatabaseName();
- if (Objects.nonNull(databaseName)) {
- isDbNameCoveredByPattern =
- isTableModel
- ? tablePattern.isTableModelDataAllowedToBeCaptured()
- && tablePattern.coversDb(databaseName.substring(5))
- : treePattern.isTreeModelDataAllowedToBeCaptured()
- && treePattern.coversDb(databaseName);
- isModelDetected = true;
- }
+ isDbNameCoveredByPattern =
+ isTableModel
+ ? tablePattern.isTableModelDataAllowedToBeCaptured()
+ && tablePattern.coversDb(databaseName.substring(5))
+ : treePattern.isTreeModelDataAllowedToBeCaptured()
+ && treePattern.coversDb(databaseName);
}
private boolean isTsFileResourceOverlappedWithTimeRange(final TsFileResource
resource) {
@@ -808,7 +793,7 @@ public class
PipeHistoricalDataRegionTsFileAndDeletionExtractor
private Event supplyTsFileEvent(TsFileResource resource) {
final PipeTsFileInsertionEvent event =
new PipeTsFileInsertionEvent(
- tsfile2IsTableModelMap.remove(resource),
+ isModelDetected ? isTableModel : null,
resource.getDatabaseName(),
resource,
shouldTransferModFile,
@@ -909,8 +894,6 @@ public class
PipeHistoricalDataRegionTsFileAndDeletionExtractor
@Override
public synchronized void close() {
- tsfile2IsTableModelMap.clear();
-
if (Objects.nonNull(pendingQueue)) {
pendingQueue.forEach(
resource -> {