This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch fix-load-tsfile-script in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit e6cb065ef10287b31c0ef50983fac4736356c089 Author: Steve Yurong Su <[email protected]> AuthorDate: Wed Jun 12 15:11:05 2024 +0800 refactor --- .../queryengine/load/LoadTsFileMemoryManager.java | 10 ++--- .../queryengine/plan/analyze/AnalyzeVisitor.java | 17 +++++--- .../plan/analyze/LoadTsfileAnalyzer.java | 47 ++++++++++++---------- .../plan/planner/LocalExecutionPlanner.java | 16 +++++--- 4 files changed, 53 insertions(+), 37 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/load/LoadTsFileMemoryManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/load/LoadTsFileMemoryManager.java index eff5f790bdb..f034a4627b9 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/load/LoadTsFileMemoryManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/load/LoadTsFileMemoryManager.java @@ -64,12 +64,12 @@ public class LoadTsFileMemoryManager { throw new LoadRuntimeOutOfMemoryException( String.format( - "forceAllocate: failed to allocate memory from query engine after %d retries, " - + "total query memory %s, available memory for load %s bytes, " - + "used memory size %d bytes, requested memory size %d bytes", + "forceAllocate: failed to allocate memory from query engine after %s retries, " + + "total query memory %s bytes, current available memory for load %s bytes, " + + "current load used memory size %s bytes, load requested memory size %s bytes", MEMORY_ALLOCATE_MAX_RETRIES, - IoTDBDescriptor.getInstance().getConfig().getAllocateMemoryForOperators(), - LocalExecutionPlanner.getInstance().getFreeMemoryForOperators(), + QUERY_ENGINE_MEMORY_MANAGER.getAllocateMemoryForOperators(), + QUERY_ENGINE_MEMORY_MANAGER.getFreeMemoryForLoadTsFile(), usedMemorySizeInBytes.get(), sizeInBytes)); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java index 038514ee02f..e67de934eac 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java @@ -2825,12 +2825,19 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext> @Override public Analysis visitLoadFile(LoadTsFileStatement loadTsFileStatement, MPPQueryContext context) { - LoadTsfileAnalyzer loadTsfileAnalyzer = - new LoadTsfileAnalyzer(loadTsFileStatement, context, partitionFetcher, schemaFetcher); - try { + try (final LoadTsfileAnalyzer loadTsfileAnalyzer = + new LoadTsfileAnalyzer(loadTsFileStatement, context, partitionFetcher, schemaFetcher)) { return loadTsfileAnalyzer.analyzeFileByFile(); - } finally { - loadTsfileAnalyzer.close(); + } catch (final Exception e) { + final String exceptionMessage = + String.format( + "Failed to execute load tsfile statement %s. Detail: %s", + loadTsFileStatement, e.getMessage()); + logger.warn(exceptionMessage, e); + final Analysis analysis = new Analysis(); + analysis.setFinishQueryAfterAnalyze(true); + analysis.setFailStatus(RpcUtils.getStatus(TSStatusCode.LOAD_FILE_ERROR, exceptionMessage)); + return analysis; } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsfileAnalyzer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsfileAnalyzer.java index 91ca66fc0d5..41690bd67c9 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsfileAnalyzer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsfileAnalyzer.java @@ -82,7 +82,6 @@ import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; -import java.nio.BufferUnderflowException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -94,7 +93,7 @@ import java.util.Map; import java.util.Set; import java.util.stream.Collectors; -public class LoadTsfileAnalyzer { +public class LoadTsfileAnalyzer implements AutoCloseable { private static final Logger LOGGER = LoggerFactory.getLogger(LoadTsfileAnalyzer.class); @@ -139,9 +138,10 @@ public class LoadTsfileAnalyzer { public Analysis analyzeFileByFile() { context.setQueryType(QueryType.WRITE); + Analysis analysis = new Analysis(); + // check if the system is read only if (CommonDescriptor.getInstance().getConfig().isReadOnly()) { - Analysis analysis = new Analysis(); analysis.setFinishQueryAfterAnalyze(true); analysis.setFailStatus( RpcUtils.getStatus(TSStatusCode.SYSTEM_READ_ONLY, LoadReadOnlyException.MESSAGE)); @@ -171,25 +171,17 @@ public class LoadTsfileAnalyzer { "Load - Analysis Stage: {}/{} tsfiles have been analyzed, progress: {}%", i + 1, tsfileNum, String.format("%.3f", (i + 1) * 100.00 / tsfileNum)); } - } catch (IllegalArgumentException e) { - LOGGER.warn( - "Parse file {} to resource error, this TsFile maybe empty.", tsFile.getPath(), e); - throw new SemanticException( - String.format("TsFile %s is empty or incomplete.", tsFile.getPath())); } catch (AuthException e) { return createFailAnalysisForAuthException(e); - } catch (BufferUnderflowException e) { - LOGGER.warn( - "The file {} is not a valid tsfile. Please check the input file.", tsFile.getPath(), e); - throw new SemanticException( - String.format( - "The file %s is not a valid tsfile. Please check the input file.", - tsFile.getPath())); } catch (Exception e) { - LOGGER.warn("Parse file {} to resource error.", tsFile.getPath(), e); - throw new SemanticException( + final String exceptionMessage = String.format( - "Parse file %s to resource error, because %s", tsFile.getPath(), e.getMessage())); + "The file %s is not a valid tsfile. Please check the input file. Detail: %s", + tsFile.getPath(), e.getMessage()); + LOGGER.warn(exceptionMessage, e); + analysis.setFinishQueryAfterAnalyze(true); + analysis.setFailStatus(RpcUtils.getStatus(TSStatusCode.LOAD_FILE_ERROR, exceptionMessage)); + return analysis; } } @@ -197,16 +189,30 @@ public class LoadTsfileAnalyzer { schemaAutoCreatorAndVerifier.flush(); } catch (AuthException e) { return createFailAnalysisForAuthException(e); + } catch (Exception e) { + final String exceptionMessage = + String.format( + "Auto create or verify schema error when executing statement %s. Detail: %s.", + loadTsFileStatement, e.getMessage()); + LOGGER.warn(exceptionMessage, e); + analysis.setFinishQueryAfterAnalyze(true); + analysis.setFailStatus( + RpcUtils.getStatus( + TSStatusCode.LOAD_FILE_ERROR, + String.format( + "Auto create or verify schema error when executing statement %s.", + loadTsFileStatement))); + return analysis; } LOGGER.info("Load - Analysis Stage: all tsfiles have been analyzed."); // data partition will be queried in the scheduler - final Analysis analysis = new Analysis(); analysis.setStatement(loadTsFileStatement); return analysis; } + @Override public void close() { schemaAutoCreatorAndVerifier.close(); } @@ -360,8 +366,7 @@ public class LoadTsfileAnalyzer { * This can only be invoked after all timeseries in the current tsfile have been processed. * Otherwise, the isAligned status may be wrong. */ - public void flushAndClearDeviceIsAlignedCacheIfNecessary() - throws SemanticException, AuthException { + public void flushAndClearDeviceIsAlignedCacheIfNecessary() throws SemanticException { // avoid OOM when loading a tsfile with too many timeseries // or loading too many tsfiles at the same time schemaCache.clearDeviceIsAlignedCacheIfNecessary(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java index 4789834f35e..4db63ed1d49 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java @@ -52,12 +52,12 @@ public class LocalExecutionPlanner { private static final Logger LOGGER = LoggerFactory.getLogger(LocalExecutionPlanner.class); private static final long ALLOCATE_MEMORY_FOR_OPERATORS; - private static final long MAX_REST_MEMORY_FOR_LOAD; + private static final long MIN_REST_MEMORY_FOR_QUERY_AFTER_LOAD; static { IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig(); ALLOCATE_MEMORY_FOR_OPERATORS = CONFIG.getAllocateMemoryForOperators(); - MAX_REST_MEMORY_FOR_LOAD = + MIN_REST_MEMORY_FOR_QUERY_AFTER_LOAD = (long) ((ALLOCATE_MEMORY_FOR_OPERATORS) * (1.0 - CONFIG.getMaxAllocateMemoryRatioForLoad())); } @@ -69,6 +69,10 @@ public class LocalExecutionPlanner { return freeMemoryForOperators; } + public long getFreeMemoryForLoadTsFile() { + return freeMemoryForOperators - MIN_REST_MEMORY_FOR_QUERY_AFTER_LOAD; + } + public static LocalExecutionPlanner getInstance() { return InstanceHolder.INSTANCE; } @@ -211,7 +215,7 @@ public class LocalExecutionPlanner { } public synchronized boolean forceAllocateFreeMemoryForOperators(long memoryInBytes) { - if (freeMemoryForOperators - memoryInBytes <= MAX_REST_MEMORY_FOR_LOAD) { + if (freeMemoryForOperators - memoryInBytes <= MIN_REST_MEMORY_FOR_QUERY_AFTER_LOAD) { return false; } else { freeMemoryForOperators -= memoryInBytes; @@ -220,9 +224,9 @@ public class LocalExecutionPlanner { } public synchronized long tryAllocateFreeMemoryForOperators(long memoryInBytes) { - if (freeMemoryForOperators - memoryInBytes <= MAX_REST_MEMORY_FOR_LOAD) { - long result = freeMemoryForOperators - MAX_REST_MEMORY_FOR_LOAD; - freeMemoryForOperators = MAX_REST_MEMORY_FOR_LOAD; + if (freeMemoryForOperators - memoryInBytes <= MIN_REST_MEMORY_FOR_QUERY_AFTER_LOAD) { + long result = freeMemoryForOperators - MIN_REST_MEMORY_FOR_QUERY_AFTER_LOAD; + freeMemoryForOperators = MIN_REST_MEMORY_FOR_QUERY_AFTER_LOAD; return result; } else { freeMemoryForOperators -= memoryInBytes;
