This is an automated email from the ASF dual-hosted git repository.

Caideyipi pushed a commit to branch load-sit
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 53cded69f5081042fb18aec1f91e643a7ffffa88
Author: Caideyipi <[email protected]>
AuthorDate: Thu May 7 09:35:48 2026 +0800

    Load: Fixed multiple bugs (#17565)
---
 .../plan/statement/crud/LoadTsFileStatement.java   |   1 +
 .../db/storageengine/load/LoadTsFileManager.java   |  54 +++++++----
 .../load/memory/LoadTsFileMemoryManager.java       |  13 ++-
 .../statement/crud/LoadTsFileStatementTest.java    |  79 +++++++++++++++
 .../load/memory/LoadTsFileMemoryManagerTest.java   | 106 +++++++++++++++++++++
 5 files changed, 234 insertions(+), 19 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java
index 3b10d3733e1..8cc6ac9fb30 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java
@@ -290,6 +290,7 @@ public class LoadTsFileStatement extends Statement {
 
       final LoadTsFileStatement statement = new LoadTsFileStatement();
       statement.databaseLevel = this.databaseLevel;
+      statement.database = this.database;
       statement.verifySchema = this.verifySchema;
       statement.deleteAfterLoad = this.deleteAfterLoad;
       statement.convertOnTypeMismatch = this.convertOnTypeMismatch;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java
index ce1bb680f21..0b85e3f0e64 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java
@@ -128,6 +128,34 @@ public class LoadTsFileManager {
     activeLoadAgent.start();
   }
 
+  private long getCleanupTaskDelayInMs() {
+    return CONFIG.getLoadCleanupTaskExecutionDelayTimeSeconds() * 1000L;
+  }
+
+  private void createCleanupTaskIfAbsent(final String uuid) {
+    synchronized (uuid2CleanupTask) {
+      if (uuid2CleanupTask.containsKey(uuid)) {
+        return;
+      }
+
+      final CleanupTask cleanupTask = new CleanupTask(uuid, 
getCleanupTaskDelayInMs());
+      uuid2CleanupTask.put(uuid, cleanupTask);
+      cleanupTaskQueue.add(cleanupTask);
+    }
+  }
+
+  private void rescheduleCleanupTask(final CleanupTask cleanupTask) {
+    synchronized (uuid2CleanupTask) {
+      if (uuid2CleanupTask.get(cleanupTask.uuid) != cleanupTask) {
+        return;
+      }
+
+      cleanupTaskQueue.remove(cleanupTask);
+      cleanupTask.resetScheduledTime();
+      cleanupTaskQueue.add(cleanupTask);
+    }
+  }
+
   private void registerCleanupTaskExecutor() {
     PipeDataNodeAgent.runtime()
         .registerPeriodicalJob(
@@ -199,26 +227,13 @@ public class LoadTsFileManager {
               uuid2WriterManager.put(uuid, writerManager);
               writerManager.close();
 
-              synchronized (uuid2CleanupTask) {
-                final CleanupTask cleanupTask =
-                    new CleanupTask(
-                        uuid, 
CONFIG.getLoadCleanupTaskExecutionDelayTimeSeconds() * 1000);
-                uuid2CleanupTask.put(uuid, cleanupTask);
-                cleanupTaskQueue.add(cleanupTask);
-              }
+              createCleanupTaskIfAbsent(uuid);
             });
   }
 
   public void writeToDataRegion(DataRegion dataRegion, LoadTsFilePieceNode 
pieceNode, String uuid)
       throws IOException, PageException {
-    if (!uuid2WriterManager.containsKey(uuid)) {
-      synchronized (uuid2CleanupTask) {
-        final CleanupTask cleanupTask =
-            new CleanupTask(uuid, 
CONFIG.getLoadCleanupTaskExecutionDelayTimeSeconds() * 1000);
-        uuid2CleanupTask.put(uuid, cleanupTask);
-        cleanupTaskQueue.add(cleanupTask);
-      }
-    }
+    createCleanupTaskIfAbsent(uuid);
 
     final Optional<CleanupTask> cleanupTask = 
Optional.ofNullable(uuid2CleanupTask.get(uuid));
     cleanupTask.ifPresent(CleanupTask::markLoadTaskRunning);
@@ -293,6 +308,8 @@ public class LoadTsFileManager {
       return false;
     }
 
+    createCleanupTaskIfAbsent(uuid);
+
     final Optional<CleanupTask> cleanupTask = 
Optional.ofNullable(uuid2CleanupTask.get(uuid));
     cleanupTask.ifPresent(CleanupTask::markLoadTaskRunning);
     try {
@@ -315,9 +332,10 @@ public class LoadTsFileManager {
 
   private void clean(String uuid) {
     synchronized (uuid2CleanupTask) {
-      final CleanupTask cleanupTask = uuid2CleanupTask.get(uuid);
+      final CleanupTask cleanupTask = uuid2CleanupTask.remove(uuid);
       if (cleanupTask != null) {
         cleanupTask.cancel();
+        cleanupTaskQueue.remove(cleanupTask);
       }
     }
 
@@ -744,12 +762,12 @@ public class LoadTsFileManager {
 
     public void markLoadTaskRunning() {
       isLoadTaskRunning = true;
-      resetScheduledTime();
+      rescheduleCleanupTask(this);
     }
 
     public void markLoadTaskNotRunning() {
       isLoadTaskRunning = false;
-      resetScheduledTime();
+      rescheduleCleanupTask(this);
     }
 
     public void resetScheduledTime() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileMemoryManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileMemoryManager.java
index 6c85d2280fd..c3f88815300 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileMemoryManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileMemoryManager.java
@@ -126,7 +126,18 @@ public class LoadTsFileMemoryManager {
     if (dataCacheMemoryBlock == null) {
       final long actuallyAllocateMemoryInBytes =
           tryAllocateFromQuery(MEMORY_TOTAL_SIZE_FROM_QUERY_IN_BYTES >> 2);
-      dataCacheMemoryBlock = new 
LoadTsFileDataCacheMemoryBlock(actuallyAllocateMemoryInBytes);
+      try {
+        dataCacheMemoryBlock = new 
LoadTsFileDataCacheMemoryBlock(actuallyAllocateMemoryInBytes);
+      } catch (RuntimeException e) {
+        if (actuallyAllocateMemoryInBytes > 0) {
+          try {
+            releaseToQuery(actuallyAllocateMemoryInBytes);
+          } catch (RuntimeException releaseException) {
+            e.addSuppressed(releaseException);
+          }
+        }
+        throw e;
+      }
       LOGGER.info(
           "Create Data Cache Memory Block {}, allocate memory {}",
           dataCacheMemoryBlock,
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatementTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatementTest.java
new file mode 100644
index 00000000000..941794bb074
--- /dev/null
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatementTest.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.statement.crud;
+
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Comparator;
+import java.util.List;
+import java.util.stream.Stream;
+
+public class LoadTsFileStatementTest {
+
+  @Test
+  public void testSubStatementsKeepDatabase() throws Exception {
+    final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+    final int originalBatchSize = config.getLoadTsFileSubStatementBatchSize();
+    final Path tempDir = 
Files.createTempDirectory("load-tsfile-sub-statements");
+
+    try {
+      config.setLoadTsFileSubStatementBatchSize(1);
+      Files.createFile(tempDir.resolve("a.tsfile"));
+      Files.createFile(tempDir.resolve("b.tsfile"));
+
+      final LoadTsFileStatement statement = new 
LoadTsFileStatement(tempDir.toString());
+      statement.setDatabase("test_db");
+
+      final List<LoadTsFileStatement> subStatements = 
statement.getSubStatements();
+      Assert.assertEquals(2, subStatements.size());
+      subStatements.forEach(
+          subStatement -> Assert.assertEquals("test_db", 
subStatement.getDatabase()));
+    } finally {
+      config.setLoadTsFileSubStatementBatchSize(originalBatchSize);
+      deleteRecursively(tempDir);
+    }
+  }
+
+  private static void deleteRecursively(final Path path) throws IOException {
+    if (path == null || !Files.exists(path)) {
+      return;
+    }
+
+    try (final Stream<Path> pathStream = Files.walk(path)) {
+      pathStream
+          .sorted(Comparator.reverseOrder())
+          .forEach(
+              currentPath -> {
+                try {
+                  Files.deleteIfExists(currentPath);
+                } catch (IOException e) {
+                  throw new RuntimeException(e);
+                }
+              });
+    }
+  }
+}
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileMemoryManagerTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileMemoryManagerTest.java
new file mode 100644
index 00000000000..f3a1bf7e411
--- /dev/null
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileMemoryManagerTest.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.storageengine.load.memory;
+
+import org.apache.iotdb.db.exception.load.LoadRuntimeOutOfMemoryException;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Field;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.spy;
+
+public class LoadTsFileMemoryManagerTest {
+
+  @Test
+  public void testAllocateDataCacheMemoryBlockDoesNotDoubleCountMemory() 
throws Exception {
+    final long allocatedMemoryInBytes = 2L * 1024 * 1024;
+    final LoadTsFileMemoryManager manager = spy(newMemoryManager());
+
+    doAnswer(
+            invocation -> {
+              setUsedMemorySize(manager, allocatedMemoryInBytes);
+              return allocatedMemoryInBytes;
+            })
+        .when(manager)
+        .tryAllocateFromQuery(anyLong());
+
+    manager.allocateDataCacheMemoryBlock();
+
+    Assert.assertEquals(allocatedMemoryInBytes, 
manager.getUsedMemorySizeInBytes());
+    Assert.assertNotNull(getDataCacheMemoryBlock(manager));
+  }
+
+  @Test
+  public void 
testAllocateDataCacheMemoryBlockRollsBackPartialAllocationOnFailure()
+      throws Exception {
+    final long allocatedMemoryInBytes = 512L;
+    final LoadTsFileMemoryManager manager = spy(newMemoryManager());
+
+    doAnswer(
+            invocation -> {
+              setUsedMemorySize(manager, allocatedMemoryInBytes);
+              return allocatedMemoryInBytes;
+            })
+        .when(manager)
+        .tryAllocateFromQuery(anyLong());
+    doAnswer(
+            invocation -> {
+              setUsedMemorySize(manager, 0L);
+              return null;
+            })
+        .when(manager)
+        .releaseToQuery(anyLong());
+
+    try {
+      manager.allocateDataCacheMemoryBlock();
+      Assert.fail("Expected LoadRuntimeOutOfMemoryException");
+    } catch (LoadRuntimeOutOfMemoryException e) {
+      Assert.assertEquals(0L, manager.getUsedMemorySizeInBytes());
+      Assert.assertNull(getDataCacheMemoryBlock(manager));
+    }
+  }
+
+  private static LoadTsFileMemoryManager newMemoryManager() throws Exception {
+    final Constructor<LoadTsFileMemoryManager> constructor =
+        LoadTsFileMemoryManager.class.getDeclaredConstructor();
+    constructor.setAccessible(true);
+    return constructor.newInstance();
+  }
+
+  private static void setUsedMemorySize(
+      final LoadTsFileMemoryManager manager, final long usedMemorySizeInBytes) 
throws Exception {
+    final Field field = 
LoadTsFileMemoryManager.class.getDeclaredField("usedMemorySizeInBytes");
+    field.setAccessible(true);
+    ((AtomicLong) field.get(manager)).set(usedMemorySizeInBytes);
+  }
+
+  private static LoadTsFileDataCacheMemoryBlock getDataCacheMemoryBlock(
+      final LoadTsFileMemoryManager manager) throws Exception {
+    final Field field = 
LoadTsFileMemoryManager.class.getDeclaredField("dataCacheMemoryBlock");
+    field.setAccessible(true);
+    return (LoadTsFileDataCacheMemoryBlock) field.get(manager);
+  }
+}

Reply via email to