This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch fix-load-tsfile-script
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit e6cb065ef10287b31c0ef50983fac4736356c089
Author: Steve Yurong Su <[email protected]>
AuthorDate: Wed Jun 12 15:11:05 2024 +0800

    refactor
---
 .../queryengine/load/LoadTsFileMemoryManager.java  | 10 ++---
 .../queryengine/plan/analyze/AnalyzeVisitor.java   | 17 +++++---
 .../plan/analyze/LoadTsfileAnalyzer.java           | 47 ++++++++++++----------
 .../plan/planner/LocalExecutionPlanner.java        | 16 +++++---
 4 files changed, 53 insertions(+), 37 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/load/LoadTsFileMemoryManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/load/LoadTsFileMemoryManager.java
index eff5f790bdb..f034a4627b9 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/load/LoadTsFileMemoryManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/load/LoadTsFileMemoryManager.java
@@ -64,12 +64,12 @@ public class LoadTsFileMemoryManager {
 
     throw new LoadRuntimeOutOfMemoryException(
         String.format(
-            "forceAllocate: failed to allocate memory from query engine after 
%d retries, "
-                + "total query memory %s, available memory for load %s bytes, "
-                + "used memory size %d bytes, requested memory size %d bytes",
+            "forceAllocate: failed to allocate memory from query engine after 
%s retries, "
+                + "total query memory %s bytes, current available memory for 
load %s bytes, "
+                + "current load used memory size %s bytes, load requested 
memory size %s bytes",
             MEMORY_ALLOCATE_MAX_RETRIES,
-            
IoTDBDescriptor.getInstance().getConfig().getAllocateMemoryForOperators(),
-            LocalExecutionPlanner.getInstance().getFreeMemoryForOperators(),
+            QUERY_ENGINE_MEMORY_MANAGER.getAllocateMemoryForOperators(),
+            QUERY_ENGINE_MEMORY_MANAGER.getFreeMemoryForLoadTsFile(),
             usedMemorySizeInBytes.get(),
             sizeInBytes));
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
index 038514ee02f..e67de934eac 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
@@ -2825,12 +2825,19 @@ public class AnalyzeVisitor extends 
StatementVisitor<Analysis, MPPQueryContext>
 
   @Override
   public Analysis visitLoadFile(LoadTsFileStatement loadTsFileStatement, 
MPPQueryContext context) {
-    LoadTsfileAnalyzer loadTsfileAnalyzer =
-        new LoadTsfileAnalyzer(loadTsFileStatement, context, partitionFetcher, 
schemaFetcher);
-    try {
+    try (final LoadTsfileAnalyzer loadTsfileAnalyzer =
+        new LoadTsfileAnalyzer(loadTsFileStatement, context, partitionFetcher, 
schemaFetcher)) {
       return loadTsfileAnalyzer.analyzeFileByFile();
-    } finally {
-      loadTsfileAnalyzer.close();
+    } catch (final Exception e) {
+      final String exceptionMessage =
+          String.format(
+              "Failed to execute load tsfile statement %s. Detail: %s",
+              loadTsFileStatement, e.getMessage());
+      logger.warn(exceptionMessage, e);
+      final Analysis analysis = new Analysis();
+      analysis.setFinishQueryAfterAnalyze(true);
+      analysis.setFailStatus(RpcUtils.getStatus(TSStatusCode.LOAD_FILE_ERROR, 
exceptionMessage));
+      return analysis;
     }
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsfileAnalyzer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsfileAnalyzer.java
index 91ca66fc0d5..41690bd67c9 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsfileAnalyzer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsfileAnalyzer.java
@@ -82,7 +82,6 @@ import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
-import java.nio.BufferUnderflowException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -94,7 +93,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
 
-public class LoadTsfileAnalyzer {
+public class LoadTsfileAnalyzer implements AutoCloseable {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(LoadTsfileAnalyzer.class);
 
@@ -139,9 +138,10 @@ public class LoadTsfileAnalyzer {
   public Analysis analyzeFileByFile() {
     context.setQueryType(QueryType.WRITE);
 
+    Analysis analysis = new Analysis();
+
     // check if the system is read only
     if (CommonDescriptor.getInstance().getConfig().isReadOnly()) {
-      Analysis analysis = new Analysis();
       analysis.setFinishQueryAfterAnalyze(true);
       analysis.setFailStatus(
           RpcUtils.getStatus(TSStatusCode.SYSTEM_READ_ONLY, 
LoadReadOnlyException.MESSAGE));
@@ -171,25 +171,17 @@ public class LoadTsfileAnalyzer {
               "Load - Analysis Stage: {}/{} tsfiles have been analyzed, 
progress: {}%",
               i + 1, tsfileNum, String.format("%.3f", (i + 1) * 100.00 / 
tsfileNum));
         }
-      } catch (IllegalArgumentException e) {
-        LOGGER.warn(
-            "Parse file {} to resource error, this TsFile maybe empty.", 
tsFile.getPath(), e);
-        throw new SemanticException(
-            String.format("TsFile %s is empty or incomplete.", 
tsFile.getPath()));
       } catch (AuthException e) {
         return createFailAnalysisForAuthException(e);
-      } catch (BufferUnderflowException e) {
-        LOGGER.warn(
-            "The file {} is not a valid tsfile. Please check the input file.", 
tsFile.getPath(), e);
-        throw new SemanticException(
-            String.format(
-                "The file %s is not a valid tsfile. Please check the input 
file.",
-                tsFile.getPath()));
       } catch (Exception e) {
-        LOGGER.warn("Parse file {} to resource error.", tsFile.getPath(), e);
-        throw new SemanticException(
+        final String exceptionMessage =
             String.format(
-                "Parse file %s to resource error, because %s", 
tsFile.getPath(), e.getMessage()));
+                "The file %s is not a valid tsfile. Please check the input 
file. Detail: %s",
+                tsFile.getPath(), e.getMessage());
+        LOGGER.warn(exceptionMessage, e);
+        analysis.setFinishQueryAfterAnalyze(true);
+        
analysis.setFailStatus(RpcUtils.getStatus(TSStatusCode.LOAD_FILE_ERROR, 
exceptionMessage));
+        return analysis;
       }
     }
 
@@ -197,16 +189,30 @@ public class LoadTsfileAnalyzer {
       schemaAutoCreatorAndVerifier.flush();
     } catch (AuthException e) {
       return createFailAnalysisForAuthException(e);
+    } catch (Exception e) {
+      final String exceptionMessage =
+          String.format(
+              "Auto create or verify schema error when executing statement %s. 
Detail: %s.",
+              loadTsFileStatement, e.getMessage());
+      LOGGER.warn(exceptionMessage, e);
+      analysis.setFinishQueryAfterAnalyze(true);
+      analysis.setFailStatus(
+          RpcUtils.getStatus(
+              TSStatusCode.LOAD_FILE_ERROR,
+              String.format(
+                  "Auto create or verify schema error when executing statement 
%s.",
+                  loadTsFileStatement)));
+      return analysis;
     }
 
     LOGGER.info("Load - Analysis Stage: all tsfiles have been analyzed.");
 
     // data partition will be queried in the scheduler
-    final Analysis analysis = new Analysis();
     analysis.setStatement(loadTsFileStatement);
     return analysis;
   }
 
+  @Override
   public void close() {
     schemaAutoCreatorAndVerifier.close();
   }
@@ -360,8 +366,7 @@ public class LoadTsfileAnalyzer {
      * This can only be invoked after all timeseries in the current tsfile 
have been processed.
      * Otherwise, the isAligned status may be wrong.
      */
-    public void flushAndClearDeviceIsAlignedCacheIfNecessary()
-        throws SemanticException, AuthException {
+    public void flushAndClearDeviceIsAlignedCacheIfNecessary() throws 
SemanticException {
       // avoid OOM when loading a tsfile with too many timeseries
       // or loading too many tsfiles at the same time
       schemaCache.clearDeviceIsAlignedCacheIfNecessary();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java
index 4789834f35e..4db63ed1d49 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java
@@ -52,12 +52,12 @@ public class LocalExecutionPlanner {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(LocalExecutionPlanner.class);
   private static final long ALLOCATE_MEMORY_FOR_OPERATORS;
-  private static final long MAX_REST_MEMORY_FOR_LOAD;
+  private static final long MIN_REST_MEMORY_FOR_QUERY_AFTER_LOAD;
 
   static {
     IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig();
     ALLOCATE_MEMORY_FOR_OPERATORS = CONFIG.getAllocateMemoryForOperators();
-    MAX_REST_MEMORY_FOR_LOAD =
+    MIN_REST_MEMORY_FOR_QUERY_AFTER_LOAD =
         (long)
             ((ALLOCATE_MEMORY_FOR_OPERATORS) * (1.0 - 
CONFIG.getMaxAllocateMemoryRatioForLoad()));
   }
@@ -69,6 +69,10 @@ public class LocalExecutionPlanner {
     return freeMemoryForOperators;
   }
 
+  public long getFreeMemoryForLoadTsFile() {
+    return freeMemoryForOperators - MIN_REST_MEMORY_FOR_QUERY_AFTER_LOAD;
+  }
+
   public static LocalExecutionPlanner getInstance() {
     return InstanceHolder.INSTANCE;
   }
@@ -211,7 +215,7 @@ public class LocalExecutionPlanner {
   }
 
   public synchronized boolean forceAllocateFreeMemoryForOperators(long 
memoryInBytes) {
-    if (freeMemoryForOperators - memoryInBytes <= MAX_REST_MEMORY_FOR_LOAD) {
+    if (freeMemoryForOperators - memoryInBytes <= 
MIN_REST_MEMORY_FOR_QUERY_AFTER_LOAD) {
       return false;
     } else {
       freeMemoryForOperators -= memoryInBytes;
@@ -220,9 +224,9 @@ public class LocalExecutionPlanner {
   }
 
   public synchronized long tryAllocateFreeMemoryForOperators(long 
memoryInBytes) {
-    if (freeMemoryForOperators - memoryInBytes <= MAX_REST_MEMORY_FOR_LOAD) {
-      long result = freeMemoryForOperators - MAX_REST_MEMORY_FOR_LOAD;
-      freeMemoryForOperators = MAX_REST_MEMORY_FOR_LOAD;
+    if (freeMemoryForOperators - memoryInBytes <= 
MIN_REST_MEMORY_FOR_QUERY_AFTER_LOAD) {
+      long result = freeMemoryForOperators - 
MIN_REST_MEMORY_FOR_QUERY_AFTER_LOAD;
+      freeMemoryForOperators = MIN_REST_MEMORY_FOR_QUERY_AFTER_LOAD;
       return result;
     } else {
       freeMemoryForOperators -= memoryInBytes;

Reply via email to