This is an automated email from the ASF dual-hosted git repository.

yongzao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 4e7f3e58961 Pipe: Fixed the audit db filter on config receiver && 
Added the judgments for table model audit DB && Optimized the logger for 
receiver status (#17219)
4e7f3e58961 is described below

commit 4e7f3e589616bcf21349394f28e56b830fdb3ee7
Author: Caideyipi <[email protected]>
AuthorDate: Wed Feb 25 21:44:26 2026 +0800

    Pipe: Fixed the audit db filter on config receiver && Added the judgments 
for table model audit DB && Optimized the logger for receiver status (#17219)
    
    * fix
    
    * table-audit
    
    * defend
    
    * except
    
    * fix
---
 .../runtime/PipeLeaderChangeHandler.java           |  5 +-
 .../pipe/source/ConfigRegionListeningFilter.java   | 84 +++++++++++++++-------
 .../pipe/source/IoTDBConfigRegionSource.java       |  3 +-
 .../manager/schema/ClusterSchemaManager.java       | 14 ++--
 .../schema/CNPhysicalPlanGenerator.java            | 10 ++-
 .../impl/pipe/task/AlterPipeProcedureV2.java       |  3 +
 .../impl/pipe/task/CreatePipeProcedureV2.java      |  5 +-
 .../analyze/cache/partition/PartitionCache.java    |  3 +
 .../plan/planner/distribution/SourceRewriter.java  |  8 ++-
 .../db/storageengine/dataregion/DataRegion.java    |  4 ++
 .../pipe/receiver/PipeReceiverStatusHandler.java   | 10 +--
 11 files changed, 105 insertions(+), 44 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeLeaderChangeHandler.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeLeaderChangeHandler.java
index a5c4ae5a3b5..9d6b1431f1c 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeLeaderChangeHandler.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeLeaderChangeHandler.java
@@ -22,6 +22,7 @@ package 
org.apache.iotdb.confignode.manager.pipe.coordinator.runtime;
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
 import org.apache.iotdb.commons.schema.SchemaConstant;
+import org.apache.iotdb.commons.schema.table.Audit;
 import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
 import org.apache.iotdb.confignode.manager.ConfigManager;
 import 
org.apache.iotdb.confignode.manager.load.cache.consensus.ConsensusGroupStatistics;
@@ -78,7 +79,9 @@ public class PipeLeaderChangeHandler implements 
IClusterStatusSubscriber {
                   || (!databaseName.equals(SchemaConstant.SYSTEM_DATABASE)
                       && 
!databaseName.startsWith(SchemaConstant.SYSTEM_DATABASE + ".")
                       && !databaseName.equals(SchemaConstant.AUDIT_DATABASE)
-                      && 
!databaseName.startsWith(SchemaConstant.AUDIT_DATABASE + "."))) {
+                      && 
!databaseName.startsWith(SchemaConstant.AUDIT_DATABASE + ".")
+                      && !databaseName.equals(Audit.TABLE_MODEL_AUDIT_DATABASE)
+                      && 
!databaseName.startsWith(Audit.TABLE_MODEL_AUDIT_DATABASE + "."))) {
                 // null or -1 means empty origin leader
                 final int oldLeaderNodeId = (pair.left == null ? -1 : 
pair.left.getLeaderId());
                 final int newLeaderNodeId = (pair.right == null ? -1 : 
pair.right.getLeaderId());
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/source/ConfigRegionListeningFilter.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/source/ConfigRegionListeningFilter.java
index b189f338a91..5911ee8fc89 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/source/ConfigRegionListeningFilter.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/source/ConfigRegionListeningFilter.java
@@ -24,10 +24,13 @@ import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern;
 import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern;
 import org.apache.iotdb.commons.schema.SchemaConstant;
+import org.apache.iotdb.commons.schema.table.Audit;
 import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan;
 import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType;
 import 
org.apache.iotdb.confignode.consensus.request.write.database.DatabaseSchemaPlan;
 import 
org.apache.iotdb.confignode.consensus.request.write.database.DeleteDatabasePlan;
+import 
org.apache.iotdb.confignode.consensus.request.write.pipe.payload.PipeCreateTableOrViewPlan;
+import 
org.apache.iotdb.confignode.consensus.request.write.table.AbstractTablePlan;
 import 
org.apache.iotdb.confignode.consensus.request.write.template.CommitSetSchemaTemplatePlan;
 import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
 
@@ -252,34 +255,61 @@ public class ConfigRegionListeningFilter {
   static boolean shouldPlanBeListened(final ConfigPhysicalPlan plan) {
     final ConfigPhysicalPlanType type = plan.getType();
 
-    // Do not transfer roll back set template plan
-    if (type.equals(ConfigPhysicalPlanType.CommitSetSchemaTemplate)
-        && ((CommitSetSchemaTemplatePlan) plan).isRollback()) {
-      return false;
+    switch (type) {
+      // Do not transfer roll back set template plan
+      case CommitSetSchemaTemplate:
+        return !((CommitSetSchemaTemplatePlan) plan).isRollback();
+      // system / audit DB
+      case DeleteDatabase:
+        return !((DeleteDatabasePlan) 
plan).getName().equals(SchemaConstant.AUDIT_DATABASE)
+            && !((DeleteDatabasePlan) 
plan).getName().equals(SchemaConstant.SYSTEM_DATABASE)
+            && !((DeleteDatabasePlan) 
plan).getName().equals(Audit.TABLE_MODEL_AUDIT_DATABASE);
+      case CreateDatabase:
+      case AlterDatabase:
+        return !(((DatabaseSchemaPlan) plan)
+                .getSchema()
+                .getName()
+                .equals(SchemaConstant.SYSTEM_DATABASE)
+            && !((DatabaseSchemaPlan) plan)
+                .getSchema()
+                .getName()
+                .equals(SchemaConstant.AUDIT_DATABASE)
+            && !((DatabaseSchemaPlan) plan)
+                .getSchema()
+                .getName()
+                .equals(Audit.TABLE_MODEL_AUDIT_DATABASE));
+      // Table under audit db
+      case PipeCreateTableOrView:
+        return !((PipeCreateTableOrViewPlan) plan)
+            .getDatabase()
+            .equals(Audit.TABLE_MODEL_AUDIT_DATABASE);
+      case CommitCreateTable:
+      case AddTableColumn:
+      case AddViewColumn:
+      case SetTableProperties:
+      case SetViewProperties:
+      case SetTableComment:
+      case SetViewComment:
+      case SetTableColumnComment:
+      case RenameTable:
+      case RenameView:
+      case RenameTableColumn:
+      case RenameViewColumn:
+      case AlterColumnDataType:
+      case CommitDeleteTable:
+      case CommitDeleteView:
+      case CommitDeleteColumn:
+      case CommitDeleteViewColumn:
+      case PipeDeleteDevices:
+        return !((AbstractTablePlan) 
plan).getDatabase().equals(Audit.TABLE_MODEL_AUDIT_DATABASE);
+      // PipeEnriched & UnsetTemplate are not listened directly,
+      // but their inner plan or converted plan are listened.
+      case PipeEnriched:
+      case UnsetTemplate:
+        return true;
+      default:
+        return OPTION_PLAN_MAP.values().stream().anyMatch(types -> 
types.contains(type));
     }
-
-    // system / audit DB
-    if (type.equals(ConfigPhysicalPlanType.DeleteDatabase)
-            && (((DeleteDatabasePlan) 
plan).getName().equals(SchemaConstant.AUDIT_DATABASE)
-                || ((DeleteDatabasePlan) 
plan).getName().equals(SchemaConstant.SYSTEM_DATABASE))
-        || (type.equals(ConfigPhysicalPlanType.CreateDatabase)
-                || type.equals(ConfigPhysicalPlanType.AlterDatabase))
-            && (((DatabaseSchemaPlan) plan)
-                    .getSchema()
-                    .getName()
-                    .equals(SchemaConstant.SYSTEM_DATABASE)
-                || ((DatabaseSchemaPlan) plan)
-                    .getSchema()
-                    .getName()
-                    .equals(SchemaConstant.AUDIT_DATABASE))) {
-      return false;
-    }
-
-    // PipeEnriched & UnsetTemplate are not listened directly,
-    // but their inner plan or converted plan are listened.
-    return type.equals(ConfigPhysicalPlanType.PipeEnriched)
-        || type.equals(ConfigPhysicalPlanType.UnsetTemplate)
-        || OPTION_PLAN_MAP.values().stream().anyMatch(types -> 
types.contains(type));
   }
 
   public static Set<ConfigPhysicalPlanType> parseListeningPlanTypeSet(
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/source/IoTDBConfigRegionSource.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/source/IoTDBConfigRegionSource.java
index eca4943c46f..07ebb638313 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/source/IoTDBConfigRegionSource.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/source/IoTDBConfigRegionSource.java
@@ -344,7 +344,8 @@ public class IoTDBConfigRegionSource extends 
IoTDBNonDataRegionSource {
       final IoTDBTreePatternOperations treePattern,
       final TablePattern tablePattern) {
     final Boolean isTableDatabasePlan = isTableDatabasePlan(plan);
-    return listenedTypeSet.contains(plan.getType())
+    return ConfigRegionListeningFilter.shouldPlanBeListened(plan)
+        && listenedTypeSet.contains(plan.getType())
         && (Objects.isNull(isTableDatabasePlan)
             || Boolean.TRUE.equals(isTableDatabasePlan)
                 && tablePattern.isTableModelDataAllowedToBeCaptured()
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/schema/ClusterSchemaManager.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/schema/ClusterSchemaManager.java
index 490370e38c3..1ea8528213b 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/schema/ClusterSchemaManager.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/schema/ClusterSchemaManager.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.commons.exception.MetadataException;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.path.PathPatternTree;
 import org.apache.iotdb.commons.schema.SchemaConstant;
+import org.apache.iotdb.commons.schema.table.Audit;
 import org.apache.iotdb.commons.schema.table.NonCommittableTsTable;
 import org.apache.iotdb.commons.schema.table.TableNodeStatus;
 import org.apache.iotdb.commons.schema.table.TreeViewSchema;
@@ -180,7 +181,8 @@ public class ClusterSchemaManager {
       clusterSchemaInfo.isDatabaseNameValid(
           schema.getName(), schema.isSetIsTableModel() && 
schema.isIsTableModel());
       if (!schema.getName().equals(SchemaConstant.SYSTEM_DATABASE)
-          && !schema.getName().equals(SchemaConstant.AUDIT_DATABASE)) {
+          && !schema.getName().equals(SchemaConstant.AUDIT_DATABASE)
+          && !schema.getName().equals(Audit.TABLE_MODEL_AUDIT_DATABASE)) {
         clusterSchemaInfo.checkDatabaseLimit();
       }
       // Cache DatabaseSchema
@@ -488,7 +490,8 @@ public class ClusterSchemaManager {
     for (final TDatabaseSchema databaseSchema : databaseSchemaMap.values()) {
       if (!isDatabaseExist(databaseSchema.getName())
           || databaseSchema.getName().equals(SchemaConstant.SYSTEM_DATABASE)
-          || databaseSchema.getName().equals(SchemaConstant.AUDIT_DATABASE)) {
+          || databaseSchema.getName().equals(SchemaConstant.AUDIT_DATABASE)
+          || 
databaseSchema.getName().equals(Audit.TABLE_MODEL_AUDIT_DATABASE)) {
         // filter the pre deleted database and the system database
         databaseNum--;
       }
@@ -498,7 +501,8 @@ public class ClusterSchemaManager {
         new AdjustMaxRegionGroupNumPlan();
     for (final TDatabaseSchema databaseSchema : databaseSchemaMap.values()) {
       if (databaseSchema.getName().equals(SchemaConstant.SYSTEM_DATABASE)
-          || databaseSchema.getName().equals(SchemaConstant.AUDIT_DATABASE)) {
+          || databaseSchema.getName().equals(SchemaConstant.AUDIT_DATABASE)
+          || 
databaseSchema.getName().equals(Audit.TABLE_MODEL_AUDIT_DATABASE)) {
         // filter the system database
         continue;
       }
@@ -828,7 +832,9 @@ public class ClusterSchemaManager {
     TSStatus errorResp = null;
     final boolean isSystemDatabase =
         databaseSchema.getName().equals(SchemaConstant.SYSTEM_DATABASE);
-    final boolean isAuditDatabase = 
databaseSchema.getName().equals(SchemaConstant.AUDIT_DATABASE);
+    final boolean isAuditDatabase =
+        databaseSchema.getName().equals(SchemaConstant.AUDIT_DATABASE)
+            || 
databaseSchema.getName().equals(Audit.TABLE_MODEL_AUDIT_DATABASE);
 
     if (databaseSchema.getTTL() < 0) {
       errorResp =
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/schema/CNPhysicalPlanGenerator.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/schema/CNPhysicalPlanGenerator.java
index f36bd5fffbf..a7354ee8c9c 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/schema/CNPhysicalPlanGenerator.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/schema/CNPhysicalPlanGenerator.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.schema.SchemaConstant;
 import org.apache.iotdb.commons.schema.node.role.IDatabaseMNode;
 import org.apache.iotdb.commons.schema.node.utils.IMNodeFactory;
+import org.apache.iotdb.commons.schema.table.Audit;
 import org.apache.iotdb.commons.schema.table.TsTable;
 import org.apache.iotdb.commons.schema.template.Template;
 import org.apache.iotdb.commons.utils.AuthUtils;
@@ -481,8 +482,10 @@ public class CNPhysicalPlanGenerator
             }
             stack.push(new Pair<>(databaseMNode, true));
             name = databaseMNode.getName();
-            for (final TsTable table : tableSet) {
-              planDeque.add(new PipeCreateTableOrViewPlan(name, table));
+            if (!name.equals(Audit.TABLE_MODEL_AUDIT_DATABASE)) {
+              for (final TsTable table : tableSet) {
+                planDeque.add(new PipeCreateTableOrViewPlan(name, table));
+              }
             }
             tableSet.clear();
             break;
@@ -552,7 +555,8 @@ public class CNPhysicalPlanGenerator
 
     final TDatabaseSchema schema = 
databaseMNode.getAsMNode().getDatabaseSchema();
     if (!schema.getName().equals(SchemaConstant.AUDIT_DATABASE)
-        && !schema.getName().equals(SchemaConstant.SYSTEM_DATABASE)) {
+        && !schema.getName().equals(SchemaConstant.SYSTEM_DATABASE)
+        && !schema.getName().equals(Audit.TABLE_MODEL_AUDIT_DATABASE)) {
       final DatabaseSchemaPlan createDBPlan =
           new DatabaseSchemaPlan(ConfigPhysicalPlanType.CreateDatabase, 
schema);
       planDeque.add(createDBPlan);
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2.java
index 921661d13de..7b74b9dddc5 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2.java
@@ -30,6 +30,7 @@ import 
org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
 import org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant;
 import org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant;
 import org.apache.iotdb.commons.schema.SchemaConstant;
+import org.apache.iotdb.commons.schema.table.Audit;
 import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
 import 
org.apache.iotdb.confignode.consensus.request.write.pipe.task.AlterPipePlanV2;
@@ -191,6 +192,8 @@ public class AlterPipeProcedureV2 extends 
AbstractOperatePipeProcedureV2 {
                     && !databaseName.startsWith(SchemaConstant.SYSTEM_DATABASE 
+ ".")
                     && !databaseName.equals(SchemaConstant.AUDIT_DATABASE)
                     && !databaseName.startsWith(SchemaConstant.AUDIT_DATABASE 
+ ".")
+                    && !databaseName.equals(Audit.TABLE_MODEL_AUDIT_DATABASE)
+                    && 
!databaseName.startsWith(Audit.TABLE_MODEL_AUDIT_DATABASE + ".")
                     && !Objects.isNull(currentPipeTaskMeta)
                     && !(PipeTaskAgent.isHistoryOnlyPipe(
                             currentPipeStaticMeta.getSourceParameters())
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
index f880bbdb85c..867d20e078a 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
@@ -34,6 +34,7 @@ import org.apache.iotdb.commons.pipe.agent.task.meta.PipeType;
 import org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant;
 import org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant;
 import org.apache.iotdb.commons.schema.SchemaConstant;
+import org.apache.iotdb.commons.schema.table.Audit;
 import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
 import 
org.apache.iotdb.confignode.consensus.request.write.pipe.task.CreatePipePlanV2;
 import 
org.apache.iotdb.confignode.consensus.request.write.pipe.task.DropPipePlanV2;
@@ -313,7 +314,9 @@ public class CreatePipeProcedureV2 extends 
AbstractOperatePipeProcedureV2 {
                     && !databaseName.equals(SchemaConstant.SYSTEM_DATABASE)
                     && !databaseName.startsWith(SchemaConstant.SYSTEM_DATABASE 
+ ".")
                     && !databaseName.equals(SchemaConstant.AUDIT_DATABASE)
-                    && !databaseName.startsWith(SchemaConstant.AUDIT_DATABASE 
+ ".")) {
+                    && !databaseName.startsWith(SchemaConstant.AUDIT_DATABASE 
+ ".")
+                    && !databaseName.equals(Audit.TABLE_MODEL_AUDIT_DATABASE)
+                    && 
!databaseName.startsWith(Audit.TABLE_MODEL_AUDIT_DATABASE + ".")) {
                   // Pipe only collect user's data, filter out metric database 
here.
                   consensusGroupIdToTaskMetaMap.put(
                       regionGroupId.getId(),
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/partition/PartitionCache.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/partition/PartitionCache.java
index 3c76bd08c1d..330ff8b172f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/partition/PartitionCache.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/partition/PartitionCache.java
@@ -42,6 +42,7 @@ import 
org.apache.iotdb.commons.partition.executor.SeriesPartitionExecutor;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.pipe.config.constant.SystemConstant;
 import org.apache.iotdb.commons.schema.SchemaConstant;
+import org.apache.iotdb.commons.schema.table.Audit;
 import org.apache.iotdb.commons.service.metric.PerformanceOverviewMetrics;
 import org.apache.iotdb.commons.utils.PathUtils;
 import org.apache.iotdb.confignode.rpc.thrift.TDatabaseSchema;
@@ -300,6 +301,8 @@ public class PartitionCache {
             databaseNamesNeedCreated.add(SchemaConstant.SYSTEM_DATABASE);
           } else if (PathUtils.isStartWith(deviceID, 
SchemaConstant.AUDIT_DATABASE)) {
             databaseNamesNeedCreated.add(SchemaConstant.AUDIT_DATABASE);
+          } else if (PathUtils.isStartWith(deviceID, 
Audit.TABLE_MODEL_AUDIT_DATABASE)) {
+            databaseNamesNeedCreated.add(Audit.TABLE_MODEL_AUDIT_DATABASE);
           } else {
             final PartialPath databaseNameNeedCreated =
                 MetaUtils.getDatabasePathByLevel(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java
index bc6475b2323..4e69469a497 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.commons.partition.DataPartition;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.path.PathPatternTree;
 import org.apache.iotdb.commons.schema.SchemaConstant;
+import org.apache.iotdb.commons.schema.table.Audit;
 import org.apache.iotdb.commons.utils.PathUtils;
 import org.apache.iotdb.commons.utils.TimePartitionUtils;
 import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
@@ -655,12 +656,13 @@ public class SourceRewriter extends 
BaseSourceRewriter<DistributionPlanContext>
           .getSchemaPartitionInfo()
           .getSchemaPartitionMap()
           .forEach(
-              (storageGroup, deviceGroup) -> {
-                if (storageGroup.equals(SchemaConstant.SYSTEM_DATABASE)) {
+              (database, deviceGroup) -> {
+                if (database.equals(SchemaConstant.SYSTEM_DATABASE)) {
                   deviceGroup.forEach(
                       (deviceGroupId, schemaRegionReplicaSet) ->
                           regionsOfSystemDatabase.add(schemaRegionReplicaSet));
-                } else if (storageGroup.equals(SchemaConstant.AUDIT_DATABASE)) 
{
+                } else if (database.equals(SchemaConstant.AUDIT_DATABASE)
+                    || database.equals(Audit.TABLE_MODEL_AUDIT_DATABASE)) {
                   deviceGroup.forEach(
                       (deviceGroupId, schemaRegionReplicaSet) ->
                           regionsOfAuditDatabase.add(schemaRegionReplicaSet));
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index 20c78fd0b8c..20d53439b95 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -31,6 +31,7 @@ import org.apache.iotdb.commons.file.SystemFileFactory;
 import org.apache.iotdb.commons.path.IFullPath;
 import org.apache.iotdb.commons.path.MeasurementPath;
 import org.apache.iotdb.commons.schema.SchemaConstant;
+import org.apache.iotdb.commons.schema.table.Audit;
 import org.apache.iotdb.commons.schema.table.InformationSchema;
 import org.apache.iotdb.commons.schema.table.TsFileTableSchemaUtil;
 import org.apache.iotdb.commons.schema.table.TsTable;
@@ -3799,6 +3800,9 @@ public class DataRegion implements IDataRegionForQuery {
     if (databaseName.startsWith(SchemaConstant.AUDIT_DATABASE)) {
       return Optional.empty();
     }
+    if (databaseName.startsWith(Audit.TABLE_MODEL_AUDIT_DATABASE)) {
+      return Optional.empty();
+    }
     int lastIndex = databaseName.lastIndexOf("-");
     if (lastIndex == -1) {
       lastIndex = databaseName.length();
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java
index 9343e6fbf1d..ce4d783d9c3 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java
@@ -135,8 +135,9 @@ public class PipeReceiverStatusHandler {
         {
           PipeLogger.log(
               LOGGER::info,
-              "Temporary unavailable exception: will retry forever. status: 
%s",
-              status);
+              "Temporary unavailable exception: will retry forever. status: 
%s, message: %s",
+              status,
+              exceptionMessage);
           throw new PipeRuntimeSinkNonReportTimeConfigurableException(
               exceptionMessage, Long.MAX_VALUE);
         }
@@ -243,9 +244,10 @@ public class PipeReceiverStatusHandler {
     if (retryMaxMillisWhenOtherExceptionsOccur == Long.MAX_VALUE) {
       PipeLogger.log(
           LOGGER::warn,
-          "%s: will retry forever. status: %s",
+          "%s: will retry forever. status: %s, message: %s",
           getNoPermission(noPermission),
-          status);
+          status,
+          exceptionMessage);
     } else {
       LOGGER.warn(
           "{}: will retry for at least {} seconds. status: {}",

Reply via email to