This is an automated email from the ASF dual-hosted git repository.
jt2594838 pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new abcc51ca992 Fix pipe receiver type conversion load path (#17849)
(#17873)
abcc51ca992 is described below
commit abcc51ca99272eefc3a492ce5c85f75061691ac4
Author: Caideyipi <[email protected]>
AuthorDate: Tue Jun 9 16:49:32 2026 +0800
Fix pipe receiver type conversion load path (#17849) (#17873)
Backport receiver-side tree-model changes to dev/1.3.
The 2.x table-model load analyzer changes and table/dual ITs are not
applicable to dev/1.3.
(cherry picked from commit 90055d55b6e1166580824e5cbed77a7c253ef514)
---
.../protocol/thrift/IoTDBDataNodeReceiver.java | 33 ++++++++++++++---
.../protocol/thrift/IoTDBDataNodeReceiverTest.java | 43 +++++++++++++++++++---
2 files changed, 65 insertions(+), 11 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
index 1ec4d6d53b7..8a5c4ad060f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
@@ -501,7 +501,7 @@ public class IoTDBDataNodeReceiver extends
IoTDBFileReceiver {
dataBaseName,
LoadTsFileStatement.getDatabaseLevelByTreeDatabase(dataBaseName),
shouldConvertDataTypeOnTypeMismatch,
- validateTsFile,
+ validateTsFile || shouldConvertDataTypeOnTypeMismatch,
null,
shouldMarkAsPipeRequest);
}
@@ -509,16 +509,23 @@ public class IoTDBDataNodeReceiver extends
IoTDBFileReceiver {
private TSStatus loadTsFileSync(final String dataBaseName, final String
fileAbsolutePath)
throws FileNotFoundException {
return executeStatementAndClassifyExceptions(
- buildLoadTsFileStatementForSync(dataBaseName, fileAbsolutePath,
validateTsFile.get()));
+ buildLoadTsFileStatementForSync(
+ dataBaseName,
+ fileAbsolutePath,
+ validateTsFile.get(),
+ shouldConvertDataTypeOnTypeMismatch));
}
static LoadTsFileStatement buildLoadTsFileStatementForSync(
- final String dataBaseName, final String fileAbsolutePath, final boolean
validateTsFile)
+ final String dataBaseName,
+ final String fileAbsolutePath,
+ final boolean validateTsFile,
+ final boolean shouldConvertDataTypeOnTypeMismatch)
throws FileNotFoundException {
final LoadTsFileStatement statement =
LoadTsFileStatement.createUnchecked(fileAbsolutePath);
statement.setDeleteAfterLoad(true);
- statement.setConvertOnTypeMismatch(true);
- statement.setVerifySchema(validateTsFile);
+ statement.setConvertOnTypeMismatch(shouldConvertDataTypeOnTypeMismatch);
+ statement.setVerifySchema(validateTsFile ||
shouldConvertDataTypeOnTypeMismatch);
statement.setAutoCreateDatabase(
IoTDBDescriptor.getInstance().getConfig().isAutoCreateSchemaEnabled());
statement.setDatabase(dataBaseName);
@@ -769,9 +776,23 @@ public class IoTDBDataNodeReceiver extends
IoTDBFileReceiver {
TSStatusCode.PIPE_TRANSFER_EXECUTE_STATEMENT_ERROR, "Execute null
statement.");
}
+ // Execute insert statements through the conversion wrapper first to avoid
writing a partial
+ // row/tablet before the type mismatch is converted.
+ if (shouldConvertDataTypeOnTypeMismatch && statement instanceof
InsertBaseStatement) {
+ final Optional<TSStatus> convertedStatus =
+ statement.accept(
+ statementDataTypeConvertExecutionVisitor,
+ new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
+ if (convertedStatus.isPresent()) {
+ return convertedStatus.get();
+ }
+ }
+
final TSStatus status = executeStatement(statement);
- // Try to convert the data type and the status code is not success
+ // Try to convert data type if the status code is not success. Insert
statements normally return
+ // above after the first converted execution. The retry path is kept for
load and fallback
+ // cases.
return shouldConvertDataTypeOnTypeMismatch
&& ((statement instanceof InsertBaseStatement
&& ((InsertBaseStatement)
statement).hasFailedMeasurements())
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiverTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiverTest.java
index f41c44763f9..1e279a18feb 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiverTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiverTest.java
@@ -39,7 +39,7 @@ public class IoTDBDataNodeReceiverTest {
try {
final LoadTsFileStatement statement =
IoTDBDataNodeReceiver.buildLoadTsFileStatementForSync(
- "root.test.sg_0", tsFile.toString(), true);
+ "root.test.sg_0", tsFile.toString(), true, true);
Assert.assertEquals("root.test.sg_0", statement.getDatabase());
Assert.assertEquals(2, statement.getDatabaseLevel());
@@ -54,16 +54,17 @@ public class IoTDBDataNodeReceiverTest {
try {
final Map<String, String> attributes =
IoTDBDataNodeReceiver.buildLoadTsFileAttributesForAsync(
- "root.test.sg_0", true, true, true);
+ "root.test.sg_0", true, false, true);
Assert.assertEquals(
"root.test.sg_0",
attributes.get(LoadTsFileConfigurator.DATABASE_NAME_KEY));
Assert.assertEquals("2",
attributes.get(LoadTsFileConfigurator.DATABASE_LEVEL_KEY));
final LoadTsFileStatement statement =
LoadTsFileStatement.createUnchecked(tsFile.toString());
- ActiveLoadPathHelper.applyAttributesToStatement(attributes, statement,
true);
+ ActiveLoadPathHelper.applyAttributesToStatement(attributes, statement,
false);
Assert.assertEquals("root.test.sg_0", statement.getDatabase());
Assert.assertEquals(2, statement.getDatabaseLevel());
+ Assert.assertTrue(statement.isVerifySchema());
} finally {
Files.deleteIfExists(tsFile);
}
@@ -75,7 +76,8 @@ public class IoTDBDataNodeReceiverTest {
final Path tsFile =
Files.createTempFile("pipe-load-default-database-level", ".tsfile");
try {
final LoadTsFileStatement statement =
- IoTDBDataNodeReceiver.buildLoadTsFileStatementForSync(null,
tsFile.toString(), true);
+ IoTDBDataNodeReceiver.buildLoadTsFileStatementForSync(
+ null, tsFile.toString(), true, true);
Assert.assertNull(statement.getDatabase());
Assert.assertEquals(
@@ -92,7 +94,7 @@ public class IoTDBDataNodeReceiverTest {
try {
final LoadTsFileStatement statement =
IoTDBDataNodeReceiver.buildLoadTsFileStatementForSync(
- "root.test.sg_0", tsFile.toString(), true);
+ "root.test.sg_0", tsFile.toString(), true, true);
final long receiverId = System.nanoTime();
final Exception exception = new RuntimeException("repeated receiver
exception " + receiverId);
@@ -107,4 +109,35 @@ public class IoTDBDataNodeReceiverTest {
Files.deleteIfExists(tsFile);
}
}
+
+ @Test
+ public void testLoadTsFileSyncStatementVerifiesSchemaWhenConvertingType()
throws Exception {
+ final Path tsFile =
Files.createTempFile("pipe-load-convert-verify-schema", ".tsfile");
+ try {
+ final LoadTsFileStatement statement =
+ IoTDBDataNodeReceiver.buildLoadTsFileStatementForSync(
+ "root.test.sg_0", tsFile.toString(), false, true);
+
+ Assert.assertTrue(statement.isConvertOnTypeMismatch());
+ Assert.assertTrue(statement.isVerifySchema());
+ } finally {
+ Files.deleteIfExists(tsFile);
+ }
+ }
+
+ @Test
+ public void
testLoadTsFileSyncStatementCanSkipVerifySchemaWhenNotConvertingType()
+ throws Exception {
+ final Path tsFile =
Files.createTempFile("pipe-load-no-convert-no-verify-schema", ".tsfile");
+ try {
+ final LoadTsFileStatement statement =
+ IoTDBDataNodeReceiver.buildLoadTsFileStatementForSync(
+ "root.test.sg_0", tsFile.toString(), false, false);
+
+ Assert.assertFalse(statement.isConvertOnTypeMismatch());
+ Assert.assertFalse(statement.isVerifySchema());
+ } finally {
+ Files.deleteIfExists(tsFile);
+ }
+ }
}