This is an automated email from the ASF dual-hosted git repository.
jt2594838 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new d1a234fc51e Load: Fixed multiple bugs (#17565)
d1a234fc51e is described below
commit d1a234fc51eb15360295394b1dbe531188a33602
Author: Caideyipi <[email protected]>
AuthorDate: Thu May 7 09:35:48 2026 +0800
Load: Fixed multiple bugs (#17565)
---
.../plan/statement/crud/LoadTsFileStatement.java | 1 +
.../db/storageengine/load/LoadTsFileManager.java | 54 +++++++----
.../load/memory/LoadTsFileMemoryManager.java | 13 ++-
.../statement/crud/LoadTsFileStatementTest.java | 79 +++++++++++++++
.../load/memory/LoadTsFileMemoryManagerTest.java | 106 +++++++++++++++++++++
5 files changed, 234 insertions(+), 19 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java
index a9b7ca100d0..512a62fdae7 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java
@@ -341,6 +341,7 @@ public class LoadTsFileStatement extends Statement {
final LoadTsFileStatement statement = new LoadTsFileStatement();
statement.databaseLevel = this.databaseLevel;
+ statement.database = this.database;
statement.verifySchema = this.verifySchema;
statement.deleteAfterLoad = this.deleteAfterLoad;
statement.convertOnTypeMismatch = this.convertOnTypeMismatch;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java
index 16c78811d08..a39e212892d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java
@@ -132,6 +132,34 @@ public class LoadTsFileManager {
activeLoadAgent.start();
}
+ private long getCleanupTaskDelayInMs() {
+ return CONFIG.getLoadCleanupTaskExecutionDelayTimeSeconds() * 1000L;
+ }
+
+ private void createCleanupTaskIfAbsent(final String uuid) {
+ synchronized (uuid2CleanupTask) {
+ if (uuid2CleanupTask.containsKey(uuid)) {
+ return;
+ }
+
+ final CleanupTask cleanupTask = new CleanupTask(uuid,
getCleanupTaskDelayInMs());
+ uuid2CleanupTask.put(uuid, cleanupTask);
+ cleanupTaskQueue.add(cleanupTask);
+ }
+ }
+
+ private void rescheduleCleanupTask(final CleanupTask cleanupTask) {
+ synchronized (uuid2CleanupTask) {
+ if (uuid2CleanupTask.get(cleanupTask.uuid) != cleanupTask) {
+ return;
+ }
+
+ cleanupTaskQueue.remove(cleanupTask);
+ cleanupTask.resetScheduledTime();
+ cleanupTaskQueue.add(cleanupTask);
+ }
+ }
+
private void registerCleanupTaskExecutor() {
PipeDataNodeAgent.runtime()
.registerPeriodicalJob(
@@ -203,26 +231,13 @@ public class LoadTsFileManager {
uuid2WriterManager.put(uuid, writerManager);
writerManager.close();
- synchronized (uuid2CleanupTask) {
- final CleanupTask cleanupTask =
- new CleanupTask(
- uuid,
CONFIG.getLoadCleanupTaskExecutionDelayTimeSeconds() * 1000);
- uuid2CleanupTask.put(uuid, cleanupTask);
- cleanupTaskQueue.add(cleanupTask);
- }
+ createCleanupTaskIfAbsent(uuid);
});
}
public void writeToDataRegion(DataRegion dataRegion, LoadTsFilePieceNode
pieceNode, String uuid)
throws IOException, PageException {
- if (!uuid2WriterManager.containsKey(uuid)) {
- synchronized (uuid2CleanupTask) {
- final CleanupTask cleanupTask =
- new CleanupTask(uuid,
CONFIG.getLoadCleanupTaskExecutionDelayTimeSeconds() * 1000);
- uuid2CleanupTask.put(uuid, cleanupTask);
- cleanupTaskQueue.add(cleanupTask);
- }
- }
+ createCleanupTaskIfAbsent(uuid);
final Optional<CleanupTask> cleanupTask =
Optional.ofNullable(uuid2CleanupTask.get(uuid));
cleanupTask.ifPresent(CleanupTask::markLoadTaskRunning);
@@ -304,6 +319,8 @@ public class LoadTsFileManager {
return false;
}
+ createCleanupTaskIfAbsent(uuid);
+
final Optional<CleanupTask> cleanupTask =
Optional.ofNullable(uuid2CleanupTask.get(uuid));
cleanupTask.ifPresent(CleanupTask::markLoadTaskRunning);
try {
@@ -326,9 +343,10 @@ public class LoadTsFileManager {
private void clean(String uuid) {
synchronized (uuid2CleanupTask) {
- final CleanupTask cleanupTask = uuid2CleanupTask.get(uuid);
+ final CleanupTask cleanupTask = uuid2CleanupTask.remove(uuid);
if (cleanupTask != null) {
cleanupTask.cancel();
+ cleanupTaskQueue.remove(cleanupTask);
}
}
@@ -776,12 +794,12 @@ public class LoadTsFileManager {
public void markLoadTaskRunning() {
isLoadTaskRunning = true;
- resetScheduledTime();
+ rescheduleCleanupTask(this);
}
public void markLoadTaskNotRunning() {
isLoadTaskRunning = false;
- resetScheduledTime();
+ rescheduleCleanupTask(this);
}
public void resetScheduledTime() {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileMemoryManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileMemoryManager.java
index e08a135d312..12dffa48e82 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileMemoryManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileMemoryManager.java
@@ -179,7 +179,18 @@ public class LoadTsFileMemoryManager {
if (dataCacheMemoryBlock == null) {
final long actuallyAllocateMemoryInBytes =
tryAllocateFromQuery(MEMORY_TOTAL_SIZE_FROM_QUERY_IN_BYTES >> 2);
- dataCacheMemoryBlock = new
LoadTsFileDataCacheMemoryBlock(actuallyAllocateMemoryInBytes);
+ try {
+ dataCacheMemoryBlock = new
LoadTsFileDataCacheMemoryBlock(actuallyAllocateMemoryInBytes);
+ } catch (RuntimeException e) {
+ if (actuallyAllocateMemoryInBytes > 0) {
+ try {
+ releaseToQuery(actuallyAllocateMemoryInBytes);
+ } catch (RuntimeException releaseException) {
+ e.addSuppressed(releaseException);
+ }
+ }
+ throw e;
+ }
LOGGER.info(
"Create Data Cache Memory Block {}, allocate memory {}",
dataCacheMemoryBlock,
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatementTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatementTest.java
new file mode 100644
index 00000000000..941794bb074
--- /dev/null
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatementTest.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.statement.crud;
+
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Comparator;
+import java.util.List;
+import java.util.stream.Stream;
+
+public class LoadTsFileStatementTest {
+
+ @Test
+ public void testSubStatementsKeepDatabase() throws Exception {
+ final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+ final int originalBatchSize = config.getLoadTsFileSubStatementBatchSize();
+ final Path tempDir =
Files.createTempDirectory("load-tsfile-sub-statements");
+
+ try {
+ config.setLoadTsFileSubStatementBatchSize(1);
+ Files.createFile(tempDir.resolve("a.tsfile"));
+ Files.createFile(tempDir.resolve("b.tsfile"));
+
+ final LoadTsFileStatement statement = new
LoadTsFileStatement(tempDir.toString());
+ statement.setDatabase("test_db");
+
+ final List<LoadTsFileStatement> subStatements =
statement.getSubStatements();
+ Assert.assertEquals(2, subStatements.size());
+ subStatements.forEach(
+ subStatement -> Assert.assertEquals("test_db",
subStatement.getDatabase()));
+ } finally {
+ config.setLoadTsFileSubStatementBatchSize(originalBatchSize);
+ deleteRecursively(tempDir);
+ }
+ }
+
+ private static void deleteRecursively(final Path path) throws IOException {
+ if (path == null || !Files.exists(path)) {
+ return;
+ }
+
+ try (final Stream<Path> pathStream = Files.walk(path)) {
+ pathStream
+ .sorted(Comparator.reverseOrder())
+ .forEach(
+ currentPath -> {
+ try {
+ Files.deleteIfExists(currentPath);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ }
+ }
+}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileMemoryManagerTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileMemoryManagerTest.java
new file mode 100644
index 00000000000..f3a1bf7e411
--- /dev/null
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileMemoryManagerTest.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.storageengine.load.memory;
+
+import org.apache.iotdb.db.exception.load.LoadRuntimeOutOfMemoryException;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Field;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.spy;
+
+public class LoadTsFileMemoryManagerTest {
+
+ @Test
+ public void testAllocateDataCacheMemoryBlockDoesNotDoubleCountMemory()
throws Exception {
+ final long allocatedMemoryInBytes = 2L * 1024 * 1024;
+ final LoadTsFileMemoryManager manager = spy(newMemoryManager());
+
+ doAnswer(
+ invocation -> {
+ setUsedMemorySize(manager, allocatedMemoryInBytes);
+ return allocatedMemoryInBytes;
+ })
+ .when(manager)
+ .tryAllocateFromQuery(anyLong());
+
+ manager.allocateDataCacheMemoryBlock();
+
+ Assert.assertEquals(allocatedMemoryInBytes,
manager.getUsedMemorySizeInBytes());
+ Assert.assertNotNull(getDataCacheMemoryBlock(manager));
+ }
+
+ @Test
+ public void
testAllocateDataCacheMemoryBlockRollsBackPartialAllocationOnFailure()
+ throws Exception {
+ final long allocatedMemoryInBytes = 512L;
+ final LoadTsFileMemoryManager manager = spy(newMemoryManager());
+
+ doAnswer(
+ invocation -> {
+ setUsedMemorySize(manager, allocatedMemoryInBytes);
+ return allocatedMemoryInBytes;
+ })
+ .when(manager)
+ .tryAllocateFromQuery(anyLong());
+ doAnswer(
+ invocation -> {
+ setUsedMemorySize(manager, 0L);
+ return null;
+ })
+ .when(manager)
+ .releaseToQuery(anyLong());
+
+ try {
+ manager.allocateDataCacheMemoryBlock();
+ Assert.fail("Expected LoadRuntimeOutOfMemoryException");
+ } catch (LoadRuntimeOutOfMemoryException e) {
+ Assert.assertEquals(0L, manager.getUsedMemorySizeInBytes());
+ Assert.assertNull(getDataCacheMemoryBlock(manager));
+ }
+ }
+
+ private static LoadTsFileMemoryManager newMemoryManager() throws Exception {
+ final Constructor<LoadTsFileMemoryManager> constructor =
+ LoadTsFileMemoryManager.class.getDeclaredConstructor();
+ constructor.setAccessible(true);
+ return constructor.newInstance();
+ }
+
+ private static void setUsedMemorySize(
+ final LoadTsFileMemoryManager manager, final long usedMemorySizeInBytes)
throws Exception {
+ final Field field =
LoadTsFileMemoryManager.class.getDeclaredField("usedMemorySizeInBytes");
+ field.setAccessible(true);
+ ((AtomicLong) field.get(manager)).set(usedMemorySizeInBytes);
+ }
+
+ private static LoadTsFileDataCacheMemoryBlock getDataCacheMemoryBlock(
+ final LoadTsFileMemoryManager manager) throws Exception {
+ final Field field =
LoadTsFileMemoryManager.class.getDeclaredField("dataCacheMemoryBlock");
+ field.setAccessible(true);
+ return (LoadTsFileDataCacheMemoryBlock) field.get(manager);
+ }
+}