This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch tsfile-tablet-converter
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/tsfile-tablet-converter by 
this push:
     new f72adfa4b79 fix
f72adfa4b79 is described below

commit f72adfa4b794ee5399c6a032878c968c5d71d384
Author: Steve Yurong Su <[email protected]>
AuthorDate: Mon Aug 19 20:41:46 2024 +0800

    fix
---
 .../scan/TsFileInsertionScanDataContainer.java     | 28 ++++++++++++++++
 ...peStatementDataTypeConvertExecutionVisitor.java | 38 +++++++++++++++++++++-
 2 files changed, 65 insertions(+), 1 deletion(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java
index 830059fe058..896dc756fe1 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java
@@ -136,6 +136,34 @@ public class TsFileInsertionScanDataContainer extends 
TsFileInsertionDataContain
         };
   }
 
+  public Iterable<Tablet> toTablets() {
+    return () ->
+        new Iterator<Tablet>() {
+          @Override
+          public boolean hasNext() {
+            return Objects.nonNull(chunkReader);
+          }
+
+          @Override
+          public Tablet next() {
+            if (!hasNext()) {
+              close();
+              throw new NoSuchElementException();
+            }
+
+            final Tablet tablet = getNextTablet();
+            final boolean hasNext = hasNext();
+            try {
+              return tablet;
+            } finally {
+              if (!hasNext) {
+                close();
+              }
+            }
+          }
+        };
+  }
+
   private Tablet getNextTablet() {
     try {
       final Tablet tablet =
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementDataTypeConvertExecutionVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementDataTypeConvertExecutionVisitor.java
index 69f456552f7..b7b82660fc8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementDataTypeConvertExecutionVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementDataTypeConvertExecutionVisitor.java
@@ -20,7 +20,10 @@
 package org.apache.iotdb.db.pipe.receiver.visitor;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.pipe.pattern.IoTDBPipePattern;
 import org.apache.iotdb.db.exception.metadata.DataTypeMismatchException;
+import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletRawReq;
+import 
org.apache.iotdb.db.pipe.event.common.tsfile.container.scan.TsFileInsertionScanDataContainer;
 import 
org.apache.iotdb.db.pipe.receiver.transform.statement.PipeConvertedInsertRowStatement;
 import 
org.apache.iotdb.db.pipe.receiver.transform.statement.PipeConvertedInsertTabletStatement;
 import org.apache.iotdb.db.queryengine.plan.statement.Statement;
@@ -34,9 +37,12 @@ import 
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement
 import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
 import org.apache.iotdb.rpc.TSStatusCode;
 
+import org.apache.commons.io.FileUtils;
+import org.apache.tsfile.write.record.Tablet;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.File;
 import java.util.Optional;
 import java.util.stream.Collectors;
 
@@ -80,7 +86,37 @@ public class PipeStatementDataTypeConvertExecutionVisitor
       final LoadTsFileStatement loadTsFileStatement, final TSStatus status) {
     // TODO: judge if the exception is caused by data type mismatch
     // TODO: convert the data type of the statement
-    return visitStatement(loadTsFileStatement, status);
+
+    for (final File file : loadTsFileStatement.getTsFiles()) {
+      try (final TsFileInsertionScanDataContainer container =
+          new TsFileInsertionScanDataContainer(
+              file, new IoTDBPipePattern(null), Long.MIN_VALUE, 
Long.MAX_VALUE, null, null)) {
+        for (final Tablet tablet : container.toTablets()) {
+          final TSStatus result =
+              statementExecutor.execute(
+                  new PipeConvertedInsertTabletStatement(
+                      PipeTransferTabletRawReq.toTPipeTransferRawReq(tablet, 
false)
+                          .constructStatement()));
+          // TODO: handle memory not enough
+          if (!(result.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
+              || result.getCode() == 
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode())) {
+            return Optional.empty();
+          }
+        }
+      } catch (Exception e) {
+        LOGGER.warn(
+            "Failed to scan TsFile for data type conversion. 
LoadTsFileStatement: {}",
+            loadTsFileStatement,
+            e);
+        return Optional.empty();
+      }
+    }
+
+    if (loadTsFileStatement.isDeleteAfterLoad()) {
+      loadTsFileStatement.getTsFiles().forEach(FileUtils::deleteQuietly);
+    }
+
+    return Optional.of(new 
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
   }
 
   @Override

Reply via email to