This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new d5feeb39316 [To dev/1.3] Load: Fix excessive GC caused by loading too 
many TsFiles at once (#16853) (#16867)
d5feeb39316 is described below

commit d5feeb3931632d6275133c02e40c596ca32058dd
Author: Zhenyu Luo <[email protected]>
AuthorDate: Wed Dec 10 16:02:31 2025 +0800

    [To dev/1.3] Load: Fix excessive GC caused by loading too many TsFiles at 
once (#16853) (#16867)
    
    * Fix excessive GC caused by loading too many TsFiles at once
    
    When loading multiple TsFiles, all file resources were loaded into memory
    simultaneously, causing excessive memory consumption and frequent GC pauses.
    
    This commit introduces batch execution for multi-file loading scenarios:
    
    1. Split LoadTsFileStatement/LoadTsFile into sub-statements, each handling
       one TsFile, to avoid loading all file resources at once
    2. Refactor duplicate code in ClientRPCServiceImpl by extracting helper
       methods for both tree model and table model
    3. Add progress logging to track the loading status of each file
    4. Support both synchronous and asynchronous loading modes
    
    Changes:
    - Added getSubStatement() method to LoadTsFileStatement and LoadTsFile
      for splitting multi-file statements
    - Extracted shouldSplitLoadTsFileStatement() and 
shouldSplitTableLoadTsFile()
      to determine if splitting is needed
    - Extracted executeBatchLoadTsFile() and executeBatchTableLoadTsFile()
      to handle batch execution with progress logging
    - Applied the optimization to 4 execution paths (tree/table model,
      sync/async loading)
    
    This fix significantly reduces memory pressure and improves system
    stability when loading large numbers of TsFiles.
    
    # Conflicts:
    #       
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
    #       
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/LoadTsFile.java
    
    * fix
    
    # Conflicts:
    #       
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/audit/AuditLogOperation.java
    #       
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
    #       
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
    #       
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/LoadTsFile.java
    #       
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java
    
    * update
    
    * update
---
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  55 +++++++
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |  24 ++++
 .../protocol/thrift/impl/ClientRPCServiceImpl.java | 159 ++++++++++++++++++---
 .../db/queryengine/plan/statement/Statement.java   |  23 +++
 .../plan/statement/crud/LoadTsFileStatement.java   |  44 ++++++
 5 files changed, 284 insertions(+), 21 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index a9d83590e9a..deec50ecb18 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -1173,6 +1173,21 @@ public class IoTDBConfig {
 
   private int loadTsFileSpiltPartitionMaxSize = 10;
 
+  /**
+   * The threshold for splitting statement when loading multiple TsFiles. When 
the number of TsFiles
+   * exceeds this threshold, the statement will be split into multiple 
sub-statements for batch
+   * execution to limit resource consumption during statement analysis. 
Default value is 10, which
+   * means splitting will occur when there are more than 10 files.
+   */
+  private int loadTsFileStatementSplitThreshold = 10;
+
+  /**
+   * The number of TsFiles that each sub-statement handles when splitting a 
statement. This
+   * parameter controls how many files are grouped together in each 
sub-statement during batch
+   * execution. Default value is 10, which means each sub-statement handles 10 
files.
+   */
+  private int loadTsFileSubStatementBatchSize = 10;
+
   private String[] loadActiveListeningDirs =
       new String[] {
         IoTDBConstant.EXT_FOLDER_NAME
@@ -4232,6 +4247,46 @@ public class IoTDBConfig {
     this.loadTsFileSpiltPartitionMaxSize = loadTsFileSpiltPartitionMaxSize;
   }
 
+  public int getLoadTsFileStatementSplitThreshold() {
+    return loadTsFileStatementSplitThreshold;
+  }
+
+  public void setLoadTsFileStatementSplitThreshold(final int 
loadTsFileStatementSplitThreshold) {
+    if (loadTsFileStatementSplitThreshold < 0) {
+      logger.warn(
+          "Invalid loadTsFileStatementSplitThreshold value: {}. Using default 
value: 10",
+          loadTsFileStatementSplitThreshold);
+      return;
+    }
+    if (this.loadTsFileStatementSplitThreshold != 
loadTsFileStatementSplitThreshold) {
+      logger.info(
+          "loadTsFileStatementSplitThreshold changed from {} to {}",
+          this.loadTsFileStatementSplitThreshold,
+          loadTsFileStatementSplitThreshold);
+    }
+    this.loadTsFileStatementSplitThreshold = loadTsFileStatementSplitThreshold;
+  }
+
+  public int getLoadTsFileSubStatementBatchSize() {
+    return loadTsFileSubStatementBatchSize;
+  }
+
+  public void setLoadTsFileSubStatementBatchSize(final int 
loadTsFileSubStatementBatchSize) {
+    if (loadTsFileSubStatementBatchSize <= 0) {
+      logger.warn(
+          "Invalid loadTsFileSubStatementBatchSize value: {}. Using default 
value: 10",
+          loadTsFileSubStatementBatchSize);
+      return;
+    }
+    if (this.loadTsFileSubStatementBatchSize != 
loadTsFileSubStatementBatchSize) {
+      logger.info(
+          "loadTsFileSubStatementBatchSize changed from {} to {}",
+          this.loadTsFileSubStatementBatchSize,
+          loadTsFileSubStatementBatchSize);
+    }
+    this.loadTsFileSubStatementBatchSize = loadTsFileSubStatementBatchSize;
+  }
+
   public String[] getPipeReceiverFileDirs() {
     return (Objects.isNull(this.pipeReceiverFileDirs) || 
this.pipeReceiverFileDirs.length == 0)
         ? new String[] {systemDir + File.separator + "pipe" + File.separator + 
"receiver"}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index f6511600745..c755381e88d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -2552,6 +2552,18 @@ public class IoTDBDescriptor {
             properties.getProperty(
                 "cache_last_values_memory_budget_in_byte",
                 String.valueOf(conf.getCacheLastValuesMemoryBudgetInByte()))));
+
+    conf.setLoadTsFileStatementSplitThreshold(
+        Integer.parseInt(
+            properties.getProperty(
+                "load_tsfile_statement_split_threshold",
+                
Integer.toString(conf.getLoadTsFileStatementSplitThreshold()))));
+
+    conf.setLoadTsFileSubStatementBatchSize(
+        Integer.parseInt(
+            properties.getProperty(
+                "load_tsfile_sub_statement_batch_size",
+                Integer.toString(conf.getLoadTsFileSubStatementBatchSize()))));
   }
 
   private void loadLoadTsFileHotModifiedProp(TrimProperties properties) throws 
IOException {
@@ -2599,6 +2611,18 @@ public class IoTDBDescriptor {
             properties.getProperty(
                 "load_tsfile_split_partition_max_size",
                 Integer.toString(conf.getLoadTsFileSpiltPartitionMaxSize()))));
+
+    conf.setLoadTsFileStatementSplitThreshold(
+        Integer.parseInt(
+            properties.getProperty(
+                "load_tsfile_statement_split_threshold",
+                
Integer.toString(conf.getLoadTsFileStatementSplitThreshold()))));
+
+    conf.setLoadTsFileSubStatementBatchSize(
+        Integer.parseInt(
+            properties.getProperty(
+                "load_tsfile_sub_statement_batch_size",
+                Integer.toString(conf.getLoadTsFileSubStatementBatchSize()))));
   }
 
   private void loadPipeHotModifiedProp(TrimProperties properties) throws 
IOException {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
index 93c15f80390..09c2b1d6ac8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
@@ -313,17 +313,32 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
       }
 
       queryId = SESSION_MANAGER.requestQueryId(clientSession, req.statementId);
-      // create and cache dataset
-      ExecutionResult result =
-          COORDINATOR.executeForTreeModel(
-              s,
-              queryId,
-              SESSION_MANAGER.getSessionInfo(clientSession),
-              statement,
-              partitionFetcher,
-              schemaFetcher,
-              req.getTimeout(),
-              true);
+
+      // Split statement if needed to limit resource consumption during 
statement analysis
+      ExecutionResult result;
+      if (s.shouldSplit()) {
+        result =
+            executeBatchStatement(
+                s,
+                queryId,
+                SESSION_MANAGER.getSessionInfo(clientSession),
+                statement,
+                partitionFetcher,
+                schemaFetcher,
+                config.getQueryTimeoutThreshold(),
+                true);
+      } else {
+        result =
+            COORDINATOR.executeForTreeModel(
+                s,
+                queryId,
+                SESSION_MANAGER.getSessionInfo(clientSession),
+                statement,
+                partitionFetcher,
+                schemaFetcher,
+                req.getTimeout(),
+                true);
+      }
 
       if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()
           && result.status.code != 
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
@@ -1669,16 +1684,32 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
           long queryId = SESSION_MANAGER.requestQueryId();
           type = s.getType() == null ? null : s.getType().name();
           // create and cache dataset
-          ExecutionResult result =
-              COORDINATOR.executeForTreeModel(
-                  s,
-                  queryId,
-                  SESSION_MANAGER.getSessionInfo(clientSession),
-                  statement,
-                  partitionFetcher,
-                  schemaFetcher,
-                  config.getQueryTimeoutThreshold(),
-                  false);
+
+          // Split statement if needed to limit resource consumption during 
statement analysis
+          ExecutionResult result;
+          if (s.shouldSplit()) {
+            result =
+                executeBatchStatement(
+                    s,
+                    queryId,
+                    SESSION_MANAGER.getSessionInfo(clientSession),
+                    statement,
+                    partitionFetcher,
+                    schemaFetcher,
+                    config.getQueryTimeoutThreshold(),
+                    false);
+          } else {
+            result =
+                COORDINATOR.executeForTreeModel(
+                    s,
+                    queryId,
+                    SESSION_MANAGER.getSessionInfo(clientSession),
+                    statement,
+                    partitionFetcher,
+                    schemaFetcher,
+                    config.getQueryTimeoutThreshold(),
+                    false);
+          }
           results.add(result.status);
         } catch (Exception e) {
           LOGGER.warn("Error occurred when executing executeBatchStatement: ", 
e);
@@ -2902,4 +2933,90 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
     PipeDataNodeAgent.receiver().legacy().handleClientExit();
     SubscriptionAgent.receiver().handleClientExit();
   }
+
+  /**
+   * Executes tree-model Statement sub-statements in batch.
+   *
+   * @param statement the Statement to be executed
+   * @param queryId the query ID
+   * @param sessionInfo the session information
+   * @param statementStr the SQL statement string
+   * @param partitionFetcher the partition fetcher
+   * @param schemaFetcher the schema fetcher
+   * @param timeoutMs the timeout in milliseconds
+   * @return the execution result
+   */
+  private ExecutionResult executeBatchStatement(
+      final Statement statement,
+      final long queryId,
+      final SessionInfo sessionInfo,
+      final String statementStr,
+      final IPartitionFetcher partitionFetcher,
+      final ISchemaFetcher schemaFetcher,
+      final long timeoutMs,
+      final boolean userQuery) {
+
+    ExecutionResult result = null;
+    final List<? extends Statement> subStatements = 
statement.getSubStatements();
+    final int totalSubStatements = subStatements.size();
+
+    LOGGER.info(
+        "Start batch executing {} sub-statement(s) in tree model, queryId: {}",
+        totalSubStatements,
+        queryId);
+
+    for (int i = 0; i < totalSubStatements; i++) {
+      final Statement subStatement = subStatements.get(i);
+
+      LOGGER.info(
+          "Executing sub-statement {}/{} in tree model, queryId: {}",
+          i + 1,
+          totalSubStatements,
+          queryId);
+
+      result =
+          COORDINATOR.executeForTreeModel(
+              subStatement,
+              queryId,
+              sessionInfo,
+              statementStr,
+              partitionFetcher,
+              schemaFetcher,
+              timeoutMs,
+              userQuery);
+
+      // Exit early if any sub-statement execution fails
+      if (result != null
+          && result.status.getCode() != 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        final int completed = i + 1;
+        final int remaining = totalSubStatements - completed;
+        final double percentage = (completed * 100.0) / totalSubStatements;
+        LOGGER.warn(
+            "Failed to execute sub-statement {}/{} in tree model, queryId: {}, 
completed: {}, remaining: {}, progress: {}%, error: {}",
+            i + 1,
+            totalSubStatements,
+            queryId,
+            completed,
+            remaining,
+            String.format("%.2f", percentage),
+            result.status.getMessage());
+        break;
+      }
+
+      LOGGER.info(
+          "Successfully executed sub-statement {}/{} in tree model, queryId: 
{}",
+          i + 1,
+          totalSubStatements,
+          queryId);
+    }
+
+    if (result != null && result.status.getCode() == 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      LOGGER.info(
+          "Completed batch executing all {} sub-statement(s) in tree model, 
queryId: {}",
+          totalSubStatements,
+          queryId);
+    }
+
+    return result;
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/Statement.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/Statement.java
index 57cd67229fa..fa29bc1b564 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/Statement.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/Statement.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.db.auth.AuthorityChecker;
 import org.apache.iotdb.db.queryengine.plan.parser.ASTVisitor;
 
+import java.util.Collections;
 import java.util.List;
 
 /**
@@ -70,4 +71,26 @@ public abstract class Statement extends StatementNode {
   public String getPipeLoggingString() {
     return toString();
   }
+
+  /**
+   * Checks whether this statement should be split into multiple 
sub-statements based on the given
+   * async requirement. Used to limit resource consumption during statement 
analysis, etc.
+   *
+   * @param requireAsync whether async execution is required
+   * @return true if the statement should be split, false otherwise. Default 
implementation returns
+   *     false.
+   */
+  public boolean shouldSplit() {
+    return false;
+  }
+
+  /**
+   * Splits the current statement into multiple sub-statements. Used to limit 
resource consumption
+   * during statement analysis, etc.
+   *
+   * @return the list of sub-statements. Default implementation returns empty 
list.
+   */
+  public List<? extends Statement> getSubStatements() {
+    return Collections.emptyList();
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java
index 0b837968cde..3b10d3733e1 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java
@@ -264,6 +264,50 @@ public class LoadTsFileStatement extends Statement {
     return tsFiles == null || tsFiles.isEmpty();
   }
 
+  @Override
+  public boolean shouldSplit() {
+    final int splitThreshold =
+        
IoTDBDescriptor.getInstance().getConfig().getLoadTsFileStatementSplitThreshold();
+    return tsFiles.size() > splitThreshold && !isAsyncLoad;
+  }
+
+  /**
+   * Splits the current LoadTsFileStatement into multiple sub-statements, each 
handling a batch of
+   * TsFiles. Used to limit resource consumption during statement analysis, 
etc.
+   *
+   * @return the list of sub-statements
+   */
+  @Override
+  public List<LoadTsFileStatement> getSubStatements() {
+    final int batchSize =
+        
IoTDBDescriptor.getInstance().getConfig().getLoadTsFileSubStatementBatchSize();
+    final int totalBatches = (tsFiles.size() + batchSize - 1) / batchSize; // 
Ceiling division
+    final List<LoadTsFileStatement> subStatements = new 
ArrayList<>(totalBatches);
+
+    for (int i = 0; i < tsFiles.size(); i += batchSize) {
+      final int endIndex = Math.min(i + batchSize, tsFiles.size());
+      final List<File> batchFiles = tsFiles.subList(i, endIndex);
+
+      final LoadTsFileStatement statement = new LoadTsFileStatement();
+      statement.databaseLevel = this.databaseLevel;
+      statement.verifySchema = this.verifySchema;
+      statement.deleteAfterLoad = this.deleteAfterLoad;
+      statement.convertOnTypeMismatch = this.convertOnTypeMismatch;
+      statement.tabletConversionThresholdBytes = 
this.tabletConversionThresholdBytes;
+      statement.autoCreateDatabase = this.autoCreateDatabase;
+      statement.isAsyncLoad = this.isAsyncLoad;
+      statement.isGeneratedByPipe = this.isGeneratedByPipe;
+
+      statement.tsFiles = new ArrayList<>(batchFiles);
+      statement.resources = new ArrayList<>(batchFiles.size());
+      statement.writePointCountList = new ArrayList<>(batchFiles.size());
+
+      subStatements.add(statement);
+    }
+
+    return subStatements;
+  }
+
   @Override
   public List<PartialPath> getPaths() {
     return Collections.emptyList();

Reply via email to