This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch rc/2.0.5
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rc/2.0.5 by this push:
new dedaa8f14af Load: Added "skipFailedTableSchemaCheck" parameter
(#16522) (#16525)
dedaa8f14af is described below
commit dedaa8f14afc5a87c0a199dff3ee7a3460a63e79
Author: Caideyipi <[email protected]>
AuthorDate: Mon Sep 29 20:05:40 2025 +0800
Load: Added "skipFailedTableSchemaCheck" parameter (#16522) (#16525)
* fix
fix
* tested
---
.../analyze/load/LoadTsFileTableSchemaCache.java | 14 +++++++++++++-
.../fetcher/TableHeaderSchemaValidator.java | 22 +++++++++++-----------
.../apache/iotdb/commons/conf/CommonConfig.java | 13 +++++++++++++
.../iotdb/commons/pipe/config/PipeConfig.java | 4 ++++
.../iotdb/commons/pipe/config/PipeDescriptor.java | 5 +++++
5 files changed, 46 insertions(+), 12 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileTableSchemaCache.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileTableSchemaCache.java
index 7ea5154714c..4b30421268c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileTableSchemaCache.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileTableSchemaCache.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.queryengine.plan.analyze.load;
import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
import org.apache.iotdb.confignode.rpc.thrift.TDatabaseSchema;
import org.apache.iotdb.db.conf.IoTDBConfig;
@@ -145,7 +146,18 @@ public class LoadTsFileTableSchemaCache {
e);
}
- createTableAndDatabaseIfNecessary(device.getTableName());
+ try {
+ createTableAndDatabaseIfNecessary(device.getTableName());
+ } catch (final Exception e) {
+ if (PipeConfig.getInstance().isSkipFailedTableSchemaCheck()) {
+ LOGGER.info(
+ "Failed to check table schema, will skip because
skipFailedTableSchemaCheck is set to true, message: {}",
+ e.getMessage());
+ } else {
+ throw e;
+ }
+ }
+
// TODO: add permission check and record auth cost
addDevice(device);
if (shouldFlushDevices()) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/TableHeaderSchemaValidator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/TableHeaderSchemaValidator.java
index 286a9d2dd70..5b8c546abb2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/TableHeaderSchemaValidator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/TableHeaderSchemaValidator.java
@@ -134,37 +134,37 @@ public class TableHeaderSchemaValidator {
DataNodeTreeViewSchemaUtils.checkTableInWrite(database, table);
// If table with this name already exists and isStrictTagColumn is true,
make sure the
// existing
- // id columns are the prefix of the incoming id columns, or vice versa
+ // id columns are the prefix of the incoming tag columns, or vice versa
if (isStrictTagColumn) {
final List<TsTableColumnSchema> realTagColumns =
table.getTagColumnSchemaList();
final List<ColumnSchema> incomingTagColumns =
tableSchema.getIdColumns();
if (realTagColumns.size() <= incomingTagColumns.size()) {
- // When incoming table has more ID columns, the existing id columns
- // should be the prefix of the incoming id columns (or equal)
+ // When incoming table has more TAG columns, the existing tag columns
+ // should be the prefix of the incoming tag columns (or equal)
for (int indexReal = 0; indexReal < realTagColumns.size();
indexReal++) {
final String tagName =
realTagColumns.get(indexReal).getColumnName();
final int indexIncoming =
tableSchema.getIndexAmongIdColumns(tagName);
if (indexIncoming != indexReal) {
throw new LoadAnalyzeTableColumnDisorderException(
String.format(
- "Can not create table because incoming table has no less
id columns than existing table, "
- + "and the existing id columns are not the prefix of
the incoming id columns. "
- + "Existing id column: %s, index in existing table:
%s, index in incoming table: %s",
+ "Can not create table because incoming table has no less
tag columns than existing table, "
+ + "and the existing tag columns are not the prefix
of the incoming tag columns. "
+ + "Existing tag column: %s, index in existing table:
%s, index in incoming table: %s",
tagName, indexReal, indexIncoming));
}
}
} else {
- // When existing table has more ID columns, the incoming id columns
- // should be the prefix of the existing id columns
+ // When existing table has more TAG columns, the incoming tag columns
+ // should be the prefix of the existing tag columns
for (int indexIncoming = 0; indexIncoming <
incomingTagColumns.size(); indexIncoming++) {
final String tagName =
incomingTagColumns.get(indexIncoming).getName();
final int indexReal = table.getTagColumnOrdinal(tagName);
if (indexReal != indexIncoming) {
throw new LoadAnalyzeTableColumnDisorderException(
String.format(
- "Can not create table because existing table has more id
columns than incoming table, "
- + "and the incoming id columns are not the prefix of
the existing id columns. "
- + "Incoming id column: %s, index in existing table:
%s, index in incoming table: %s",
+ "Can not create table because existing table has more
tag columns than incoming table, "
+ + "and the incoming tag columns are not the prefix
of the existing tag columns. "
+ + "Incoming tag column: %s, index in existing table:
%s, index in incoming table: %s",
tagName, indexReal, indexIncoming));
}
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 3031518edc3..205963a9dff 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -202,6 +202,7 @@ public class CommonConfig {
// Sequentially poll the tsFile by default
private int pipeRealTimeQueuePollHistoricalTsFileThreshold = 1;
private int pipeRealTimeQueueMaxWaitingTsFileSize = 1;
+ private boolean skipFailedTableSchemaCheck = true;
/** The maximum number of threads that can be used to execute subtasks in
PipeSubtaskExecutor. */
private int pipeSubtaskExecutorMaxThreadNum =
@@ -1423,6 +1424,18 @@ public class CommonConfig {
pipeRealTimeQueueMaxWaitingTsFileSize);
}
+ public boolean isSkipFailedTableSchemaCheck() {
+ return skipFailedTableSchemaCheck;
+ }
+
+ public void setSkipFailedTableSchemaCheck(boolean
skipFailedTableSchemaCheck) {
+ if (this.skipFailedTableSchemaCheck == skipFailedTableSchemaCheck) {
+ return;
+ }
+ this.skipFailedTableSchemaCheck = skipFailedTableSchemaCheck;
+ logger.info("skipFailedTableSchemaCheck is set to {}.",
skipFailedTableSchemaCheck);
+ }
+
public void setPipeAirGapReceiverEnabled(boolean pipeAirGapReceiverEnabled) {
if (pipeAirGapReceiverEnabled == this.pipeAirGapReceiverEnabled) {
return;
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index 7d1064c7462..7d248eb37fd 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -119,6 +119,10 @@ public class PipeConfig {
return COMMON_CONFIG.getPipeRealTimeQueueMaxWaitingTsFileSize();
}
+ public boolean isSkipFailedTableSchemaCheck() {
+ return COMMON_CONFIG.isSkipFailedTableSchemaCheck();
+ }
+
/////////////////////////////// Subtask Executor
///////////////////////////////
public int getPipeSubtaskExecutorMaxThreadNum() {
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
index 895b940ebf8..faad93bd7b2 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
@@ -278,6 +278,11 @@ public class PipeDescriptor {
properties.getProperty(
"pipe_realTime_queue_max_waiting_tsFile_size",
String.valueOf(config.getPipeRealTimeQueueMaxWaitingTsFileSize()))));
+ config.setSkipFailedTableSchemaCheck(
+ Boolean.parseBoolean(
+ properties.getProperty(
+ "skip_failed_table_schema_check",
+ String.valueOf(config.isSkipFailedTableSchemaCheck()))));
config.setPipeSubtaskExecutorBasicCheckPointIntervalByConsumedEventCount(
Integer.parseInt(
properties.getProperty(