This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch ty/object_type in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 8e1d0e36b4917a8825d81574386d2aac0ce7ed2f Author: Zhenyu Luo <[email protected]> AuthorDate: Tue Dec 9 14:16:40 2025 +0800 Load: Fix excessive GC caused by loading too many TsFiles at once (#16853) * Fix excessive GC caused by loading too many TsFiles at once When loading multiple TsFiles, all file resources were loaded into memory simultaneously, causing excessive memory consumption and frequent GC pauses. This commit introduces batch execution for multi-file loading scenarios: 1. Split LoadTsFileStatement/LoadTsFile into sub-statements, each handling one TsFile, to avoid loading all file resources at once 2. Refactor duplicate code in ClientRPCServiceImpl by extracting helper methods for both tree model and table model 3. Add progress logging to track the loading status of each file 4. Support both synchronous and asynchronous loading modes Changes: - Added getSubStatement() method to LoadTsFileStatement and LoadTsFile for splitting multi-file statements - Extracted shouldSplitLoadTsFileStatement() and shouldSplitTableLoadTsFile() to determine if splitting is needed - Extracted executeBatchLoadTsFile() and executeBatchTableLoadTsFile() to handle batch execution with progress logging - Applied the optimization to 4 execution paths (tree/table model, sync/async loading) This fix significantly reduces memory pressure and improves system stability when loading large numbers of TsFiles. * fix * update (cherry picked from commit bc4f8e9bd8160e2a10745c7dd2fc5cdb8732de7a) --- .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 55 ++++ .../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 24 ++ .../protocol/thrift/impl/ClientRPCServiceImpl.java | 319 ++++++++++++++++++--- .../plan/relational/sql/ast/LoadTsFile.java | 61 +++- .../plan/relational/sql/ast/Statement.java | 25 ++ .../db/queryengine/plan/statement/Statement.java | 23 ++ .../plan/statement/crud/LoadTsFileStatement.java | 48 ++++ 7 files changed, 511 insertions(+), 44 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java index 375285dfa2f..7591e9d3cab 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java @@ -1121,6 +1121,21 @@ public class IoTDBConfig { private int loadTsFileSpiltPartitionMaxSize = 10; + /** + * The threshold for splitting statement when loading multiple TsFiles. When the number of TsFiles + * exceeds this threshold, the statement will be split into multiple sub-statements for batch + * execution to limit resource consumption during statement analysis. Default value is 10, which + * means splitting will occur when there are more than 10 files. + */ + private int loadTsFileStatementSplitThreshold = 10; + + /** + * The number of TsFiles that each sub-statement handles when splitting a statement. This + * parameter controls how many files are grouped together in each sub-statement during batch + * execution. Default value is 10, which means each sub-statement handles 10 files. + */ + private int loadTsFileSubStatementBatchSize = 10; + private String[] loadActiveListeningDirs = new String[] { IoTDBConstant.EXT_FOLDER_NAME @@ -4057,6 +4072,46 @@ public class IoTDBConfig { this.loadTsFileSpiltPartitionMaxSize = loadTsFileSpiltPartitionMaxSize; } + public int getLoadTsFileStatementSplitThreshold() { + return loadTsFileStatementSplitThreshold; + } + + public void setLoadTsFileStatementSplitThreshold(final int loadTsFileStatementSplitThreshold) { + if (loadTsFileStatementSplitThreshold < 0) { + logger.warn( + "Invalid loadTsFileStatementSplitThreshold value: {}. Using default value: 10", + loadTsFileStatementSplitThreshold); + return; + } + if (this.loadTsFileStatementSplitThreshold != loadTsFileStatementSplitThreshold) { + logger.info( + "loadTsFileStatementSplitThreshold changed from {} to {}", + this.loadTsFileStatementSplitThreshold, + loadTsFileStatementSplitThreshold); + } + this.loadTsFileStatementSplitThreshold = loadTsFileStatementSplitThreshold; + } + + public int getLoadTsFileSubStatementBatchSize() { + return loadTsFileSubStatementBatchSize; + } + + public void setLoadTsFileSubStatementBatchSize(final int loadTsFileSubStatementBatchSize) { + if (loadTsFileSubStatementBatchSize <= 0) { + logger.warn( + "Invalid loadTsFileSubStatementBatchSize value: {}. Using default value: 10", + loadTsFileSubStatementBatchSize); + return; + } + if (this.loadTsFileSubStatementBatchSize != loadTsFileSubStatementBatchSize) { + logger.info( + "loadTsFileSubStatementBatchSize changed from {} to {}", + this.loadTsFileSubStatementBatchSize, + loadTsFileSubStatementBatchSize); + } + this.loadTsFileSubStatementBatchSize = loadTsFileSubStatementBatchSize; + } + public String[] getPipeReceiverFileDirs() { return (Objects.isNull(this.pipeReceiverFileDirs) || this.pipeReceiverFileDirs.length == 0) ? new String[] {systemDir + File.separator + "pipe" + File.separator + "receiver"} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java index c194f304e7f..bd579b24c3f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java @@ -2406,6 +2406,18 @@ public class IoTDBDescriptor { properties.getProperty( "skip_failed_table_schema_check", String.valueOf(conf.isSkipFailedTableSchemaCheck())))); + + conf.setLoadTsFileStatementSplitThreshold( + Integer.parseInt( + properties.getProperty( + "load_tsfile_statement_split_threshold", + Integer.toString(conf.getLoadTsFileStatementSplitThreshold())))); + + conf.setLoadTsFileSubStatementBatchSize( + Integer.parseInt( + properties.getProperty( + "load_tsfile_sub_statement_batch_size", + Integer.toString(conf.getLoadTsFileSubStatementBatchSize())))); } private void loadLoadTsFileHotModifiedProp(TrimProperties properties) throws IOException { @@ -2454,6 +2466,18 @@ public class IoTDBDescriptor { "load_tsfile_split_partition_max_size", Integer.toString(conf.getLoadTsFileSpiltPartitionMaxSize())))); + conf.setLoadTsFileStatementSplitThreshold( + Integer.parseInt( + properties.getProperty( + "load_tsfile_statement_split_threshold", + Integer.toString(conf.getLoadTsFileStatementSplitThreshold())))); + + conf.setLoadTsFileSubStatementBatchSize( + Integer.parseInt( + properties.getProperty( + "load_tsfile_sub_statement_batch_size", + Integer.toString(conf.getLoadTsFileSubStatementBatchSize())))); + conf.setSkipFailedTableSchemaCheck( Boolean.parseBoolean( properties.getProperty( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java index ad16535bf57..07c2800799c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java @@ -365,16 +365,30 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { queryId = SESSION_MANAGER.requestQueryId(clientSession, req.statementId); - result = - COORDINATOR.executeForTreeModel( - s, - queryId, - SESSION_MANAGER.getSessionInfo(clientSession), - statement, - partitionFetcher, - schemaFetcher, - req.getTimeout(), - true); + // Split statement if needed to limit resource consumption during statement analysis + if (s.shouldSplit()) { + result = + executeBatchStatement( + s, + queryId, + SESSION_MANAGER.getSessionInfo(clientSession), + statement, + partitionFetcher, + schemaFetcher, + config.getQueryTimeoutThreshold(), + true); + } else { + result = + COORDINATOR.executeForTreeModel( + s, + queryId, + SESSION_MANAGER.getSessionInfo(clientSession), + statement, + partitionFetcher, + schemaFetcher, + req.getTimeout(), + true); + } } } else { org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Statement s = @@ -396,17 +410,32 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { queryId = SESSION_MANAGER.requestQueryId(clientSession, req.statementId); - result = - COORDINATOR.executeForTableModel( - s, - relationSqlParser, - clientSession, - queryId, - SESSION_MANAGER.getSessionInfo(clientSession), - statement, - metadata, - req.getTimeout(), - true); + // Split statement if needed to limit resource consumption during statement analysis + if (s.shouldSplit()) { + result = + executeBatchTableStatement( + s, + relationSqlParser, + clientSession, + queryId, + SESSION_MANAGER.getSessionInfo(clientSession), + statement, + metadata, + config.getQueryTimeoutThreshold(), + true); + } else { + result = + COORDINATOR.executeForTableModel( + s, + relationSqlParser, + clientSession, + queryId, + SESSION_MANAGER.getSessionInfo(clientSession), + statement, + metadata, + req.getTimeout(), + true); + } } if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode() @@ -1846,16 +1875,31 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { queryId = SESSION_MANAGER.requestQueryId(); type = s.getType() == null ? null : s.getType().name(); // create and cache dataset - result = - COORDINATOR.executeForTreeModel( - s, - queryId, - SESSION_MANAGER.getSessionInfo(clientSession), - statement, - partitionFetcher, - schemaFetcher, - config.getQueryTimeoutThreshold(), - false); + + // Split statement if needed to limit resource consumption during statement analysis + if (s.shouldSplit()) { + result = + executeBatchStatement( + s, + queryId, + SESSION_MANAGER.getSessionInfo(clientSession), + statement, + partitionFetcher, + schemaFetcher, + config.getQueryTimeoutThreshold(), + false); + } else { + result = + COORDINATOR.executeForTreeModel( + s, + queryId, + SESSION_MANAGER.getSessionInfo(clientSession), + statement, + partitionFetcher, + schemaFetcher, + config.getQueryTimeoutThreshold(), + false); + } } } else { @@ -1876,17 +1920,32 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { queryId = SESSION_MANAGER.requestQueryId(); - result = - COORDINATOR.executeForTableModel( - s, - relationSqlParser, - clientSession, - queryId, - SESSION_MANAGER.getSessionInfo(clientSession), - statement, - metadata, - config.getQueryTimeoutThreshold(), - false); + // Split statement if needed to limit resource consumption during statement analysis + if (s.shouldSplit()) { + result = + executeBatchTableStatement( + s, + relationSqlParser, + clientSession, + queryId, + SESSION_MANAGER.getSessionInfo(clientSession), + statement, + metadata, + config.getQueryTimeoutThreshold(), + false); + } else { + result = + COORDINATOR.executeForTableModel( + s, + relationSqlParser, + clientSession, + queryId, + SESSION_MANAGER.getSessionInfo(clientSession), + statement, + metadata, + config.getQueryTimeoutThreshold(), + false); + } } results.add(result.status); @@ -3191,4 +3250,180 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { PipeDataNodeAgent.receiver().legacy().handleClientExit(); SubscriptionAgent.receiver().handleClientExit(); } + + /** + * Executes tree-model Statement sub-statements in batch. + * + * @param statement the Statement to be executed + * @param queryId the query ID + * @param sessionInfo the session information + * @param statementStr the SQL statement string + * @param partitionFetcher the partition fetcher + * @param schemaFetcher the schema fetcher + * @param timeoutMs the timeout in milliseconds + * @return the execution result + */ + private ExecutionResult executeBatchStatement( + final Statement statement, + final long queryId, + final SessionInfo sessionInfo, + final String statementStr, + final IPartitionFetcher partitionFetcher, + final ISchemaFetcher schemaFetcher, + final long timeoutMs, + final boolean userQuery) { + + ExecutionResult result = null; + final List<? extends Statement> subStatements = statement.getSubStatements(); + final int totalSubStatements = subStatements.size(); + + LOGGER.info( + "Start batch executing {} sub-statement(s) in tree model, queryId: {}", + totalSubStatements, + queryId); + + for (int i = 0; i < totalSubStatements; i++) { + final Statement subStatement = subStatements.get(i); + + LOGGER.info( + "Executing sub-statement {}/{} in tree model, queryId: {}", + i + 1, + totalSubStatements, + queryId); + + result = + COORDINATOR.executeForTreeModel( + subStatement, + queryId, + sessionInfo, + statementStr, + partitionFetcher, + schemaFetcher, + timeoutMs, + userQuery); + + // Exit early if any sub-statement execution fails + if (result != null + && result.status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + final int completed = i + 1; + final int remaining = totalSubStatements - completed; + final double percentage = (completed * 100.0) / totalSubStatements; + LOGGER.warn( + "Failed to execute sub-statement {}/{} in tree model, queryId: {}, completed: {}, remaining: {}, progress: {}%, error: {}", + i + 1, + totalSubStatements, + queryId, + completed, + remaining, + String.format("%.2f", percentage), + result.status.getMessage()); + break; + } + + LOGGER.info( + "Successfully executed sub-statement {}/{} in tree model, queryId: {}", + i + 1, + totalSubStatements, + queryId); + } + + if (result != null && result.status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + LOGGER.info( + "Completed batch executing all {} sub-statement(s) in tree model, queryId: {}", + totalSubStatements, + queryId); + } + + return result; + } + + /** + * Executes table-model Statement sub-statements in batch. + * + * @param statement the Statement to be executed + * @param relationSqlParser the relational SQL parser + * @param clientSession the client session + * @param queryId the query ID + * @param sessionInfo the session information + * @param statementStr the SQL statement string + * @param metadata the metadata + * @param timeoutMs the timeout in milliseconds + * @return the execution result + */ + private ExecutionResult executeBatchTableStatement( + final org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Statement statement, + final SqlParser relationSqlParser, + final IClientSession clientSession, + final long queryId, + final SessionInfo sessionInfo, + final String statementStr, + final Metadata metadata, + final long timeoutMs, + final boolean userQuery) { + + ExecutionResult result = null; + List<? extends org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Statement> + subStatements = statement.getSubStatements(); + int totalSubStatements = subStatements.size(); + LOGGER.info( + "Start batch executing {} sub-statement(s) in table model, queryId: {}", + totalSubStatements, + queryId); + + for (int i = 0; i < totalSubStatements; i++) { + final org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Statement subStatement = + subStatements.get(i); + + LOGGER.info( + "Executing sub-statement {}/{} in table model, queryId: {}", + i + 1, + totalSubStatements, + queryId); + + result = + COORDINATOR.executeForTableModel( + subStatement, + relationSqlParser, + clientSession, + queryId, + sessionInfo, + statementStr, + metadata, + timeoutMs, + userQuery); + + // Exit early if any sub-statement execution fails + if (result != null + && result.status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + final int completed = i + 1; + final int remaining = totalSubStatements - completed; + final double percentage = (completed * 100.0) / totalSubStatements; + LOGGER.warn( + "Failed to execute sub-statement {}/{} in table model, queryId: {}, completed: {}, remaining: {}, progress: {}%, error: {}", + i + 1, + totalSubStatements, + queryId, + completed, + remaining, + String.format("%.2f", percentage), + result.status.getMessage()); + break; + } + + LOGGER.info( + "Successfully executed sub-statement {}/{} in table model, queryId: {}", + i + 1, + totalSubStatements, + queryId); + } + + if (result != null && result.status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + LOGGER.info( + "Completed batch executing all {} sub-statement(s) in table model, queryId: {}", + totalSubStatements, + queryId); + } + + return result; + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/LoadTsFile.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/LoadTsFile.java index 93fc8c7b583..166f06b85e3 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/LoadTsFile.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/LoadTsFile.java @@ -37,7 +37,7 @@ import static java.util.Objects.requireNonNull; public class LoadTsFile extends Statement { - private final String filePath; + private String filePath; private int databaseLevel; // For loading to tree-model only private String database; // For loading to table-model only @@ -50,7 +50,7 @@ public class LoadTsFile extends Statement { private boolean isGeneratedByPipe = false; - private final Map<String, String> loadAttributes; + private Map<String, String> loadAttributes; private List<File> tsFiles; private List<TsFileResource> resources; @@ -232,6 +232,63 @@ public class LoadTsFile extends Statement { return tsFiles == null || tsFiles.isEmpty(); } + @Override + public boolean shouldSplit() { + final int splitThreshold = + IoTDBDescriptor.getInstance().getConfig().getLoadTsFileStatementSplitThreshold(); + return tsFiles.size() > splitThreshold && !isAsyncLoad; + } + + /** + * Splits the current LoadTsFile statement into multiple sub-statements, each handling a batch of + * TsFiles. Used to limit resource consumption during statement analysis, etc. + * + * @return the list of sub-statements + */ + @Override + public List<LoadTsFile> getSubStatements() { + final int batchSize = + IoTDBDescriptor.getInstance().getConfig().getLoadTsFileSubStatementBatchSize(); + final int totalBatches = (tsFiles.size() + batchSize - 1) / batchSize; // Ceiling division + final List<LoadTsFile> subStatements = new ArrayList<>(totalBatches); + + for (int i = 0; i < tsFiles.size(); i += batchSize) { + final int endIndex = Math.min(i + batchSize, tsFiles.size()); + final List<File> batchFiles = tsFiles.subList(i, endIndex); + + // Use the first file's path for the sub-statement + final String filePath = batchFiles.get(0).getAbsolutePath(); + final Map<String, String> properties = this.loadAttributes; + + final LoadTsFile subStatement = + new LoadTsFile(getLocation().orElse(null), filePath, properties); + + // Copy all configuration properties + subStatement.databaseLevel = this.databaseLevel; + subStatement.database = this.database; + subStatement.verify = this.verify; + subStatement.deleteAfterLoad = this.deleteAfterLoad; + subStatement.convertOnTypeMismatch = this.convertOnTypeMismatch; + subStatement.tabletConversionThresholdBytes = this.tabletConversionThresholdBytes; + subStatement.autoCreateDatabase = this.autoCreateDatabase; + subStatement.isAsyncLoad = this.isAsyncLoad; + subStatement.isGeneratedByPipe = this.isGeneratedByPipe; + + // Set all files in the batch + subStatement.tsFiles = new ArrayList<>(batchFiles); + subStatement.resources = new ArrayList<>(batchFiles.size()); + subStatement.writePointCountList = new ArrayList<>(batchFiles.size()); + subStatement.isTableModel = new ArrayList<>(batchFiles.size()); + for (int j = 0; j < batchFiles.size(); j++) { + subStatement.isTableModel.add(true); + } + + subStatements.add(subStatement); + } + + return subStatements; + } + @Override public <R, C> R accept(AstVisitor<R, C> visitor, C context) { return visitor.visitLoadTsFile(this, context); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/Statement.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/Statement.java index 0352c85f1eb..7ba19b972a2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/Statement.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/Statement.java @@ -21,6 +21,9 @@ package org.apache.iotdb.db.queryengine.plan.relational.sql.ast; import javax.annotation.Nullable; +import java.util.Collections; +import java.util.List; + public abstract class Statement extends Node { protected Statement(final @Nullable NodeLocation location) { @@ -31,4 +34,26 @@ public abstract class Statement extends Node { public <R, C> R accept(final AstVisitor<R, C> visitor, final C context) { return visitor.visitStatement(this, context); } + + /** + * Checks whether this statement should be split into multiple sub-statements based on the given + * async requirement. Used to limit resource consumption during statement analysis, etc. + * + * @param requireAsync whether async execution is required + * @return true if the statement should be split, false otherwise. Default implementation returns + * false. + */ + public boolean shouldSplit() { + return false; + } + + /** + * Splits the current statement into multiple sub-statements. Used to limit resource consumption + * during statement analysis, etc. + * + * @return the list of sub-statements. Default implementation returns empty list. + */ + public List<? extends Statement> getSubStatements() { + return Collections.emptyList(); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/Statement.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/Statement.java index 5b31f08ca67..0b2ecff6b5b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/Statement.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/Statement.java @@ -23,6 +23,7 @@ import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.db.queryengine.common.MPPQueryContext; import org.apache.iotdb.db.queryengine.plan.parser.ASTVisitor; +import java.util.Collections; import java.util.List; /** @@ -68,4 +69,26 @@ public abstract class Statement extends StatementNode { public String getPipeLoggingString() { return toString(); } + + /** + * Checks whether this statement should be split into multiple sub-statements based on the given + * async requirement. Used to limit resource consumption during statement analysis, etc. + * + * @param requireAsync whether async execution is required + * @return true if the statement should be split, false otherwise. Default implementation returns + * false. + */ + public boolean shouldSplit() { + return false; + } + + /** + * Splits the current statement into multiple sub-statements. Used to limit resource consumption + * during statement analysis, etc. + * + * @return the list of sub-statements. Default implementation returns empty list. + */ + public List<? extends Statement> getSubStatements() { + return Collections.emptyList(); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java index d1dff1bb9cf..a51dcaf09d2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java @@ -306,6 +306,54 @@ public class LoadTsFileStatement extends Statement { return tsFiles == null || tsFiles.isEmpty(); } + @Override + public boolean shouldSplit() { + final int splitThreshold = + IoTDBDescriptor.getInstance().getConfig().getLoadTsFileStatementSplitThreshold(); + return tsFiles.size() > splitThreshold && !isAsyncLoad; + } + + /** + * Splits the current LoadTsFileStatement into multiple sub-statements, each handling a batch of + * TsFiles. Used to limit resource consumption during statement analysis, etc. + * + * @return the list of sub-statements + */ + @Override + public List<LoadTsFileStatement> getSubStatements() { + final int batchSize = + IoTDBDescriptor.getInstance().getConfig().getLoadTsFileSubStatementBatchSize(); + final int totalBatches = (tsFiles.size() + batchSize - 1) / batchSize; // Ceiling division + final List<LoadTsFileStatement> subStatements = new ArrayList<>(totalBatches); + + for (int i = 0; i < tsFiles.size(); i += batchSize) { + final int endIndex = Math.min(i + batchSize, tsFiles.size()); + final List<File> batchFiles = tsFiles.subList(i, endIndex); + + final LoadTsFileStatement statement = new LoadTsFileStatement(); + statement.databaseLevel = this.databaseLevel; + statement.verifySchema = this.verifySchema; + statement.deleteAfterLoad = this.deleteAfterLoad; + statement.convertOnTypeMismatch = this.convertOnTypeMismatch; + statement.tabletConversionThresholdBytes = this.tabletConversionThresholdBytes; + statement.autoCreateDatabase = this.autoCreateDatabase; + statement.isAsyncLoad = this.isAsyncLoad; + statement.isGeneratedByPipe = this.isGeneratedByPipe; + + statement.tsFiles = new ArrayList<>(batchFiles); + statement.resources = new ArrayList<>(batchFiles.size()); + statement.writePointCountList = new ArrayList<>(batchFiles.size()); + statement.isTableModel = new ArrayList<>(batchFiles.size()); + for (int j = 0; j < batchFiles.size(); j++) { + statement.isTableModel.add(false); + } + + subStatements.add(statement); + } + + return subStatements; + } + @Override public List<PartialPath> getPaths() { return Collections.emptyList();
