This is an automated email from the ASF dual-hosted git repository.

jt2594838 pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new abcc51ca992 Fix pipe receiver type conversion load path (#17849) 
(#17873)
abcc51ca992 is described below

commit abcc51ca99272eefc3a492ce5c85f75061691ac4
Author: Caideyipi <[email protected]>
AuthorDate: Tue Jun 9 16:49:32 2026 +0800

    Fix pipe receiver type conversion load path (#17849) (#17873)
    
    Backport receiver-side tree-model changes to dev/1.3.
    
    The 2.x table-model load analyzer changes and table/dual ITs are not 
applicable to dev/1.3.
    
    (cherry picked from commit 90055d55b6e1166580824e5cbed77a7c253ef514)
---
 .../protocol/thrift/IoTDBDataNodeReceiver.java     | 33 ++++++++++++++---
 .../protocol/thrift/IoTDBDataNodeReceiverTest.java | 43 +++++++++++++++++++---
 2 files changed, 65 insertions(+), 11 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
index 1ec4d6d53b7..8a5c4ad060f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
@@ -501,7 +501,7 @@ public class IoTDBDataNodeReceiver extends 
IoTDBFileReceiver {
         dataBaseName,
         LoadTsFileStatement.getDatabaseLevelByTreeDatabase(dataBaseName),
         shouldConvertDataTypeOnTypeMismatch,
-        validateTsFile,
+        validateTsFile || shouldConvertDataTypeOnTypeMismatch,
         null,
         shouldMarkAsPipeRequest);
   }
@@ -509,16 +509,23 @@ public class IoTDBDataNodeReceiver extends 
IoTDBFileReceiver {
   private TSStatus loadTsFileSync(final String dataBaseName, final String 
fileAbsolutePath)
       throws FileNotFoundException {
     return executeStatementAndClassifyExceptions(
-        buildLoadTsFileStatementForSync(dataBaseName, fileAbsolutePath, 
validateTsFile.get()));
+        buildLoadTsFileStatementForSync(
+            dataBaseName,
+            fileAbsolutePath,
+            validateTsFile.get(),
+            shouldConvertDataTypeOnTypeMismatch));
   }
 
   static LoadTsFileStatement buildLoadTsFileStatementForSync(
-      final String dataBaseName, final String fileAbsolutePath, final boolean 
validateTsFile)
+      final String dataBaseName,
+      final String fileAbsolutePath,
+      final boolean validateTsFile,
+      final boolean shouldConvertDataTypeOnTypeMismatch)
       throws FileNotFoundException {
     final LoadTsFileStatement statement = 
LoadTsFileStatement.createUnchecked(fileAbsolutePath);
     statement.setDeleteAfterLoad(true);
-    statement.setConvertOnTypeMismatch(true);
-    statement.setVerifySchema(validateTsFile);
+    statement.setConvertOnTypeMismatch(shouldConvertDataTypeOnTypeMismatch);
+    statement.setVerifySchema(validateTsFile || 
shouldConvertDataTypeOnTypeMismatch);
     statement.setAutoCreateDatabase(
         IoTDBDescriptor.getInstance().getConfig().isAutoCreateSchemaEnabled());
     statement.setDatabase(dataBaseName);
@@ -769,9 +776,23 @@ public class IoTDBDataNodeReceiver extends 
IoTDBFileReceiver {
           TSStatusCode.PIPE_TRANSFER_EXECUTE_STATEMENT_ERROR, "Execute null 
statement.");
     }
 
+    // Execute insert statements through the conversion wrapper first to avoid 
writing a partial
+    // row/tablet before the type mismatch is converted.
+    if (shouldConvertDataTypeOnTypeMismatch && statement instanceof 
InsertBaseStatement) {
+      final Optional<TSStatus> convertedStatus =
+          statement.accept(
+              statementDataTypeConvertExecutionVisitor,
+              new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
+      if (convertedStatus.isPresent()) {
+        return convertedStatus.get();
+      }
+    }
+
     final TSStatus status = executeStatement(statement);
 
-    // Try to convert the data type and the status code is not success
+    // Try to convert data type if the status code is not success. Insert 
statements normally return
+    // above after the first converted execution. The retry path is kept for 
load and fallback
+    // cases.
     return shouldConvertDataTypeOnTypeMismatch
             && ((statement instanceof InsertBaseStatement
                     && ((InsertBaseStatement) 
statement).hasFailedMeasurements())
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiverTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiverTest.java
index f41c44763f9..1e279a18feb 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiverTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiverTest.java
@@ -39,7 +39,7 @@ public class IoTDBDataNodeReceiverTest {
     try {
       final LoadTsFileStatement statement =
           IoTDBDataNodeReceiver.buildLoadTsFileStatementForSync(
-              "root.test.sg_0", tsFile.toString(), true);
+              "root.test.sg_0", tsFile.toString(), true, true);
 
       Assert.assertEquals("root.test.sg_0", statement.getDatabase());
       Assert.assertEquals(2, statement.getDatabaseLevel());
@@ -54,16 +54,17 @@ public class IoTDBDataNodeReceiverTest {
     try {
       final Map<String, String> attributes =
           IoTDBDataNodeReceiver.buildLoadTsFileAttributesForAsync(
-              "root.test.sg_0", true, true, true);
+              "root.test.sg_0", true, false, true);
 
       Assert.assertEquals(
           "root.test.sg_0", 
attributes.get(LoadTsFileConfigurator.DATABASE_NAME_KEY));
       Assert.assertEquals("2", 
attributes.get(LoadTsFileConfigurator.DATABASE_LEVEL_KEY));
 
       final LoadTsFileStatement statement = 
LoadTsFileStatement.createUnchecked(tsFile.toString());
-      ActiveLoadPathHelper.applyAttributesToStatement(attributes, statement, 
true);
+      ActiveLoadPathHelper.applyAttributesToStatement(attributes, statement, 
false);
       Assert.assertEquals("root.test.sg_0", statement.getDatabase());
       Assert.assertEquals(2, statement.getDatabaseLevel());
+      Assert.assertTrue(statement.isVerifySchema());
     } finally {
       Files.deleteIfExists(tsFile);
     }
@@ -75,7 +76,8 @@ public class IoTDBDataNodeReceiverTest {
     final Path tsFile = 
Files.createTempFile("pipe-load-default-database-level", ".tsfile");
     try {
       final LoadTsFileStatement statement =
-          IoTDBDataNodeReceiver.buildLoadTsFileStatementForSync(null, 
tsFile.toString(), true);
+          IoTDBDataNodeReceiver.buildLoadTsFileStatementForSync(
+              null, tsFile.toString(), true, true);
 
       Assert.assertNull(statement.getDatabase());
       Assert.assertEquals(
@@ -92,7 +94,7 @@ public class IoTDBDataNodeReceiverTest {
     try {
       final LoadTsFileStatement statement =
           IoTDBDataNodeReceiver.buildLoadTsFileStatementForSync(
-              "root.test.sg_0", tsFile.toString(), true);
+              "root.test.sg_0", tsFile.toString(), true, true);
       final long receiverId = System.nanoTime();
       final Exception exception = new RuntimeException("repeated receiver 
exception " + receiverId);
 
@@ -107,4 +109,35 @@ public class IoTDBDataNodeReceiverTest {
       Files.deleteIfExists(tsFile);
     }
   }
+
+  @Test
+  public void testLoadTsFileSyncStatementVerifiesSchemaWhenConvertingType() 
throws Exception {
+    final Path tsFile = 
Files.createTempFile("pipe-load-convert-verify-schema", ".tsfile");
+    try {
+      final LoadTsFileStatement statement =
+          IoTDBDataNodeReceiver.buildLoadTsFileStatementForSync(
+              "root.test.sg_0", tsFile.toString(), false, true);
+
+      Assert.assertTrue(statement.isConvertOnTypeMismatch());
+      Assert.assertTrue(statement.isVerifySchema());
+    } finally {
+      Files.deleteIfExists(tsFile);
+    }
+  }
+
+  @Test
+  public void 
testLoadTsFileSyncStatementCanSkipVerifySchemaWhenNotConvertingType()
+      throws Exception {
+    final Path tsFile = 
Files.createTempFile("pipe-load-no-convert-no-verify-schema", ".tsfile");
+    try {
+      final LoadTsFileStatement statement =
+          IoTDBDataNodeReceiver.buildLoadTsFileStatementForSync(
+              "root.test.sg_0", tsFile.toString(), false, false);
+
+      Assert.assertFalse(statement.isConvertOnTypeMismatch());
+      Assert.assertFalse(statement.isVerifySchema());
+    } finally {
+      Files.deleteIfExists(tsFile);
+    }
+  }
 }

Reply via email to