This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 677f9b8e164 Load: Added "skipFailedTableSchemaCheck" parameter (#16522)
677f9b8e164 is described below

commit 677f9b8e16443d07dca66d888b9a862ed20165d5
Author: Caideyipi <[email protected]>
AuthorDate: Mon Sep 29 20:00:50 2025 +0800

    Load: Added "skipFailedTableSchemaCheck" parameter (#16522)
    
    * fix
    
    fix
    
    * tested
---
 .../analyze/load/LoadTsFileTableSchemaCache.java   | 14 +++++++++++++-
 .../fetcher/TableHeaderSchemaValidator.java        | 22 +++++++++++-----------
 .../apache/iotdb/commons/conf/CommonConfig.java    | 13 +++++++++++++
 .../iotdb/commons/pipe/config/PipeConfig.java      |  4 ++++
 .../iotdb/commons/pipe/config/PipeDescriptor.java  |  5 +++++
 5 files changed, 46 insertions(+), 12 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileTableSchemaCache.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileTableSchemaCache.java
index abf0b0afe5b..18b5d4cbc32 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileTableSchemaCache.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileTableSchemaCache.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.queryengine.plan.analyze.load;
 
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.path.PatternTreeMap;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
 import org.apache.iotdb.confignode.rpc.thrift.TDatabaseSchema;
 import org.apache.iotdb.db.auth.AuthorityChecker;
@@ -146,7 +147,18 @@ public class LoadTsFileTableSchemaCache {
           e);
     }
 
-    createTableAndDatabaseIfNecessary(device.getTableName());
+    try {
+      createTableAndDatabaseIfNecessary(device.getTableName());
+    } catch (final Exception e) {
+      if (PipeConfig.getInstance().isSkipFailedTableSchemaCheck()) {
+        LOGGER.info(
+            "Failed to check table schema, will skip because 
skipFailedTableSchemaCheck is set to true, message: {}",
+            e.getMessage());
+      } else {
+        throw e;
+      }
+    }
+
     // TODO: add permission check and record auth cost
     addDevice(device);
     if (shouldFlushDevices()) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/TableHeaderSchemaValidator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/TableHeaderSchemaValidator.java
index 29ff418d4b5..032b0d91cfd 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/TableHeaderSchemaValidator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/TableHeaderSchemaValidator.java
@@ -132,37 +132,37 @@ public class TableHeaderSchemaValidator {
       DataNodeTreeViewSchemaUtils.checkTableInWrite(database, table);
       // If table with this name already exists and isStrictTagColumn is true, 
make sure the
       // existing
-      // id columns are the prefix of the incoming id columns, or vice versa
+      // id columns are the prefix of the incoming tag columns, or vice versa
       if (isStrictTagColumn) {
         final List<TsTableColumnSchema> realTagColumns = 
table.getTagColumnSchemaList();
         final List<ColumnSchema> incomingTagColumns = 
tableSchema.getTagColumns();
         if (realTagColumns.size() <= incomingTagColumns.size()) {
-          // When incoming table has more ID columns, the existing id columns
-          // should be the prefix of the incoming id columns (or equal)
+          // When incoming table has more TAG columns, the existing tag columns
+          // should be the prefix of the incoming tag columns (or equal)
           for (int indexReal = 0; indexReal < realTagColumns.size(); 
indexReal++) {
             final String tagName = 
realTagColumns.get(indexReal).getColumnName();
             final int indexIncoming = 
tableSchema.getIndexAmongTagColumns(tagName);
             if (indexIncoming != indexReal) {
               throw new LoadAnalyzeTableColumnDisorderException(
                   String.format(
-                      "Can not create table because incoming table has no less 
id columns than existing table, "
-                          + "and the existing id columns are not the prefix of 
the incoming id columns. "
-                          + "Existing id column: %s, index in existing table: 
%s, index in incoming table: %s",
+                      "Can not create table because incoming table has no less 
tag columns than existing table, "
+                          + "and the existing tag columns are not the prefix 
of the incoming tag columns. "
+                          + "Existing tag column: %s, index in existing table: 
%s, index in incoming table: %s",
                       tagName, indexReal, indexIncoming));
             }
           }
         } else {
-          // When existing table has more ID columns, the incoming id columns
-          // should be the prefix of the existing id columns
+          // When existing table has more TAG columns, the incoming tag columns
+          // should be the prefix of the existing tag columns
           for (int indexIncoming = 0; indexIncoming < 
incomingTagColumns.size(); indexIncoming++) {
             final String tagName = 
incomingTagColumns.get(indexIncoming).getName();
             final int indexReal = table.getTagColumnOrdinal(tagName);
             if (indexReal != indexIncoming) {
               throw new LoadAnalyzeTableColumnDisorderException(
                   String.format(
-                      "Can not create table because existing table has more id 
columns than incoming table, "
-                          + "and the incoming id columns are not the prefix of 
the existing id columns. "
-                          + "Incoming id column: %s, index in existing table: 
%s, index in incoming table: %s",
+                      "Can not create table because existing table has more 
tag columns than incoming table, "
+                          + "and the incoming tag columns are not the prefix 
of the existing tag columns. "
+                          + "Incoming tag column: %s, index in existing table: 
%s, index in incoming table: %s",
                       tagName, indexReal, indexIncoming));
             }
           }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index a960584c66c..17b0ca57746 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -209,6 +209,7 @@ public class CommonConfig {
   // Sequentially poll the tsFile by default
   private int pipeRealTimeQueuePollHistoricalTsFileThreshold = 1;
   private int pipeRealTimeQueueMaxWaitingTsFileSize = 1;
+  private boolean skipFailedTableSchemaCheck = true;
 
   /** The maximum number of threads that can be used to execute subtasks in 
PipeSubtaskExecutor. */
   private int pipeSubtaskExecutorMaxThreadNum =
@@ -1522,6 +1523,18 @@ public class CommonConfig {
         pipeRealTimeQueueMaxWaitingTsFileSize);
   }
 
+  public boolean isSkipFailedTableSchemaCheck() {
+    return skipFailedTableSchemaCheck;
+  }
+
+  public void setSkipFailedTableSchemaCheck(boolean 
skipFailedTableSchemaCheck) {
+    if (this.skipFailedTableSchemaCheck == skipFailedTableSchemaCheck) {
+      return;
+    }
+    this.skipFailedTableSchemaCheck = skipFailedTableSchemaCheck;
+    logger.info("skipFailedTableSchemaCheck is set to {}.", 
skipFailedTableSchemaCheck);
+  }
+
   public void setPipeAirGapReceiverEnabled(boolean pipeAirGapReceiverEnabled) {
     if (pipeAirGapReceiverEnabled == this.pipeAirGapReceiverEnabled) {
       return;
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index 329d8815ac8..37d9af0f3e6 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -119,6 +119,10 @@ public class PipeConfig {
     return COMMON_CONFIG.getPipeRealTimeQueueMaxWaitingTsFileSize();
   }
 
+  public boolean isSkipFailedTableSchemaCheck() {
+    return COMMON_CONFIG.isSkipFailedTableSchemaCheck();
+  }
+
   /////////////////////////////// Subtask Executor 
///////////////////////////////
 
   public int getPipeSubtaskExecutorMaxThreadNum() {
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
index 1beb4c67e25..2a170b6b8d1 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
@@ -278,6 +278,11 @@ public class PipeDescriptor {
             properties.getProperty(
                 "pipe_realTime_queue_max_waiting_tsFile_size",
                 
String.valueOf(config.getPipeRealTimeQueueMaxWaitingTsFileSize()))));
+    config.setSkipFailedTableSchemaCheck(
+        Boolean.parseBoolean(
+            properties.getProperty(
+                "skip_failed_table_schema_check",
+                String.valueOf(config.isSkipFailedTableSchemaCheck()))));
     config.setPipeSubtaskExecutorBasicCheckPointIntervalByConsumedEventCount(
         Integer.parseInt(
             properties.getProperty(

Reply via email to