This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch convert-on-type-mismatch
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/convert-on-type-mismatch by
this push:
new 7f06e3b74fa Update PipeStatementDataTypeConvertExecutionVisitor.java
7f06e3b74fa is described below
commit 7f06e3b74fa60e4979612e93a2191e2c21b3cdfe
Author: Steve Yurong Su <[email protected]>
AuthorDate: Wed Aug 7 18:37:03 2024 +0800
Update PipeStatementDataTypeConvertExecutionVisitor.java
---
...peStatementDataTypeConvertExecutionVisitor.java | 106 ++++++++++++++++-----
1 file changed, 83 insertions(+), 23 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementDataTypeConvertExecutionVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementDataTypeConvertExecutionVisitor.java
index fc6e8339b3d..7db2812efc1 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementDataTypeConvertExecutionVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementDataTypeConvertExecutionVisitor.java
@@ -20,6 +20,8 @@
package org.apache.iotdb.db.pipe.receiver.visitor;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import
org.apache.iotdb.db.pipe.receiver.transform.statement.PipeConvertedInsertRowStatement;
+import
org.apache.iotdb.db.pipe.receiver.transform.statement.PipeConvertedInsertTabletStatement;
import org.apache.iotdb.db.queryengine.plan.statement.Statement;
import org.apache.iotdb.db.queryengine.plan.statement.StatementNode;
import org.apache.iotdb.db.queryengine.plan.statement.StatementVisitor;
@@ -27,12 +29,14 @@ import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertMultiTabletsSta
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsOfOneDeviceStatement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement;
-import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.util.Optional;
-import java.util.function.Consumer;
+import java.util.stream.Collectors;
/**
* This visitor transforms the data type of the statement when the statement
is executed and an
@@ -41,12 +45,29 @@ import java.util.function.Consumer;
public class PipeStatementDataTypeConvertExecutionVisitor
extends StatementVisitor<Optional<TSStatus>, Exception> {
- private final Consumer<Statement> statementExecutor;
+ private static final Logger LOGGER =
+
LoggerFactory.getLogger(PipeStatementDataTypeConvertExecutionVisitor.class);
+
+ @FunctionalInterface
+ public interface StatementExecutor {
+ TSStatus execute(final Statement statement);
+ }
+
+ private final StatementExecutor statementExecutor;
- public PipeStatementDataTypeConvertExecutionVisitor(final
Consumer<Statement> statementExecutor) {
+ public PipeStatementDataTypeConvertExecutionVisitor(final StatementExecutor
statementExecutor) {
this.statementExecutor = statementExecutor;
}
+ private Optional<TSStatus> tryExecute(final Statement statement) {
+ try {
+ return Optional.of(statementExecutor.execute(statement));
+ } catch (final Exception e) {
+ LOGGER.warn("Failed to execute statement after data type conversion.",
e);
+ return Optional.empty();
+ }
+ }
+
@Override
public Optional<TSStatus> visitNode(
final StatementNode statementNode, final Exception exception) {
@@ -56,42 +77,81 @@ public class PipeStatementDataTypeConvertExecutionVisitor
@Override
public Optional<TSStatus> visitLoadFile(
final LoadTsFileStatement loadTsFileStatement, final Exception
exception) {
+ // TODO: judge if the exception is caused by data type mismatch
+ // TODO: convert the data type of the statement
return visitStatement(loadTsFileStatement, exception);
}
@Override
- public Optional<TSStatus> visitInsert(
- InsertStatement insertStatement, final Exception exception) {
- return visitStatement(insertStatement, exception);
+ public Optional<TSStatus> visitInsertRow(
+ final InsertRowStatement insertRowStatement, final Exception exception) {
+ // TODO: judge if the exception is caused by data type mismatch
+
+ return tryExecute(new PipeConvertedInsertRowStatement(insertRowStatement));
}
@Override
- public Optional<TSStatus> visitInsertTablet(
- InsertTabletStatement insertTabletStatement, final Exception exception) {
- return visitStatement(insertTabletStatement, exception);
+ public Optional<TSStatus> visitInsertRows(
+ final InsertRowsStatement insertRowsStatement, final Exception
exception) {
+ // TODO: judge if the exception is caused by data type mismatch
+
+ if (insertRowsStatement.getInsertRowStatementList() == null
+ || insertRowsStatement.getInsertRowStatementList().isEmpty()) {
+ return Optional.empty();
+ }
+
+ final InsertRowsStatement convertedInsertRowsStatement = new
InsertRowsStatement();
+ convertedInsertRowsStatement.setInsertRowStatementList(
+ insertRowsStatement.getInsertRowStatementList().stream()
+ .map(PipeConvertedInsertRowStatement::new)
+ .collect(Collectors.toList()));
+ return tryExecute(convertedInsertRowsStatement);
}
@Override
- public Optional<TSStatus> visitInsertRow(
- InsertRowStatement insertRowStatement, final Exception exception) {
- return visitStatement(insertRowStatement, exception);
+ public Optional<TSStatus> visitInsertRowsOfOneDevice(
+ final InsertRowsOfOneDeviceStatement insertRowsOfOneDeviceStatement,
+ final Exception exception) {
+ // TODO: judge if the exception is caused by data type mismatch
+
+ if (insertRowsOfOneDeviceStatement.getInsertRowStatementList() == null
+ ||
insertRowsOfOneDeviceStatement.getInsertRowStatementList().isEmpty()) {
+ return Optional.empty();
+ }
+
+ final InsertRowsOfOneDeviceStatement
convertedInsertRowsOfOneDeviceStatement =
+ new InsertRowsOfOneDeviceStatement();
+ convertedInsertRowsOfOneDeviceStatement.setInsertRowStatementList(
+ insertRowsOfOneDeviceStatement.getInsertRowStatementList().stream()
+ .map(PipeConvertedInsertRowStatement::new)
+ .collect(Collectors.toList()));
+ return tryExecute(convertedInsertRowsOfOneDeviceStatement);
}
@Override
- public Optional<TSStatus> visitInsertRows(
- InsertRowsStatement insertRowsStatement, final Exception exception) {
- return visitStatement(insertRowsStatement, exception);
+ public Optional<TSStatus> visitInsertTablet(
+ final InsertTabletStatement insertTabletStatement, final Exception
exception) {
+ // TODO: judge if the exception is caused by data type mismatch
+
+ return tryExecute(new
PipeConvertedInsertTabletStatement(insertTabletStatement));
}
@Override
public Optional<TSStatus> visitInsertMultiTablets(
- InsertMultiTabletsStatement insertMultiTabletsStatement, final Exception
exception) {
- return visitStatement(insertMultiTabletsStatement, exception);
- }
+ final InsertMultiTabletsStatement insertMultiTabletsStatement, final
Exception exception) {
+ // TODO: judge if the exception is caused by data type mismatch
- @Override
- public Optional<TSStatus> visitInsertRowsOfOneDevice(
- InsertRowsOfOneDeviceStatement insertRowsOfOneDeviceStatement, final
Exception exception) {
- return visitStatement(insertRowsOfOneDeviceStatement, exception);
+ if (insertMultiTabletsStatement.getInsertTabletStatementList() == null
+ ||
insertMultiTabletsStatement.getInsertTabletStatementList().isEmpty()) {
+ return Optional.empty();
+ }
+
+ final InsertMultiTabletsStatement convertedInsertMultiTabletsStatement =
+ new InsertMultiTabletsStatement();
+ convertedInsertMultiTabletsStatement.setInsertTabletStatementList(
+ insertMultiTabletsStatement.getInsertTabletStatementList().stream()
+ .map(PipeConvertedInsertTabletStatement::new)
+ .collect(Collectors.toList()));
+ return tryExecute(convertedInsertMultiTabletsStatement);
}
}