This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch rc/1.3.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rc/1.3.3 by this push:
     new 64dd5f97c26 Pipe: Convert tsfile to tablets on data sync receiver when 
data type / aligned type mismatch detected (#13239) (#13245)
64dd5f97c26 is described below

commit 64dd5f97c260550d6cc3d7ef6b15ab11e7452844
Author: Steve Yurong Su <[email protected]>
AuthorDate: Wed Aug 21 11:08:08 2024 +0800

    Pipe: Convert tsfile to tablets on data sync receiver when data type / 
aligned type mismatch detected (#13239) (#13245)
    
    (cherry picked from commit 1ee4744718fa8832c57e76b0ad4187a34930837c)
---
 .../scan/TsFileInsertionScanDataContainer.java     | 28 +++++++++++
 ...peStatementDataTypeConvertExecutionVisitor.java | 54 ++++++++++++++++++++--
 2 files changed, 79 insertions(+), 3 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java
index e2995f61fdc..61206afe0ea 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java
@@ -143,6 +143,34 @@ public class TsFileInsertionScanDataContainer extends 
TsFileInsertionDataContain
         };
   }
 
+  public Iterable<Tablet> toTablets() {
+    return () ->
+        new Iterator<Tablet>() {
+          @Override
+          public boolean hasNext() {
+            return Objects.nonNull(chunkReader);
+          }
+
+          @Override
+          public Tablet next() {
+            if (!hasNext()) {
+              close();
+              throw new NoSuchElementException();
+            }
+
+            final Tablet tablet = getNextTablet();
+            final boolean hasNext = hasNext();
+            try {
+              return tablet;
+            } finally {
+              if (!hasNext) {
+                close();
+              }
+            }
+          }
+        };
+  }
+
   private Tablet getNextTablet() {
     try {
       final Tablet tablet =
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementDataTypeConvertExecutionVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementDataTypeConvertExecutionVisitor.java
index 69f456552f7..9f669a75b8b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementDataTypeConvertExecutionVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementDataTypeConvertExecutionVisitor.java
@@ -20,7 +20,10 @@
 package org.apache.iotdb.db.pipe.receiver.visitor;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.pipe.pattern.IoTDBPipePattern;
 import org.apache.iotdb.db.exception.metadata.DataTypeMismatchException;
+import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletRawReq;
+import 
org.apache.iotdb.db.pipe.event.common.tsfile.container.scan.TsFileInsertionScanDataContainer;
 import 
org.apache.iotdb.db.pipe.receiver.transform.statement.PipeConvertedInsertRowStatement;
 import 
org.apache.iotdb.db.pipe.receiver.transform.statement.PipeConvertedInsertTabletStatement;
 import org.apache.iotdb.db.queryengine.plan.statement.Statement;
@@ -34,9 +37,12 @@ import 
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement
 import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
 import org.apache.iotdb.rpc.TSStatusCode;
 
+import org.apache.commons.io.FileUtils;
+import org.apache.tsfile.write.record.Tablet;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.File;
 import java.util.Optional;
 import java.util.stream.Collectors;
 
@@ -78,9 +84,51 @@ public class PipeStatementDataTypeConvertExecutionVisitor
   @Override
   public Optional<TSStatus> visitLoadFile(
       final LoadTsFileStatement loadTsFileStatement, final TSStatus status) {
-    // TODO: judge if the exception is caused by data type mismatch
-    // TODO: convert the data type of the statement
-    return visitStatement(loadTsFileStatement, status);
+    if (status.getCode() != TSStatusCode.LOAD_FILE_ERROR.getStatusCode()) {
+      return Optional.empty();
+    }
+
+    LOGGER.warn(
+        "Data type mismatch detected (TSStatus: {}) for LoadTsFileStatement: 
{}. Start data type conversion.",
+        status,
+        loadTsFileStatement);
+
+    for (final File file : loadTsFileStatement.getTsFiles()) {
+      try (final TsFileInsertionScanDataContainer container =
+          new TsFileInsertionScanDataContainer(
+              file, new IoTDBPipePattern(null), Long.MIN_VALUE, 
Long.MAX_VALUE, null, null)) {
+        for (final Tablet tablet : container.toTablets()) {
+          final PipeConvertedInsertTabletStatement statement =
+              new PipeConvertedInsertTabletStatement(
+                  PipeTransferTabletRawReq.toTPipeTransferRawReq(tablet, false)
+                      .constructStatement());
+          TSStatus result = statementExecutor.execute(statement);
+
+          // Retry once if the write process is rejected
+          if (result.getCode() == 
TSStatusCode.WRITE_PROCESS_REJECT.getStatusCode()) {
+            result = statementExecutor.execute(statement);
+          }
+
+          if (!(result.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
+              || result.getCode() == 
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode())) {
+            return Optional.empty();
+          }
+        }
+      } catch (final Exception e) {
+        LOGGER.warn(
+            "Failed to convert data type for LoadTsFileStatement: {}.", 
loadTsFileStatement, e);
+        return Optional.empty();
+      }
+    }
+
+    if (loadTsFileStatement.isDeleteAfterLoad()) {
+      loadTsFileStatement.getTsFiles().forEach(FileUtils::deleteQuietly);
+    }
+
+    LOGGER.warn(
+        "Data type conversion for LoadTsFileStatement {} is successful.", 
loadTsFileStatement);
+
+    return Optional.of(new 
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
   }
 
   @Override

Reply via email to