This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch rc/1.3.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rc/1.3.3 by this push:
new 64dd5f97c26 Pipe: Convert tsfile to tablets on data sync receiver when
data type / aligned type mismatch detected (#13239) (#13245)
64dd5f97c26 is described below
commit 64dd5f97c260550d6cc3d7ef6b15ab11e7452844
Author: Steve Yurong Su <[email protected]>
AuthorDate: Wed Aug 21 11:08:08 2024 +0800
Pipe: Convert tsfile to tablets on data sync receiver when data type /
aligned type mismatch detected (#13239) (#13245)
(cherry picked from commit 1ee4744718fa8832c57e76b0ad4187a34930837c)
---
.../scan/TsFileInsertionScanDataContainer.java | 28 +++++++++++
...peStatementDataTypeConvertExecutionVisitor.java | 54 ++++++++++++++++++++--
2 files changed, 79 insertions(+), 3 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java
index e2995f61fdc..61206afe0ea 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java
@@ -143,6 +143,34 @@ public class TsFileInsertionScanDataContainer extends
TsFileInsertionDataContain
};
}
+ public Iterable<Tablet> toTablets() {
+ return () ->
+ new Iterator<Tablet>() {
+ @Override
+ public boolean hasNext() {
+ return Objects.nonNull(chunkReader);
+ }
+
+ @Override
+ public Tablet next() {
+ if (!hasNext()) {
+ close();
+ throw new NoSuchElementException();
+ }
+
+ final Tablet tablet = getNextTablet();
+ final boolean hasNext = hasNext();
+ try {
+ return tablet;
+ } finally {
+ if (!hasNext) {
+ close();
+ }
+ }
+ }
+ };
+ }
+
private Tablet getNextTablet() {
try {
final Tablet tablet =
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementDataTypeConvertExecutionVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementDataTypeConvertExecutionVisitor.java
index 69f456552f7..9f669a75b8b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementDataTypeConvertExecutionVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementDataTypeConvertExecutionVisitor.java
@@ -20,7 +20,10 @@
package org.apache.iotdb.db.pipe.receiver.visitor;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.pipe.pattern.IoTDBPipePattern;
import org.apache.iotdb.db.exception.metadata.DataTypeMismatchException;
+import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletRawReq;
+import
org.apache.iotdb.db.pipe.event.common.tsfile.container.scan.TsFileInsertionScanDataContainer;
import
org.apache.iotdb.db.pipe.receiver.transform.statement.PipeConvertedInsertRowStatement;
import
org.apache.iotdb.db.pipe.receiver.transform.statement.PipeConvertedInsertTabletStatement;
import org.apache.iotdb.db.queryengine.plan.statement.Statement;
@@ -34,9 +37,12 @@ import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement
import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.commons.io.FileUtils;
+import org.apache.tsfile.write.record.Tablet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.File;
import java.util.Optional;
import java.util.stream.Collectors;
@@ -78,9 +84,51 @@ public class PipeStatementDataTypeConvertExecutionVisitor
@Override
public Optional<TSStatus> visitLoadFile(
final LoadTsFileStatement loadTsFileStatement, final TSStatus status) {
- // TODO: judge if the exception is caused by data type mismatch
- // TODO: convert the data type of the statement
- return visitStatement(loadTsFileStatement, status);
+ if (status.getCode() != TSStatusCode.LOAD_FILE_ERROR.getStatusCode()) {
+ return Optional.empty();
+ }
+
+ LOGGER.warn(
+ "Data type mismatch detected (TSStatus: {}) for LoadTsFileStatement:
{}. Start data type conversion.",
+ status,
+ loadTsFileStatement);
+
+ for (final File file : loadTsFileStatement.getTsFiles()) {
+ try (final TsFileInsertionScanDataContainer container =
+ new TsFileInsertionScanDataContainer(
+ file, new IoTDBPipePattern(null), Long.MIN_VALUE,
Long.MAX_VALUE, null, null)) {
+ for (final Tablet tablet : container.toTablets()) {
+ final PipeConvertedInsertTabletStatement statement =
+ new PipeConvertedInsertTabletStatement(
+ PipeTransferTabletRawReq.toTPipeTransferRawReq(tablet, false)
+ .constructStatement());
+ TSStatus result = statementExecutor.execute(statement);
+
+ // Retry once if the write process is rejected
+ if (result.getCode() ==
TSStatusCode.WRITE_PROCESS_REJECT.getStatusCode()) {
+ result = statementExecutor.execute(statement);
+ }
+
+ if (!(result.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
+ || result.getCode() ==
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode())) {
+ return Optional.empty();
+ }
+ }
+ } catch (final Exception e) {
+ LOGGER.warn(
+ "Failed to convert data type for LoadTsFileStatement: {}.",
loadTsFileStatement, e);
+ return Optional.empty();
+ }
+ }
+
+ if (loadTsFileStatement.isDeleteAfterLoad()) {
+ loadTsFileStatement.getTsFiles().forEach(FileUtils::deleteQuietly);
+ }
+
+ LOGGER.warn(
+ "Data type conversion for LoadTsFileStatement {} is successful.",
loadTsFileStatement);
+
+ return Optional.of(new
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
}
@Override