This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 79da22a7c96 Load: Consider isGeneratedByPipe mark when executing
tsfile-tablet conversion in analysis stage (#14651) (#14652)
79da22a7c96 is described below
commit 79da22a7c963a5caf6b45e6d348516be654bfa15
Author: Steve Yurong Su <[email protected]>
AuthorDate: Wed Jan 8 18:55:54 2025 +0800
Load: Consider isGeneratedByPipe mark when executing tsfile-tablet
conversion in analysis stage (#14651) (#14652)
(cherry picked from commit 90cdf3bbc94224723ba00b0f92b0007d38a84000)
---
.../queryengine/plan/analyze/AnalyzeVisitor.java | 6 +++
.../plan/analyze/LoadTsFileAnalyzer.java | 2 +-
.../plan/scheduler/load/LoadTsFileScheduler.java | 2 +-
.../plan/statement/crud/LoadTsFileStatement.java | 9 +++++
.../converter/LoadTsFileDataTypeConverter.java | 44 +++++++++++++---------
5 files changed, 44 insertions(+), 19 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
index 69a38b20a81..fefba280c4c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
@@ -2983,6 +2983,12 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
@Override
public Analysis visitPipeEnrichedStatement(
PipeEnrichedStatement pipeEnrichedStatement, MPPQueryContext context) {
+ // The LoadTsFileStatement is a special case, it needs isGeneratedByPipe
information
+ // in the analyzer to execute the tsfile-tablet conversion in some cases.
+ if (pipeEnrichedStatement.getInnerStatement() instanceof
LoadTsFileStatement) {
+ ((LoadTsFileStatement)
pipeEnrichedStatement.getInnerStatement()).markIsGeneratedByPipe();
+ }
+
Analysis analysis = pipeEnrichedStatement.getInnerStatement().accept(this,
context);
// statement may be changed because of logical view
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsFileAnalyzer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsFileAnalyzer.java
index 384d0e30ede..26e7146c7a7 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsFileAnalyzer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsFileAnalyzer.java
@@ -229,7 +229,7 @@ public class LoadTsFileAnalyzer implements AutoCloseable {
private void executeDataTypeConversionOnTypeMismatch(
final Analysis analysis, final VerifyMetadataTypeMismatchException e) {
final LoadTsFileDataTypeConverter loadTsFileDataTypeConverter =
- new LoadTsFileDataTypeConverter();
+ new
LoadTsFileDataTypeConverter(loadTsFileStatement.isGeneratedByPipe());
final TSStatus status =
loadTsFileStatement.isConvertOnTypeMismatch()
?
loadTsFileDataTypeConverter.convertForTreeModel(loadTsFileStatement).orElse(null)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
index 0a46eefa4e0..72146c9d2de 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
@@ -525,7 +525,7 @@ public class LoadTsFileScheduler implements IScheduler {
private void convertFailedTsFilesToTabletsAndRetry() {
final LoadTsFileDataTypeConverter loadTsFileDataTypeConverter =
- new LoadTsFileDataTypeConverter();
+ new LoadTsFileDataTypeConverter(isGeneratedByPipe);
for (final int failedLoadTsFileIndex : failedTsFileNodeIndexes) {
final LoadSingleTsFileNode failedNode =
tsFileNodeList.get(failedLoadTsFileIndex);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java
index 889ca2c202c..d1a346b14cc 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java
@@ -47,6 +47,7 @@ public class LoadTsFileStatement extends Statement {
private boolean deleteAfterLoad = false;
private boolean convertOnTypeMismatch = true;
private boolean autoCreateDatabase = true;
+ private boolean isGeneratedByPipe = false;
private Map<String, String> loadAttributes;
@@ -168,6 +169,14 @@ public class LoadTsFileStatement extends Statement {
return autoCreateDatabase;
}
+ public void markIsGeneratedByPipe() {
+ isGeneratedByPipe = true;
+ }
+
+ public boolean isGeneratedByPipe() {
+ return isGeneratedByPipe;
+ }
+
public List<File> getTsFiles() {
return tsFiles;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTsFileDataTypeConverter.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTsFileDataTypeConverter.java
index 05fe606931e..ee2a8fe2547 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTsFileDataTypeConverter.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTsFileDataTypeConverter.java
@@ -25,7 +25,9 @@ import org.apache.iotdb.db.protocol.session.SessionManager;
import org.apache.iotdb.db.queryengine.plan.Coordinator;
import org.apache.iotdb.db.queryengine.plan.analyze.ClusterPartitionFetcher;
import
org.apache.iotdb.db.queryengine.plan.analyze.schema.ClusterSchemaFetcher;
+import org.apache.iotdb.db.queryengine.plan.statement.Statement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
+import
org.apache.iotdb.db.queryengine.plan.statement.pipe.PipeEnrichedStatement;
import org.apache.iotdb.rpc.TSStatusCode;
import org.slf4j.Logger;
@@ -39,28 +41,22 @@ public class LoadTsFileDataTypeConverter {
private static final SessionManager SESSION_MANAGER =
SessionManager.getInstance();
- private final LoadTreeStatementDataTypeConvertExecutionVisitor
- treeStatementDataTypeConvertExecutionVisitor =
- new LoadTreeStatementDataTypeConvertExecutionVisitor(
- statement ->
- Coordinator.getInstance()
- .executeForTreeModel(
- statement,
- SESSION_MANAGER.requestQueryId(),
-
SESSION_MANAGER.getSessionInfo(SESSION_MANAGER.getCurrSession()),
- "",
- ClusterPartitionFetcher.getInstance(),
- ClusterSchemaFetcher.getInstance(),
-
IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold(),
- false)
- .status);
-
public static final LoadConvertedInsertTabletStatementTSStatusVisitor
STATEMENT_STATUS_VISITOR =
new LoadConvertedInsertTabletStatementTSStatusVisitor();
public static final LoadConvertedInsertTabletStatementExceptionVisitor
STATEMENT_EXCEPTION_VISITOR = new
LoadConvertedInsertTabletStatementExceptionVisitor();
- public Optional<TSStatus> convertForTreeModel(LoadTsFileStatement
loadTsFileTreeStatement) {
+ private final boolean isGeneratedByPipe;
+
+ private final LoadTreeStatementDataTypeConvertExecutionVisitor
+ treeStatementDataTypeConvertExecutionVisitor =
+ new
LoadTreeStatementDataTypeConvertExecutionVisitor(this::executeForTreeModel);
+
+ public LoadTsFileDataTypeConverter(final boolean isGeneratedByPipe) {
+ this.isGeneratedByPipe = isGeneratedByPipe;
+ }
+
+ public Optional<TSStatus> convertForTreeModel(final LoadTsFileStatement
loadTsFileTreeStatement) {
try {
return
loadTsFileTreeStatement.accept(treeStatementDataTypeConvertExecutionVisitor,
null);
} catch (Exception e) {
@@ -71,6 +67,20 @@ public class LoadTsFileDataTypeConverter {
}
}
+ private TSStatus executeForTreeModel(final Statement statement) {
+ return Coordinator.getInstance()
+ .executeForTreeModel(
+ isGeneratedByPipe ? new PipeEnrichedStatement(statement) :
statement,
+ SESSION_MANAGER.requestQueryId(),
+ SESSION_MANAGER.getSessionInfo(SESSION_MANAGER.getCurrSession()),
+ "",
+ ClusterPartitionFetcher.getInstance(),
+ ClusterSchemaFetcher.getInstance(),
+
IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold(),
+ false)
+ .status;
+ }
+
public boolean isSuccessful(final TSStatus status) {
return status != null
&& (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()