This is an automated email from the ASF dual-hosted git repository.

jt2594838 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 4d05a851ef5 Limit concurrent load tsfile type conversions (#18014)
4d05a851ef5 is described below

commit 4d05a851ef51cddfaa1270c7e1419bfe6f56ce70
Author: Caideyipi <[email protected]>
AuthorDate: Thu Jun 25 11:00:03 2026 +0800

    Limit concurrent load tsfile type conversions (#18014)
    
    * Limit concurrent load tsfile type conversions
    
    * Keep load conversion retryable on memory pressure
    
    * Use i18n message for active load retry log
    
    * Add active load retry coverage for temporary unavailable
    
    * Fix load conversion memory test compile
---
 .../it/env/cluster/config/MppDataNodeConfig.java   |  25 +++
 .../it/env/remote/config/RemoteDataNodeConfig.java |  17 ++
 .../apache/iotdb/itbase/env/DataNodeConfig.java    |   8 +
 .../iotdb/db/it/IoTDBLoadTsFileActiveRetryIT.java  | 226 +++++++++++++++++++++
 .../iotdb/db/i18n/StorageEngineMessages.java       |   4 +
 .../iotdb/db/i18n/StorageEngineMessages.java       |   4 +
 .../active/ActiveLoadFailedMessageHandler.java     |  17 ++
 .../load/active/ActiveLoadTsFileLoader.java        |   2 +-
 ...ertedInsertTabletStatementExceptionVisitor.java |   9 +-
 ...ertedInsertTabletStatementExceptionVisitor.java |   9 +-
 .../converter/LoadTsFileDataTypeConverter.java     | 112 ++++++++++
 .../active/ActiveLoadFailedMessageHandlerTest.java |  51 +++++
 .../load/active/ActiveLoadTsFileLoaderTest.java    | 152 ++++++++++++++
 ...atementDataTypeConvertExecutionVisitorTest.java |  19 ++
 14 files changed, 646 insertions(+), 9 deletions(-)

diff --git 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppDataNodeConfig.java
 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppDataNodeConfig.java
index 4caef4cc1f9..4d86b481cbc 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppDataNodeConfig.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppDataNodeConfig.java
@@ -89,6 +89,31 @@ public class MppDataNodeConfig extends MppBaseConfig 
implements DataNodeConfig {
     return this;
   }
 
+  @Override
+  public DataNodeConfig setMaxAllocateMemoryRatioForLoad(double 
maxAllocateMemoryRatioForLoad) {
+    properties.setProperty(
+        "max_allocate_memory_ratio_for_load", 
String.valueOf(maxAllocateMemoryRatioForLoad));
+    return this;
+  }
+
+  @Override
+  public DataNodeConfig setLoadTsFileTabletConversionBatchMemorySizeInBytes(
+      long loadTsFileTabletConversionBatchMemorySizeInBytes) {
+    properties.setProperty(
+        "load_tsfile_tablet_conversion_batch_memory_size_in_bytes",
+        String.valueOf(loadTsFileTabletConversionBatchMemorySizeInBytes));
+    return this;
+  }
+
+  @Override
+  public DataNodeConfig setLoadActiveListeningCheckIntervalSeconds(
+      long loadActiveListeningCheckIntervalSeconds) {
+    properties.setProperty(
+        "load_active_listening_check_interval_seconds",
+        String.valueOf(loadActiveListeningCheckIntervalSeconds));
+    return this;
+  }
+
   @Override
   public DataNodeConfig setCompactionScheduleInterval(long 
compactionScheduleInterval) {
     properties.setProperty(
diff --git 
a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteDataNodeConfig.java
 
b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteDataNodeConfig.java
index aa73d962dba..1142054d132 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteDataNodeConfig.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteDataNodeConfig.java
@@ -54,6 +54,23 @@ public class RemoteDataNodeConfig implements DataNodeConfig {
     return this;
   }
 
+  @Override
+  public DataNodeConfig setMaxAllocateMemoryRatioForLoad(double 
maxAllocateMemoryRatioForLoad) {
+    return this;
+  }
+
+  @Override
+  public DataNodeConfig setLoadTsFileTabletConversionBatchMemorySizeInBytes(
+      long loadTsFileTabletConversionBatchMemorySizeInBytes) {
+    return this;
+  }
+
+  @Override
+  public DataNodeConfig setLoadActiveListeningCheckIntervalSeconds(
+      long loadActiveListeningCheckIntervalSeconds) {
+    return this;
+  }
+
   @Override
   public DataNodeConfig setCompactionScheduleInterval(long 
compactionScheduleInterval) {
     return this;
diff --git 
a/integration-test/src/main/java/org/apache/iotdb/itbase/env/DataNodeConfig.java
 
b/integration-test/src/main/java/org/apache/iotdb/itbase/env/DataNodeConfig.java
index 01a6114c206..1df0d54b211 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/itbase/env/DataNodeConfig.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/itbase/env/DataNodeConfig.java
@@ -36,6 +36,14 @@ public interface DataNodeConfig {
   DataNodeConfig setLoadTsFileAnalyzeSchemaMemorySizeInBytes(
       long loadTsFileAnalyzeSchemaMemorySizeInBytes);
 
+  DataNodeConfig setMaxAllocateMemoryRatioForLoad(double 
maxAllocateMemoryRatioForLoad);
+
+  DataNodeConfig setLoadTsFileTabletConversionBatchMemorySizeInBytes(
+      long loadTsFileTabletConversionBatchMemorySizeInBytes);
+
+  DataNodeConfig setLoadActiveListeningCheckIntervalSeconds(
+      long loadActiveListeningCheckIntervalSeconds);
+
   DataNodeConfig setCompactionScheduleInterval(long 
compactionScheduleInterval);
 
   DataNodeConfig setEnableMQTTService(boolean enableMQTTService);
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadTsFileActiveRetryIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadTsFileActiveRetryIT.java
new file mode 100644
index 00000000000..3827615fe24
--- /dev/null
+++ 
b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadTsFileActiveRetryIT.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.it;
+
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.it.utils.TsFileGenerator;
+import org.apache.iotdb.itbase.category.ClusterIT;
+import org.apache.iotdb.itbase.category.LocalStandaloneIT;
+
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.tsfile.write.schema.MeasurementSchema;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.io.File;
+import java.nio.file.Files;
+import java.sql.Connection;
+import java.sql.Statement;
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({LocalStandaloneIT.class, ClusterIT.class})
+public class IoTDBLoadTsFileActiveRetryIT {
+
+  private static final String DATABASE = "root.sg.test_0";
+  private static final String DEVICE = DATABASE + ".d_0";
+  private static final String MEASUREMENT = "sensor_00";
+  private static final long 
UNALLOCATABLE_TABLET_CONVERSION_BATCH_MEMORY_SIZE_IN_BYTES =
+      Long.MAX_VALUE / 4;
+  private static final MeasurementSchema TSFILE_SCHEMA =
+      new MeasurementSchema(MEASUREMENT, TSDataType.INT32, TSEncoding.RLE);
+
+  private File tmpDir;
+
+  @Before
+  public void setUp() throws Exception {
+    tmpDir = new File(Files.createTempDirectory("load-active-retry").toUri());
+    
EnvFactory.getEnv().getConfig().getCommonConfig().setPipeMemoryManagementEnabled(false);
+    EnvFactory.getEnv()
+        .getConfig()
+        .getDataNodeConfig()
+        .setMaxAllocateMemoryRatioForLoad(1.0)
+        .setLoadTsFileAnalyzeSchemaMemorySizeInBytes(1)
+        .setLoadTsFileTabletConversionBatchMemorySizeInBytes(
+            UNALLOCATABLE_TABLET_CONVERSION_BATCH_MEMORY_SIZE_IN_BYTES)
+        .setLoadActiveListeningCheckIntervalSeconds(1);
+
+    EnvFactory.getEnv().initClusterEnvironment();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    try (final Connection connection = EnvFactory.getEnv().getConnection();
+        final Statement statement = connection.createStatement()) {
+      statement.execute("delete database " + DATABASE);
+    } catch (final Exception ignored) {
+      // ignore cleanup failure
+    } finally {
+      EnvFactory.getEnv().cleanClusterEnvironment();
+      deleteRecursively(tmpDir);
+    }
+  }
+
+  @Test
+  public void testActiveLoadTemporaryUnavailableShouldKeepFileForRetry() 
throws Exception {
+    final DataNodeWrapper dataNodeWrapper = 
EnvFactory.getEnv().getDataNodeWrapper(0);
+    final File retryTsFile = new File(tmpDir, "1-0-0-0.tsfile");
+    final File permanentFailureTsFile = new File(tmpDir, "2-0-0-0.tsfile");
+    generateTsFile(retryTsFile);
+    generateTsFile(permanentFailureTsFile);
+
+    try (final Connection connection =
+            
EnvFactory.getEnv().getConnectionWithSpecifiedDataNode(dataNodeWrapper);
+        final Statement statement = connection.createStatement()) {
+      statement.execute("create database " + DATABASE);
+      statement.execute(
+          String.format(
+              "create timeseries %s.%s %s", DEVICE, MEASUREMENT, 
TSDataType.INT64.name()));
+
+      statement.execute(
+          String.format(
+              "load \"%s\" with ('database-level'='3', 'async'='true', 
'on-success'='none', "
+                  + "'convert-on-type-mismatch'='true')",
+              retryTsFile.getAbsolutePath()));
+      statement.execute(
+          String.format(
+              "load \"%s\" with ('database-level'='3', 'async'='true', 
'on-success'='none', "
+                  + "'convert-on-type-mismatch'='false')",
+              permanentFailureTsFile.getAbsolutePath()));
+
+      final File activeDir = getActiveLoadDir(dataNodeWrapper);
+      final File failDir = getActiveLoadFailDir(dataNodeWrapper);
+      final File activeTsFile = waitForFile(activeDir, retryTsFile.getName(), 
30_000L);
+
+      Assert.assertNotNull(
+          "Async load should copy tsfile into active load directory", 
activeTsFile);
+
+      Assert.assertNotNull(
+          "Permanent active load failure should be moved to fail dir",
+          waitForFile(failDir, permanentFailureTsFile.getName(), 
TimeUnit.SECONDS.toMillis(60)));
+
+      assertFileKeptForRetry(
+          activeDir, failDir, retryTsFile.getName(), 
TimeUnit.SECONDS.toMillis(12));
+    }
+  }
+
+  private void generateTsFile(final File tsFile) throws Exception {
+    try (final TsFileGenerator generator = new TsFileGenerator(tsFile)) {
+      generator.registerTimeseries(DEVICE, 
Collections.singletonList(TSFILE_SCHEMA));
+      generator.generateData(DEVICE, 10, 1, false);
+    }
+  }
+
+  private File getActiveLoadDir(final DataNodeWrapper dataNodeWrapper) {
+    return new File(
+        dataNodeWrapper.getNodePath()
+            + File.separator
+            + "ext"
+            + File.separator
+            + "load"
+            + File.separator
+            + "pending");
+  }
+
+  private File getActiveLoadFailDir(final DataNodeWrapper dataNodeWrapper) {
+    return new File(
+        dataNodeWrapper.getNodePath()
+            + File.separator
+            + "ext"
+            + File.separator
+            + "load"
+            + File.separator
+            + "failed");
+  }
+
+  private File waitForFile(final File root, final String fileName, final long 
timeoutMs)
+      throws InterruptedException {
+    final long deadline = System.currentTimeMillis() + timeoutMs;
+    while (System.currentTimeMillis() < deadline) {
+      final File file = findFile(root, fileName);
+      if (file != null) {
+        return file;
+      }
+      Thread.sleep(500L);
+    }
+    return null;
+  }
+
+  private boolean containsFile(final File root, final String fileName) {
+    return findFile(root, fileName) != null;
+  }
+
+  private void assertFileKeptForRetry(
+      final File activeDir, final File failDir, final String fileName, final 
long observationMs)
+      throws InterruptedException {
+    final long deadline = System.currentTimeMillis() + observationMs;
+    while (System.currentTimeMillis() < deadline) {
+      Assert.assertTrue(
+          "Temporary unavailable active load should keep tsfile for retry",
+          containsFile(activeDir, fileName));
+      Assert.assertFalse(
+          "Temporary unavailable active load must not move tsfile to fail dir",
+          containsFile(failDir, fileName));
+      Thread.sleep(500L);
+    }
+  }
+
+  private File findFile(final File root, final String fileName) {
+    if (root == null || !root.exists()) {
+      return null;
+    }
+    if (root.isFile()) {
+      return root.getName().equals(fileName) ? root : null;
+    }
+
+    final File[] children = root.listFiles();
+    if (children == null) {
+      return null;
+    }
+    for (final File child : children) {
+      final File file = findFile(child, fileName);
+      if (file != null) {
+        return file;
+      }
+    }
+    return null;
+  }
+
+  private void deleteRecursively(final File file) {
+    if (file == null || !file.exists()) {
+      return;
+    }
+    final File[] children = file.listFiles();
+    if (children != null) {
+      for (final File child : children) {
+        deleteRecursively(child);
+      }
+    }
+    Assert.assertTrue(file.delete());
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/StorageEngineMessages.java
 
b/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/StorageEngineMessages.java
index cb90fac0b06..cc708e7e48c 100644
--- 
a/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/StorageEngineMessages.java
+++ 
b/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/StorageEngineMessages.java
@@ -489,6 +489,8 @@ public final class StorageEngineMessages {
   public static final String RELEASE_DATA_CACHE_MEMORY_BLOCK = "Release Data 
Cache Memory Block {}";
   public static final String START_DATA_TYPE_CONVERSION_DOT = "Start data type 
conversion for LoadTsFileStatement: {}.";
   public static final String START_DATA_TYPE_CONVERSION = "Start data type 
conversion for LoadTsFileStatement: {}";
+  public static final String INTERRUPTED_WAITING_TABLET_CONVERSION_SLOT =
+      "Interrupted while waiting for tablet conversion slot: ";
   public static final String FAIL_TO_LOAD_TSFILE_TO_ACTIVE_DIR = "Fail to load 
tsfile to Active dir";
   public static final String FAIL_TO_LOAD_DISK_SPACE = "Fail to load disk 
space of file {}";
   public static final String LOAD_ACTIVE_LISTENING_DIR_NOT_SET = "Load active 
listening dir is not set.";
@@ -521,6 +523,8 @@ public final class StorageEngineMessages {
   public static final String ERROR_EXECUTING_ACTIVE_LOAD_JOB = "Error occurred 
when executing active load periodical job.";
   public static final String ACTIVE_LOAD_EXECUTOR_STARTED = "Active load 
periodical jobs executor is started successfully.";
   public static final String ACTIVE_LOAD_EXECUTOR_STOPPED = "Active load 
periodical jobs executor is stopped successfully.";
+  public static final String ACTIVE_LOAD_TEMPORARILY_UNAVAILABLE =
+      "Rejecting auto load tsfile {} (isGeneratedByPipe = {}) due to temporary 
unavailability, will retry later. Status: {}";
   public static final String ERROR_MOVING_FILE_TO_FAIL_DIR = "Error occurred 
during moving file {} to fail directory.";
   public static final String FAILED_COUNT_FILES_IN_FAIL_DIR = "Failed to count 
failed files in fail directory.";
 
diff --git 
a/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/StorageEngineMessages.java
 
b/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/StorageEngineMessages.java
index e488c40067e..792909a97e0 100644
--- 
a/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/StorageEngineMessages.java
+++ 
b/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/StorageEngineMessages.java
@@ -489,6 +489,8 @@ public final class StorageEngineMessages {
   public static final String RELEASE_DATA_CACHE_MEMORY_BLOCK = "释放数据缓存内存块 {}";
   public static final String START_DATA_TYPE_CONVERSION_DOT = "开始对 
LoadTsFileStatement: {} 进行数据类型转换。";
   public static final String START_DATA_TYPE_CONVERSION = "开始对 
LoadTsFileStatement: {} 进行数据类型转换";
+  public static final String INTERRUPTED_WAITING_TABLET_CONVERSION_SLOT =
+      "等待 tablet 转换槽位时被中断: ";
   public static final String FAIL_TO_LOAD_TSFILE_TO_ACTIVE_DIR = "加载 TsFile 到 
Active 目录失败";
   public static final String FAIL_TO_LOAD_DISK_SPACE = "获取文件 {} 的磁盘空间失败";
   public static final String LOAD_ACTIVE_LISTENING_DIR_NOT_SET = "未设置加载 Active 
监听目录。";
@@ -521,6 +523,8 @@ public final class StorageEngineMessages {
   public static final String ERROR_EXECUTING_ACTIVE_LOAD_JOB = "执行 Active 
加载定期任务时发生错误。";
   public static final String ACTIVE_LOAD_EXECUTOR_STARTED = "Active 
加载定期任务执行器已成功启动。";
   public static final String ACTIVE_LOAD_EXECUTOR_STOPPED = "Active 
加载定期任务执行器已成功停止。";
+  public static final String ACTIVE_LOAD_TEMPORARILY_UNAVAILABLE =
+      "拒绝自动加载 TsFile {} (isGeneratedByPipe = {}),原因是系统暂时不可用,将稍后重试。状态: {}";
   public static final String ERROR_MOVING_FILE_TO_FAIL_DIR = "将文件 {} 
移动到失败目录时发生错误。";
   public static final String FAILED_COUNT_FILES_IN_FAIL_DIR = 
"统计失败目录中的失败文件数量失败。";
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadFailedMessageHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadFailedMessageHandler.java
index 92a1d45702b..4879e4f634e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadFailedMessageHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadFailedMessageHandler.java
@@ -19,7 +19,10 @@
 
 package org.apache.iotdb.db.storageengine.load.active;
 
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
+import org.apache.iotdb.db.i18n.StorageEngineMessages;
+import org.apache.iotdb.rpc.TSStatusCode;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -96,6 +99,20 @@ public class ActiveLoadFailedMessageHandler {
     void handle(final ActiveLoadPendingQueue.ActiveLoadEntry entry);
   }
 
+  public static boolean isStatusShouldRetry(
+      final ActiveLoadPendingQueue.ActiveLoadEntry entry, final TSStatus 
status) {
+    if (status != null
+        && status.getCode() == 
TSStatusCode.LOAD_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode()) {
+      LOGGER.info(
+          StorageEngineMessages.ACTIVE_LOAD_TEMPORARILY_UNAVAILABLE,
+          entry.getFile(),
+          entry.isGeneratedByPipe(),
+          status);
+      return true;
+    }
+    return isExceptionMessageShouldRetry(entry, status == null ? null : 
status.getMessage());
+  }
+
   public static boolean isExceptionMessageShouldRetry(
       final ActiveLoadPendingQueue.ActiveLoadEntry entry, final String 
message) {
     if (CommonDescriptor.getInstance().getConfig().isReadOnly()) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java
index 61e297a9e86..3f0696499eb 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java
@@ -294,7 +294,7 @@ public class ActiveLoadTsFileLoader {
 
   private void handleLoadFailure(
       final ActiveLoadPendingQueue.ActiveLoadEntry entry, final TSStatus 
status) {
-    if (!ActiveLoadFailedMessageHandler.isExceptionMessageShouldRetry(entry, 
status.getMessage())) {
+    if (!ActiveLoadFailedMessageHandler.isStatusShouldRetry(entry, status)) {
       LOGGER.warn(
           "Failed to auto load tsfile {} (isGeneratedByPipe = {}), status: {}. 
File will be moved to fail directory.",
           entry.getFile(),
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTableConvertedInsertTabletStatementExceptionVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTableConvertedInsertTabletStatementExceptionVisitor.java
index 28dfafcbd41..ff538cb41da 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTableConvertedInsertTabletStatementExceptionVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTableConvertedInsertTabletStatementExceptionVisitor.java
@@ -23,7 +23,6 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.exception.SemanticException;
 import org.apache.iotdb.commons.exception.auth.AccessDeniedException;
 import org.apache.iotdb.commons.queryengine.plan.relational.sql.ast.Node;
-import org.apache.iotdb.db.exception.load.LoadRuntimeOutOfMemoryException;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.AstVisitor;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.LoadTsFile;
 import org.apache.iotdb.rpc.TSStatusCode;
@@ -32,6 +31,9 @@ public class 
LoadTableConvertedInsertTabletStatementExceptionVisitor
     implements AstVisitor<TSStatus, Exception> {
   @Override
   public TSStatus visitNode(final Node node, final Exception context) {
+    if (LoadTsFileDataTypeConverter.isMemoryPressureException(context)) {
+      return LoadTsFileDataTypeConverter.getMemoryPressureStatus(context);
+    }
     if (context instanceof AccessDeniedException) {
       return new TSStatus(TSStatusCode.NO_PERMISSION.getStatusCode())
           .setMessage(context.getMessage());
@@ -42,9 +44,8 @@ public class 
LoadTableConvertedInsertTabletStatementExceptionVisitor
 
   @Override
   public TSStatus visitLoadTsFile(final LoadTsFile loadTsFile, final Exception 
context) {
-    if (context instanceof LoadRuntimeOutOfMemoryException) {
-      return new 
TSStatus(TSStatusCode.LOAD_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode())
-          .setMessage(context.getMessage());
+    if (LoadTsFileDataTypeConverter.isMemoryPressureException(context)) {
+      return LoadTsFileDataTypeConverter.getMemoryPressureStatus(context);
     } else if (context instanceof SemanticException) {
       return new 
TSStatus(TSStatusCode.LOAD_USER_CONFLICT_EXCEPTION.getStatusCode())
           .setMessage(context.getMessage());
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeConvertedInsertTabletStatementExceptionVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeConvertedInsertTabletStatementExceptionVisitor.java
index 92445518f08..ce465f8843e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeConvertedInsertTabletStatementExceptionVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeConvertedInsertTabletStatementExceptionVisitor.java
@@ -22,7 +22,6 @@ package org.apache.iotdb.db.storageengine.load.converter;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.exception.SemanticException;
 import org.apache.iotdb.commons.exception.auth.AccessDeniedException;
-import org.apache.iotdb.db.exception.load.LoadRuntimeOutOfMemoryException;
 import org.apache.iotdb.db.queryengine.plan.statement.StatementNode;
 import org.apache.iotdb.db.queryengine.plan.statement.StatementVisitor;
 import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
@@ -33,6 +32,9 @@ public class 
LoadTreeConvertedInsertTabletStatementExceptionVisitor
 
   @Override
   public TSStatus visitNode(final StatementNode node, final Exception context) 
{
+    if (LoadTsFileDataTypeConverter.isMemoryPressureException(context)) {
+      return LoadTsFileDataTypeConverter.getMemoryPressureStatus(context);
+    }
     if (context instanceof AccessDeniedException) {
       return new TSStatus(TSStatusCode.NO_PERMISSION.getStatusCode())
           .setMessage(context.getMessage());
@@ -44,9 +46,8 @@ public class 
LoadTreeConvertedInsertTabletStatementExceptionVisitor
   @Override
   public TSStatus visitLoadFile(
       final LoadTsFileStatement loadTsFileStatement, final Exception context) {
-    if (context instanceof LoadRuntimeOutOfMemoryException) {
-      return new 
TSStatus(TSStatusCode.LOAD_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode())
-          .setMessage(context.getMessage());
+    if (LoadTsFileDataTypeConverter.isMemoryPressureException(context)) {
+      return LoadTsFileDataTypeConverter.getMemoryPressureStatus(context);
     } else if (context instanceof SemanticException) {
       return new 
TSStatus(TSStatusCode.LOAD_USER_CONFLICT_EXCEPTION.getStatusCode())
           .setMessage(context.getMessage());
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTsFileDataTypeConverter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTsFileDataTypeConverter.java
index 3aab4aac290..b8dc0e5d110 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTsFileDataTypeConverter.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTsFileDataTypeConverter.java
@@ -21,9 +21,14 @@ package org.apache.iotdb.db.storageengine.load.converter;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.conf.IoTDBConstant;
+import 
org.apache.iotdb.commons.exception.pipe.PipeRuntimeOutOfMemoryCriticalException;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.commons.queryengine.common.SqlDialect;
 import org.apache.iotdb.db.auth.AuthorityChecker;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.load.LoadRuntimeOutOfMemoryException;
+import org.apache.iotdb.db.i18n.StorageEngineMessages;
+import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
 import org.apache.iotdb.db.protocol.session.IClientSession;
 import org.apache.iotdb.db.protocol.session.InternalClientSession;
 import org.apache.iotdb.db.protocol.session.SessionManager;
@@ -46,6 +51,7 @@ import org.slf4j.LoggerFactory;
 
 import java.time.ZoneId;
 import java.util.Optional;
+import java.util.concurrent.Semaphore;
 
 public class LoadTsFileDataTypeConverter {
 
@@ -53,6 +59,89 @@ public class LoadTsFileDataTypeConverter {
 
   private static final SessionManager SESSION_MANAGER = 
SessionManager.getInstance();
 
+  private static Semaphore getTabletConversionSemaphore() {
+    return TabletConversionSemaphoreHolder.INSTANCE;
+  }
+
+  private static int getTabletConversionPermitCount() {
+    final int configuredThreadCount =
+        Math.max(
+            1,
+            
IoTDBDescriptor.getInstance().getConfig().getLoadTsFileTabletConversionThreadCount());
+    if (!PipeConfig.getInstance().getPipeMemoryManagementEnabled()) {
+      return configuredThreadCount;
+    }
+    final long memorySafePermitCount =
+        getAllowedPipeTabletMemorySizeInBytes() / 
estimatePipeTabletMemorySizePerConversion();
+    return (int) Math.max(1, Math.min((long) configuredThreadCount, 
memorySafePermitCount));
+  }
+
+  private static long estimatePipeTabletMemorySizePerConversion() {
+    final PipeConfig pipeConfig = PipeConfig.getInstance();
+    final long tabletSize =
+        Math.max(
+            1L, 
IoTDBDescriptor.getInstance().getConfig().getPipeDataStructureTabletSizeInBytes());
+    final long maxReaderChunkSize = Math.max(0L, 
pipeConfig.getPipeMaxReaderChunkSize());
+    final long tableSize =
+        Math.max(
+            1L,
+            Math.min(tabletSize, 
IoTDBDescriptor.getInstance().getConfig().getTargetChunkSize()));
+
+    // Both conversion paths keep source/converted tablet data and the current 
reader chunk in
+    // memory. Table-model conversion may additionally keep source/converted 
table-sized batches, so
+    // use the larger estimate as the per-conversion working set.
+    final long treeParserMemorySize = 2L * tabletSize + maxReaderChunkSize;
+    final long tableParserMemorySize = 2L * tabletSize + 2L * tableSize + 
maxReaderChunkSize;
+    return Math.max(1L, Math.max(treeParserMemorySize, tableParserMemorySize));
+  }
+
+  private static long getAllowedPipeTabletMemorySizeInBytes() {
+    final PipeConfig pipeConfig = PipeConfig.getInstance();
+    return (long)
+        
((pipeConfig.getPipeDataStructureTabletMemoryBlockAllocationRejectThreshold()
+                + 
pipeConfig.getPipeDataStructureTsFileMemoryBlockAllocationRejectThreshold() / 2)
+            * 
PipeDataNodeResourceManager.memory().getTotalNonFloatingMemorySizeInBytes());
+  }
+
+  private static Optional<TSStatus> getInterruptedConversionStatus(final 
InterruptedException e) {
+    Thread.currentThread().interrupt();
+    return Optional.of(
+        new 
TSStatus(TSStatusCode.LOAD_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode())
+            .setMessage(
+                
StorageEngineMessages.INTERRUPTED_WAITING_TABLET_CONVERSION_SLOT + 
e.getMessage()));
+  }
+
+  public static boolean isMemoryPressureException(final Throwable throwable) {
+    Throwable current = throwable;
+    while (current != null) {
+      if (current instanceof LoadRuntimeOutOfMemoryException
+          || current instanceof PipeRuntimeOutOfMemoryCriticalException) {
+        return true;
+      }
+      current = current.getCause();
+    }
+    return false;
+  }
+
+  public static TSStatus getMemoryPressureStatus(final Throwable throwable) {
+    Throwable current = throwable;
+    while (current != null) {
+      if (current instanceof LoadRuntimeOutOfMemoryException
+          || current instanceof PipeRuntimeOutOfMemoryCriticalException) {
+        return new 
TSStatus(TSStatusCode.LOAD_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode())
+            .setMessage(current.getMessage());
+      }
+      current = current.getCause();
+    }
+
+    return new 
TSStatus(TSStatusCode.LOAD_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode())
+        .setMessage(throwable == null ? null : throwable.getMessage());
+  }
+
+  private static class TabletConversionSemaphoreHolder {
+    private static final Semaphore INSTANCE = new 
Semaphore(getTabletConversionPermitCount());
+  }
+
   public static final LoadConvertedInsertTabletStatementTSStatusVisitor 
STATEMENT_STATUS_VISITOR =
       new LoadConvertedInsertTabletStatementTSStatusVisitor();
   public static final LoadTreeConvertedInsertTabletStatementExceptionVisitor
@@ -83,16 +172,28 @@ public class LoadTsFileDataTypeConverter {
   }
 
   public Optional<TSStatus> convertForTableModel(final LoadTsFile 
loadTsFileTableStatement) {
+    boolean isPermitAcquired = false;
     try {
+      getTabletConversionSemaphore().acquire();
+      isPermitAcquired = true;
       return loadTsFileTableStatement.accept(
           tableStatementDataTypeConvertExecutionVisitor, 
loadTsFileTableStatement.getDatabase());
+    } catch (final InterruptedException e) {
+      return getInterruptedConversionStatus(e);
     } catch (Exception e) {
+      if (isMemoryPressureException(e)) {
+        return Optional.of(getMemoryPressureStatus(e));
+      }
       LOGGER.warn(
           "Failed to convert data types for table model statement {}.",
           loadTsFileTableStatement,
           e);
       return Optional.of(
           new 
TSStatus(TSStatusCode.LOAD_FILE_ERROR.getStatusCode()).setMessage(e.getMessage()));
+    } finally {
+      if (isPermitAcquired) {
+        getTabletConversionSemaphore().release();
+      }
     }
   }
 
@@ -136,14 +237,25 @@ public class LoadTsFileDataTypeConverter {
 
   public Optional<TSStatus> convertForTreeModel(final LoadTsFileStatement 
loadTsFileTreeStatement) {
     DataNodeSchemaLockManager.getInstance().releaseReadLock(context);
+    boolean isPermitAcquired = false;
     try {
+      getTabletConversionSemaphore().acquire();
+      isPermitAcquired = true;
       return 
loadTsFileTreeStatement.accept(treeStatementDataTypeConvertExecutionVisitor, 
null);
+    } catch (final InterruptedException e) {
+      return getInterruptedConversionStatus(e);
     } catch (Exception e) {
+      if (isMemoryPressureException(e)) {
+        return Optional.of(getMemoryPressureStatus(e));
+      }
       LOGGER.warn(
           "Failed to convert data types for tree model statement {}.", 
loadTsFileTreeStatement, e);
       return Optional.of(
           new 
TSStatus(TSStatusCode.LOAD_FILE_ERROR.getStatusCode()).setMessage(e.getMessage()));
     } finally {
+      if (isPermitAcquired) {
+        getTabletConversionSemaphore().release();
+      }
       DataNodeSchemaLockManager.getInstance()
           .takeReadLock(context, SchemaLockType.VALIDATE_VS_DELETION_TREE);
     }
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadFailedMessageHandlerTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadFailedMessageHandlerTest.java
new file mode 100644
index 00000000000..67c909c1c02
--- /dev/null
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadFailedMessageHandlerTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.storageengine.load.active;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class ActiveLoadFailedMessageHandlerTest {
+
+  @Test
+  public void testTemporaryUnavailableStatusShouldRetryWithoutMessageMatch() {
+    final ActiveLoadPendingQueue.ActiveLoadEntry entry =
+        new ActiveLoadPendingQueue.ActiveLoadEntry("test.tsfile", "pending", 
true, false);
+
+    Assert.assertTrue(
+        ActiveLoadFailedMessageHandler.isStatusShouldRetry(
+            entry,
+            new 
TSStatus(TSStatusCode.LOAD_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode())
+                .setMessage("temporarily unavailable")));
+  }
+
+  @Test
+  public void testPermanentStatusShouldNotRetryWithoutMessageMatch() {
+    final ActiveLoadPendingQueue.ActiveLoadEntry entry =
+        new ActiveLoadPendingQueue.ActiveLoadEntry("test.tsfile", "pending", 
true, false);
+
+    Assert.assertFalse(
+        ActiveLoadFailedMessageHandler.isStatusShouldRetry(
+            entry, new 
TSStatus(TSStatusCode.LOAD_FILE_ERROR.getStatusCode()).setMessage("bad")));
+  }
+}
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoaderTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoaderTest.java
new file mode 100644
index 00000000000..3208dc16ef5
--- /dev/null
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoaderTest.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.storageengine.load.active;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.cluster.NodeStatus;
+import org.apache.iotdb.commons.conf.CommonDescriptor;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import 
org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile;
+import 
org.apache.iotdb.db.storageengine.dataregion.modification.v1.ModificationFileV1;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.lang.reflect.Method;
+import java.nio.file.Files;
+
+public class ActiveLoadTsFileLoaderTest {
+
+  private final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+  private File tempDir;
+  private String originalFailDir;
+  private NodeStatus originalNodeStatus;
+
+  @Before
+  public void setUp() throws Exception {
+    tempDir = Files.createTempDirectory("active-load-retry").toFile();
+    originalFailDir = config.getLoadActiveListeningFailDir();
+    originalNodeStatus = 
CommonDescriptor.getInstance().getConfig().getNodeStatus();
+    
CommonDescriptor.getInstance().getConfig().setNodeStatus(NodeStatus.Running);
+    config.setLoadActiveListeningFailDir(new File(tempDir, 
"failed").getAbsolutePath());
+  }
+
+  @After
+  public void tearDown() {
+    config.setLoadActiveListeningFailDir(originalFailDir);
+    
CommonDescriptor.getInstance().getConfig().setNodeStatus(originalNodeStatus);
+    deleteRecursively(tempDir);
+  }
+
+  @Test
+  public void testTemporaryUnavailableStatusDoesNotMoveFileToFailDir() throws 
Exception {
+    final ActiveLoadTsFileLoader loader = new ActiveLoadTsFileLoader();
+    final File tsFile = createTsFileWithCompanionFiles("retry.tsfile");
+    final ActiveLoadPendingQueue.ActiveLoadEntry entry =
+        new ActiveLoadPendingQueue.ActiveLoadEntry(
+            tsFile.getAbsolutePath(), tempDir.getAbsolutePath(), false, false);
+
+    invokeInitFailDirIfNecessary(loader);
+    invokeHandleLoadFailure(
+        loader,
+        entry,
+        new 
TSStatus(TSStatusCode.LOAD_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode())
+            .setMessage("load conversion is temporarily unavailable"));
+
+    Assert.assertTrue(tsFile.exists());
+    Assert.assertTrue(new File(tsFile.getAbsolutePath() + 
TsFileResource.RESOURCE_SUFFIX).exists());
+    Assert.assertTrue(new File(tsFile.getAbsolutePath() + 
ModificationFileV1.FILE_SUFFIX).exists());
+    Assert.assertTrue(new File(tsFile.getAbsolutePath() + 
ModificationFile.FILE_SUFFIX).exists());
+    Assert.assertFalse(new File(config.getLoadActiveListeningFailDir(), 
tsFile.getName()).exists());
+  }
+
+  @Test
+  public void testPermanentFailureStatusMovesFileToFailDir() throws Exception {
+    final ActiveLoadTsFileLoader loader = new ActiveLoadTsFileLoader();
+    final File tsFile = createTsFileWithCompanionFiles("failed.tsfile");
+    final ActiveLoadPendingQueue.ActiveLoadEntry entry =
+        new ActiveLoadPendingQueue.ActiveLoadEntry(
+            tsFile.getAbsolutePath(), tempDir.getAbsolutePath(), false, false);
+
+    invokeInitFailDirIfNecessary(loader);
+    invokeHandleLoadFailure(
+        loader,
+        entry,
+        new 
TSStatus(TSStatusCode.LOAD_FILE_ERROR.getStatusCode()).setMessage("permanent 
error"));
+
+    final File failDir = new File(config.getLoadActiveListeningFailDir());
+    Assert.assertFalse(tsFile.exists());
+    Assert.assertTrue(new File(failDir, tsFile.getName()).exists());
+    Assert.assertTrue(
+        new File(failDir, tsFile.getName() + 
TsFileResource.RESOURCE_SUFFIX).exists());
+    Assert.assertTrue(
+        new File(failDir, tsFile.getName() + 
ModificationFileV1.FILE_SUFFIX).exists());
+    Assert.assertTrue(new File(failDir, tsFile.getName() + 
ModificationFile.FILE_SUFFIX).exists());
+  }
+
+  private File createTsFileWithCompanionFiles(final String fileName) throws 
Exception {
+    final File tsFile = new File(tempDir, fileName);
+    Assert.assertTrue(tsFile.createNewFile());
+    Assert.assertTrue(
+        new File(tsFile.getAbsolutePath() + 
TsFileResource.RESOURCE_SUFFIX).createNewFile());
+    Assert.assertTrue(
+        new File(tsFile.getAbsolutePath() + 
ModificationFileV1.FILE_SUFFIX).createNewFile());
+    Assert.assertTrue(
+        new File(tsFile.getAbsolutePath() + 
ModificationFile.FILE_SUFFIX).createNewFile());
+    return tsFile;
+  }
+
+  private void invokeInitFailDirIfNecessary(final ActiveLoadTsFileLoader 
loader) throws Exception {
+    final Method method = 
ActiveLoadTsFileLoader.class.getDeclaredMethod("initFailDirIfNecessary");
+    method.setAccessible(true);
+    method.invoke(loader);
+  }
+
+  private void invokeHandleLoadFailure(
+      final ActiveLoadTsFileLoader loader,
+      final ActiveLoadPendingQueue.ActiveLoadEntry entry,
+      final TSStatus status)
+      throws Exception {
+    final Method method =
+        ActiveLoadTsFileLoader.class.getDeclaredMethod(
+            "handleLoadFailure", ActiveLoadPendingQueue.ActiveLoadEntry.class, 
TSStatus.class);
+    method.setAccessible(true);
+    method.invoke(loader, entry, status);
+  }
+
+  private static void deleteRecursively(final File file) {
+    if (file == null || !file.exists()) {
+      return;
+    }
+    final File[] children = file.listFiles();
+    if (children != null) {
+      for (final File child : children) {
+        deleteRecursively(child);
+      }
+    }
+    Assert.assertTrue(file.delete());
+  }
+}
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeStatementDataTypeConvertExecutionVisitorTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeStatementDataTypeConvertExecutionVisitorTest.java
index 32b15a1859d..29766f5e5eb 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeStatementDataTypeConvertExecutionVisitorTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeStatementDataTypeConvertExecutionVisitorTest.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.storageengine.load.converter;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
+import 
org.apache.iotdb.commons.exception.pipe.PipeRuntimeOutOfMemoryCriticalException;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBTreePattern;
 import 
org.apache.iotdb.db.pipe.event.common.tsfile.parser.scan.TsFileInsertionEventScanParser;
@@ -209,6 +210,24 @@ public class 
LoadTreeStatementDataTypeConvertExecutionVisitorTest {
         pointCountByTimeseries.getOrDefault(ALIGNED_DEVICE + ".s12", 0) < 
ROW_COUNT_PER_DEVICE);
   }
 
+  @Test
+  public void testPipeOutOfMemoryIsTemporaryUnavailable() throws Exception {
+    tsFile = File.createTempFile("oom", ".tsfile");
+
+    final LoadTreeConvertedInsertTabletStatementExceptionVisitor visitor =
+        new LoadTreeConvertedInsertTabletStatementExceptionVisitor();
+    final TSStatus status =
+        visitor.visitLoadFile(
+            LoadTsFileStatement.createUnchecked(tsFile.getAbsolutePath()),
+            new IllegalStateException(
+                "wrapped memory pressure",
+                new PipeRuntimeOutOfMemoryCriticalException("pipe tablet 
memory is not enough")));
+
+    Assert.assertEquals(
+        TSStatusCode.LOAD_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode(), 
status.getCode());
+    Assert.assertNotEquals(TSStatusCode.LOAD_FILE_ERROR.getStatusCode(), 
status.getCode());
+  }
+
   private void writeTsFile(final File file) throws Exception {
     if (file.exists()) {
       Assert.assertTrue(file.delete());


Reply via email to