This is an automated email from the ASF dual-hosted git repository.
yongzao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 4e7f3e58961 Pipe: Fixed the audit db filter on config receiver &&
Added the judgments for table model audit DB && Optimized the logger for
receiver status (#17219)
4e7f3e58961 is described below
commit 4e7f3e589616bcf21349394f28e56b830fdb3ee7
Author: Caideyipi <[email protected]>
AuthorDate: Wed Feb 25 21:44:26 2026 +0800
Pipe: Fixed the audit db filter on config receiver && Added the judgments
for table model audit DB && Optimized the logger for receiver status (#17219)
* fix
* table-audit
* defend
* except
* fix
---
.../runtime/PipeLeaderChangeHandler.java | 5 +-
.../pipe/source/ConfigRegionListeningFilter.java | 84 +++++++++++++++-------
.../pipe/source/IoTDBConfigRegionSource.java | 3 +-
.../manager/schema/ClusterSchemaManager.java | 14 ++--
.../schema/CNPhysicalPlanGenerator.java | 10 ++-
.../impl/pipe/task/AlterPipeProcedureV2.java | 3 +
.../impl/pipe/task/CreatePipeProcedureV2.java | 5 +-
.../analyze/cache/partition/PartitionCache.java | 3 +
.../plan/planner/distribution/SourceRewriter.java | 8 ++-
.../db/storageengine/dataregion/DataRegion.java | 4 ++
.../pipe/receiver/PipeReceiverStatusHandler.java | 10 +--
11 files changed, 105 insertions(+), 44 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeLeaderChangeHandler.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeLeaderChangeHandler.java
index a5c4ae5a3b5..9d6b1431f1c 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeLeaderChangeHandler.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeLeaderChangeHandler.java
@@ -22,6 +22,7 @@ package
org.apache.iotdb.confignode.manager.pipe.coordinator.runtime;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
import org.apache.iotdb.commons.schema.SchemaConstant;
+import org.apache.iotdb.commons.schema.table.Audit;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import org.apache.iotdb.confignode.manager.ConfigManager;
import
org.apache.iotdb.confignode.manager.load.cache.consensus.ConsensusGroupStatistics;
@@ -78,7 +79,9 @@ public class PipeLeaderChangeHandler implements
IClusterStatusSubscriber {
|| (!databaseName.equals(SchemaConstant.SYSTEM_DATABASE)
&&
!databaseName.startsWith(SchemaConstant.SYSTEM_DATABASE + ".")
&& !databaseName.equals(SchemaConstant.AUDIT_DATABASE)
- &&
!databaseName.startsWith(SchemaConstant.AUDIT_DATABASE + "."))) {
+ &&
!databaseName.startsWith(SchemaConstant.AUDIT_DATABASE + ".")
+ && !databaseName.equals(Audit.TABLE_MODEL_AUDIT_DATABASE)
+ &&
!databaseName.startsWith(Audit.TABLE_MODEL_AUDIT_DATABASE + "."))) {
// null or -1 means empty origin leader
final int oldLeaderNodeId = (pair.left == null ? -1 :
pair.left.getLeaderId());
final int newLeaderNodeId = (pair.right == null ? -1 :
pair.right.getLeaderId());
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/source/ConfigRegionListeningFilter.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/source/ConfigRegionListeningFilter.java
index b189f338a91..5911ee8fc89 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/source/ConfigRegionListeningFilter.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/source/ConfigRegionListeningFilter.java
@@ -24,10 +24,13 @@ import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern;
import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern;
import org.apache.iotdb.commons.schema.SchemaConstant;
+import org.apache.iotdb.commons.schema.table.Audit;
import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan;
import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType;
import
org.apache.iotdb.confignode.consensus.request.write.database.DatabaseSchemaPlan;
import
org.apache.iotdb.confignode.consensus.request.write.database.DeleteDatabasePlan;
+import
org.apache.iotdb.confignode.consensus.request.write.pipe.payload.PipeCreateTableOrViewPlan;
+import
org.apache.iotdb.confignode.consensus.request.write.table.AbstractTablePlan;
import
org.apache.iotdb.confignode.consensus.request.write.template.CommitSetSchemaTemplatePlan;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
@@ -252,34 +255,61 @@ public class ConfigRegionListeningFilter {
static boolean shouldPlanBeListened(final ConfigPhysicalPlan plan) {
final ConfigPhysicalPlanType type = plan.getType();
- // Do not transfer roll back set template plan
- if (type.equals(ConfigPhysicalPlanType.CommitSetSchemaTemplate)
- && ((CommitSetSchemaTemplatePlan) plan).isRollback()) {
- return false;
+ switch (type) {
+ // Do not transfer roll back set template plan
+ case CommitSetSchemaTemplate:
+ return !((CommitSetSchemaTemplatePlan) plan).isRollback();
+ // system / audit DB
+ case DeleteDatabase:
+ return !((DeleteDatabasePlan)
plan).getName().equals(SchemaConstant.AUDIT_DATABASE)
+ && !((DeleteDatabasePlan)
plan).getName().equals(SchemaConstant.SYSTEM_DATABASE)
+ && !((DeleteDatabasePlan)
plan).getName().equals(Audit.TABLE_MODEL_AUDIT_DATABASE);
+ case CreateDatabase:
+ case AlterDatabase:
+ return !(((DatabaseSchemaPlan) plan)
+ .getSchema()
+ .getName()
+ .equals(SchemaConstant.SYSTEM_DATABASE)
+ && !((DatabaseSchemaPlan) plan)
+ .getSchema()
+ .getName()
+ .equals(SchemaConstant.AUDIT_DATABASE)
+ && !((DatabaseSchemaPlan) plan)
+ .getSchema()
+ .getName()
+ .equals(Audit.TABLE_MODEL_AUDIT_DATABASE));
+ // Table under audit db
+ case PipeCreateTableOrView:
+ return !((PipeCreateTableOrViewPlan) plan)
+ .getDatabase()
+ .equals(Audit.TABLE_MODEL_AUDIT_DATABASE);
+ case CommitCreateTable:
+ case AddTableColumn:
+ case AddViewColumn:
+ case SetTableProperties:
+ case SetViewProperties:
+ case SetTableComment:
+ case SetViewComment:
+ case SetTableColumnComment:
+ case RenameTable:
+ case RenameView:
+ case RenameTableColumn:
+ case RenameViewColumn:
+ case AlterColumnDataType:
+ case CommitDeleteTable:
+ case CommitDeleteView:
+ case CommitDeleteColumn:
+ case CommitDeleteViewColumn:
+ case PipeDeleteDevices:
+ return !((AbstractTablePlan)
plan).getDatabase().equals(Audit.TABLE_MODEL_AUDIT_DATABASE);
+ // PipeEnriched & UnsetTemplate are not listened directly,
+ // but their inner plan or converted plan are listened.
+ case PipeEnriched:
+ case UnsetTemplate:
+ return true;
+ default:
+ return OPTION_PLAN_MAP.values().stream().anyMatch(types ->
types.contains(type));
}
-
- // system / audit DB
- if (type.equals(ConfigPhysicalPlanType.DeleteDatabase)
- && (((DeleteDatabasePlan)
plan).getName().equals(SchemaConstant.AUDIT_DATABASE)
- || ((DeleteDatabasePlan)
plan).getName().equals(SchemaConstant.SYSTEM_DATABASE))
- || (type.equals(ConfigPhysicalPlanType.CreateDatabase)
- || type.equals(ConfigPhysicalPlanType.AlterDatabase))
- && (((DatabaseSchemaPlan) plan)
- .getSchema()
- .getName()
- .equals(SchemaConstant.SYSTEM_DATABASE)
- || ((DatabaseSchemaPlan) plan)
- .getSchema()
- .getName()
- .equals(SchemaConstant.AUDIT_DATABASE))) {
- return false;
- }
-
- // PipeEnriched & UnsetTemplate are not listened directly,
- // but their inner plan or converted plan are listened.
- return type.equals(ConfigPhysicalPlanType.PipeEnriched)
- || type.equals(ConfigPhysicalPlanType.UnsetTemplate)
- || OPTION_PLAN_MAP.values().stream().anyMatch(types ->
types.contains(type));
}
public static Set<ConfigPhysicalPlanType> parseListeningPlanTypeSet(
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/source/IoTDBConfigRegionSource.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/source/IoTDBConfigRegionSource.java
index eca4943c46f..07ebb638313 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/source/IoTDBConfigRegionSource.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/source/IoTDBConfigRegionSource.java
@@ -344,7 +344,8 @@ public class IoTDBConfigRegionSource extends
IoTDBNonDataRegionSource {
final IoTDBTreePatternOperations treePattern,
final TablePattern tablePattern) {
final Boolean isTableDatabasePlan = isTableDatabasePlan(plan);
- return listenedTypeSet.contains(plan.getType())
+ return ConfigRegionListeningFilter.shouldPlanBeListened(plan)
+ && listenedTypeSet.contains(plan.getType())
&& (Objects.isNull(isTableDatabasePlan)
|| Boolean.TRUE.equals(isTableDatabasePlan)
&& tablePattern.isTableModelDataAllowedToBeCaptured()
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/schema/ClusterSchemaManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/schema/ClusterSchemaManager.java
index 490370e38c3..1ea8528213b 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/schema/ClusterSchemaManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/schema/ClusterSchemaManager.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.path.PathPatternTree;
import org.apache.iotdb.commons.schema.SchemaConstant;
+import org.apache.iotdb.commons.schema.table.Audit;
import org.apache.iotdb.commons.schema.table.NonCommittableTsTable;
import org.apache.iotdb.commons.schema.table.TableNodeStatus;
import org.apache.iotdb.commons.schema.table.TreeViewSchema;
@@ -180,7 +181,8 @@ public class ClusterSchemaManager {
clusterSchemaInfo.isDatabaseNameValid(
schema.getName(), schema.isSetIsTableModel() &&
schema.isIsTableModel());
if (!schema.getName().equals(SchemaConstant.SYSTEM_DATABASE)
- && !schema.getName().equals(SchemaConstant.AUDIT_DATABASE)) {
+ && !schema.getName().equals(SchemaConstant.AUDIT_DATABASE)
+ && !schema.getName().equals(Audit.TABLE_MODEL_AUDIT_DATABASE)) {
clusterSchemaInfo.checkDatabaseLimit();
}
// Cache DatabaseSchema
@@ -488,7 +490,8 @@ public class ClusterSchemaManager {
for (final TDatabaseSchema databaseSchema : databaseSchemaMap.values()) {
if (!isDatabaseExist(databaseSchema.getName())
|| databaseSchema.getName().equals(SchemaConstant.SYSTEM_DATABASE)
- || databaseSchema.getName().equals(SchemaConstant.AUDIT_DATABASE)) {
+ || databaseSchema.getName().equals(SchemaConstant.AUDIT_DATABASE)
+ ||
databaseSchema.getName().equals(Audit.TABLE_MODEL_AUDIT_DATABASE)) {
// filter the pre deleted database and the system database
databaseNum--;
}
@@ -498,7 +501,8 @@ public class ClusterSchemaManager {
new AdjustMaxRegionGroupNumPlan();
for (final TDatabaseSchema databaseSchema : databaseSchemaMap.values()) {
if (databaseSchema.getName().equals(SchemaConstant.SYSTEM_DATABASE)
- || databaseSchema.getName().equals(SchemaConstant.AUDIT_DATABASE)) {
+ || databaseSchema.getName().equals(SchemaConstant.AUDIT_DATABASE)
+ ||
databaseSchema.getName().equals(Audit.TABLE_MODEL_AUDIT_DATABASE)) {
// filter the system database
continue;
}
@@ -828,7 +832,9 @@ public class ClusterSchemaManager {
TSStatus errorResp = null;
final boolean isSystemDatabase =
databaseSchema.getName().equals(SchemaConstant.SYSTEM_DATABASE);
- final boolean isAuditDatabase =
databaseSchema.getName().equals(SchemaConstant.AUDIT_DATABASE);
+ final boolean isAuditDatabase =
+ databaseSchema.getName().equals(SchemaConstant.AUDIT_DATABASE)
+ ||
databaseSchema.getName().equals(Audit.TABLE_MODEL_AUDIT_DATABASE);
if (databaseSchema.getTTL() < 0) {
errorResp =
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/schema/CNPhysicalPlanGenerator.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/schema/CNPhysicalPlanGenerator.java
index f36bd5fffbf..a7354ee8c9c 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/schema/CNPhysicalPlanGenerator.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/schema/CNPhysicalPlanGenerator.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.schema.SchemaConstant;
import org.apache.iotdb.commons.schema.node.role.IDatabaseMNode;
import org.apache.iotdb.commons.schema.node.utils.IMNodeFactory;
+import org.apache.iotdb.commons.schema.table.Audit;
import org.apache.iotdb.commons.schema.table.TsTable;
import org.apache.iotdb.commons.schema.template.Template;
import org.apache.iotdb.commons.utils.AuthUtils;
@@ -481,8 +482,10 @@ public class CNPhysicalPlanGenerator
}
stack.push(new Pair<>(databaseMNode, true));
name = databaseMNode.getName();
- for (final TsTable table : tableSet) {
- planDeque.add(new PipeCreateTableOrViewPlan(name, table));
+ if (!name.equals(Audit.TABLE_MODEL_AUDIT_DATABASE)) {
+ for (final TsTable table : tableSet) {
+ planDeque.add(new PipeCreateTableOrViewPlan(name, table));
+ }
}
tableSet.clear();
break;
@@ -552,7 +555,8 @@ public class CNPhysicalPlanGenerator
final TDatabaseSchema schema =
databaseMNode.getAsMNode().getDatabaseSchema();
if (!schema.getName().equals(SchemaConstant.AUDIT_DATABASE)
- && !schema.getName().equals(SchemaConstant.SYSTEM_DATABASE)) {
+ && !schema.getName().equals(SchemaConstant.SYSTEM_DATABASE)
+ && !schema.getName().equals(Audit.TABLE_MODEL_AUDIT_DATABASE)) {
final DatabaseSchemaPlan createDBPlan =
new DatabaseSchemaPlan(ConfigPhysicalPlanType.CreateDatabase,
schema);
planDeque.add(createDBPlan);
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2.java
index 921661d13de..7b74b9dddc5 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2.java
@@ -30,6 +30,7 @@ import
org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
import org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant;
import org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant;
import org.apache.iotdb.commons.schema.SchemaConstant;
+import org.apache.iotdb.commons.schema.table.Audit;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import
org.apache.iotdb.confignode.consensus.request.write.pipe.task.AlterPipePlanV2;
@@ -191,6 +192,8 @@ public class AlterPipeProcedureV2 extends
AbstractOperatePipeProcedureV2 {
&& !databaseName.startsWith(SchemaConstant.SYSTEM_DATABASE
+ ".")
&& !databaseName.equals(SchemaConstant.AUDIT_DATABASE)
&& !databaseName.startsWith(SchemaConstant.AUDIT_DATABASE
+ ".")
+ && !databaseName.equals(Audit.TABLE_MODEL_AUDIT_DATABASE)
+ &&
!databaseName.startsWith(Audit.TABLE_MODEL_AUDIT_DATABASE + ".")
&& !Objects.isNull(currentPipeTaskMeta)
&& !(PipeTaskAgent.isHistoryOnlyPipe(
currentPipeStaticMeta.getSourceParameters())
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
index f880bbdb85c..867d20e078a 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
@@ -34,6 +34,7 @@ import org.apache.iotdb.commons.pipe.agent.task.meta.PipeType;
import org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant;
import org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant;
import org.apache.iotdb.commons.schema.SchemaConstant;
+import org.apache.iotdb.commons.schema.table.Audit;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import
org.apache.iotdb.confignode.consensus.request.write.pipe.task.CreatePipePlanV2;
import
org.apache.iotdb.confignode.consensus.request.write.pipe.task.DropPipePlanV2;
@@ -313,7 +314,9 @@ public class CreatePipeProcedureV2 extends
AbstractOperatePipeProcedureV2 {
&& !databaseName.equals(SchemaConstant.SYSTEM_DATABASE)
&& !databaseName.startsWith(SchemaConstant.SYSTEM_DATABASE
+ ".")
&& !databaseName.equals(SchemaConstant.AUDIT_DATABASE)
- && !databaseName.startsWith(SchemaConstant.AUDIT_DATABASE
+ ".")) {
+ && !databaseName.startsWith(SchemaConstant.AUDIT_DATABASE
+ ".")
+ && !databaseName.equals(Audit.TABLE_MODEL_AUDIT_DATABASE)
+ &&
!databaseName.startsWith(Audit.TABLE_MODEL_AUDIT_DATABASE + ".")) {
// Pipe only collect user's data, filter out metric database
here.
consensusGroupIdToTaskMetaMap.put(
regionGroupId.getId(),
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/partition/PartitionCache.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/partition/PartitionCache.java
index 3c76bd08c1d..330ff8b172f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/partition/PartitionCache.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/partition/PartitionCache.java
@@ -42,6 +42,7 @@ import
org.apache.iotdb.commons.partition.executor.SeriesPartitionExecutor;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.pipe.config.constant.SystemConstant;
import org.apache.iotdb.commons.schema.SchemaConstant;
+import org.apache.iotdb.commons.schema.table.Audit;
import org.apache.iotdb.commons.service.metric.PerformanceOverviewMetrics;
import org.apache.iotdb.commons.utils.PathUtils;
import org.apache.iotdb.confignode.rpc.thrift.TDatabaseSchema;
@@ -300,6 +301,8 @@ public class PartitionCache {
databaseNamesNeedCreated.add(SchemaConstant.SYSTEM_DATABASE);
} else if (PathUtils.isStartWith(deviceID,
SchemaConstant.AUDIT_DATABASE)) {
databaseNamesNeedCreated.add(SchemaConstant.AUDIT_DATABASE);
+ } else if (PathUtils.isStartWith(deviceID,
Audit.TABLE_MODEL_AUDIT_DATABASE)) {
+ databaseNamesNeedCreated.add(Audit.TABLE_MODEL_AUDIT_DATABASE);
} else {
final PartialPath databaseNameNeedCreated =
MetaUtils.getDatabasePathByLevel(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java
index bc6475b2323..4e69469a497 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.commons.partition.DataPartition;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.path.PathPatternTree;
import org.apache.iotdb.commons.schema.SchemaConstant;
+import org.apache.iotdb.commons.schema.table.Audit;
import org.apache.iotdb.commons.utils.PathUtils;
import org.apache.iotdb.commons.utils.TimePartitionUtils;
import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
@@ -655,12 +656,13 @@ public class SourceRewriter extends
BaseSourceRewriter<DistributionPlanContext>
.getSchemaPartitionInfo()
.getSchemaPartitionMap()
.forEach(
- (storageGroup, deviceGroup) -> {
- if (storageGroup.equals(SchemaConstant.SYSTEM_DATABASE)) {
+ (database, deviceGroup) -> {
+ if (database.equals(SchemaConstant.SYSTEM_DATABASE)) {
deviceGroup.forEach(
(deviceGroupId, schemaRegionReplicaSet) ->
regionsOfSystemDatabase.add(schemaRegionReplicaSet));
- } else if (storageGroup.equals(SchemaConstant.AUDIT_DATABASE))
{
+ } else if (database.equals(SchemaConstant.AUDIT_DATABASE)
+ || database.equals(Audit.TABLE_MODEL_AUDIT_DATABASE)) {
deviceGroup.forEach(
(deviceGroupId, schemaRegionReplicaSet) ->
regionsOfAuditDatabase.add(schemaRegionReplicaSet));
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index 20c78fd0b8c..20d53439b95 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -31,6 +31,7 @@ import org.apache.iotdb.commons.file.SystemFileFactory;
import org.apache.iotdb.commons.path.IFullPath;
import org.apache.iotdb.commons.path.MeasurementPath;
import org.apache.iotdb.commons.schema.SchemaConstant;
+import org.apache.iotdb.commons.schema.table.Audit;
import org.apache.iotdb.commons.schema.table.InformationSchema;
import org.apache.iotdb.commons.schema.table.TsFileTableSchemaUtil;
import org.apache.iotdb.commons.schema.table.TsTable;
@@ -3799,6 +3800,9 @@ public class DataRegion implements IDataRegionForQuery {
if (databaseName.startsWith(SchemaConstant.AUDIT_DATABASE)) {
return Optional.empty();
}
+ if (databaseName.startsWith(Audit.TABLE_MODEL_AUDIT_DATABASE)) {
+ return Optional.empty();
+ }
int lastIndex = databaseName.lastIndexOf("-");
if (lastIndex == -1) {
lastIndex = databaseName.length();
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java
index 9343e6fbf1d..ce4d783d9c3 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java
@@ -135,8 +135,9 @@ public class PipeReceiverStatusHandler {
{
PipeLogger.log(
LOGGER::info,
- "Temporary unavailable exception: will retry forever. status:
%s",
- status);
+ "Temporary unavailable exception: will retry forever. status:
%s, message: %s",
+ status,
+ exceptionMessage);
throw new PipeRuntimeSinkNonReportTimeConfigurableException(
exceptionMessage, Long.MAX_VALUE);
}
@@ -243,9 +244,10 @@ public class PipeReceiverStatusHandler {
if (retryMaxMillisWhenOtherExceptionsOccur == Long.MAX_VALUE) {
PipeLogger.log(
LOGGER::warn,
- "%s: will retry forever. status: %s",
+ "%s: will retry forever. status: %s, message: %s",
getNoPermission(noPermission),
- status);
+ status,
+ exceptionMessage);
} else {
LOGGER.warn(
"{}: will retry for at least {} seconds. status: {}",