This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch convert-pipe-tsfile-1.3.3 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 9ee1427330328043a914f28db95cf0534ab4a8ef Author: Steve Yurong Su <[email protected]> AuthorDate: Wed Aug 21 10:14:39 2024 +0800 Pipe: Convert tsfile to tablets on data sync receiver when data type / aligned type mismatch detected (#13239) (cherry picked from commit 1ee4744718fa8832c57e76b0ad4187a34930837c) --- .../scan/TsFileInsertionScanDataContainer.java | 28 +++++++++++ ...peStatementDataTypeConvertExecutionVisitor.java | 54 ++++++++++++++++++++-- 2 files changed, 79 insertions(+), 3 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java index e2995f61fdc..61206afe0ea 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java @@ -143,6 +143,34 @@ public class TsFileInsertionScanDataContainer extends TsFileInsertionDataContain }; } + public Iterable<Tablet> toTablets() { + return () -> + new Iterator<Tablet>() { + @Override + public boolean hasNext() { + return Objects.nonNull(chunkReader); + } + + @Override + public Tablet next() { + if (!hasNext()) { + close(); + throw new NoSuchElementException(); + } + + final Tablet tablet = getNextTablet(); + final boolean hasNext = hasNext(); + try { + return tablet; + } finally { + if (!hasNext) { + close(); + } + } + } + }; + } + private Tablet getNextTablet() { try { final Tablet tablet = diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementDataTypeConvertExecutionVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementDataTypeConvertExecutionVisitor.java index 69f456552f7..9f669a75b8b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementDataTypeConvertExecutionVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementDataTypeConvertExecutionVisitor.java @@ -20,7 +20,10 @@ package org.apache.iotdb.db.pipe.receiver.visitor; import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.commons.pipe.pattern.IoTDBPipePattern; import org.apache.iotdb.db.exception.metadata.DataTypeMismatchException; +import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletRawReq; +import org.apache.iotdb.db.pipe.event.common.tsfile.container.scan.TsFileInsertionScanDataContainer; import org.apache.iotdb.db.pipe.receiver.transform.statement.PipeConvertedInsertRowStatement; import org.apache.iotdb.db.pipe.receiver.transform.statement.PipeConvertedInsertTabletStatement; import org.apache.iotdb.db.queryengine.plan.statement.Statement; @@ -34,9 +37,12 @@ import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement; import org.apache.iotdb.rpc.TSStatusCode; +import org.apache.commons.io.FileUtils; +import org.apache.tsfile.write.record.Tablet; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.File; import java.util.Optional; import java.util.stream.Collectors; @@ -78,9 +84,51 @@ public class PipeStatementDataTypeConvertExecutionVisitor @Override public Optional<TSStatus> visitLoadFile( final LoadTsFileStatement loadTsFileStatement, final TSStatus status) { - // TODO: judge if the exception is caused by data type mismatch - // TODO: convert the data type of the statement - return visitStatement(loadTsFileStatement, status); + if (status.getCode() != TSStatusCode.LOAD_FILE_ERROR.getStatusCode()) { + return Optional.empty(); + } + + LOGGER.warn( + "Data type mismatch detected (TSStatus: {}) for LoadTsFileStatement: {}. Start data type conversion.", + status, + loadTsFileStatement); + + for (final File file : loadTsFileStatement.getTsFiles()) { + try (final TsFileInsertionScanDataContainer container = + new TsFileInsertionScanDataContainer( + file, new IoTDBPipePattern(null), Long.MIN_VALUE, Long.MAX_VALUE, null, null)) { + for (final Tablet tablet : container.toTablets()) { + final PipeConvertedInsertTabletStatement statement = + new PipeConvertedInsertTabletStatement( + PipeTransferTabletRawReq.toTPipeTransferRawReq(tablet, false) + .constructStatement()); + TSStatus result = statementExecutor.execute(statement); + + // Retry once if the write process is rejected + if (result.getCode() == TSStatusCode.WRITE_PROCESS_REJECT.getStatusCode()) { + result = statementExecutor.execute(statement); + } + + if (!(result.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode() + || result.getCode() == TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode())) { + return Optional.empty(); + } + } + } catch (final Exception e) { + LOGGER.warn( + "Failed to convert data type for LoadTsFileStatement: {}.", loadTsFileStatement, e); + return Optional.empty(); + } + } + + if (loadTsFileStatement.isDeleteAfterLoad()) { + loadTsFileStatement.getTsFiles().forEach(FileUtils::deleteQuietly); + } + + LOGGER.warn( + "Data type conversion for LoadTsFileStatement {} is successful.", loadTsFileStatement); + + return Optional.of(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode())); } @Override
