This is an automated email from the ASF dual-hosted git repository.

jt2594838 pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new f6995bbb80d [To dev/1.3] Load: Fixed multiple bugs (#17565) (#17623)
f6995bbb80d is described below

commit f6995bbb80d4b8529476420eb660fd32c82988fe
Author: Caideyipi <[email protected]>
AuthorDate: Sat May 9 15:12:44 2026 +0800

    [To dev/1.3] Load: Fixed multiple bugs (#17565) (#17623)
    
    * Load: Fixed multiple bugs (#17565)
    
    * fix
    
    * test-del
---
 .../db/storageengine/load/LoadTsFileManager.java   |  54 +++++++----
 .../load/memory/LoadTsFileMemoryManager.java       |  13 ++-
 .../load/memory/LoadTsFileMemoryManagerTest.java   | 106 +++++++++++++++++++++
 3 files changed, 154 insertions(+), 19 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java
index ce1bb680f21..0b85e3f0e64 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java
@@ -128,6 +128,34 @@ public class LoadTsFileManager {
     activeLoadAgent.start();
   }
 
+  private long getCleanupTaskDelayInMs() {
+    return CONFIG.getLoadCleanupTaskExecutionDelayTimeSeconds() * 1000L;
+  }
+
+  private void createCleanupTaskIfAbsent(final String uuid) {
+    synchronized (uuid2CleanupTask) {
+      if (uuid2CleanupTask.containsKey(uuid)) {
+        return;
+      }
+
+      final CleanupTask cleanupTask = new CleanupTask(uuid, 
getCleanupTaskDelayInMs());
+      uuid2CleanupTask.put(uuid, cleanupTask);
+      cleanupTaskQueue.add(cleanupTask);
+    }
+  }
+
+  private void rescheduleCleanupTask(final CleanupTask cleanupTask) {
+    synchronized (uuid2CleanupTask) {
+      if (uuid2CleanupTask.get(cleanupTask.uuid) != cleanupTask) {
+        return;
+      }
+
+      cleanupTaskQueue.remove(cleanupTask);
+      cleanupTask.resetScheduledTime();
+      cleanupTaskQueue.add(cleanupTask);
+    }
+  }
+
   private void registerCleanupTaskExecutor() {
     PipeDataNodeAgent.runtime()
         .registerPeriodicalJob(
@@ -199,26 +227,13 @@ public class LoadTsFileManager {
               uuid2WriterManager.put(uuid, writerManager);
               writerManager.close();
 
-              synchronized (uuid2CleanupTask) {
-                final CleanupTask cleanupTask =
-                    new CleanupTask(
-                        uuid, 
CONFIG.getLoadCleanupTaskExecutionDelayTimeSeconds() * 1000);
-                uuid2CleanupTask.put(uuid, cleanupTask);
-                cleanupTaskQueue.add(cleanupTask);
-              }
+              createCleanupTaskIfAbsent(uuid);
             });
   }
 
   public void writeToDataRegion(DataRegion dataRegion, LoadTsFilePieceNode 
pieceNode, String uuid)
       throws IOException, PageException {
-    if (!uuid2WriterManager.containsKey(uuid)) {
-      synchronized (uuid2CleanupTask) {
-        final CleanupTask cleanupTask =
-            new CleanupTask(uuid, 
CONFIG.getLoadCleanupTaskExecutionDelayTimeSeconds() * 1000);
-        uuid2CleanupTask.put(uuid, cleanupTask);
-        cleanupTaskQueue.add(cleanupTask);
-      }
-    }
+    createCleanupTaskIfAbsent(uuid);
 
     final Optional<CleanupTask> cleanupTask = 
Optional.ofNullable(uuid2CleanupTask.get(uuid));
     cleanupTask.ifPresent(CleanupTask::markLoadTaskRunning);
@@ -293,6 +308,8 @@ public class LoadTsFileManager {
       return false;
     }
 
+    createCleanupTaskIfAbsent(uuid);
+
     final Optional<CleanupTask> cleanupTask = 
Optional.ofNullable(uuid2CleanupTask.get(uuid));
     cleanupTask.ifPresent(CleanupTask::markLoadTaskRunning);
     try {
@@ -315,9 +332,10 @@ public class LoadTsFileManager {
 
   private void clean(String uuid) {
     synchronized (uuid2CleanupTask) {
-      final CleanupTask cleanupTask = uuid2CleanupTask.get(uuid);
+      final CleanupTask cleanupTask = uuid2CleanupTask.remove(uuid);
       if (cleanupTask != null) {
         cleanupTask.cancel();
+        cleanupTaskQueue.remove(cleanupTask);
       }
     }
 
@@ -744,12 +762,12 @@ public class LoadTsFileManager {
 
     public void markLoadTaskRunning() {
       isLoadTaskRunning = true;
-      resetScheduledTime();
+      rescheduleCleanupTask(this);
     }
 
     public void markLoadTaskNotRunning() {
       isLoadTaskRunning = false;
-      resetScheduledTime();
+      rescheduleCleanupTask(this);
     }
 
     public void resetScheduledTime() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileMemoryManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileMemoryManager.java
index 6c85d2280fd..c3f88815300 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileMemoryManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileMemoryManager.java
@@ -126,7 +126,18 @@ public class LoadTsFileMemoryManager {
     if (dataCacheMemoryBlock == null) {
       final long actuallyAllocateMemoryInBytes =
           tryAllocateFromQuery(MEMORY_TOTAL_SIZE_FROM_QUERY_IN_BYTES >> 2);
-      dataCacheMemoryBlock = new 
LoadTsFileDataCacheMemoryBlock(actuallyAllocateMemoryInBytes);
+      try {
+        dataCacheMemoryBlock = new 
LoadTsFileDataCacheMemoryBlock(actuallyAllocateMemoryInBytes);
+      } catch (RuntimeException e) {
+        if (actuallyAllocateMemoryInBytes > 0) {
+          try {
+            releaseToQuery(actuallyAllocateMemoryInBytes);
+          } catch (RuntimeException releaseException) {
+            e.addSuppressed(releaseException);
+          }
+        }
+        throw e;
+      }
       LOGGER.info(
           "Create Data Cache Memory Block {}, allocate memory {}",
           dataCacheMemoryBlock,
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileMemoryManagerTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileMemoryManagerTest.java
new file mode 100644
index 00000000000..f3a1bf7e411
--- /dev/null
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileMemoryManagerTest.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.storageengine.load.memory;
+
+import org.apache.iotdb.db.exception.load.LoadRuntimeOutOfMemoryException;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Field;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.spy;
+
+public class LoadTsFileMemoryManagerTest {
+
+  @Test
+  public void testAllocateDataCacheMemoryBlockDoesNotDoubleCountMemory() 
throws Exception {
+    final long allocatedMemoryInBytes = 2L * 1024 * 1024;
+    final LoadTsFileMemoryManager manager = spy(newMemoryManager());
+
+    doAnswer(
+            invocation -> {
+              setUsedMemorySize(manager, allocatedMemoryInBytes);
+              return allocatedMemoryInBytes;
+            })
+        .when(manager)
+        .tryAllocateFromQuery(anyLong());
+
+    manager.allocateDataCacheMemoryBlock();
+
+    Assert.assertEquals(allocatedMemoryInBytes, 
manager.getUsedMemorySizeInBytes());
+    Assert.assertNotNull(getDataCacheMemoryBlock(manager));
+  }
+
+  @Test
+  public void 
testAllocateDataCacheMemoryBlockRollsBackPartialAllocationOnFailure()
+      throws Exception {
+    final long allocatedMemoryInBytes = 512L;
+    final LoadTsFileMemoryManager manager = spy(newMemoryManager());
+
+    doAnswer(
+            invocation -> {
+              setUsedMemorySize(manager, allocatedMemoryInBytes);
+              return allocatedMemoryInBytes;
+            })
+        .when(manager)
+        .tryAllocateFromQuery(anyLong());
+    doAnswer(
+            invocation -> {
+              setUsedMemorySize(manager, 0L);
+              return null;
+            })
+        .when(manager)
+        .releaseToQuery(anyLong());
+
+    try {
+      manager.allocateDataCacheMemoryBlock();
+      Assert.fail("Expected LoadRuntimeOutOfMemoryException");
+    } catch (LoadRuntimeOutOfMemoryException e) {
+      Assert.assertEquals(0L, manager.getUsedMemorySizeInBytes());
+      Assert.assertNull(getDataCacheMemoryBlock(manager));
+    }
+  }
+
+  private static LoadTsFileMemoryManager newMemoryManager() throws Exception {
+    final Constructor<LoadTsFileMemoryManager> constructor =
+        LoadTsFileMemoryManager.class.getDeclaredConstructor();
+    constructor.setAccessible(true);
+    return constructor.newInstance();
+  }
+
+  private static void setUsedMemorySize(
+      final LoadTsFileMemoryManager manager, final long usedMemorySizeInBytes) 
throws Exception {
+    final Field field = 
LoadTsFileMemoryManager.class.getDeclaredField("usedMemorySizeInBytes");
+    field.setAccessible(true);
+    ((AtomicLong) field.get(manager)).set(usedMemorySizeInBytes);
+  }
+
+  private static LoadTsFileDataCacheMemoryBlock getDataCacheMemoryBlock(
+      final LoadTsFileMemoryManager manager) throws Exception {
+    final Field field = 
LoadTsFileMemoryManager.class.getDeclaredField("dataCacheMemoryBlock");
+    field.setAccessible(true);
+    return (LoadTsFileDataCacheMemoryBlock) field.get(manager);
+  }
+}

Reply via email to