This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch tsfile-tablet-converter
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/tsfile-tablet-converter by
this push:
new f72adfa4b79 fix
f72adfa4b79 is described below
commit f72adfa4b794ee5399c6a032878c968c5d71d384
Author: Steve Yurong Su <[email protected]>
AuthorDate: Mon Aug 19 20:41:46 2024 +0800
fix
---
.../scan/TsFileInsertionScanDataContainer.java | 28 ++++++++++++++++
...peStatementDataTypeConvertExecutionVisitor.java | 38 +++++++++++++++++++++-
2 files changed, 65 insertions(+), 1 deletion(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java
index 830059fe058..896dc756fe1 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java
@@ -136,6 +136,34 @@ public class TsFileInsertionScanDataContainer extends
TsFileInsertionDataContain
};
}
+ public Iterable<Tablet> toTablets() {
+ return () ->
+ new Iterator<Tablet>() {
+ @Override
+ public boolean hasNext() {
+ return Objects.nonNull(chunkReader);
+ }
+
+ @Override
+ public Tablet next() {
+ if (!hasNext()) {
+ close();
+ throw new NoSuchElementException();
+ }
+
+ final Tablet tablet = getNextTablet();
+ final boolean hasNext = hasNext();
+ try {
+ return tablet;
+ } finally {
+ if (!hasNext) {
+ close();
+ }
+ }
+ }
+ };
+ }
+
private Tablet getNextTablet() {
try {
final Tablet tablet =
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementDataTypeConvertExecutionVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementDataTypeConvertExecutionVisitor.java
index 69f456552f7..b7b82660fc8 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementDataTypeConvertExecutionVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementDataTypeConvertExecutionVisitor.java
@@ -20,7 +20,10 @@
package org.apache.iotdb.db.pipe.receiver.visitor;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.pipe.pattern.IoTDBPipePattern;
import org.apache.iotdb.db.exception.metadata.DataTypeMismatchException;
+import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletRawReq;
+import
org.apache.iotdb.db.pipe.event.common.tsfile.container.scan.TsFileInsertionScanDataContainer;
import
org.apache.iotdb.db.pipe.receiver.transform.statement.PipeConvertedInsertRowStatement;
import
org.apache.iotdb.db.pipe.receiver.transform.statement.PipeConvertedInsertTabletStatement;
import org.apache.iotdb.db.queryengine.plan.statement.Statement;
@@ -34,9 +37,12 @@ import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement
import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.commons.io.FileUtils;
+import org.apache.tsfile.write.record.Tablet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.File;
import java.util.Optional;
import java.util.stream.Collectors;
@@ -80,7 +86,37 @@ public class PipeStatementDataTypeConvertExecutionVisitor
final LoadTsFileStatement loadTsFileStatement, final TSStatus status) {
// TODO: judge if the exception is caused by data type mismatch
// TODO: convert the data type of the statement
- return visitStatement(loadTsFileStatement, status);
+
+ for (final File file : loadTsFileStatement.getTsFiles()) {
+ try (final TsFileInsertionScanDataContainer container =
+ new TsFileInsertionScanDataContainer(
+ file, new IoTDBPipePattern(null), Long.MIN_VALUE,
Long.MAX_VALUE, null, null)) {
+ for (final Tablet tablet : container.toTablets()) {
+ final TSStatus result =
+ statementExecutor.execute(
+ new PipeConvertedInsertTabletStatement(
+ PipeTransferTabletRawReq.toTPipeTransferRawReq(tablet,
false)
+ .constructStatement()));
+ // TODO: handle memory not enough
+ if (!(result.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
+ || result.getCode() ==
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode())) {
+ return Optional.empty();
+ }
+ }
+ } catch (Exception e) {
+ LOGGER.warn(
+ "Failed to scan TsFile for data type conversion.
LoadTsFileStatement: {}",
+ loadTsFileStatement,
+ e);
+ return Optional.empty();
+ }
+ }
+
+ if (loadTsFileStatement.isDeleteAfterLoad()) {
+ loadTsFileStatement.getTsFiles().forEach(FileUtils::deleteQuietly);
+ }
+
+ return Optional.of(new
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
}
@Override