This is an automated email from the ASF dual-hosted git repository.
jt2594838 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new e9584eced72 Pipe: Fixed the remaining correctness bug for PR 17661 &&
the replication configuration of IoTDBPipeReceiverAutoCreateDisabledIT (#17678)
e9584eced72 is described below
commit e9584eced726a39733b68d42bc5bb02857d91fad
Author: Caideyipi <[email protected]>
AuthorDate: Fri May 15 16:38:04 2026 +0800
Pipe: Fixed the remaining correctness bug for PR 17661 && the replication
configuration of IoTDBPipeReceiverAutoCreateDisabledIT (#17678)
* Update IoTDBPipeReceiverAutoCreateDisabledIT.java
* Fix
* Update IoTDBPipeReceiverAutoCreateDisabledIT.java
* Update IoTDBDataNodeReceiver.java
* Update IoTDBDataNodeReceiver.java
* Fix
---
.../IoTDBPipeReceiverAutoCreateDisabledIT.java | 17 +++++++-
.../protocol/thrift/IoTDBDataNodeReceiver.java | 48 +++++++++++++++-------
.../protocol/airgap/IoTDBDataRegionAirGapSink.java | 4 +-
.../thrift/async/IoTDBDataRegionAsyncSink.java | 2 +-
.../thrift/sync/IoTDBDataRegionSyncSink.java | 2 +-
5 files changed, 52 insertions(+), 21 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeReceiverAutoCreateDisabledIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeReceiverAutoCreateDisabledIT.java
index 6d4f2b80b33..72646f81b86 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeReceiverAutoCreateDisabledIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeReceiverAutoCreateDisabledIT.java
@@ -57,7 +57,17 @@ public class IoTDBPipeReceiverAutoCreateDisabledIT extends
AbstractPipeDualTreeM
@Override
protected void setupConfig() {
super.setupConfig();
-
receiverEnv.getConfig().getCommonConfig().setAutoCreateSchemaEnabled(false);
+ senderEnv
+ .getConfig()
+ .getCommonConfig()
+ .setDataReplicationFactor(1)
+ .setSchemaReplicationFactor(1);
+ receiverEnv
+ .getConfig()
+ .getCommonConfig()
+ .setAutoCreateSchemaEnabled(false)
+ .setDataReplicationFactor(1)
+ .setSchemaReplicationFactor(1);
}
@Test
@@ -69,10 +79,12 @@ public class IoTDBPipeReceiverAutoCreateDisabledIT extends
AbstractPipeDualTreeM
final String createPipeSql =
String.format(
- "create pipe test with source
('inclusion'='all','source.realtime.mode'='stream','source.realtime.enable'='true')
"
+ "create pipe test with source ('inclusion'='all') "
+ "with sink ('sink'='iotdb-thrift-sink',
'sink.node-urls'='%s');",
receiverEnv.getDataNodeWrapper(0).getIpAndPortString());
final String createDatabaseSql = "create database root.test.sg;";
+ final String createSecondDatabaseSql =
+ "create database
root.test.ABCDEFGHIJKLMNOPQRSTUVWXYZ_abcdefghijklmnopqrstuvwxyz1;";
final String createFirstTimeSeriesSql =
"create timeseries
root.test.sg.`1~!@#$%^&*()_+=:'\"/|[]{}`.`~!@#$%^&*()_+=:'\"/|[]{}` float;";
final String insertFirstSql =
@@ -95,6 +107,7 @@ public class IoTDBPipeReceiverAutoCreateDisabledIT extends
AbstractPipeDualTreeM
statement.execute(createFirstTimeSeriesSql);
statement.execute(insertFirstSql);
final QueryResult firstQueryResult = queryForResult(statement,
firstSelectSql);
+ statement.execute(createSecondDatabaseSql);
statement.execute(createSecondTimeSeriesSql);
statement.execute(insertSecondSql);
final QueryResult secondQueryResult = queryForResult(statement,
secondSelectSql);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
index dd6e684031c..dbf77339c0f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
@@ -871,9 +871,11 @@ public class IoTDBDataNodeReceiver extends
IoTDBFileReceiver {
// Judge which model the statement belongs to
final boolean isTableModelStatement;
final String databaseName;
- if (statement instanceof LoadTsFileStatement
- && ((LoadTsFileStatement) statement).getDatabase() != null) {
- isTableModelStatement = true;
+ if (statement instanceof LoadTsFileStatement) {
+ // Pipe receiver always constructs a tree-model LoadTsFileStatement. Its
database field is
+ // only an explicit database hint for table data or pipe-generated
tree-model loads, so it
+ // must not be used to route execution into the table-model pipeline.
+ isTableModelStatement = false;
databaseName = ((LoadTsFileStatement) statement).getDatabase();
} else if (statement instanceof InsertBaseStatement
&& ((InsertBaseStatement) statement).isWriteToTable()) {
@@ -925,18 +927,34 @@ public class IoTDBDataNodeReceiver extends
IoTDBFileReceiver {
// Try to convert data type if the statement is a tree model statement
// and the status code is not success
- return shouldConvertDataTypeOnTypeMismatch
- && ((statement instanceof InsertBaseStatement
- && ((InsertBaseStatement)
statement).hasFailedMeasurements())
- || (status.getCode() !=
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()
- && status.getCode() !=
TSStatusCode.SUCCESS_STATUS.getStatusCode()))
- ? (isTableModelStatement
- ? statement
- .accept(
- tableStatementDataTypeConvertExecutionVisitor, new
Pair<>(status, databaseName))
- .orElse(status)
- : statement.accept(treeStatementDataTypeConvertExecutionVisitor,
status).orElse(status))
- : status;
+ if (!shouldConvertDataTypeOnTypeMismatch
+ || !((statement instanceof InsertBaseStatement
+ && ((InsertBaseStatement) statement).hasFailedMeasurements())
+ || (status.getCode() !=
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()
+ && status.getCode() !=
TSStatusCode.SUCCESS_STATUS.getStatusCode()))) {
+ return status;
+ }
+
+ if (statement instanceof LoadTsFileStatement
+ && shouldUseTableModelVisitorForLoadStatement((LoadTsFileStatement)
statement)) {
+ return statement
+ .accept(tableStatementDataTypeConvertExecutionVisitor, new
Pair<>(status, databaseName))
+ .orElse(status);
+ }
+
+ return isTableModelStatement
+ ? statement
+ .accept(tableStatementDataTypeConvertExecutionVisitor, new
Pair<>(status, databaseName))
+ .orElse(status)
+ : statement.accept(treeStatementDataTypeConvertExecutionVisitor,
status).orElse(status);
+ }
+
+ private boolean shouldUseTableModelVisitorForLoadStatement(
+ final LoadTsFileStatement loadTsFileStatement) {
+ final List<Boolean> isTableModel = loadTsFileStatement.getIsTableModel();
+ return Objects.nonNull(isTableModel)
+ && !isTableModel.isEmpty()
+ && isTableModel.stream().allMatch(Boolean.TRUE::equals);
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java
index 7f904324bbb..64124c56256 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java
@@ -341,7 +341,7 @@ public class IoTDBDataRegionAirGapSink extends
IoTDBDataNodeAirGapSink {
tsFile.length(),
pipeTsFileInsertionEvent.isTableModelEvent()
? pipeTsFileInsertionEvent.getTableModelDatabaseName()
- : null))) {
+ : pipeTsFileInsertionEvent.getTreeModelDatabaseName()))) {
receiverStatusHandler.handle(
new
TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
.setMessage(errorMessage),
@@ -362,7 +362,7 @@ public class IoTDBDataRegionAirGapSink extends
IoTDBDataNodeAirGapSink {
tsFile.length(),
pipeTsFileInsertionEvent.isTableModelEvent()
? pipeTsFileInsertionEvent.getTableModelDatabaseName()
- : null))) {
+ : pipeTsFileInsertionEvent.getTreeModelDatabaseName()))) {
receiverStatusHandler.handle(
new
TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
.setMessage(errorMessage),
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
index 554c6c43dfb..97e9309a435 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
@@ -423,7 +423,7 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
&& clientManager.supportModsIfIsDataNodeReceiver(),
pipeTsFileInsertionEvent.isTableModelEvent()
? pipeTsFileInsertionEvent.getTableModelDatabaseName()
- : null);
+ : pipeTsFileInsertionEvent.getTreeModelDatabaseName());
transfer(pipeTransferTsFileHandler);
return true;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java
index e8c4420861c..4309e5dae4e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java
@@ -499,7 +499,7 @@ public class IoTDBDataRegionSyncSink extends
IoTDBDataNodeSyncSink {
pipeTsFileInsertionEvent.isWithMod() ?
pipeTsFileInsertionEvent.getModFile() : null,
pipeTsFileInsertionEvent.isTableModelEvent()
? pipeTsFileInsertionEvent.getTableModelDatabaseName()
- : null);
+ : pipeTsFileInsertionEvent.getTreeModelDatabaseName());
} finally {
pipeTsFileInsertionEvent.decreaseReferenceCount(
IoTDBDataRegionSyncSink.class.getName(), false);