This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch load-conversion-consider-is-generated-by-pipe-1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 7e94328a18d7249bef8fcccca5d7133419b41a22
Author: Steve Yurong Su <[email protected]>
AuthorDate: Wed Jan 8 15:02:29 2025 +0800

    Load: Consider isGeneratedByPipe mark when executing tsfile-tablet 
conversion in analysis stage (#14651)
    
    (cherry picked from commit 90cdf3bbc94224723ba00b0f92b0007d38a84000)
---
 .../queryengine/plan/analyze/AnalyzeVisitor.java   |  6 +++
 .../plan/analyze/LoadTsFileAnalyzer.java           |  2 +-
 .../plan/scheduler/load/LoadTsFileScheduler.java   |  2 +-
 .../plan/statement/crud/LoadTsFileStatement.java   |  9 +++++
 .../converter/LoadTsFileDataTypeConverter.java     | 44 +++++++++++++---------
 5 files changed, 44 insertions(+), 19 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
index 69a38b20a81..fefba280c4c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
@@ -2983,6 +2983,12 @@ public class AnalyzeVisitor extends 
StatementVisitor<Analysis, MPPQueryContext>
   @Override
   public Analysis visitPipeEnrichedStatement(
       PipeEnrichedStatement pipeEnrichedStatement, MPPQueryContext context) {
+    // The LoadTsFileStatement is a special case, it needs isGeneratedByPipe 
information
+    // in the analyzer to execute the tsfile-tablet conversion in some cases.
+    if (pipeEnrichedStatement.getInnerStatement() instanceof 
LoadTsFileStatement) {
+      ((LoadTsFileStatement) 
pipeEnrichedStatement.getInnerStatement()).markIsGeneratedByPipe();
+    }
+
     Analysis analysis = pipeEnrichedStatement.getInnerStatement().accept(this, 
context);
 
     // statement may be changed because of logical view
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsFileAnalyzer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsFileAnalyzer.java
index 384d0e30ede..26e7146c7a7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsFileAnalyzer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsFileAnalyzer.java
@@ -229,7 +229,7 @@ public class LoadTsFileAnalyzer implements AutoCloseable {
   private void executeDataTypeConversionOnTypeMismatch(
       final Analysis analysis, final VerifyMetadataTypeMismatchException e) {
     final LoadTsFileDataTypeConverter loadTsFileDataTypeConverter =
-        new LoadTsFileDataTypeConverter();
+        new 
LoadTsFileDataTypeConverter(loadTsFileStatement.isGeneratedByPipe());
     final TSStatus status =
         loadTsFileStatement.isConvertOnTypeMismatch()
             ? 
loadTsFileDataTypeConverter.convertForTreeModel(loadTsFileStatement).orElse(null)
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
index 0a46eefa4e0..72146c9d2de 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
@@ -525,7 +525,7 @@ public class LoadTsFileScheduler implements IScheduler {
 
   private void convertFailedTsFilesToTabletsAndRetry() {
     final LoadTsFileDataTypeConverter loadTsFileDataTypeConverter =
-        new LoadTsFileDataTypeConverter();
+        new LoadTsFileDataTypeConverter(isGeneratedByPipe);
 
     for (final int failedLoadTsFileIndex : failedTsFileNodeIndexes) {
       final LoadSingleTsFileNode failedNode = 
tsFileNodeList.get(failedLoadTsFileIndex);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java
index 889ca2c202c..d1a346b14cc 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java
@@ -47,6 +47,7 @@ public class LoadTsFileStatement extends Statement {
   private boolean deleteAfterLoad = false;
   private boolean convertOnTypeMismatch = true;
   private boolean autoCreateDatabase = true;
+  private boolean isGeneratedByPipe = false;
 
   private Map<String, String> loadAttributes;
 
@@ -168,6 +169,14 @@ public class LoadTsFileStatement extends Statement {
     return autoCreateDatabase;
   }
 
+  public void markIsGeneratedByPipe() {
+    isGeneratedByPipe = true;
+  }
+
+  public boolean isGeneratedByPipe() {
+    return isGeneratedByPipe;
+  }
+
   public List<File> getTsFiles() {
     return tsFiles;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTsFileDataTypeConverter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTsFileDataTypeConverter.java
index 05fe606931e..ee2a8fe2547 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTsFileDataTypeConverter.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTsFileDataTypeConverter.java
@@ -25,7 +25,9 @@ import org.apache.iotdb.db.protocol.session.SessionManager;
 import org.apache.iotdb.db.queryengine.plan.Coordinator;
 import org.apache.iotdb.db.queryengine.plan.analyze.ClusterPartitionFetcher;
 import 
org.apache.iotdb.db.queryengine.plan.analyze.schema.ClusterSchemaFetcher;
+import org.apache.iotdb.db.queryengine.plan.statement.Statement;
 import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
+import 
org.apache.iotdb.db.queryengine.plan.statement.pipe.PipeEnrichedStatement;
 import org.apache.iotdb.rpc.TSStatusCode;
 
 import org.slf4j.Logger;
@@ -39,28 +41,22 @@ public class LoadTsFileDataTypeConverter {
 
   private static final SessionManager SESSION_MANAGER = 
SessionManager.getInstance();
 
-  private final LoadTreeStatementDataTypeConvertExecutionVisitor
-      treeStatementDataTypeConvertExecutionVisitor =
-          new LoadTreeStatementDataTypeConvertExecutionVisitor(
-              statement ->
-                  Coordinator.getInstance()
-                      .executeForTreeModel(
-                          statement,
-                          SESSION_MANAGER.requestQueryId(),
-                          
SESSION_MANAGER.getSessionInfo(SESSION_MANAGER.getCurrSession()),
-                          "",
-                          ClusterPartitionFetcher.getInstance(),
-                          ClusterSchemaFetcher.getInstance(),
-                          
IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold(),
-                          false)
-                      .status);
-
   public static final LoadConvertedInsertTabletStatementTSStatusVisitor 
STATEMENT_STATUS_VISITOR =
       new LoadConvertedInsertTabletStatementTSStatusVisitor();
   public static final LoadConvertedInsertTabletStatementExceptionVisitor
       STATEMENT_EXCEPTION_VISITOR = new 
LoadConvertedInsertTabletStatementExceptionVisitor();
 
-  public Optional<TSStatus> convertForTreeModel(LoadTsFileStatement 
loadTsFileTreeStatement) {
+  private final boolean isGeneratedByPipe;
+
+  private final LoadTreeStatementDataTypeConvertExecutionVisitor
+      treeStatementDataTypeConvertExecutionVisitor =
+          new 
LoadTreeStatementDataTypeConvertExecutionVisitor(this::executeForTreeModel);
+
+  public LoadTsFileDataTypeConverter(final boolean isGeneratedByPipe) {
+    this.isGeneratedByPipe = isGeneratedByPipe;
+  }
+
+  public Optional<TSStatus> convertForTreeModel(final LoadTsFileStatement 
loadTsFileTreeStatement) {
     try {
       return 
loadTsFileTreeStatement.accept(treeStatementDataTypeConvertExecutionVisitor, 
null);
     } catch (Exception e) {
@@ -71,6 +67,20 @@ public class LoadTsFileDataTypeConverter {
     }
   }
 
+  private TSStatus executeForTreeModel(final Statement statement) {
+    return Coordinator.getInstance()
+        .executeForTreeModel(
+            isGeneratedByPipe ? new PipeEnrichedStatement(statement) : 
statement,
+            SESSION_MANAGER.requestQueryId(),
+            SESSION_MANAGER.getSessionInfo(SESSION_MANAGER.getCurrSession()),
+            "",
+            ClusterPartitionFetcher.getInstance(),
+            ClusterSchemaFetcher.getInstance(),
+            
IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold(),
+            false)
+        .status;
+  }
+
   public boolean isSuccessful(final TSStatus status) {
     return status != null
         && (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()

Reply via email to