This is an automated email from the ASF dual-hosted git repository.
jt2594838 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 4d05a851ef5 Limit concurrent load tsfile type conversions (#18014)
4d05a851ef5 is described below
commit 4d05a851ef51cddfaa1270c7e1419bfe6f56ce70
Author: Caideyipi <[email protected]>
AuthorDate: Thu Jun 25 11:00:03 2026 +0800
Limit concurrent load tsfile type conversions (#18014)
* Limit concurrent load tsfile type conversions
* Keep load conversion retryable on memory pressure
* Use i18n message for active load retry log
* Add active load retry coverage for temporary unavailable
* Fix load conversion memory test compile
---
.../it/env/cluster/config/MppDataNodeConfig.java | 25 +++
.../it/env/remote/config/RemoteDataNodeConfig.java | 17 ++
.../apache/iotdb/itbase/env/DataNodeConfig.java | 8 +
.../iotdb/db/it/IoTDBLoadTsFileActiveRetryIT.java | 226 +++++++++++++++++++++
.../iotdb/db/i18n/StorageEngineMessages.java | 4 +
.../iotdb/db/i18n/StorageEngineMessages.java | 4 +
.../active/ActiveLoadFailedMessageHandler.java | 17 ++
.../load/active/ActiveLoadTsFileLoader.java | 2 +-
...ertedInsertTabletStatementExceptionVisitor.java | 9 +-
...ertedInsertTabletStatementExceptionVisitor.java | 9 +-
.../converter/LoadTsFileDataTypeConverter.java | 112 ++++++++++
.../active/ActiveLoadFailedMessageHandlerTest.java | 51 +++++
.../load/active/ActiveLoadTsFileLoaderTest.java | 152 ++++++++++++++
...atementDataTypeConvertExecutionVisitorTest.java | 19 ++
14 files changed, 646 insertions(+), 9 deletions(-)
diff --git
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppDataNodeConfig.java
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppDataNodeConfig.java
index 4caef4cc1f9..4d86b481cbc 100644
---
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppDataNodeConfig.java
+++
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppDataNodeConfig.java
@@ -89,6 +89,31 @@ public class MppDataNodeConfig extends MppBaseConfig
implements DataNodeConfig {
return this;
}
+ @Override
+ public DataNodeConfig setMaxAllocateMemoryRatioForLoad(double
maxAllocateMemoryRatioForLoad) {
+ properties.setProperty(
+ "max_allocate_memory_ratio_for_load",
String.valueOf(maxAllocateMemoryRatioForLoad));
+ return this;
+ }
+
+ @Override
+ public DataNodeConfig setLoadTsFileTabletConversionBatchMemorySizeInBytes(
+ long loadTsFileTabletConversionBatchMemorySizeInBytes) {
+ properties.setProperty(
+ "load_tsfile_tablet_conversion_batch_memory_size_in_bytes",
+ String.valueOf(loadTsFileTabletConversionBatchMemorySizeInBytes));
+ return this;
+ }
+
+ @Override
+ public DataNodeConfig setLoadActiveListeningCheckIntervalSeconds(
+ long loadActiveListeningCheckIntervalSeconds) {
+ properties.setProperty(
+ "load_active_listening_check_interval_seconds",
+ String.valueOf(loadActiveListeningCheckIntervalSeconds));
+ return this;
+ }
+
@Override
public DataNodeConfig setCompactionScheduleInterval(long
compactionScheduleInterval) {
properties.setProperty(
diff --git
a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteDataNodeConfig.java
b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteDataNodeConfig.java
index aa73d962dba..1142054d132 100644
---
a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteDataNodeConfig.java
+++
b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteDataNodeConfig.java
@@ -54,6 +54,23 @@ public class RemoteDataNodeConfig implements DataNodeConfig {
return this;
}
+ @Override
+ public DataNodeConfig setMaxAllocateMemoryRatioForLoad(double
maxAllocateMemoryRatioForLoad) {
+ return this;
+ }
+
+ @Override
+ public DataNodeConfig setLoadTsFileTabletConversionBatchMemorySizeInBytes(
+ long loadTsFileTabletConversionBatchMemorySizeInBytes) {
+ return this;
+ }
+
+ @Override
+ public DataNodeConfig setLoadActiveListeningCheckIntervalSeconds(
+ long loadActiveListeningCheckIntervalSeconds) {
+ return this;
+ }
+
@Override
public DataNodeConfig setCompactionScheduleInterval(long
compactionScheduleInterval) {
return this;
diff --git
a/integration-test/src/main/java/org/apache/iotdb/itbase/env/DataNodeConfig.java
b/integration-test/src/main/java/org/apache/iotdb/itbase/env/DataNodeConfig.java
index 01a6114c206..1df0d54b211 100644
---
a/integration-test/src/main/java/org/apache/iotdb/itbase/env/DataNodeConfig.java
+++
b/integration-test/src/main/java/org/apache/iotdb/itbase/env/DataNodeConfig.java
@@ -36,6 +36,14 @@ public interface DataNodeConfig {
DataNodeConfig setLoadTsFileAnalyzeSchemaMemorySizeInBytes(
long loadTsFileAnalyzeSchemaMemorySizeInBytes);
+ DataNodeConfig setMaxAllocateMemoryRatioForLoad(double
maxAllocateMemoryRatioForLoad);
+
+ DataNodeConfig setLoadTsFileTabletConversionBatchMemorySizeInBytes(
+ long loadTsFileTabletConversionBatchMemorySizeInBytes);
+
+ DataNodeConfig setLoadActiveListeningCheckIntervalSeconds(
+ long loadActiveListeningCheckIntervalSeconds);
+
DataNodeConfig setCompactionScheduleInterval(long
compactionScheduleInterval);
DataNodeConfig setEnableMQTTService(boolean enableMQTTService);
diff --git
a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadTsFileActiveRetryIT.java
b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadTsFileActiveRetryIT.java
new file mode 100644
index 00000000000..3827615fe24
--- /dev/null
+++
b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadTsFileActiveRetryIT.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.it;
+
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.it.utils.TsFileGenerator;
+import org.apache.iotdb.itbase.category.ClusterIT;
+import org.apache.iotdb.itbase.category.LocalStandaloneIT;
+
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.tsfile.write.schema.MeasurementSchema;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.io.File;
+import java.nio.file.Files;
+import java.sql.Connection;
+import java.sql.Statement;
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({LocalStandaloneIT.class, ClusterIT.class})
+public class IoTDBLoadTsFileActiveRetryIT {
+
+ private static final String DATABASE = "root.sg.test_0";
+ private static final String DEVICE = DATABASE + ".d_0";
+ private static final String MEASUREMENT = "sensor_00";
+ private static final long
UNALLOCATABLE_TABLET_CONVERSION_BATCH_MEMORY_SIZE_IN_BYTES =
+ Long.MAX_VALUE / 4;
+ private static final MeasurementSchema TSFILE_SCHEMA =
+ new MeasurementSchema(MEASUREMENT, TSDataType.INT32, TSEncoding.RLE);
+
+ private File tmpDir;
+
+ @Before
+ public void setUp() throws Exception {
+ tmpDir = new File(Files.createTempDirectory("load-active-retry").toUri());
+
EnvFactory.getEnv().getConfig().getCommonConfig().setPipeMemoryManagementEnabled(false);
+ EnvFactory.getEnv()
+ .getConfig()
+ .getDataNodeConfig()
+ .setMaxAllocateMemoryRatioForLoad(1.0)
+ .setLoadTsFileAnalyzeSchemaMemorySizeInBytes(1)
+ .setLoadTsFileTabletConversionBatchMemorySizeInBytes(
+ UNALLOCATABLE_TABLET_CONVERSION_BATCH_MEMORY_SIZE_IN_BYTES)
+ .setLoadActiveListeningCheckIntervalSeconds(1);
+
+ EnvFactory.getEnv().initClusterEnvironment();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ try (final Connection connection = EnvFactory.getEnv().getConnection();
+ final Statement statement = connection.createStatement()) {
+ statement.execute("delete database " + DATABASE);
+ } catch (final Exception ignored) {
+ // ignore cleanup failure
+ } finally {
+ EnvFactory.getEnv().cleanClusterEnvironment();
+ deleteRecursively(tmpDir);
+ }
+ }
+
+ @Test
+ public void testActiveLoadTemporaryUnavailableShouldKeepFileForRetry()
throws Exception {
+ final DataNodeWrapper dataNodeWrapper =
EnvFactory.getEnv().getDataNodeWrapper(0);
+ final File retryTsFile = new File(tmpDir, "1-0-0-0.tsfile");
+ final File permanentFailureTsFile = new File(tmpDir, "2-0-0-0.tsfile");
+ generateTsFile(retryTsFile);
+ generateTsFile(permanentFailureTsFile);
+
+ try (final Connection connection =
+
EnvFactory.getEnv().getConnectionWithSpecifiedDataNode(dataNodeWrapper);
+ final Statement statement = connection.createStatement()) {
+ statement.execute("create database " + DATABASE);
+ statement.execute(
+ String.format(
+ "create timeseries %s.%s %s", DEVICE, MEASUREMENT,
TSDataType.INT64.name()));
+
+ statement.execute(
+ String.format(
+ "load \"%s\" with ('database-level'='3', 'async'='true',
'on-success'='none', "
+ + "'convert-on-type-mismatch'='true')",
+ retryTsFile.getAbsolutePath()));
+ statement.execute(
+ String.format(
+ "load \"%s\" with ('database-level'='3', 'async'='true',
'on-success'='none', "
+ + "'convert-on-type-mismatch'='false')",
+ permanentFailureTsFile.getAbsolutePath()));
+
+ final File activeDir = getActiveLoadDir(dataNodeWrapper);
+ final File failDir = getActiveLoadFailDir(dataNodeWrapper);
+ final File activeTsFile = waitForFile(activeDir, retryTsFile.getName(),
30_000L);
+
+ Assert.assertNotNull(
+ "Async load should copy tsfile into active load directory",
activeTsFile);
+
+ Assert.assertNotNull(
+ "Permanent active load failure should be moved to fail dir",
+ waitForFile(failDir, permanentFailureTsFile.getName(),
TimeUnit.SECONDS.toMillis(60)));
+
+ assertFileKeptForRetry(
+ activeDir, failDir, retryTsFile.getName(),
TimeUnit.SECONDS.toMillis(12));
+ }
+ }
+
+ private void generateTsFile(final File tsFile) throws Exception {
+ try (final TsFileGenerator generator = new TsFileGenerator(tsFile)) {
+ generator.registerTimeseries(DEVICE,
Collections.singletonList(TSFILE_SCHEMA));
+ generator.generateData(DEVICE, 10, 1, false);
+ }
+ }
+
+ private File getActiveLoadDir(final DataNodeWrapper dataNodeWrapper) {
+ return new File(
+ dataNodeWrapper.getNodePath()
+ + File.separator
+ + "ext"
+ + File.separator
+ + "load"
+ + File.separator
+ + "pending");
+ }
+
+ private File getActiveLoadFailDir(final DataNodeWrapper dataNodeWrapper) {
+ return new File(
+ dataNodeWrapper.getNodePath()
+ + File.separator
+ + "ext"
+ + File.separator
+ + "load"
+ + File.separator
+ + "failed");
+ }
+
+ private File waitForFile(final File root, final String fileName, final long
timeoutMs)
+ throws InterruptedException {
+ final long deadline = System.currentTimeMillis() + timeoutMs;
+ while (System.currentTimeMillis() < deadline) {
+ final File file = findFile(root, fileName);
+ if (file != null) {
+ return file;
+ }
+ Thread.sleep(500L);
+ }
+ return null;
+ }
+
+ private boolean containsFile(final File root, final String fileName) {
+ return findFile(root, fileName) != null;
+ }
+
+ private void assertFileKeptForRetry(
+ final File activeDir, final File failDir, final String fileName, final
long observationMs)
+ throws InterruptedException {
+ final long deadline = System.currentTimeMillis() + observationMs;
+ while (System.currentTimeMillis() < deadline) {
+ Assert.assertTrue(
+ "Temporary unavailable active load should keep tsfile for retry",
+ containsFile(activeDir, fileName));
+ Assert.assertFalse(
+ "Temporary unavailable active load must not move tsfile to fail dir",
+ containsFile(failDir, fileName));
+ Thread.sleep(500L);
+ }
+ }
+
+ private File findFile(final File root, final String fileName) {
+ if (root == null || !root.exists()) {
+ return null;
+ }
+ if (root.isFile()) {
+ return root.getName().equals(fileName) ? root : null;
+ }
+
+ final File[] children = root.listFiles();
+ if (children == null) {
+ return null;
+ }
+ for (final File child : children) {
+ final File file = findFile(child, fileName);
+ if (file != null) {
+ return file;
+ }
+ }
+ return null;
+ }
+
+ private void deleteRecursively(final File file) {
+ if (file == null || !file.exists()) {
+ return;
+ }
+ final File[] children = file.listFiles();
+ if (children != null) {
+ for (final File child : children) {
+ deleteRecursively(child);
+ }
+ }
+ Assert.assertTrue(file.delete());
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/StorageEngineMessages.java
b/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/StorageEngineMessages.java
index cb90fac0b06..cc708e7e48c 100644
---
a/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/StorageEngineMessages.java
+++
b/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/StorageEngineMessages.java
@@ -489,6 +489,8 @@ public final class StorageEngineMessages {
public static final String RELEASE_DATA_CACHE_MEMORY_BLOCK = "Release Data
Cache Memory Block {}";
public static final String START_DATA_TYPE_CONVERSION_DOT = "Start data type
conversion for LoadTsFileStatement: {}.";
public static final String START_DATA_TYPE_CONVERSION = "Start data type
conversion for LoadTsFileStatement: {}";
+ public static final String INTERRUPTED_WAITING_TABLET_CONVERSION_SLOT =
+ "Interrupted while waiting for tablet conversion slot: ";
public static final String FAIL_TO_LOAD_TSFILE_TO_ACTIVE_DIR = "Fail to load
tsfile to Active dir";
public static final String FAIL_TO_LOAD_DISK_SPACE = "Fail to load disk
space of file {}";
public static final String LOAD_ACTIVE_LISTENING_DIR_NOT_SET = "Load active
listening dir is not set.";
@@ -521,6 +523,8 @@ public final class StorageEngineMessages {
public static final String ERROR_EXECUTING_ACTIVE_LOAD_JOB = "Error occurred
when executing active load periodical job.";
public static final String ACTIVE_LOAD_EXECUTOR_STARTED = "Active load
periodical jobs executor is started successfully.";
public static final String ACTIVE_LOAD_EXECUTOR_STOPPED = "Active load
periodical jobs executor is stopped successfully.";
+ public static final String ACTIVE_LOAD_TEMPORARILY_UNAVAILABLE =
+ "Rejecting auto load tsfile {} (isGeneratedByPipe = {}) due to temporary
unavailability, will retry later. Status: {}";
public static final String ERROR_MOVING_FILE_TO_FAIL_DIR = "Error occurred
during moving file {} to fail directory.";
public static final String FAILED_COUNT_FILES_IN_FAIL_DIR = "Failed to count
failed files in fail directory.";
diff --git
a/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/StorageEngineMessages.java
b/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/StorageEngineMessages.java
index e488c40067e..792909a97e0 100644
---
a/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/StorageEngineMessages.java
+++
b/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/StorageEngineMessages.java
@@ -489,6 +489,8 @@ public final class StorageEngineMessages {
public static final String RELEASE_DATA_CACHE_MEMORY_BLOCK = "释放数据缓存内存块 {}";
public static final String START_DATA_TYPE_CONVERSION_DOT = "开始对
LoadTsFileStatement: {} 进行数据类型转换。";
public static final String START_DATA_TYPE_CONVERSION = "开始对
LoadTsFileStatement: {} 进行数据类型转换";
+ public static final String INTERRUPTED_WAITING_TABLET_CONVERSION_SLOT =
+ "等待 tablet 转换槽位时被中断: ";
public static final String FAIL_TO_LOAD_TSFILE_TO_ACTIVE_DIR = "加载 TsFile 到
Active 目录失败";
public static final String FAIL_TO_LOAD_DISK_SPACE = "获取文件 {} 的磁盘空间失败";
public static final String LOAD_ACTIVE_LISTENING_DIR_NOT_SET = "未设置加载 Active
监听目录。";
@@ -521,6 +523,8 @@ public final class StorageEngineMessages {
public static final String ERROR_EXECUTING_ACTIVE_LOAD_JOB = "执行 Active
加载定期任务时发生错误。";
public static final String ACTIVE_LOAD_EXECUTOR_STARTED = "Active
加载定期任务执行器已成功启动。";
public static final String ACTIVE_LOAD_EXECUTOR_STOPPED = "Active
加载定期任务执行器已成功停止。";
+ public static final String ACTIVE_LOAD_TEMPORARILY_UNAVAILABLE =
+ "拒绝自动加载 TsFile {} (isGeneratedByPipe = {}),原因是系统暂时不可用,将稍后重试。状态: {}";
public static final String ERROR_MOVING_FILE_TO_FAIL_DIR = "将文件 {}
移动到失败目录时发生错误。";
public static final String FAILED_COUNT_FILES_IN_FAIL_DIR =
"统计失败目录中的失败文件数量失败。";
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadFailedMessageHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadFailedMessageHandler.java
index 92a1d45702b..4879e4f634e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadFailedMessageHandler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadFailedMessageHandler.java
@@ -19,7 +19,10 @@
package org.apache.iotdb.db.storageengine.load.active;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.conf.CommonDescriptor;
+import org.apache.iotdb.db.i18n.StorageEngineMessages;
+import org.apache.iotdb.rpc.TSStatusCode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -96,6 +99,20 @@ public class ActiveLoadFailedMessageHandler {
void handle(final ActiveLoadPendingQueue.ActiveLoadEntry entry);
}
+ public static boolean isStatusShouldRetry(
+ final ActiveLoadPendingQueue.ActiveLoadEntry entry, final TSStatus
status) {
+ if (status != null
+ && status.getCode() ==
TSStatusCode.LOAD_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode()) {
+ LOGGER.info(
+ StorageEngineMessages.ACTIVE_LOAD_TEMPORARILY_UNAVAILABLE,
+ entry.getFile(),
+ entry.isGeneratedByPipe(),
+ status);
+ return true;
+ }
+ return isExceptionMessageShouldRetry(entry, status == null ? null :
status.getMessage());
+ }
+
public static boolean isExceptionMessageShouldRetry(
final ActiveLoadPendingQueue.ActiveLoadEntry entry, final String
message) {
if (CommonDescriptor.getInstance().getConfig().isReadOnly()) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java
index 61e297a9e86..3f0696499eb 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java
@@ -294,7 +294,7 @@ public class ActiveLoadTsFileLoader {
private void handleLoadFailure(
final ActiveLoadPendingQueue.ActiveLoadEntry entry, final TSStatus
status) {
- if (!ActiveLoadFailedMessageHandler.isExceptionMessageShouldRetry(entry,
status.getMessage())) {
+ if (!ActiveLoadFailedMessageHandler.isStatusShouldRetry(entry, status)) {
LOGGER.warn(
"Failed to auto load tsfile {} (isGeneratedByPipe = {}), status: {}.
File will be moved to fail directory.",
entry.getFile(),
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTableConvertedInsertTabletStatementExceptionVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTableConvertedInsertTabletStatementExceptionVisitor.java
index 28dfafcbd41..ff538cb41da 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTableConvertedInsertTabletStatementExceptionVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTableConvertedInsertTabletStatementExceptionVisitor.java
@@ -23,7 +23,6 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.exception.SemanticException;
import org.apache.iotdb.commons.exception.auth.AccessDeniedException;
import org.apache.iotdb.commons.queryengine.plan.relational.sql.ast.Node;
-import org.apache.iotdb.db.exception.load.LoadRuntimeOutOfMemoryException;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.AstVisitor;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.LoadTsFile;
import org.apache.iotdb.rpc.TSStatusCode;
@@ -32,6 +31,9 @@ public class
LoadTableConvertedInsertTabletStatementExceptionVisitor
implements AstVisitor<TSStatus, Exception> {
@Override
public TSStatus visitNode(final Node node, final Exception context) {
+ if (LoadTsFileDataTypeConverter.isMemoryPressureException(context)) {
+ return LoadTsFileDataTypeConverter.getMemoryPressureStatus(context);
+ }
if (context instanceof AccessDeniedException) {
return new TSStatus(TSStatusCode.NO_PERMISSION.getStatusCode())
.setMessage(context.getMessage());
@@ -42,9 +44,8 @@ public class
LoadTableConvertedInsertTabletStatementExceptionVisitor
@Override
public TSStatus visitLoadTsFile(final LoadTsFile loadTsFile, final Exception
context) {
- if (context instanceof LoadRuntimeOutOfMemoryException) {
- return new
TSStatus(TSStatusCode.LOAD_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode())
- .setMessage(context.getMessage());
+ if (LoadTsFileDataTypeConverter.isMemoryPressureException(context)) {
+ return LoadTsFileDataTypeConverter.getMemoryPressureStatus(context);
} else if (context instanceof SemanticException) {
return new
TSStatus(TSStatusCode.LOAD_USER_CONFLICT_EXCEPTION.getStatusCode())
.setMessage(context.getMessage());
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeConvertedInsertTabletStatementExceptionVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeConvertedInsertTabletStatementExceptionVisitor.java
index 92445518f08..ce465f8843e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeConvertedInsertTabletStatementExceptionVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeConvertedInsertTabletStatementExceptionVisitor.java
@@ -22,7 +22,6 @@ package org.apache.iotdb.db.storageengine.load.converter;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.exception.SemanticException;
import org.apache.iotdb.commons.exception.auth.AccessDeniedException;
-import org.apache.iotdb.db.exception.load.LoadRuntimeOutOfMemoryException;
import org.apache.iotdb.db.queryengine.plan.statement.StatementNode;
import org.apache.iotdb.db.queryengine.plan.statement.StatementVisitor;
import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
@@ -33,6 +32,9 @@ public class
LoadTreeConvertedInsertTabletStatementExceptionVisitor
@Override
public TSStatus visitNode(final StatementNode node, final Exception context)
{
+ if (LoadTsFileDataTypeConverter.isMemoryPressureException(context)) {
+ return LoadTsFileDataTypeConverter.getMemoryPressureStatus(context);
+ }
if (context instanceof AccessDeniedException) {
return new TSStatus(TSStatusCode.NO_PERMISSION.getStatusCode())
.setMessage(context.getMessage());
@@ -44,9 +46,8 @@ public class
LoadTreeConvertedInsertTabletStatementExceptionVisitor
@Override
public TSStatus visitLoadFile(
final LoadTsFileStatement loadTsFileStatement, final Exception context) {
- if (context instanceof LoadRuntimeOutOfMemoryException) {
- return new
TSStatus(TSStatusCode.LOAD_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode())
- .setMessage(context.getMessage());
+ if (LoadTsFileDataTypeConverter.isMemoryPressureException(context)) {
+ return LoadTsFileDataTypeConverter.getMemoryPressureStatus(context);
} else if (context instanceof SemanticException) {
return new
TSStatus(TSStatusCode.LOAD_USER_CONFLICT_EXCEPTION.getStatusCode())
.setMessage(context.getMessage());
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTsFileDataTypeConverter.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTsFileDataTypeConverter.java
index 3aab4aac290..b8dc0e5d110 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTsFileDataTypeConverter.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTsFileDataTypeConverter.java
@@ -21,9 +21,14 @@ package org.apache.iotdb.db.storageengine.load.converter;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.conf.IoTDBConstant;
+import
org.apache.iotdb.commons.exception.pipe.PipeRuntimeOutOfMemoryCriticalException;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.queryengine.common.SqlDialect;
import org.apache.iotdb.db.auth.AuthorityChecker;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.load.LoadRuntimeOutOfMemoryException;
+import org.apache.iotdb.db.i18n.StorageEngineMessages;
+import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
import org.apache.iotdb.db.protocol.session.IClientSession;
import org.apache.iotdb.db.protocol.session.InternalClientSession;
import org.apache.iotdb.db.protocol.session.SessionManager;
@@ -46,6 +51,7 @@ import org.slf4j.LoggerFactory;
import java.time.ZoneId;
import java.util.Optional;
+import java.util.concurrent.Semaphore;
public class LoadTsFileDataTypeConverter {
@@ -53,6 +59,89 @@ public class LoadTsFileDataTypeConverter {
private static final SessionManager SESSION_MANAGER =
SessionManager.getInstance();
+ private static Semaphore getTabletConversionSemaphore() {
+ return TabletConversionSemaphoreHolder.INSTANCE;
+ }
+
+ private static int getTabletConversionPermitCount() {
+ final int configuredThreadCount =
+ Math.max(
+ 1,
+
IoTDBDescriptor.getInstance().getConfig().getLoadTsFileTabletConversionThreadCount());
+ if (!PipeConfig.getInstance().getPipeMemoryManagementEnabled()) {
+ return configuredThreadCount;
+ }
+ final long memorySafePermitCount =
+ getAllowedPipeTabletMemorySizeInBytes() /
estimatePipeTabletMemorySizePerConversion();
+ return (int) Math.max(1, Math.min((long) configuredThreadCount,
memorySafePermitCount));
+ }
+
+ private static long estimatePipeTabletMemorySizePerConversion() {
+ final PipeConfig pipeConfig = PipeConfig.getInstance();
+ final long tabletSize =
+ Math.max(
+ 1L,
IoTDBDescriptor.getInstance().getConfig().getPipeDataStructureTabletSizeInBytes());
+ final long maxReaderChunkSize = Math.max(0L,
pipeConfig.getPipeMaxReaderChunkSize());
+ final long tableSize =
+ Math.max(
+ 1L,
+ Math.min(tabletSize,
IoTDBDescriptor.getInstance().getConfig().getTargetChunkSize()));
+
+ // Both conversion paths keep source/converted tablet data and the current
reader chunk in
+ // memory. Table-model conversion may additionally keep source/converted
table-sized batches, so
+ // use the larger estimate as the per-conversion working set.
+ final long treeParserMemorySize = 2L * tabletSize + maxReaderChunkSize;
+ final long tableParserMemorySize = 2L * tabletSize + 2L * tableSize +
maxReaderChunkSize;
+ return Math.max(1L, Math.max(treeParserMemorySize, tableParserMemorySize));
+ }
+
+ private static long getAllowedPipeTabletMemorySizeInBytes() {
+ final PipeConfig pipeConfig = PipeConfig.getInstance();
+ return (long)
+
((pipeConfig.getPipeDataStructureTabletMemoryBlockAllocationRejectThreshold()
+ +
pipeConfig.getPipeDataStructureTsFileMemoryBlockAllocationRejectThreshold() / 2)
+ *
PipeDataNodeResourceManager.memory().getTotalNonFloatingMemorySizeInBytes());
+ }
+
+ private static Optional<TSStatus> getInterruptedConversionStatus(final
InterruptedException e) {
+ Thread.currentThread().interrupt();
+ return Optional.of(
+ new
TSStatus(TSStatusCode.LOAD_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode())
+ .setMessage(
+
StorageEngineMessages.INTERRUPTED_WAITING_TABLET_CONVERSION_SLOT +
e.getMessage()));
+ }
+
+ public static boolean isMemoryPressureException(final Throwable throwable) {
+ Throwable current = throwable;
+ while (current != null) {
+ if (current instanceof LoadRuntimeOutOfMemoryException
+ || current instanceof PipeRuntimeOutOfMemoryCriticalException) {
+ return true;
+ }
+ current = current.getCause();
+ }
+ return false;
+ }
+
+ public static TSStatus getMemoryPressureStatus(final Throwable throwable) {
+ Throwable current = throwable;
+ while (current != null) {
+ if (current instanceof LoadRuntimeOutOfMemoryException
+ || current instanceof PipeRuntimeOutOfMemoryCriticalException) {
+ return new
TSStatus(TSStatusCode.LOAD_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode())
+ .setMessage(current.getMessage());
+ }
+ current = current.getCause();
+ }
+
+ return new
TSStatus(TSStatusCode.LOAD_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode())
+ .setMessage(throwable == null ? null : throwable.getMessage());
+ }
+
+ private static class TabletConversionSemaphoreHolder {
+ private static final Semaphore INSTANCE = new
Semaphore(getTabletConversionPermitCount());
+ }
+
public static final LoadConvertedInsertTabletStatementTSStatusVisitor
STATEMENT_STATUS_VISITOR =
new LoadConvertedInsertTabletStatementTSStatusVisitor();
public static final LoadTreeConvertedInsertTabletStatementExceptionVisitor
@@ -83,16 +172,28 @@ public class LoadTsFileDataTypeConverter {
}
public Optional<TSStatus> convertForTableModel(final LoadTsFile
loadTsFileTableStatement) {
+ boolean isPermitAcquired = false;
try {
+ getTabletConversionSemaphore().acquire();
+ isPermitAcquired = true;
return loadTsFileTableStatement.accept(
tableStatementDataTypeConvertExecutionVisitor,
loadTsFileTableStatement.getDatabase());
+ } catch (final InterruptedException e) {
+ return getInterruptedConversionStatus(e);
} catch (Exception e) {
+ if (isMemoryPressureException(e)) {
+ return Optional.of(getMemoryPressureStatus(e));
+ }
LOGGER.warn(
"Failed to convert data types for table model statement {}.",
loadTsFileTableStatement,
e);
return Optional.of(
new
TSStatus(TSStatusCode.LOAD_FILE_ERROR.getStatusCode()).setMessage(e.getMessage()));
+ } finally {
+ if (isPermitAcquired) {
+ getTabletConversionSemaphore().release();
+ }
}
}
@@ -136,14 +237,25 @@ public class LoadTsFileDataTypeConverter {
public Optional<TSStatus> convertForTreeModel(final LoadTsFileStatement
loadTsFileTreeStatement) {
DataNodeSchemaLockManager.getInstance().releaseReadLock(context);
+ boolean isPermitAcquired = false;
try {
+ getTabletConversionSemaphore().acquire();
+ isPermitAcquired = true;
return
loadTsFileTreeStatement.accept(treeStatementDataTypeConvertExecutionVisitor,
null);
+ } catch (final InterruptedException e) {
+ return getInterruptedConversionStatus(e);
} catch (Exception e) {
+ if (isMemoryPressureException(e)) {
+ return Optional.of(getMemoryPressureStatus(e));
+ }
LOGGER.warn(
"Failed to convert data types for tree model statement {}.",
loadTsFileTreeStatement, e);
return Optional.of(
new
TSStatus(TSStatusCode.LOAD_FILE_ERROR.getStatusCode()).setMessage(e.getMessage()));
} finally {
+ if (isPermitAcquired) {
+ getTabletConversionSemaphore().release();
+ }
DataNodeSchemaLockManager.getInstance()
.takeReadLock(context, SchemaLockType.VALIDATE_VS_DELETION_TREE);
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadFailedMessageHandlerTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadFailedMessageHandlerTest.java
new file mode 100644
index 00000000000..67c909c1c02
--- /dev/null
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadFailedMessageHandlerTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.storageengine.load.active;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class ActiveLoadFailedMessageHandlerTest {
+
+ @Test
+ public void testTemporaryUnavailableStatusShouldRetryWithoutMessageMatch() {
+ final ActiveLoadPendingQueue.ActiveLoadEntry entry =
+ new ActiveLoadPendingQueue.ActiveLoadEntry("test.tsfile", "pending",
true, false);
+
+ Assert.assertTrue(
+ ActiveLoadFailedMessageHandler.isStatusShouldRetry(
+ entry,
+ new
TSStatus(TSStatusCode.LOAD_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode())
+ .setMessage("temporarily unavailable")));
+ }
+
+ @Test
+ public void testPermanentStatusShouldNotRetryWithoutMessageMatch() {
+ final ActiveLoadPendingQueue.ActiveLoadEntry entry =
+ new ActiveLoadPendingQueue.ActiveLoadEntry("test.tsfile", "pending",
true, false);
+
+ Assert.assertFalse(
+ ActiveLoadFailedMessageHandler.isStatusShouldRetry(
+ entry, new
TSStatus(TSStatusCode.LOAD_FILE_ERROR.getStatusCode()).setMessage("bad")));
+ }
+}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoaderTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoaderTest.java
new file mode 100644
index 00000000000..3208dc16ef5
--- /dev/null
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoaderTest.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.storageengine.load.active;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.cluster.NodeStatus;
+import org.apache.iotdb.commons.conf.CommonDescriptor;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import
org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile;
+import
org.apache.iotdb.db.storageengine.dataregion.modification.v1.ModificationFileV1;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.lang.reflect.Method;
+import java.nio.file.Files;
+
+public class ActiveLoadTsFileLoaderTest {
+
+ private final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+ private File tempDir;
+ private String originalFailDir;
+ private NodeStatus originalNodeStatus;
+
+ @Before
+ public void setUp() throws Exception {
+ tempDir = Files.createTempDirectory("active-load-retry").toFile();
+ originalFailDir = config.getLoadActiveListeningFailDir();
+ originalNodeStatus =
CommonDescriptor.getInstance().getConfig().getNodeStatus();
+
CommonDescriptor.getInstance().getConfig().setNodeStatus(NodeStatus.Running);
+ config.setLoadActiveListeningFailDir(new File(tempDir,
"failed").getAbsolutePath());
+ }
+
+ @After
+ public void tearDown() {
+ config.setLoadActiveListeningFailDir(originalFailDir);
+
CommonDescriptor.getInstance().getConfig().setNodeStatus(originalNodeStatus);
+ deleteRecursively(tempDir);
+ }
+
+ @Test
+ public void testTemporaryUnavailableStatusDoesNotMoveFileToFailDir() throws
Exception {
+ final ActiveLoadTsFileLoader loader = new ActiveLoadTsFileLoader();
+ final File tsFile = createTsFileWithCompanionFiles("retry.tsfile");
+ final ActiveLoadPendingQueue.ActiveLoadEntry entry =
+ new ActiveLoadPendingQueue.ActiveLoadEntry(
+ tsFile.getAbsolutePath(), tempDir.getAbsolutePath(), false, false);
+
+ invokeInitFailDirIfNecessary(loader);
+ invokeHandleLoadFailure(
+ loader,
+ entry,
+ new
TSStatus(TSStatusCode.LOAD_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode())
+ .setMessage("load conversion is temporarily unavailable"));
+
+ Assert.assertTrue(tsFile.exists());
+ Assert.assertTrue(new File(tsFile.getAbsolutePath() +
TsFileResource.RESOURCE_SUFFIX).exists());
+ Assert.assertTrue(new File(tsFile.getAbsolutePath() +
ModificationFileV1.FILE_SUFFIX).exists());
+ Assert.assertTrue(new File(tsFile.getAbsolutePath() +
ModificationFile.FILE_SUFFIX).exists());
+ Assert.assertFalse(new File(config.getLoadActiveListeningFailDir(),
tsFile.getName()).exists());
+ }
+
+ @Test
+ public void testPermanentFailureStatusMovesFileToFailDir() throws Exception {
+ final ActiveLoadTsFileLoader loader = new ActiveLoadTsFileLoader();
+ final File tsFile = createTsFileWithCompanionFiles("failed.tsfile");
+ final ActiveLoadPendingQueue.ActiveLoadEntry entry =
+ new ActiveLoadPendingQueue.ActiveLoadEntry(
+ tsFile.getAbsolutePath(), tempDir.getAbsolutePath(), false, false);
+
+ invokeInitFailDirIfNecessary(loader);
+ invokeHandleLoadFailure(
+ loader,
+ entry,
+ new
TSStatus(TSStatusCode.LOAD_FILE_ERROR.getStatusCode()).setMessage("permanent
error"));
+
+ final File failDir = new File(config.getLoadActiveListeningFailDir());
+ Assert.assertFalse(tsFile.exists());
+ Assert.assertTrue(new File(failDir, tsFile.getName()).exists());
+ Assert.assertTrue(
+ new File(failDir, tsFile.getName() +
TsFileResource.RESOURCE_SUFFIX).exists());
+ Assert.assertTrue(
+ new File(failDir, tsFile.getName() +
ModificationFileV1.FILE_SUFFIX).exists());
+ Assert.assertTrue(new File(failDir, tsFile.getName() +
ModificationFile.FILE_SUFFIX).exists());
+ }
+
+ private File createTsFileWithCompanionFiles(final String fileName) throws
Exception {
+ final File tsFile = new File(tempDir, fileName);
+ Assert.assertTrue(tsFile.createNewFile());
+ Assert.assertTrue(
+ new File(tsFile.getAbsolutePath() +
TsFileResource.RESOURCE_SUFFIX).createNewFile());
+ Assert.assertTrue(
+ new File(tsFile.getAbsolutePath() +
ModificationFileV1.FILE_SUFFIX).createNewFile());
+ Assert.assertTrue(
+ new File(tsFile.getAbsolutePath() +
ModificationFile.FILE_SUFFIX).createNewFile());
+ return tsFile;
+ }
+
+ private void invokeInitFailDirIfNecessary(final ActiveLoadTsFileLoader
loader) throws Exception {
+ final Method method =
ActiveLoadTsFileLoader.class.getDeclaredMethod("initFailDirIfNecessary");
+ method.setAccessible(true);
+ method.invoke(loader);
+ }
+
+ private void invokeHandleLoadFailure(
+ final ActiveLoadTsFileLoader loader,
+ final ActiveLoadPendingQueue.ActiveLoadEntry entry,
+ final TSStatus status)
+ throws Exception {
+ final Method method =
+ ActiveLoadTsFileLoader.class.getDeclaredMethod(
+ "handleLoadFailure", ActiveLoadPendingQueue.ActiveLoadEntry.class,
TSStatus.class);
+ method.setAccessible(true);
+ method.invoke(loader, entry, status);
+ }
+
+ private static void deleteRecursively(final File file) {
+ if (file == null || !file.exists()) {
+ return;
+ }
+ final File[] children = file.listFiles();
+ if (children != null) {
+ for (final File child : children) {
+ deleteRecursively(child);
+ }
+ }
+ Assert.assertTrue(file.delete());
+ }
+}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeStatementDataTypeConvertExecutionVisitorTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeStatementDataTypeConvertExecutionVisitorTest.java
index 32b15a1859d..29766f5e5eb 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeStatementDataTypeConvertExecutionVisitorTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeStatementDataTypeConvertExecutionVisitorTest.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.storageengine.load.converter;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.conf.CommonDescriptor;
+import
org.apache.iotdb.commons.exception.pipe.PipeRuntimeOutOfMemoryCriticalException;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBTreePattern;
import
org.apache.iotdb.db.pipe.event.common.tsfile.parser.scan.TsFileInsertionEventScanParser;
@@ -209,6 +210,24 @@ public class
LoadTreeStatementDataTypeConvertExecutionVisitorTest {
pointCountByTimeseries.getOrDefault(ALIGNED_DEVICE + ".s12", 0) <
ROW_COUNT_PER_DEVICE);
}
+ @Test
+ public void testPipeOutOfMemoryIsTemporaryUnavailable() throws Exception {
+ tsFile = File.createTempFile("oom", ".tsfile");
+
+ final LoadTreeConvertedInsertTabletStatementExceptionVisitor visitor =
+ new LoadTreeConvertedInsertTabletStatementExceptionVisitor();
+ final TSStatus status =
+ visitor.visitLoadFile(
+ LoadTsFileStatement.createUnchecked(tsFile.getAbsolutePath()),
+ new IllegalStateException(
+ "wrapped memory pressure",
+ new PipeRuntimeOutOfMemoryCriticalException("pipe tablet
memory is not enough")));
+
+ Assert.assertEquals(
+ TSStatusCode.LOAD_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode(),
status.getCode());
+ Assert.assertNotEquals(TSStatusCode.LOAD_FILE_ERROR.getStatusCode(),
status.getCode());
+ }
+
private void writeTsFile(final File file) throws Exception {
if (file.exists()) {
Assert.assertTrue(file.delete());