This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 7dd570d991a Load: Introduce LoadTsFileManager.CleanupTask to force 
close writer manager after exception occurring (#11924)
7dd570d991a is described below

commit 7dd570d991abfaef8d2cc85028000f363684b97a
Author: Steve Yurong Su <[email protected]>
AuthorDate: Thu Jan 18 20:43:16 2024 +0800

    Load: Introduce LoadTsFileManager.CleanupTask to force close writer manager 
after exception occurring (#11924)
---
 .../execution/load/LoadTsFileManager.java          | 143 ++++++++++++++-------
 .../iotdb/db/storageengine/StorageEngine.java      |  19 +--
 2 files changed, 102 insertions(+), 60 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
index 131e8f1884b..1d5cbd0205f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
@@ -20,7 +20,6 @@
 package org.apache.iotdb.db.queryengine.execution.load;
 
 import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
-import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.commons.consensus.index.ProgressIndex;
 import org.apache.iotdb.commons.file.SystemFileFactory;
@@ -31,6 +30,7 @@ import org.apache.iotdb.commons.utils.FileUtils;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.LoadFileException;
+import org.apache.iotdb.db.pipe.agent.PipeAgent;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadTsFilePieceNode;
 import org.apache.iotdb.db.queryengine.plan.scheduler.load.LoadTsFileScheduler;
 import 
org.apache.iotdb.db.queryengine.plan.scheduler.load.LoadTsFileScheduler.LoadCommand;
@@ -54,9 +54,7 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.PriorityBlockingQueue;
 
 /**
  * {@link LoadTsFileManager} is used for dealing with {@link 
LoadTsFilePieceNode} and {@link
@@ -76,21 +74,52 @@ public class LoadTsFileManager {
 
   private final File loadDir;
 
-  private final Map<String, TsFileWriterManager> uuid2WriterManager;
+  private final Map<String, TsFileWriterManager> uuid2WriterManager = new 
ConcurrentHashMap<>();
 
-  private final ScheduledExecutorService cleanupExecutors;
-  private final Map<String, ScheduledFuture<?>> uuid2Future;
+  private final Map<String, CleanupTask> uuid2CleanupTask = new 
ConcurrentHashMap<>();
+  private final PriorityBlockingQueue<CleanupTask> cleanupTaskQueue = new 
PriorityBlockingQueue<>();
 
   public LoadTsFileManager() {
     this.loadDir = 
SystemFileFactory.INSTANCE.getFile(CONFIG.getLoadTsFileDir());
-    this.uuid2WriterManager = new ConcurrentHashMap<>();
-    this.cleanupExecutors =
-        IoTDBThreadPoolFactory.newScheduledThreadPool(1, 
LoadTsFileManager.class.getName());
-    this.uuid2Future = new ConcurrentHashMap<>();
 
+    registerCleanupTaskExecutor();
     recover();
   }
 
+  private void registerCleanupTaskExecutor() {
+    PipeAgent.runtime()
+        .registerPeriodicalJob(
+            "LoadTsFileManager#cleanupTasks",
+            this::cleanupTasks,
+            LoadTsFileScheduler.LOAD_TASK_MAX_TIME_IN_SECOND);
+  }
+
+  private void cleanupTasks() {
+    while (!cleanupTaskQueue.isEmpty()) {
+      synchronized (uuid2CleanupTask) {
+        if (cleanupTaskQueue.isEmpty()) {
+          continue;
+        }
+
+        final CleanupTask cleanupTask = cleanupTaskQueue.peek();
+        if (cleanupTask.scheduledTime <= System.currentTimeMillis()) {
+          cleanupTask.run();
+
+          uuid2CleanupTask.remove(cleanupTask.uuid);
+          cleanupTaskQueue.poll();
+        } else {
+          final long waitTimeInMs = cleanupTask.scheduledTime - 
System.currentTimeMillis();
+          LOGGER.info(
+              "Next load cleanup task {} is not ready to run, wait for at 
least {} ms ({}s).",
+              cleanupTask.uuid,
+              waitTimeInMs,
+              waitTimeInMs / 1000.0);
+          return;
+        }
+      }
+    }
+  }
+
   private void recover() {
     if (!loadDir.exists()) {
       return;
@@ -106,26 +135,28 @@ public class LoadTsFileManager {
 
       uuid2WriterManager.put(uuid, writerManager);
       writerManager.close();
-      uuid2Future.put(
-          uuid,
-          cleanupExecutors.schedule(
-              () -> forceCloseWriterManager(uuid),
-              LoadTsFileScheduler.LOAD_TASK_MAX_TIME_IN_SECOND,
-              TimeUnit.SECONDS));
+
+      synchronized (uuid2CleanupTask) {
+        final CleanupTask cleanupTask =
+            new CleanupTask(uuid, 
LoadTsFileScheduler.LOAD_TASK_MAX_TIME_IN_SECOND * 1000);
+        uuid2CleanupTask.put(uuid, cleanupTask);
+        cleanupTaskQueue.add(cleanupTask);
+      }
     }
   }
 
   public void writeToDataRegion(DataRegion dataRegion, LoadTsFilePieceNode 
pieceNode, String uuid)
       throws IOException {
     if (!uuid2WriterManager.containsKey(uuid)) {
-      uuid2Future.put(
-          uuid,
-          cleanupExecutors.schedule(
-              () -> forceCloseWriterManager(uuid),
-              LoadTsFileScheduler.LOAD_TASK_MAX_TIME_IN_SECOND,
-              TimeUnit.SECONDS));
+      synchronized (uuid2CleanupTask) {
+        final CleanupTask cleanupTask =
+            new CleanupTask(uuid, 
LoadTsFileScheduler.LOAD_TASK_MAX_TIME_IN_SECOND * 1000);
+        uuid2CleanupTask.put(uuid, cleanupTask);
+        cleanupTaskQueue.add(cleanupTask);
+      }
     }
-    TsFileWriterManager writerManager =
+
+    final TsFileWriterManager writerManager =
         uuid2WriterManager.computeIfAbsent(
             uuid, o -> new 
TsFileWriterManager(SystemFileFactory.INSTANCE.getFile(loadDir, uuid)));
     for (TsFileData tsFileData : pieceNode.getAllTsFileData()) {
@@ -158,29 +189,19 @@ public class LoadTsFileManager {
   }
 
   private void clean(String uuid) {
-    uuid2WriterManager.get(uuid).close();
-    uuid2WriterManager.remove(uuid);
-    uuid2Future.get(uuid).cancel(true);
-    uuid2Future.remove(uuid);
-
-    final Path loadDirPath = loadDir.toPath();
-    if (!Files.exists(loadDirPath)) {
-      return;
-    }
-    try {
-      Files.deleteIfExists(loadDirPath);
-      LOGGER.info("Load dir {} was deleted.", loadDirPath);
-    } catch (DirectoryNotEmptyException e) {
-      LOGGER.info("Load dir {} is not empty, skip deleting.", loadDirPath);
-    } catch (IOException e) {
-      LOGGER.info(MESSAGE_DELETE_FAIL, loadDirPath);
+    synchronized (uuid2CleanupTask) {
+      final CleanupTask cleanupTask = uuid2CleanupTask.get(uuid);
+      if (cleanupTask != null) {
+        cleanupTask.cancel();
+      }
     }
+
+    forceCloseWriterManager(uuid);
   }
 
   private void forceCloseWriterManager(String uuid) {
     uuid2WriterManager.get(uuid).close();
     uuid2WriterManager.remove(uuid);
-    uuid2Future.remove(uuid);
 
     final Path loadDirPath = loadDir.toPath();
     if (!Files.exists(loadDirPath)) {
@@ -375,6 +396,42 @@ public class LoadTsFileManager {
     }
   }
 
+  private class CleanupTask implements Runnable, Comparable<CleanupTask> {
+
+    private final String uuid;
+    private final long scheduledTime;
+
+    private volatile boolean isCanceled = false;
+
+    private CleanupTask(String uuid, long delayInMs) {
+      this.uuid = uuid;
+      scheduledTime = System.currentTimeMillis() + delayInMs;
+    }
+
+    public void cancel() {
+      isCanceled = true;
+    }
+
+    @Override
+    public void run() {
+      if (isCanceled) {
+        LOGGER.info("Load cleanup task {} is canceled.", uuid);
+      } else {
+        LOGGER.info("Load cleanup task {} starts.", uuid);
+        try {
+          forceCloseWriterManager(uuid);
+        } catch (Exception e) {
+          LOGGER.warn("Load cleanup task {} error.", uuid, e);
+        }
+      }
+    }
+
+    @Override
+    public int compareTo(CleanupTask that) {
+      return Long.compare(this.scheduledTime, that.scheduledTime);
+    }
+  }
+
   private static class DataPartitionInfo {
 
     private final DataRegion dataRegion;
@@ -389,10 +446,6 @@ public class LoadTsFileManager {
       return dataRegion;
     }
 
-    public TTimePartitionSlot getTimePartitionSlot() {
-      return timePartitionSlot;
-    }
-
     @Override
     public String toString() {
       return String.join(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
index 3fe138d2dc1..71250a0c54e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
@@ -143,7 +143,7 @@ public class StorageEngine implements IService {
   private List<FlushListener> customFlushListeners = new ArrayList<>();
   private int recoverDataRegionNum = 0;
 
-  private LoadTsFileManager loadTsFileManager;
+  private final LoadTsFileManager loadTsFileManager = new LoadTsFileManager();
 
   private StorageEngine() {}
 
@@ -786,7 +786,7 @@ public class StorageEngine implements IService {
     }
 
     try {
-      getLoadTsFileManager().writeToDataRegion(getDataRegion(dataRegionId), 
pieceNode, uuid);
+      loadTsFileManager.writeToDataRegion(getDataRegion(dataRegionId), 
pieceNode, uuid);
     } catch (IOException e) {
       LOGGER.error(
           "IO error when writing piece node of TsFile {} to DataRegion {}.",
@@ -811,7 +811,7 @@ public class StorageEngine implements IService {
     try {
       switch (loadCommand) {
         case EXECUTE:
-          if (getLoadTsFileManager().loadAll(uuid, isGeneratedByPipe, 
progressIndex)) {
+          if (loadTsFileManager.loadAll(uuid, isGeneratedByPipe, 
progressIndex)) {
             status = RpcUtils.SUCCESS_STATUS;
           } else {
             status.setCode(TSStatusCode.LOAD_FILE_ERROR.getStatusCode());
@@ -822,7 +822,7 @@ public class StorageEngine implements IService {
           }
           break;
         case ROLLBACK:
-          if (getLoadTsFileManager().deleteAll(uuid)) {
+          if (loadTsFileManager.deleteAll(uuid)) {
             status = RpcUtils.SUCCESS_STATUS;
           } else {
             status.setCode(TSStatusCode.LOAD_FILE_ERROR.getStatusCode());
@@ -884,17 +884,6 @@ public class StorageEngine implements IService {
         });
   }
 
-  private LoadTsFileManager getLoadTsFileManager() {
-    if (loadTsFileManager == null) {
-      synchronized (LoadTsFileManager.class) {
-        if (loadTsFileManager == null) {
-          loadTsFileManager = new LoadTsFileManager();
-        }
-      }
-    }
-    return loadTsFileManager;
-  }
-
   static class InstanceHolder {
 
     private static final StorageEngine INSTANCE = new StorageEngine();

Reply via email to