This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 68daea60b0a Pipe/Subscription: Filter out non-working DR PipeTasks in 
CN & Reduce model judgement cost in 
PipeHistoricalDataRegionTsFileAndDeletionExtractor (#14059)
68daea60b0a is described below

commit 68daea60b0a4f7c8141fbe17f198a9bfdace873c
Author: Zhenyu Luo <[email protected]>
AuthorDate: Tue Nov 26 11:50:11 2024 +0800

    Pipe/Subscription: Filter out non-working DR PipeTasks in CN & Reduce model 
judgement cost in PipeHistoricalDataRegionTsFileAndDeletionExtractor (#14059)
    
    Co-authored-by: Steve Yurong Su <[email protected]>
---
 .../impl/pipe/AbstractOperatePipeProcedureV2.java  | 79 ++++++++++++++++++++--
 ...bstractOperateSubscriptionAndPipeProcedure.java |  4 +-
 .../dataregion/DataRegionListeningFilter.java      | 28 ++++++++
 ...oricalDataRegionTsFileAndDeletionExtractor.java | 69 +++++++------------
 4 files changed, 130 insertions(+), 50 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
index ec0a33ee677..6aa530b2860 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
@@ -19,6 +19,8 @@
 
 package org.apache.iotdb.confignode.procedure.impl.pipe;
 
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeMeta;
 import org.apache.iotdb.confignode.manager.pipe.metric.PipeProcedureMetrics;
 import org.apache.iotdb.confignode.persistence.pipe.PipeTaskInfo;
@@ -30,6 +32,9 @@ import 
org.apache.iotdb.confignode.procedure.impl.node.AbstractNodeProcedure;
 import 
org.apache.iotdb.confignode.procedure.impl.pipe.runtime.PipeMetaSyncProcedure;
 import org.apache.iotdb.confignode.procedure.state.ProcedureLockState;
 import 
org.apache.iotdb.confignode.procedure.state.pipe.task.OperatePipeTaskState;
+import org.apache.iotdb.confignode.rpc.thrift.TDatabaseSchema;
+import org.apache.iotdb.confignode.service.ConfigNode;
+import org.apache.iotdb.db.pipe.extractor.dataregion.DataRegionListeningFilter;
 import org.apache.iotdb.mpp.rpc.thrift.TPushPipeMetaResp;
 import org.apache.iotdb.pipe.api.exception.PipeException;
 import org.apache.iotdb.rpc.TSStatusCode;
@@ -390,9 +395,8 @@ public abstract class AbstractOperatePipeProcedureV2
       throws IOException {
     final List<ByteBuffer> pipeMetaBinaryList = new ArrayList<>();
     for (PipeMeta pipeMeta : pipeTaskInfo.get().getPipeMetaList()) {
-      pipeMetaBinaryList.add(pipeMeta.serialize());
+      
pipeMetaBinaryList.add(copyAndFilterOutNonWorkingDataRegionPipeTasks(pipeMeta).serialize());
     }
-
     return env.pushAllPipeMetaToDataNodes(pipeMetaBinaryList);
   }
 
@@ -407,10 +411,9 @@ public abstract class AbstractOperatePipeProcedureV2
   public static Map<Integer, TPushPipeMetaResp> pushPipeMetaToDataNodes(
       ConfigNodeProcedureEnv env, AtomicReference<PipeTaskInfo> pipeTaskInfo) 
throws IOException {
     final List<ByteBuffer> pipeMetaBinaryList = new ArrayList<>();
-    for (PipeMeta pipeMeta : pipeTaskInfo.get().getPipeMetaList()) {
-      pipeMetaBinaryList.add(pipeMeta.serialize());
+    for (final PipeMeta pipeMeta : pipeTaskInfo.get().getPipeMetaList()) {
+      
pipeMetaBinaryList.add(copyAndFilterOutNonWorkingDataRegionPipeTasks(pipeMeta).serialize());
     }
-
     return env.pushAllPipeMetaToDataNodes(pipeMetaBinaryList);
   }
 
@@ -487,7 +490,9 @@ public abstract class AbstractOperatePipeProcedureV2
   protected Map<Integer, TPushPipeMetaResp> pushSinglePipeMetaToDataNodes(
       String pipeName, ConfigNodeProcedureEnv env) throws IOException {
     return env.pushSinglePipeMetaToDataNodes(
-        pipeTaskInfo.get().getPipeMetaByPipeName(pipeName).serialize());
+        copyAndFilterOutNonWorkingDataRegionPipeTasks(
+                pipeTaskInfo.get().getPipeMetaByPipeName(pipeName))
+            .serialize());
   }
 
   /**
@@ -502,6 +507,68 @@ public abstract class AbstractOperatePipeProcedureV2
     return env.dropSinglePipeOnDataNodes(pipeName);
   }
 
+  public static PipeMeta 
copyAndFilterOutNonWorkingDataRegionPipeTasks(PipeMeta originalPipeMeta)
+      throws IOException {
+    final PipeMeta copiedPipeMeta = originalPipeMeta.deepCopy4TaskAgent();
+
+    copiedPipeMeta
+        .getRuntimeMeta()
+        .getConsensusGroupId2TaskMetaMap()
+        .entrySet()
+        .removeIf(
+            consensusGroupId2TaskMeta -> {
+              final String database;
+              try {
+                database =
+                    ConfigNode.getInstance()
+                        .getConfigManager()
+                        .getPartitionManager()
+                        .getRegionDatabase(
+                            new TConsensusGroupId(
+                                // We assume that the consensus group id is a 
data region id.
+                                TConsensusGroupType.DataRegion,
+                                consensusGroupId2TaskMeta.getKey()));
+                if (database == null) {
+                  // If the consensus group id is not a data region id, we 
keep it.
+                  // If the consensus group id is a data region id, but the 
database is not found,
+                  // we keep it.
+                  return false;
+                }
+              } catch (final Exception ignore) {
+                // In case of any exception, we keep the consensus group id.
+                return false;
+              }
+
+              final boolean isTableModel;
+              try {
+                final TDatabaseSchema schema =
+                    ConfigNode.getInstance()
+                        .getConfigManager()
+                        .getClusterSchemaManager()
+                        .getDatabaseSchemaByName(database);
+                if (schema == null) {
+                  // If the database is not found, we keep it.
+                  return false;
+                }
+                isTableModel = schema.isIsTableModel();
+              } catch (final Exception ignore) {
+                // If the database is not found, we keep it.
+                return false;
+              }
+
+              try {
+                return !DataRegionListeningFilter.shouldDatabaseBeListened(
+                    copiedPipeMeta.getStaticMeta().getExtractorParameters(),
+                    isTableModel,
+                    database);
+              } catch (final Exception e) {
+                return false;
+              }
+            });
+
+    return copiedPipeMeta;
+  }
+
   @Override
   public void serialize(DataOutputStream stream) throws IOException {
     super.serialize(stream);
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/AbstractOperateSubscriptionAndPipeProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/AbstractOperateSubscriptionAndPipeProcedure.java
index 2b420136253..de90951262c 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/AbstractOperateSubscriptionAndPipeProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/AbstractOperateSubscriptionAndPipeProcedure.java
@@ -36,6 +36,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicReference;
 
+import static 
org.apache.iotdb.confignode.procedure.impl.pipe.AbstractOperatePipeProcedureV2.copyAndFilterOutNonWorkingDataRegionPipeTasks;
+
 public abstract class AbstractOperateSubscriptionAndPipeProcedure
     extends AbstractOperateSubscriptionProcedure {
   private static final Logger LOGGER =
@@ -135,7 +137,7 @@ public abstract class 
AbstractOperateSubscriptionAndPipeProcedure
         LOGGER.warn("Pipe {} not found in PipeTaskInfo, can not push its 
meta.", pipeName);
         continue;
       }
-      pipeMetaBinaryList.add(pipeMeta.serialize());
+      
pipeMetaBinaryList.add(copyAndFilterOutNonWorkingDataRegionPipeTasks(pipeMeta).serialize());
     }
 
     return env.pushMultiPipeMetaToDataNodes(pipeMetaBinaryList);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/DataRegionListeningFilter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/DataRegionListeningFilter.java
index 18de48cfc32..3c0ab3594ad 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/DataRegionListeningFilter.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/DataRegionListeningFilter.java
@@ -61,6 +61,34 @@ public class DataRegionListeningFilter {
     }
   }
 
+  public static boolean shouldDatabaseBeListened(
+      final PipeParameters parameters, final boolean isTableModel, final 
String databaseRawName)
+      throws IllegalPathException {
+    final Pair<Boolean, Boolean> insertionDeletionListeningOptionPair =
+        parseInsertionDeletionListeningOptionPair(parameters);
+    final boolean hasSpecificListeningOption =
+        insertionDeletionListeningOptionPair.getLeft()
+            || insertionDeletionListeningOptionPair.getRight();
+    if (!hasSpecificListeningOption) {
+      return false;
+    }
+
+    if (isTableModel) {
+      final String databaseTableModel =
+          databaseRawName.startsWith("root.") ? databaseRawName.substring(5) : 
databaseRawName;
+      final TablePattern tablePattern =
+          TablePattern.parsePipePatternFromSourceParameters(parameters);
+      return tablePattern.isTableModelDataAllowedToBeCaptured()
+          && tablePattern.matchesDatabase(databaseTableModel);
+    } else {
+      final String databaseTreeModel =
+          databaseRawName.startsWith("root.") ? databaseRawName : "root." + 
databaseRawName;
+      final TreePattern treePattern = 
TreePattern.parsePipePatternFromSourceParameters(parameters);
+      return treePattern.isTreeModelDataAllowedToBeCaptured()
+          && treePattern.mayOverlapWithDb(databaseTreeModel);
+    }
+  }
+
   public static boolean shouldDataRegionBeListened(
       PipeParameters parameters, DataRegionId dataRegionId) throws 
IllegalPathException {
     final Pair<Boolean, Boolean> insertionDeletionListeningOptionPair =
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionExtractor.java
index c10d2ebefc3..ae08fa8b0cb 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionExtractor.java
@@ -132,8 +132,10 @@ public class 
PipeHistoricalDataRegionTsFileAndDeletionExtractor
 
   private TreePattern treePattern;
   private TablePattern tablePattern;
-  private boolean isDbNameCoveredByPattern = false;
+
   private boolean isModelDetected = false;
+  private boolean isTableModel;
+  private boolean isDbNameCoveredByPattern = false;
 
   private boolean isHistoricalExtractorEnabled = false;
   private long historicalDataExtractionStartTime = Long.MIN_VALUE; // Event 
time
@@ -153,8 +155,6 @@ public class 
PipeHistoricalDataRegionTsFileAndDeletionExtractor
 
   private volatile boolean hasBeenStarted = false;
 
-  private final Map<TsFileResource, Boolean> tsfile2IsTableModelMap = new 
HashMap<>(0);
-
   private Queue<PersistentResource> pendingQueue;
 
   @Override
@@ -669,49 +669,34 @@ public class 
PipeHistoricalDataRegionTsFileAndDeletionExtractor
     return deviceSet.stream()
         .anyMatch(
             deviceID -> {
-              if (deviceID instanceof PlainDeviceID
-                  || 
deviceID.getTableName().startsWith(TREE_MODEL_EVENT_TABLE_NAME_PREFIX)
-                  || deviceID.getTableName().equals(PATH_ROOT)) {
-                // In case of tree model deviceID
-                updateIsDbNameCoveredByPattern(resource, false);
-                if (treePattern.isTreeModelDataAllowedToBeCaptured()
-                    && treePattern.mayOverlapWithDevice(deviceID)) {
-                  tsfile2IsTableModelMap.computeIfAbsent(
-                      resource, (tsFileResource) -> Boolean.FALSE);
-                  return true;
-                }
-              } else {
-                // In case of table model deviceID
-                updateIsDbNameCoveredByPattern(resource, true);
-                if (tablePattern.isTableModelDataAllowedToBeCaptured()
-                    // The database name in resource is prefixed with "root."
-                    && 
tablePattern.matchesDatabase(resource.getDatabaseName().substring(5))
-                    && tablePattern.matchesTable(deviceID.getTableName())) {
-                  tsfile2IsTableModelMap.computeIfAbsent(
-                      resource, (tsFileResource) -> Boolean.TRUE);
-                  return true;
-                }
+              if (!isModelDetected) {
+                detectModel(resource, deviceID);
+                isModelDetected = true;
               }
-              return false;
+
+              return isTableModel
+                  ? (tablePattern.isTableModelDataAllowedToBeCaptured()
+                      // The database name in resource is prefixed with "root."
+                      && 
tablePattern.matchesDatabase(resource.getDatabaseName().substring(5))
+                      && tablePattern.matchesTable(deviceID.getTableName()))
+                  : (treePattern.isTreeModelDataAllowedToBeCaptured()
+                      && treePattern.mayOverlapWithDevice(deviceID));
             });
   }
 
-  private void updateIsDbNameCoveredByPattern(
-      final TsFileResource resource, final boolean isTableModel) {
-    if (isModelDetected) {
-      return;
-    }
+  private void detectModel(final TsFileResource resource, final IDeviceID 
deviceID) {
+    this.isTableModel =
+        !(deviceID instanceof PlainDeviceID
+            || 
deviceID.getTableName().startsWith(TREE_MODEL_EVENT_TABLE_NAME_PREFIX)
+            || deviceID.getTableName().equals(PATH_ROOT));
 
     final String databaseName = resource.getDatabaseName();
-    if (Objects.nonNull(databaseName)) {
-      isDbNameCoveredByPattern =
-          isTableModel
-              ? tablePattern.isTableModelDataAllowedToBeCaptured()
-                  && tablePattern.coversDb(databaseName.substring(5))
-              : treePattern.isTreeModelDataAllowedToBeCaptured()
-                  && treePattern.coversDb(databaseName);
-      isModelDetected = true;
-    }
+    isDbNameCoveredByPattern =
+        isTableModel
+            ? tablePattern.isTableModelDataAllowedToBeCaptured()
+                && tablePattern.coversDb(databaseName.substring(5))
+            : treePattern.isTreeModelDataAllowedToBeCaptured()
+                && treePattern.coversDb(databaseName);
   }
 
   private boolean isTsFileResourceOverlappedWithTimeRange(final TsFileResource 
resource) {
@@ -808,7 +793,7 @@ public class 
PipeHistoricalDataRegionTsFileAndDeletionExtractor
   private Event supplyTsFileEvent(TsFileResource resource) {
     final PipeTsFileInsertionEvent event =
         new PipeTsFileInsertionEvent(
-            tsfile2IsTableModelMap.remove(resource),
+            isModelDetected ? isTableModel : null,
             resource.getDatabaseName(),
             resource,
             shouldTransferModFile,
@@ -909,8 +894,6 @@ public class 
PipeHistoricalDataRegionTsFileAndDeletionExtractor
 
   @Override
   public synchronized void close() {
-    tsfile2IsTableModelMap.clear();
-
     if (Objects.nonNull(pendingQueue)) {
       pendingQueue.forEach(
           resource -> {

Reply via email to