This is an automated email from the ASF dual-hosted git repository.
jt2594838 pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new f6995bbb80d [To dev/1.3] Load: Fixed multiple bugs (#17565) (#17623)
f6995bbb80d is described below
commit f6995bbb80d4b8529476420eb660fd32c82988fe
Author: Caideyipi <[email protected]>
AuthorDate: Sat May 9 15:12:44 2026 +0800
[To dev/1.3] Load: Fixed multiple bugs (#17565) (#17623)
* Load: Fixed multiple bugs (#17565)
* fix
* test-del
---
.../db/storageengine/load/LoadTsFileManager.java | 54 +++++++----
.../load/memory/LoadTsFileMemoryManager.java | 13 ++-
.../load/memory/LoadTsFileMemoryManagerTest.java | 106 +++++++++++++++++++++
3 files changed, 154 insertions(+), 19 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java
index ce1bb680f21..0b85e3f0e64 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java
@@ -128,6 +128,34 @@ public class LoadTsFileManager {
activeLoadAgent.start();
}
+ private long getCleanupTaskDelayInMs() {
+ return CONFIG.getLoadCleanupTaskExecutionDelayTimeSeconds() * 1000L;
+ }
+
+ private void createCleanupTaskIfAbsent(final String uuid) {
+ synchronized (uuid2CleanupTask) {
+ if (uuid2CleanupTask.containsKey(uuid)) {
+ return;
+ }
+
+ final CleanupTask cleanupTask = new CleanupTask(uuid,
getCleanupTaskDelayInMs());
+ uuid2CleanupTask.put(uuid, cleanupTask);
+ cleanupTaskQueue.add(cleanupTask);
+ }
+ }
+
+ private void rescheduleCleanupTask(final CleanupTask cleanupTask) {
+ synchronized (uuid2CleanupTask) {
+ if (uuid2CleanupTask.get(cleanupTask.uuid) != cleanupTask) {
+ return;
+ }
+
+ cleanupTaskQueue.remove(cleanupTask);
+ cleanupTask.resetScheduledTime();
+ cleanupTaskQueue.add(cleanupTask);
+ }
+ }
+
private void registerCleanupTaskExecutor() {
PipeDataNodeAgent.runtime()
.registerPeriodicalJob(
@@ -199,26 +227,13 @@ public class LoadTsFileManager {
uuid2WriterManager.put(uuid, writerManager);
writerManager.close();
- synchronized (uuid2CleanupTask) {
- final CleanupTask cleanupTask =
- new CleanupTask(
- uuid,
CONFIG.getLoadCleanupTaskExecutionDelayTimeSeconds() * 1000);
- uuid2CleanupTask.put(uuid, cleanupTask);
- cleanupTaskQueue.add(cleanupTask);
- }
+ createCleanupTaskIfAbsent(uuid);
});
}
public void writeToDataRegion(DataRegion dataRegion, LoadTsFilePieceNode
pieceNode, String uuid)
throws IOException, PageException {
- if (!uuid2WriterManager.containsKey(uuid)) {
- synchronized (uuid2CleanupTask) {
- final CleanupTask cleanupTask =
- new CleanupTask(uuid,
CONFIG.getLoadCleanupTaskExecutionDelayTimeSeconds() * 1000);
- uuid2CleanupTask.put(uuid, cleanupTask);
- cleanupTaskQueue.add(cleanupTask);
- }
- }
+ createCleanupTaskIfAbsent(uuid);
final Optional<CleanupTask> cleanupTask =
Optional.ofNullable(uuid2CleanupTask.get(uuid));
cleanupTask.ifPresent(CleanupTask::markLoadTaskRunning);
@@ -293,6 +308,8 @@ public class LoadTsFileManager {
return false;
}
+ createCleanupTaskIfAbsent(uuid);
+
final Optional<CleanupTask> cleanupTask =
Optional.ofNullable(uuid2CleanupTask.get(uuid));
cleanupTask.ifPresent(CleanupTask::markLoadTaskRunning);
try {
@@ -315,9 +332,10 @@ public class LoadTsFileManager {
private void clean(String uuid) {
synchronized (uuid2CleanupTask) {
- final CleanupTask cleanupTask = uuid2CleanupTask.get(uuid);
+ final CleanupTask cleanupTask = uuid2CleanupTask.remove(uuid);
if (cleanupTask != null) {
cleanupTask.cancel();
+ cleanupTaskQueue.remove(cleanupTask);
}
}
@@ -744,12 +762,12 @@ public class LoadTsFileManager {
public void markLoadTaskRunning() {
isLoadTaskRunning = true;
- resetScheduledTime();
+ rescheduleCleanupTask(this);
}
public void markLoadTaskNotRunning() {
isLoadTaskRunning = false;
- resetScheduledTime();
+ rescheduleCleanupTask(this);
}
public void resetScheduledTime() {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileMemoryManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileMemoryManager.java
index 6c85d2280fd..c3f88815300 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileMemoryManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileMemoryManager.java
@@ -126,7 +126,18 @@ public class LoadTsFileMemoryManager {
if (dataCacheMemoryBlock == null) {
final long actuallyAllocateMemoryInBytes =
tryAllocateFromQuery(MEMORY_TOTAL_SIZE_FROM_QUERY_IN_BYTES >> 2);
- dataCacheMemoryBlock = new
LoadTsFileDataCacheMemoryBlock(actuallyAllocateMemoryInBytes);
+ try {
+ dataCacheMemoryBlock = new
LoadTsFileDataCacheMemoryBlock(actuallyAllocateMemoryInBytes);
+ } catch (RuntimeException e) {
+ if (actuallyAllocateMemoryInBytes > 0) {
+ try {
+ releaseToQuery(actuallyAllocateMemoryInBytes);
+ } catch (RuntimeException releaseException) {
+ e.addSuppressed(releaseException);
+ }
+ }
+ throw e;
+ }
LOGGER.info(
"Create Data Cache Memory Block {}, allocate memory {}",
dataCacheMemoryBlock,
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileMemoryManagerTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileMemoryManagerTest.java
new file mode 100644
index 00000000000..f3a1bf7e411
--- /dev/null
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileMemoryManagerTest.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.storageengine.load.memory;
+
+import org.apache.iotdb.db.exception.load.LoadRuntimeOutOfMemoryException;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Field;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.spy;
+
+public class LoadTsFileMemoryManagerTest {
+
+ @Test
+ public void testAllocateDataCacheMemoryBlockDoesNotDoubleCountMemory()
throws Exception {
+ final long allocatedMemoryInBytes = 2L * 1024 * 1024;
+ final LoadTsFileMemoryManager manager = spy(newMemoryManager());
+
+ doAnswer(
+ invocation -> {
+ setUsedMemorySize(manager, allocatedMemoryInBytes);
+ return allocatedMemoryInBytes;
+ })
+ .when(manager)
+ .tryAllocateFromQuery(anyLong());
+
+ manager.allocateDataCacheMemoryBlock();
+
+ Assert.assertEquals(allocatedMemoryInBytes,
manager.getUsedMemorySizeInBytes());
+ Assert.assertNotNull(getDataCacheMemoryBlock(manager));
+ }
+
+ @Test
+ public void
testAllocateDataCacheMemoryBlockRollsBackPartialAllocationOnFailure()
+ throws Exception {
+ final long allocatedMemoryInBytes = 512L;
+ final LoadTsFileMemoryManager manager = spy(newMemoryManager());
+
+ doAnswer(
+ invocation -> {
+ setUsedMemorySize(manager, allocatedMemoryInBytes);
+ return allocatedMemoryInBytes;
+ })
+ .when(manager)
+ .tryAllocateFromQuery(anyLong());
+ doAnswer(
+ invocation -> {
+ setUsedMemorySize(manager, 0L);
+ return null;
+ })
+ .when(manager)
+ .releaseToQuery(anyLong());
+
+ try {
+ manager.allocateDataCacheMemoryBlock();
+ Assert.fail("Expected LoadRuntimeOutOfMemoryException");
+ } catch (LoadRuntimeOutOfMemoryException e) {
+ Assert.assertEquals(0L, manager.getUsedMemorySizeInBytes());
+ Assert.assertNull(getDataCacheMemoryBlock(manager));
+ }
+ }
+
+ private static LoadTsFileMemoryManager newMemoryManager() throws Exception {
+ final Constructor<LoadTsFileMemoryManager> constructor =
+ LoadTsFileMemoryManager.class.getDeclaredConstructor();
+ constructor.setAccessible(true);
+ return constructor.newInstance();
+ }
+
+ private static void setUsedMemorySize(
+ final LoadTsFileMemoryManager manager, final long usedMemorySizeInBytes)
throws Exception {
+ final Field field =
LoadTsFileMemoryManager.class.getDeclaredField("usedMemorySizeInBytes");
+ field.setAccessible(true);
+ ((AtomicLong) field.get(manager)).set(usedMemorySizeInBytes);
+ }
+
+ private static LoadTsFileDataCacheMemoryBlock getDataCacheMemoryBlock(
+ final LoadTsFileMemoryManager manager) throws Exception {
+ final Field field =
LoadTsFileMemoryManager.class.getDeclaredField("dataCacheMemoryBlock");
+ field.setAccessible(true);
+ return (LoadTsFileDataCacheMemoryBlock) field.get(manager);
+ }
+}