This is an automated email from the ASF dual-hosted git repository.

jt2594838 pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new 7fdccc916a4 Limit concurrent load tsfile type conversions (#18014) 
(#18030)
7fdccc916a4 is described below

commit 7fdccc916a463e1e1c945721a1d3e69f96d9bf88
Author: Caideyipi <[email protected]>
AuthorDate: Fri Jun 26 14:21:51 2026 +0800

    Limit concurrent load tsfile type conversions (#18014) (#18030)
    
    * Limit concurrent load tsfile type conversions
    
    * Keep load conversion retryable on memory pressure
    
    * Use i18n message for active load retry log
    
    * Add active load retry coverage for temporary unavailable
    
    * Fix load conversion memory test compile
    
    (cherry picked from commit 4d05a851ef51cddfaa1270c7e1419bfe6f56ce70)
---
 .../it/env/cluster/config/MppDataNodeConfig.java   |  25 +++
 .../it/env/remote/config/RemoteDataNodeConfig.java |  17 ++
 .../apache/iotdb/itbase/env/DataNodeConfig.java    |   8 +
 .../iotdb/db/it/IoTDBLoadTsFileActiveRetryIT.java  | 226 +++++++++++++++++++++
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  10 +
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |   5 +
 .../active/ActiveLoadFailedMessageHandler.java     |  16 ++
 .../load/active/ActiveLoadTsFileLoader.java        |   2 +-
 ...ertedInsertTabletStatementExceptionVisitor.java |   9 +-
 .../converter/LoadTsFileDataTypeConverter.java     |  95 +++++++++
 .../active/ActiveLoadFailedMessageHandlerTest.java |  51 +++++
 .../load/active/ActiveLoadTsFileLoaderTest.java    | 146 +++++++++++++
 ...dInsertTabletStatementExceptionVisitorTest.java |  54 +++++
 13 files changed, 659 insertions(+), 5 deletions(-)

diff --git 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppDataNodeConfig.java
 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppDataNodeConfig.java
index 9ac380503cd..c011cd994a2 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppDataNodeConfig.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppDataNodeConfig.java
@@ -89,6 +89,31 @@ public class MppDataNodeConfig extends MppBaseConfig 
implements DataNodeConfig {
     return this;
   }
 
+  @Override
+  public DataNodeConfig setMaxAllocateMemoryRatioForLoad(double 
maxAllocateMemoryRatioForLoad) {
+    properties.setProperty(
+        "max_allocate_memory_ratio_for_load", 
String.valueOf(maxAllocateMemoryRatioForLoad));
+    return this;
+  }
+
+  @Override
+  public DataNodeConfig setLoadTsFileTabletConversionBatchMemorySizeInBytes(
+      long loadTsFileTabletConversionBatchMemorySizeInBytes) {
+    properties.setProperty(
+        "load_tsfile_tablet_conversion_batch_memory_size_in_bytes",
+        String.valueOf(loadTsFileTabletConversionBatchMemorySizeInBytes));
+    return this;
+  }
+
+  @Override
+  public DataNodeConfig setLoadActiveListeningCheckIntervalSeconds(
+      long loadActiveListeningCheckIntervalSeconds) {
+    properties.setProperty(
+        "load_active_listening_check_interval_seconds",
+        String.valueOf(loadActiveListeningCheckIntervalSeconds));
+    return this;
+  }
+
   @Override
   public DataNodeConfig setLoadLastCacheStrategy(String strategyName) {
     setProperty("last_cache_operation_on_load", strategyName);
diff --git 
a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteDataNodeConfig.java
 
b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteDataNodeConfig.java
index b2550ae689f..46947add596 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteDataNodeConfig.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteDataNodeConfig.java
@@ -54,6 +54,23 @@ public class RemoteDataNodeConfig implements DataNodeConfig {
     return this;
   }
 
+  @Override
+  public DataNodeConfig setMaxAllocateMemoryRatioForLoad(double 
maxAllocateMemoryRatioForLoad) {
+    return this;
+  }
+
+  @Override
+  public DataNodeConfig setLoadTsFileTabletConversionBatchMemorySizeInBytes(
+      long loadTsFileTabletConversionBatchMemorySizeInBytes) {
+    return this;
+  }
+
+  @Override
+  public DataNodeConfig setLoadActiveListeningCheckIntervalSeconds(
+      long loadActiveListeningCheckIntervalSeconds) {
+    return this;
+  }
+
   @Override
   public DataNodeConfig setLoadLastCacheStrategy(String strategyName) {
     return this;
diff --git 
a/integration-test/src/main/java/org/apache/iotdb/itbase/env/DataNodeConfig.java
 
b/integration-test/src/main/java/org/apache/iotdb/itbase/env/DataNodeConfig.java
index c27eb369c0d..9e4c62022bd 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/itbase/env/DataNodeConfig.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/itbase/env/DataNodeConfig.java
@@ -36,6 +36,14 @@ public interface DataNodeConfig {
   DataNodeConfig setLoadTsFileAnalyzeSchemaMemorySizeInBytes(
       long loadTsFileAnalyzeSchemaMemorySizeInBytes);
 
+  DataNodeConfig setMaxAllocateMemoryRatioForLoad(double 
maxAllocateMemoryRatioForLoad);
+
+  DataNodeConfig setLoadTsFileTabletConversionBatchMemorySizeInBytes(
+      long loadTsFileTabletConversionBatchMemorySizeInBytes);
+
+  DataNodeConfig setLoadActiveListeningCheckIntervalSeconds(
+      long loadActiveListeningCheckIntervalSeconds);
+
   DataNodeConfig setLoadLastCacheStrategy(String strategyName);
 
   DataNodeConfig setCacheLastValuesForLoad(boolean cacheLastValuesForLoad);
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadTsFileActiveRetryIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadTsFileActiveRetryIT.java
new file mode 100644
index 00000000000..3827615fe24
--- /dev/null
+++ 
b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadTsFileActiveRetryIT.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.it;
+
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.it.utils.TsFileGenerator;
+import org.apache.iotdb.itbase.category.ClusterIT;
+import org.apache.iotdb.itbase.category.LocalStandaloneIT;
+
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.tsfile.write.schema.MeasurementSchema;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.io.File;
+import java.nio.file.Files;
+import java.sql.Connection;
+import java.sql.Statement;
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({LocalStandaloneIT.class, ClusterIT.class})
+public class IoTDBLoadTsFileActiveRetryIT {
+
+  private static final String DATABASE = "root.sg.test_0";
+  private static final String DEVICE = DATABASE + ".d_0";
+  private static final String MEASUREMENT = "sensor_00";
+  private static final long 
UNALLOCATABLE_TABLET_CONVERSION_BATCH_MEMORY_SIZE_IN_BYTES =
+      Long.MAX_VALUE / 4;
+  private static final MeasurementSchema TSFILE_SCHEMA =
+      new MeasurementSchema(MEASUREMENT, TSDataType.INT32, TSEncoding.RLE);
+
+  private File tmpDir;
+
+  @Before
+  public void setUp() throws Exception {
+    tmpDir = new File(Files.createTempDirectory("load-active-retry").toUri());
+    
EnvFactory.getEnv().getConfig().getCommonConfig().setPipeMemoryManagementEnabled(false);
+    EnvFactory.getEnv()
+        .getConfig()
+        .getDataNodeConfig()
+        .setMaxAllocateMemoryRatioForLoad(1.0)
+        .setLoadTsFileAnalyzeSchemaMemorySizeInBytes(1)
+        .setLoadTsFileTabletConversionBatchMemorySizeInBytes(
+            UNALLOCATABLE_TABLET_CONVERSION_BATCH_MEMORY_SIZE_IN_BYTES)
+        .setLoadActiveListeningCheckIntervalSeconds(1);
+
+    EnvFactory.getEnv().initClusterEnvironment();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    try (final Connection connection = EnvFactory.getEnv().getConnection();
+        final Statement statement = connection.createStatement()) {
+      statement.execute("delete database " + DATABASE);
+    } catch (final Exception ignored) {
+      // ignore cleanup failure
+    } finally {
+      EnvFactory.getEnv().cleanClusterEnvironment();
+      deleteRecursively(tmpDir);
+    }
+  }
+
+  @Test
+  public void testActiveLoadTemporaryUnavailableShouldKeepFileForRetry() 
throws Exception {
+    final DataNodeWrapper dataNodeWrapper = 
EnvFactory.getEnv().getDataNodeWrapper(0);
+    final File retryTsFile = new File(tmpDir, "1-0-0-0.tsfile");
+    final File permanentFailureTsFile = new File(tmpDir, "2-0-0-0.tsfile");
+    generateTsFile(retryTsFile);
+    generateTsFile(permanentFailureTsFile);
+
+    try (final Connection connection =
+            
EnvFactory.getEnv().getConnectionWithSpecifiedDataNode(dataNodeWrapper);
+        final Statement statement = connection.createStatement()) {
+      statement.execute("create database " + DATABASE);
+      statement.execute(
+          String.format(
+              "create timeseries %s.%s %s", DEVICE, MEASUREMENT, 
TSDataType.INT64.name()));
+
+      statement.execute(
+          String.format(
+              "load \"%s\" with ('database-level'='3', 'async'='true', 
'on-success'='none', "
+                  + "'convert-on-type-mismatch'='true')",
+              retryTsFile.getAbsolutePath()));
+      statement.execute(
+          String.format(
+              "load \"%s\" with ('database-level'='3', 'async'='true', 
'on-success'='none', "
+                  + "'convert-on-type-mismatch'='false')",
+              permanentFailureTsFile.getAbsolutePath()));
+
+      final File activeDir = getActiveLoadDir(dataNodeWrapper);
+      final File failDir = getActiveLoadFailDir(dataNodeWrapper);
+      final File activeTsFile = waitForFile(activeDir, retryTsFile.getName(), 
30_000L);
+
+      Assert.assertNotNull(
+          "Async load should copy tsfile into active load directory", 
activeTsFile);
+
+      Assert.assertNotNull(
+          "Permanent active load failure should be moved to fail dir",
+          waitForFile(failDir, permanentFailureTsFile.getName(), 
TimeUnit.SECONDS.toMillis(60)));
+
+      assertFileKeptForRetry(
+          activeDir, failDir, retryTsFile.getName(), 
TimeUnit.SECONDS.toMillis(12));
+    }
+  }
+
+  private void generateTsFile(final File tsFile) throws Exception {
+    try (final TsFileGenerator generator = new TsFileGenerator(tsFile)) {
+      generator.registerTimeseries(DEVICE, 
Collections.singletonList(TSFILE_SCHEMA));
+      generator.generateData(DEVICE, 10, 1, false);
+    }
+  }
+
+  private File getActiveLoadDir(final DataNodeWrapper dataNodeWrapper) {
+    return new File(
+        dataNodeWrapper.getNodePath()
+            + File.separator
+            + "ext"
+            + File.separator
+            + "load"
+            + File.separator
+            + "pending");
+  }
+
+  private File getActiveLoadFailDir(final DataNodeWrapper dataNodeWrapper) {
+    return new File(
+        dataNodeWrapper.getNodePath()
+            + File.separator
+            + "ext"
+            + File.separator
+            + "load"
+            + File.separator
+            + "failed");
+  }
+
+  private File waitForFile(final File root, final String fileName, final long 
timeoutMs)
+      throws InterruptedException {
+    final long deadline = System.currentTimeMillis() + timeoutMs;
+    while (System.currentTimeMillis() < deadline) {
+      final File file = findFile(root, fileName);
+      if (file != null) {
+        return file;
+      }
+      Thread.sleep(500L);
+    }
+    return null;
+  }
+
+  private boolean containsFile(final File root, final String fileName) {
+    return findFile(root, fileName) != null;
+  }
+
+  private void assertFileKeptForRetry(
+      final File activeDir, final File failDir, final String fileName, final 
long observationMs)
+      throws InterruptedException {
+    final long deadline = System.currentTimeMillis() + observationMs;
+    while (System.currentTimeMillis() < deadline) {
+      Assert.assertTrue(
+          "Temporary unavailable active load should keep tsfile for retry",
+          containsFile(activeDir, fileName));
+      Assert.assertFalse(
+          "Temporary unavailable active load must not move tsfile to fail dir",
+          containsFile(failDir, fileName));
+      Thread.sleep(500L);
+    }
+  }
+
+  private File findFile(final File root, final String fileName) {
+    if (root == null || !root.exists()) {
+      return null;
+    }
+    if (root.isFile()) {
+      return root.getName().equals(fileName) ? root : null;
+    }
+
+    final File[] children = root.listFiles();
+    if (children == null) {
+      return null;
+    }
+    for (final File child : children) {
+      final File file = findFile(child, fileName);
+      if (file != null) {
+        return file;
+      }
+    }
+    return null;
+  }
+
+  private void deleteRecursively(final File file) {
+    if (file == null || !file.exists()) {
+      return;
+    }
+    final File[] children = file.listFiles();
+    if (children != null) {
+      for (final File child : children) {
+        deleteRecursively(child);
+      }
+    }
+    Assert.assertTrue(file.delete());
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index c92c9bba28a..a3130f49f34 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -1158,6 +1158,8 @@ public class IoTDBConfig {
 
   private long loadTsFileTabletConversionBatchMemorySizeInBytes = 4096 * 1024;
 
+  private int loadTsFileTabletConversionThreadCount = 5;
+
   private long loadChunkMetadataMemorySizeInBytes = 33554432; // 32MB
 
   private long loadMemoryAllocateRetryIntervalMs = 1000L;
@@ -4141,6 +4143,14 @@ public class IoTDBConfig {
         loadTsFileTabletConversionBatchMemorySizeInBytes;
   }
 
+  public int getLoadTsFileTabletConversionThreadCount() {
+    return loadTsFileTabletConversionThreadCount;
+  }
+
+  public void setLoadTsFileTabletConversionThreadCount(int 
loadTsFileTabletConversionThreadCount) {
+    this.loadTsFileTabletConversionThreadCount = 
loadTsFileTabletConversionThreadCount;
+  }
+
   public long getLoadChunkMetadataMemorySizeInBytes() {
     return loadChunkMetadataMemorySizeInBytes;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index f9e87375b78..a838203a943 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -2426,6 +2426,11 @@ public class IoTDBDescriptor {
             properties.getProperty(
                 "load_tsfile_tablet_conversion_batch_memory_size_in_bytes",
                 
String.valueOf(conf.getLoadTsFileTabletConversionBatchMemorySizeInBytes()))));
+    conf.setLoadTsFileTabletConversionThreadCount(
+        Integer.parseInt(
+            properties.getProperty(
+                "load_tsfile_tablet_conversion_thread_count",
+                
String.valueOf(conf.getLoadTsFileTabletConversionThreadCount()))));
     conf.setLoadChunkMetadataMemorySizeInBytes(
         Long.parseLong(
             Optional.ofNullable(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadFailedMessageHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadFailedMessageHandler.java
index c2d9af94725..89039889d08 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadFailedMessageHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadFailedMessageHandler.java
@@ -19,7 +19,9 @@
 
 package org.apache.iotdb.db.storageengine.load.active;
 
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
+import org.apache.iotdb.rpc.TSStatusCode;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -96,6 +98,20 @@ public class ActiveLoadFailedMessageHandler {
     void handle(final ActiveLoadPendingQueue.ActiveLoadEntry activeLoadEntry);
   }
 
+  public static boolean isStatusShouldRetry(
+      final ActiveLoadPendingQueue.ActiveLoadEntry entry, final TSStatus 
status) {
+    if (status != null
+        && status.getCode() == 
TSStatusCode.LOAD_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode()) {
+      LOGGER.info(
+          "Rejecting auto load tsfile {} (isGeneratedByPipe = {}) due to 
temporary unavailability, will retry later. Status: {}",
+          entry.getFile(),
+          entry.isGeneratedByPipe(),
+          status);
+      return true;
+    }
+    return isExceptionMessageShouldRetry(entry, status == null ? null : 
status.getMessage());
+  }
+
   public static boolean isExceptionMessageShouldRetry(
       final ActiveLoadPendingQueue.ActiveLoadEntry entry, final String 
message) {
     if (CommonDescriptor.getInstance().getConfig().isReadOnly()) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java
index ffb9e23bbfa..a9133efe4f3 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java
@@ -280,7 +280,7 @@ public class ActiveLoadTsFileLoader {
 
   private void handleLoadFailure(
       final ActiveLoadPendingQueue.ActiveLoadEntry entry, final TSStatus 
status) {
-    if (!ActiveLoadFailedMessageHandler.isExceptionMessageShouldRetry(entry, 
status.getMessage())) {
+    if (!ActiveLoadFailedMessageHandler.isStatusShouldRetry(entry, status)) {
       LOGGER.warn(
           "Failed to auto load tsfile {} (isGeneratedByPipe = {}), status: {}. 
File will be moved to fail directory.",
           entry.getFile(),
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadConvertedInsertTabletStatementExceptionVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadConvertedInsertTabletStatementExceptionVisitor.java
index f292ee55930..59ad6ebcd6c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadConvertedInsertTabletStatementExceptionVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadConvertedInsertTabletStatementExceptionVisitor.java
@@ -20,7 +20,6 @@
 package org.apache.iotdb.db.storageengine.load.converter;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.db.exception.load.LoadRuntimeOutOfMemoryException;
 import org.apache.iotdb.db.exception.sql.SemanticException;
 import org.apache.iotdb.db.queryengine.plan.statement.StatementNode;
 import org.apache.iotdb.db.queryengine.plan.statement.StatementVisitor;
@@ -32,6 +31,9 @@ public class 
LoadConvertedInsertTabletStatementExceptionVisitor
 
   @Override
   public TSStatus visitNode(final StatementNode node, final Exception context) 
{
+    if (LoadTsFileDataTypeConverter.isMemoryPressureException(context)) {
+      return LoadTsFileDataTypeConverter.getMemoryPressureStatus(context);
+    }
     return new TSStatus(TSStatusCode.LOAD_FILE_ERROR.getStatusCode())
         .setMessage(context.getMessage());
   }
@@ -39,9 +41,8 @@ public class 
LoadConvertedInsertTabletStatementExceptionVisitor
   @Override
   public TSStatus visitLoadFile(
       final LoadTsFileStatement loadTsFileStatement, final Exception context) {
-    if (context instanceof LoadRuntimeOutOfMemoryException) {
-      return new 
TSStatus(TSStatusCode.LOAD_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode())
-          .setMessage(context.getMessage());
+    if (LoadTsFileDataTypeConverter.isMemoryPressureException(context)) {
+      return LoadTsFileDataTypeConverter.getMemoryPressureStatus(context);
     } else if (context instanceof SemanticException) {
       return new 
TSStatus(TSStatusCode.LOAD_USER_CONFLICT_EXCEPTION.getStatusCode())
           .setMessage(context.getMessage());
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTsFileDataTypeConverter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTsFileDataTypeConverter.java
index e46e25c5f7f..be154ac8181 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTsFileDataTypeConverter.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTsFileDataTypeConverter.java
@@ -21,8 +21,12 @@ package org.apache.iotdb.db.storageengine.load.converter;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.conf.IoTDBConstant;
+import 
org.apache.iotdb.commons.exception.pipe.PipeRuntimeOutOfMemoryCriticalException;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.db.auth.AuthorityChecker;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.load.LoadRuntimeOutOfMemoryException;
+import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
 import org.apache.iotdb.db.protocol.session.IClientSession;
 import org.apache.iotdb.db.protocol.session.InternalClientSession;
 import org.apache.iotdb.db.protocol.session.SessionManager;
@@ -42,6 +46,7 @@ import org.slf4j.LoggerFactory;
 
 import java.time.ZoneId;
 import java.util.Optional;
+import java.util.concurrent.Semaphore;
 
 public class LoadTsFileDataTypeConverter {
 
@@ -49,6 +54,85 @@ public class LoadTsFileDataTypeConverter {
 
   private static final SessionManager SESSION_MANAGER = 
SessionManager.getInstance();
 
+  private static Semaphore getTabletConversionSemaphore() {
+    return TabletConversionSemaphoreHolder.INSTANCE;
+  }
+
+  private static int getTabletConversionPermitCount() {
+    final int configuredThreadCount =
+        Math.max(
+            1,
+            
IoTDBDescriptor.getInstance().getConfig().getLoadTsFileTabletConversionThreadCount());
+    if (!PipeConfig.getInstance().getPipeMemoryManagementEnabled()) {
+      return configuredThreadCount;
+    }
+    final long memorySafePermitCount =
+        getAllowedPipeTabletMemorySizeInBytes() / 
estimatePipeTabletMemorySizePerConversion();
+    return (int) Math.max(1, Math.min((long) configuredThreadCount, 
memorySafePermitCount));
+  }
+
+  private static long estimatePipeTabletMemorySizePerConversion() {
+    final PipeConfig pipeConfig = PipeConfig.getInstance();
+    final long tabletSize =
+        Math.max(
+            1L, 
IoTDBDescriptor.getInstance().getConfig().getPipeDataStructureTabletSizeInBytes());
+    final long maxReaderChunkSize = Math.max(0L, 
pipeConfig.getPipeMaxReaderChunkSize());
+    final long tableSize =
+        Math.max(
+            1L,
+            Math.min(tabletSize, 
IoTDBDescriptor.getInstance().getConfig().getTargetChunkSize()));
+
+    final long treeParserMemorySize = 2L * tabletSize + maxReaderChunkSize;
+    final long tableParserMemorySize = 2L * tabletSize + 2L * tableSize + 
maxReaderChunkSize;
+    return Math.max(1L, Math.max(treeParserMemorySize, tableParserMemorySize));
+  }
+
+  private static long getAllowedPipeTabletMemorySizeInBytes() {
+    final PipeConfig pipeConfig = PipeConfig.getInstance();
+    return (long)
+        
((pipeConfig.getPipeDataStructureTabletMemoryBlockAllocationRejectThreshold()
+                + 
pipeConfig.getPipeDataStructureTsFileMemoryBlockAllocationRejectThreshold() / 2)
+            * 
PipeDataNodeResourceManager.memory().getTotalNonFloatingMemorySizeInBytes());
+  }
+
+  private static Optional<TSStatus> getInterruptedConversionStatus(final 
InterruptedException e) {
+    Thread.currentThread().interrupt();
+    return Optional.of(
+        new 
TSStatus(TSStatusCode.LOAD_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode())
+            .setMessage("Interrupted while waiting for tablet conversion slot: 
" + e.getMessage()));
+  }
+
+  public static boolean isMemoryPressureException(final Throwable throwable) {
+    Throwable current = throwable;
+    while (current != null) {
+      if (current instanceof LoadRuntimeOutOfMemoryException
+          || current instanceof PipeRuntimeOutOfMemoryCriticalException) {
+        return true;
+      }
+      current = current.getCause();
+    }
+    return false;
+  }
+
+  public static TSStatus getMemoryPressureStatus(final Throwable throwable) {
+    Throwable current = throwable;
+    while (current != null) {
+      if (current instanceof LoadRuntimeOutOfMemoryException
+          || current instanceof PipeRuntimeOutOfMemoryCriticalException) {
+        return new 
TSStatus(TSStatusCode.LOAD_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode())
+            .setMessage(current.getMessage());
+      }
+      current = current.getCause();
+    }
+
+    return new 
TSStatus(TSStatusCode.LOAD_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode())
+        .setMessage(throwable == null ? null : throwable.getMessage());
+  }
+
+  private static class TabletConversionSemaphoreHolder {
+    private static final Semaphore INSTANCE = new 
Semaphore(getTabletConversionPermitCount());
+  }
+
   public static final LoadConvertedInsertTabletStatementTSStatusVisitor 
STATEMENT_STATUS_VISITOR =
       new LoadConvertedInsertTabletStatementTSStatusVisitor();
   public static final LoadConvertedInsertTabletStatementExceptionVisitor
@@ -69,14 +153,25 @@ public class LoadTsFileDataTypeConverter {
 
   public Optional<TSStatus> convertForTreeModel(final LoadTsFileStatement 
loadTsFileTreeStatement) {
     DataNodeSchemaLockManager.getInstance().releaseReadLock(context);
+    boolean isPermitAcquired = false;
     try {
+      getTabletConversionSemaphore().acquire();
+      isPermitAcquired = true;
       return 
loadTsFileTreeStatement.accept(treeStatementDataTypeConvertExecutionVisitor, 
null);
+    } catch (final InterruptedException e) {
+      return getInterruptedConversionStatus(e);
     } catch (Exception e) {
+      if (isMemoryPressureException(e)) {
+        return Optional.of(getMemoryPressureStatus(e));
+      }
       LOGGER.warn(
           "Failed to convert data types for load statement {}.", 
loadTsFileTreeStatement, e);
       return Optional.of(
           new 
TSStatus(TSStatusCode.LOAD_FILE_ERROR.getStatusCode()).setMessage(e.getMessage()));
     } finally {
+      if (isPermitAcquired) {
+        getTabletConversionSemaphore().release();
+      }
       DataNodeSchemaLockManager.getInstance()
           .takeReadLock(context, SchemaLockType.VALIDATE_VS_DELETION);
     }
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadFailedMessageHandlerTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadFailedMessageHandlerTest.java
new file mode 100644
index 00000000000..caf8a9f8d2b
--- /dev/null
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadFailedMessageHandlerTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.storageengine.load.active;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class ActiveLoadFailedMessageHandlerTest {
+
+  @Test
+  public void testTemporaryUnavailableStatusShouldRetryWithoutMessageMatch() {
+    final ActiveLoadPendingQueue.ActiveLoadEntry entry =
+        new ActiveLoadPendingQueue.ActiveLoadEntry("test.tsfile", "pending", 
true);
+
+    Assert.assertTrue(
+        ActiveLoadFailedMessageHandler.isStatusShouldRetry(
+            entry,
+            new 
TSStatus(TSStatusCode.LOAD_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode())
+                .setMessage("temporarily unavailable")));
+  }
+
+  @Test
+  public void testPermanentStatusShouldNotRetryWithoutMessageMatch() {
+    final ActiveLoadPendingQueue.ActiveLoadEntry entry =
+        new ActiveLoadPendingQueue.ActiveLoadEntry("test.tsfile", "pending", 
true);
+
+    Assert.assertFalse(
+        ActiveLoadFailedMessageHandler.isStatusShouldRetry(
+            entry, new 
TSStatus(TSStatusCode.LOAD_FILE_ERROR.getStatusCode()).setMessage("bad")));
+  }
+}
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoaderTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoaderTest.java
new file mode 100644
index 00000000000..f21748c3d3e
--- /dev/null
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoaderTest.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.storageengine.load.active;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.cluster.NodeStatus;
+import org.apache.iotdb.commons.conf.CommonDescriptor;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import 
org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.lang.reflect.Method;
+import java.nio.file.Files;
+
+public class ActiveLoadTsFileLoaderTest {
+
+  private final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+  private File tempDir;
+  private String originalFailDir;
+  private NodeStatus originalNodeStatus;
+
+  @Before
+  public void setUp() throws Exception {
+    tempDir = Files.createTempDirectory("active-load-retry").toFile();
+    originalFailDir = config.getLoadActiveListeningFailDir();
+    originalNodeStatus = 
CommonDescriptor.getInstance().getConfig().getNodeStatus();
+    
CommonDescriptor.getInstance().getConfig().setNodeStatus(NodeStatus.Running);
+    config.setLoadActiveListeningFailDir(new File(tempDir, 
"failed").getAbsolutePath());
+  }
+
+  @After
+  public void tearDown() {
+    config.setLoadActiveListeningFailDir(originalFailDir);
+    
CommonDescriptor.getInstance().getConfig().setNodeStatus(originalNodeStatus);
+    deleteRecursively(tempDir);
+  }
+
+  @Test
+  public void testTemporaryUnavailableStatusDoesNotMoveFileToFailDir() throws 
Exception {
+    final ActiveLoadTsFileLoader loader = new ActiveLoadTsFileLoader();
+    final File tsFile = createTsFileWithCompanionFiles("retry.tsfile");
+    final ActiveLoadPendingQueue.ActiveLoadEntry entry =
+        new ActiveLoadPendingQueue.ActiveLoadEntry(
+            tsFile.getAbsolutePath(), tempDir.getAbsolutePath(), false);
+
+    invokeInitFailDirIfNecessary(loader);
+    invokeHandleLoadFailure(
+        loader,
+        entry,
+        new 
TSStatus(TSStatusCode.LOAD_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode())
+            .setMessage("load conversion is temporarily unavailable"));
+
+    Assert.assertTrue(tsFile.exists());
+    Assert.assertTrue(new File(tsFile.getAbsolutePath() + 
TsFileResource.RESOURCE_SUFFIX).exists());
+    Assert.assertTrue(new File(tsFile.getAbsolutePath() + 
ModificationFile.FILE_SUFFIX).exists());
+    Assert.assertFalse(new File(config.getLoadActiveListeningFailDir(), 
tsFile.getName()).exists());
+  }
+
+  @Test
+  public void testPermanentFailureStatusMovesFileToFailDir() throws Exception {
+    final ActiveLoadTsFileLoader loader = new ActiveLoadTsFileLoader();
+    final File tsFile = createTsFileWithCompanionFiles("failed.tsfile");
+    final ActiveLoadPendingQueue.ActiveLoadEntry entry =
+        new ActiveLoadPendingQueue.ActiveLoadEntry(
+            tsFile.getAbsolutePath(), tempDir.getAbsolutePath(), false);
+
+    invokeInitFailDirIfNecessary(loader);
+    invokeHandleLoadFailure(
+        loader,
+        entry,
+        new 
TSStatus(TSStatusCode.LOAD_FILE_ERROR.getStatusCode()).setMessage("permanent 
error"));
+
+    final File failDir = new File(config.getLoadActiveListeningFailDir());
+    Assert.assertFalse(tsFile.exists());
+    Assert.assertTrue(new File(failDir, tsFile.getName()).exists());
+    Assert.assertTrue(
+        new File(failDir, tsFile.getName() + 
TsFileResource.RESOURCE_SUFFIX).exists());
+    Assert.assertTrue(new File(failDir, tsFile.getName() + 
ModificationFile.FILE_SUFFIX).exists());
+  }
+
+  private File createTsFileWithCompanionFiles(final String fileName) throws 
Exception {
+    final File tsFile = new File(tempDir, fileName);
+    Assert.assertTrue(tsFile.createNewFile());
+    Assert.assertTrue(
+        new File(tsFile.getAbsolutePath() + 
TsFileResource.RESOURCE_SUFFIX).createNewFile());
+    Assert.assertTrue(
+        new File(tsFile.getAbsolutePath() + 
ModificationFile.FILE_SUFFIX).createNewFile());
+    return tsFile;
+  }
+
+  private void invokeInitFailDirIfNecessary(final ActiveLoadTsFileLoader 
loader) throws Exception {
+    final Method method = 
ActiveLoadTsFileLoader.class.getDeclaredMethod("initFailDirIfNecessary");
+    method.setAccessible(true);
+    method.invoke(loader);
+  }
+
+  private void invokeHandleLoadFailure(
+      final ActiveLoadTsFileLoader loader,
+      final ActiveLoadPendingQueue.ActiveLoadEntry entry,
+      final TSStatus status)
+      throws Exception {
+    final Method method =
+        ActiveLoadTsFileLoader.class.getDeclaredMethod(
+            "handleLoadFailure", ActiveLoadPendingQueue.ActiveLoadEntry.class, 
TSStatus.class);
+    method.setAccessible(true);
+    method.invoke(loader, entry, status);
+  }
+
+  private static void deleteRecursively(final File file) {
+    if (file == null || !file.exists()) {
+      return;
+    }
+    final File[] children = file.listFiles();
+    if (children != null) {
+      for (final File child : children) {
+        deleteRecursively(child);
+      }
+    }
+    Assert.assertTrue(file.delete());
+  }
+}
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/load/converter/LoadConvertedInsertTabletStatementExceptionVisitorTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/load/converter/LoadConvertedInsertTabletStatementExceptionVisitorTest.java
new file mode 100644
index 00000000000..d97e30407ab
--- /dev/null
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/load/converter/LoadConvertedInsertTabletStatementExceptionVisitorTest.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.storageengine.load.converter;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import 
org.apache.iotdb.commons.exception.pipe.PipeRuntimeOutOfMemoryCriticalException;
+import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.File;
+
+public class LoadConvertedInsertTabletStatementExceptionVisitorTest {
+
+  @Test
+  public void testPipeOutOfMemoryIsTemporaryUnavailable() throws Exception {
+    final File tsFile = File.createTempFile("oom", ".tsfile");
+    try {
+      final LoadConvertedInsertTabletStatementExceptionVisitor visitor =
+          new LoadConvertedInsertTabletStatementExceptionVisitor();
+      final TSStatus status =
+          visitor.visitLoadFile(
+              LoadTsFileStatement.createUnchecked(tsFile.getAbsolutePath()),
+              new IllegalStateException(
+                  "wrapped memory pressure",
+                  new PipeRuntimeOutOfMemoryCriticalException("pipe tablet 
memory is not enough")));
+
+      Assert.assertEquals(
+          TSStatusCode.LOAD_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode(), 
status.getCode());
+      Assert.assertNotEquals(TSStatusCode.LOAD_FILE_ERROR.getStatusCode(), 
status.getCode());
+    } finally {
+      Assert.assertTrue(tsFile.delete());
+    }
+  }
+}


Reply via email to