This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 677f9b8e164 Load: Added "skipFailedTableSchemaCheck" parameter (#16522)
677f9b8e164 is described below
commit 677f9b8e16443d07dca66d888b9a862ed20165d5
Author: Caideyipi <[email protected]>
AuthorDate: Mon Sep 29 20:00:50 2025 +0800
Load: Added "skipFailedTableSchemaCheck" parameter (#16522)
* fix
fix
* tested
---
.../analyze/load/LoadTsFileTableSchemaCache.java | 14 +++++++++++++-
.../fetcher/TableHeaderSchemaValidator.java | 22 +++++++++++-----------
.../apache/iotdb/commons/conf/CommonConfig.java | 13 +++++++++++++
.../iotdb/commons/pipe/config/PipeConfig.java | 4 ++++
.../iotdb/commons/pipe/config/PipeDescriptor.java | 5 +++++
5 files changed, 46 insertions(+), 12 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileTableSchemaCache.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileTableSchemaCache.java
index abf0b0afe5b..18b5d4cbc32 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileTableSchemaCache.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileTableSchemaCache.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.queryengine.plan.analyze.load;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.path.PatternTreeMap;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
import org.apache.iotdb.confignode.rpc.thrift.TDatabaseSchema;
import org.apache.iotdb.db.auth.AuthorityChecker;
@@ -146,7 +147,18 @@ public class LoadTsFileTableSchemaCache {
e);
}
- createTableAndDatabaseIfNecessary(device.getTableName());
+ try {
+ createTableAndDatabaseIfNecessary(device.getTableName());
+ } catch (final Exception e) {
+ if (PipeConfig.getInstance().isSkipFailedTableSchemaCheck()) {
+ LOGGER.info(
+ "Failed to check table schema, will skip because
skipFailedTableSchemaCheck is set to true, message: {}",
+ e.getMessage());
+ } else {
+ throw e;
+ }
+ }
+
// TODO: add permission check and record auth cost
addDevice(device);
if (shouldFlushDevices()) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/TableHeaderSchemaValidator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/TableHeaderSchemaValidator.java
index 29ff418d4b5..032b0d91cfd 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/TableHeaderSchemaValidator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/TableHeaderSchemaValidator.java
@@ -132,37 +132,37 @@ public class TableHeaderSchemaValidator {
DataNodeTreeViewSchemaUtils.checkTableInWrite(database, table);
// If table with this name already exists and isStrictTagColumn is true,
make sure the
// existing
- // id columns are the prefix of the incoming id columns, or vice versa
+ // id columns are the prefix of the incoming tag columns, or vice versa
if (isStrictTagColumn) {
final List<TsTableColumnSchema> realTagColumns =
table.getTagColumnSchemaList();
final List<ColumnSchema> incomingTagColumns =
tableSchema.getTagColumns();
if (realTagColumns.size() <= incomingTagColumns.size()) {
- // When incoming table has more ID columns, the existing id columns
- // should be the prefix of the incoming id columns (or equal)
+ // When incoming table has more TAG columns, the existing tag columns
+ // should be the prefix of the incoming tag columns (or equal)
for (int indexReal = 0; indexReal < realTagColumns.size();
indexReal++) {
final String tagName =
realTagColumns.get(indexReal).getColumnName();
final int indexIncoming =
tableSchema.getIndexAmongTagColumns(tagName);
if (indexIncoming != indexReal) {
throw new LoadAnalyzeTableColumnDisorderException(
String.format(
- "Can not create table because incoming table has no less
id columns than existing table, "
- + "and the existing id columns are not the prefix of
the incoming id columns. "
- + "Existing id column: %s, index in existing table:
%s, index in incoming table: %s",
+ "Can not create table because incoming table has no less
tag columns than existing table, "
+ + "and the existing tag columns are not the prefix
of the incoming tag columns. "
+ + "Existing tag column: %s, index in existing table:
%s, index in incoming table: %s",
tagName, indexReal, indexIncoming));
}
}
} else {
- // When existing table has more ID columns, the incoming id columns
- // should be the prefix of the existing id columns
+ // When existing table has more TAG columns, the incoming tag columns
+ // should be the prefix of the existing tag columns
for (int indexIncoming = 0; indexIncoming <
incomingTagColumns.size(); indexIncoming++) {
final String tagName =
incomingTagColumns.get(indexIncoming).getName();
final int indexReal = table.getTagColumnOrdinal(tagName);
if (indexReal != indexIncoming) {
throw new LoadAnalyzeTableColumnDisorderException(
String.format(
- "Can not create table because existing table has more id
columns than incoming table, "
- + "and the incoming id columns are not the prefix of
the existing id columns. "
- + "Incoming id column: %s, index in existing table:
%s, index in incoming table: %s",
+ "Can not create table because existing table has more
tag columns than incoming table, "
+ + "and the incoming tag columns are not the prefix
of the existing tag columns. "
+ + "Incoming tag column: %s, index in existing table:
%s, index in incoming table: %s",
tagName, indexReal, indexIncoming));
}
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index a960584c66c..17b0ca57746 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -209,6 +209,7 @@ public class CommonConfig {
// Sequentially poll the tsFile by default
private int pipeRealTimeQueuePollHistoricalTsFileThreshold = 1;
private int pipeRealTimeQueueMaxWaitingTsFileSize = 1;
+ private boolean skipFailedTableSchemaCheck = true;
/** The maximum number of threads that can be used to execute subtasks in
PipeSubtaskExecutor. */
private int pipeSubtaskExecutorMaxThreadNum =
@@ -1522,6 +1523,18 @@ public class CommonConfig {
pipeRealTimeQueueMaxWaitingTsFileSize);
}
+ public boolean isSkipFailedTableSchemaCheck() {
+ return skipFailedTableSchemaCheck;
+ }
+
+ public void setSkipFailedTableSchemaCheck(boolean
skipFailedTableSchemaCheck) {
+ if (this.skipFailedTableSchemaCheck == skipFailedTableSchemaCheck) {
+ return;
+ }
+ this.skipFailedTableSchemaCheck = skipFailedTableSchemaCheck;
+ logger.info("skipFailedTableSchemaCheck is set to {}.",
skipFailedTableSchemaCheck);
+ }
+
public void setPipeAirGapReceiverEnabled(boolean pipeAirGapReceiverEnabled) {
if (pipeAirGapReceiverEnabled == this.pipeAirGapReceiverEnabled) {
return;
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index 329d8815ac8..37d9af0f3e6 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -119,6 +119,10 @@ public class PipeConfig {
return COMMON_CONFIG.getPipeRealTimeQueueMaxWaitingTsFileSize();
}
+ public boolean isSkipFailedTableSchemaCheck() {
+ return COMMON_CONFIG.isSkipFailedTableSchemaCheck();
+ }
+
/////////////////////////////// Subtask Executor
///////////////////////////////
public int getPipeSubtaskExecutorMaxThreadNum() {
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
index 1beb4c67e25..2a170b6b8d1 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
@@ -278,6 +278,11 @@ public class PipeDescriptor {
properties.getProperty(
"pipe_realTime_queue_max_waiting_tsFile_size",
String.valueOf(config.getPipeRealTimeQueueMaxWaitingTsFileSize()))));
+ config.setSkipFailedTableSchemaCheck(
+ Boolean.parseBoolean(
+ properties.getProperty(
+ "skip_failed_table_schema_check",
+ String.valueOf(config.isSkipFailedTableSchemaCheck()))));
config.setPipeSubtaskExecutorBasicCheckPointIntervalByConsumedEventCount(
Integer.parseInt(
properties.getProperty(