This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 764cedcbfed Pipe: Fixed the potentially missing database auto-create
in receiver (#16529)
764cedcbfed is described below
commit 764cedcbfede63bb95da91f5cbfdd8d1dd5601ce
Author: Caideyipi <[email protected]>
AuthorDate: Tue Sep 30 13:49:10 2025 +0800
Pipe: Fixed the potentially missing database auto-create in receiver
(#16529)
* fix
* fix
---
.../pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java | 12 +++---------
.../java/org/apache/iotdb/commons/conf/CommonConfig.java | 2 +-
2 files changed, 4 insertions(+), 10 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
index a65ec2b266d..16f463f5f6f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
@@ -97,6 +97,7 @@ import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.pipe.PipeEnrichedStatement;
+import org.apache.iotdb.db.schemaengine.table.DataNodeTableCache;
import org.apache.iotdb.db.storageengine.load.active.ActiveLoadUtil;
import org.apache.iotdb.db.storageengine.rescon.disk.FolderManager;
import
org.apache.iotdb.db.storageengine.rescon.disk.strategy.DirectoryStrategyType;
@@ -126,7 +127,6 @@ import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
@@ -174,8 +174,6 @@ public class IoTDBDataNodeReceiver extends
IoTDBFileReceiver {
private final PipeTransferSliceReqHandler sliceReqHandler = new
PipeTransferSliceReqHandler();
- private static final Set<String> ALREADY_CREATED_TABLE_MODEL_DATABASES =
- ConcurrentHashMap.newKeySet();
private final SqlParser tableSqlParser = new SqlParser();
private static final SessionManager SESSION_MANAGER =
SessionManager.getInstance();
@@ -581,7 +579,7 @@ public class IoTDBDataNodeReceiver extends
IoTDBFileReceiver {
statement.setDeleteAfterLoad(true);
statement.setConvertOnTypeMismatch(true);
statement.setVerifySchema(validateTsFile.get());
- statement.setAutoCreateDatabase(false);
+ statement.setAutoCreateDatabase(true);
statement.setDatabase(dataBaseName);
return executeStatementAndClassifyExceptions(statement);
@@ -966,8 +964,6 @@ public class IoTDBDataNodeReceiver extends
IoTDBFileReceiver {
IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold())
.status;
} catch (final Exception e) {
- ALREADY_CREATED_TABLE_MODEL_DATABASES.remove(databaseName);
-
final Throwable rootCause = getRootCause(e);
if (rootCause.getMessage() != null
&& rootCause
@@ -997,7 +993,7 @@ public class IoTDBDataNodeReceiver extends
IoTDBFileReceiver {
}
private void autoCreateDatabaseIfNecessary(final String database) {
- if (ALREADY_CREATED_TABLE_MODEL_DATABASES.contains(database)
+ if (DataNodeTableCache.getInstance().isDatabaseExist(database)
||
!IoTDBDescriptor.getInstance().getConfig().isAutoCreateSchemaEnabled()) {
return;
}
@@ -1026,8 +1022,6 @@ public class IoTDBDataNodeReceiver extends
IoTDBFileReceiver {
}
throw new PipeException("Auto create database failed because: " +
e.getMessage());
}
-
- ALREADY_CREATED_TABLE_MODEL_DATABASES.add(database);
}
private TSStatus executeStatementForTreeModel(final Statement statement) {
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 17b0ca57746..2c6912bac38 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -209,7 +209,7 @@ public class CommonConfig {
// Sequentially poll the tsFile by default
private int pipeRealTimeQueuePollHistoricalTsFileThreshold = 1;
private int pipeRealTimeQueueMaxWaitingTsFileSize = 1;
- private boolean skipFailedTableSchemaCheck = true;
+ private boolean skipFailedTableSchemaCheck = false;
/** The maximum number of threads that can be used to execute subtasks in
PipeSubtaskExecutor. */
private int pipeSubtaskExecutorMaxThreadNum =