This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch load-conversion-consider-is-generated-by-pipe-1.3 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 7e94328a18d7249bef8fcccca5d7133419b41a22 Author: Steve Yurong Su <[email protected]> AuthorDate: Wed Jan 8 15:02:29 2025 +0800 Load: Consider isGeneratedByPipe mark when executing tsfile-tablet conversion in analysis stage (#14651) (cherry picked from commit 90cdf3bbc94224723ba00b0f92b0007d38a84000) --- .../queryengine/plan/analyze/AnalyzeVisitor.java | 6 +++ .../plan/analyze/LoadTsFileAnalyzer.java | 2 +- .../plan/scheduler/load/LoadTsFileScheduler.java | 2 +- .../plan/statement/crud/LoadTsFileStatement.java | 9 +++++ .../converter/LoadTsFileDataTypeConverter.java | 44 +++++++++++++--------- 5 files changed, 44 insertions(+), 19 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java index 69a38b20a81..fefba280c4c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java @@ -2983,6 +2983,12 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext> @Override public Analysis visitPipeEnrichedStatement( PipeEnrichedStatement pipeEnrichedStatement, MPPQueryContext context) { + // The LoadTsFileStatement is a special case, it needs isGeneratedByPipe information + // in the analyzer to execute the tsfile-tablet conversion in some cases. + if (pipeEnrichedStatement.getInnerStatement() instanceof LoadTsFileStatement) { + ((LoadTsFileStatement) pipeEnrichedStatement.getInnerStatement()).markIsGeneratedByPipe(); + } + Analysis analysis = pipeEnrichedStatement.getInnerStatement().accept(this, context); // statement may be changed because of logical view diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsFileAnalyzer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsFileAnalyzer.java index 384d0e30ede..26e7146c7a7 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsFileAnalyzer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsFileAnalyzer.java @@ -229,7 +229,7 @@ public class LoadTsFileAnalyzer implements AutoCloseable { private void executeDataTypeConversionOnTypeMismatch( final Analysis analysis, final VerifyMetadataTypeMismatchException e) { final LoadTsFileDataTypeConverter loadTsFileDataTypeConverter = - new LoadTsFileDataTypeConverter(); + new LoadTsFileDataTypeConverter(loadTsFileStatement.isGeneratedByPipe()); final TSStatus status = loadTsFileStatement.isConvertOnTypeMismatch() ? loadTsFileDataTypeConverter.convertForTreeModel(loadTsFileStatement).orElse(null) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java index 0a46eefa4e0..72146c9d2de 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java @@ -525,7 +525,7 @@ public class LoadTsFileScheduler implements IScheduler { private void convertFailedTsFilesToTabletsAndRetry() { final LoadTsFileDataTypeConverter loadTsFileDataTypeConverter = - new LoadTsFileDataTypeConverter(); + new LoadTsFileDataTypeConverter(isGeneratedByPipe); for (final int failedLoadTsFileIndex : failedTsFileNodeIndexes) { final LoadSingleTsFileNode failedNode = tsFileNodeList.get(failedLoadTsFileIndex); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java index 889ca2c202c..d1a346b14cc 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java @@ -47,6 +47,7 @@ public class LoadTsFileStatement extends Statement { private boolean deleteAfterLoad = false; private boolean convertOnTypeMismatch = true; private boolean autoCreateDatabase = true; + private boolean isGeneratedByPipe = false; private Map<String, String> loadAttributes; @@ -168,6 +169,14 @@ public class LoadTsFileStatement extends Statement { return autoCreateDatabase; } + public void markIsGeneratedByPipe() { + isGeneratedByPipe = true; + } + + public boolean isGeneratedByPipe() { + return isGeneratedByPipe; + } + public List<File> getTsFiles() { return tsFiles; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTsFileDataTypeConverter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTsFileDataTypeConverter.java index 05fe606931e..ee2a8fe2547 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTsFileDataTypeConverter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTsFileDataTypeConverter.java @@ -25,7 +25,9 @@ import org.apache.iotdb.db.protocol.session.SessionManager; import org.apache.iotdb.db.queryengine.plan.Coordinator; import org.apache.iotdb.db.queryengine.plan.analyze.ClusterPartitionFetcher; import org.apache.iotdb.db.queryengine.plan.analyze.schema.ClusterSchemaFetcher; +import org.apache.iotdb.db.queryengine.plan.statement.Statement; import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement; +import org.apache.iotdb.db.queryengine.plan.statement.pipe.PipeEnrichedStatement; import org.apache.iotdb.rpc.TSStatusCode; import org.slf4j.Logger; @@ -39,28 +41,22 @@ public class LoadTsFileDataTypeConverter { private static final SessionManager SESSION_MANAGER = SessionManager.getInstance(); - private final LoadTreeStatementDataTypeConvertExecutionVisitor - treeStatementDataTypeConvertExecutionVisitor = - new LoadTreeStatementDataTypeConvertExecutionVisitor( - statement -> - Coordinator.getInstance() - .executeForTreeModel( - statement, - SESSION_MANAGER.requestQueryId(), - SESSION_MANAGER.getSessionInfo(SESSION_MANAGER.getCurrSession()), - "", - ClusterPartitionFetcher.getInstance(), - ClusterSchemaFetcher.getInstance(), - IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold(), - false) - .status); - public static final LoadConvertedInsertTabletStatementTSStatusVisitor STATEMENT_STATUS_VISITOR = new LoadConvertedInsertTabletStatementTSStatusVisitor(); public static final LoadConvertedInsertTabletStatementExceptionVisitor STATEMENT_EXCEPTION_VISITOR = new LoadConvertedInsertTabletStatementExceptionVisitor(); - public Optional<TSStatus> convertForTreeModel(LoadTsFileStatement loadTsFileTreeStatement) { + private final boolean isGeneratedByPipe; + + private final LoadTreeStatementDataTypeConvertExecutionVisitor + treeStatementDataTypeConvertExecutionVisitor = + new LoadTreeStatementDataTypeConvertExecutionVisitor(this::executeForTreeModel); + + public LoadTsFileDataTypeConverter(final boolean isGeneratedByPipe) { + this.isGeneratedByPipe = isGeneratedByPipe; + } + + public Optional<TSStatus> convertForTreeModel(final LoadTsFileStatement loadTsFileTreeStatement) { try { return loadTsFileTreeStatement.accept(treeStatementDataTypeConvertExecutionVisitor, null); } catch (Exception e) { @@ -71,6 +67,20 @@ public class LoadTsFileDataTypeConverter { } } + private TSStatus executeForTreeModel(final Statement statement) { + return Coordinator.getInstance() + .executeForTreeModel( + isGeneratedByPipe ? new PipeEnrichedStatement(statement) : statement, + SESSION_MANAGER.requestQueryId(), + SESSION_MANAGER.getSessionInfo(SESSION_MANAGER.getCurrSession()), + "", + ClusterPartitionFetcher.getInstance(), + ClusterSchemaFetcher.getInstance(), + IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold(), + false) + .status; + } + public boolean isSuccessful(final TSStatus status) { return status != null && (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
