This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 7dd570d991a Load: Introduce LoadTsFileManager.CleanupTask to force
close writer manager after exception occurring (#11924)
7dd570d991a is described below
commit 7dd570d991abfaef8d2cc85028000f363684b97a
Author: Steve Yurong Su <[email protected]>
AuthorDate: Thu Jan 18 20:43:16 2024 +0800
Load: Introduce LoadTsFileManager.CleanupTask to force close writer manager
after exception occurring (#11924)
---
.../execution/load/LoadTsFileManager.java | 143 ++++++++++++++-------
.../iotdb/db/storageengine/StorageEngine.java | 19 +--
2 files changed, 102 insertions(+), 60 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
index 131e8f1884b..1d5cbd0205f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
@@ -20,7 +20,6 @@
package org.apache.iotdb.db.queryengine.execution.load;
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
-import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.consensus.index.ProgressIndex;
import org.apache.iotdb.commons.file.SystemFileFactory;
@@ -31,6 +30,7 @@ import org.apache.iotdb.commons.utils.FileUtils;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.LoadFileException;
+import org.apache.iotdb.db.pipe.agent.PipeAgent;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadTsFilePieceNode;
import org.apache.iotdb.db.queryengine.plan.scheduler.load.LoadTsFileScheduler;
import
org.apache.iotdb.db.queryengine.plan.scheduler.load.LoadTsFileScheduler.LoadCommand;
@@ -54,9 +54,7 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.PriorityBlockingQueue;
/**
* {@link LoadTsFileManager} is used for dealing with {@link
LoadTsFilePieceNode} and {@link
@@ -76,21 +74,52 @@ public class LoadTsFileManager {
private final File loadDir;
- private final Map<String, TsFileWriterManager> uuid2WriterManager;
+ private final Map<String, TsFileWriterManager> uuid2WriterManager = new
ConcurrentHashMap<>();
- private final ScheduledExecutorService cleanupExecutors;
- private final Map<String, ScheduledFuture<?>> uuid2Future;
+ private final Map<String, CleanupTask> uuid2CleanupTask = new
ConcurrentHashMap<>();
+ private final PriorityBlockingQueue<CleanupTask> cleanupTaskQueue = new
PriorityBlockingQueue<>();
public LoadTsFileManager() {
this.loadDir =
SystemFileFactory.INSTANCE.getFile(CONFIG.getLoadTsFileDir());
- this.uuid2WriterManager = new ConcurrentHashMap<>();
- this.cleanupExecutors =
- IoTDBThreadPoolFactory.newScheduledThreadPool(1,
LoadTsFileManager.class.getName());
- this.uuid2Future = new ConcurrentHashMap<>();
+ registerCleanupTaskExecutor();
recover();
}
+ private void registerCleanupTaskExecutor() {
+ PipeAgent.runtime()
+ .registerPeriodicalJob(
+ "LoadTsFileManager#cleanupTasks",
+ this::cleanupTasks,
+ LoadTsFileScheduler.LOAD_TASK_MAX_TIME_IN_SECOND);
+ }
+
+ private void cleanupTasks() {
+ while (!cleanupTaskQueue.isEmpty()) {
+ synchronized (uuid2CleanupTask) {
+ if (cleanupTaskQueue.isEmpty()) {
+ continue;
+ }
+
+ final CleanupTask cleanupTask = cleanupTaskQueue.peek();
+ if (cleanupTask.scheduledTime <= System.currentTimeMillis()) {
+ cleanupTask.run();
+
+ uuid2CleanupTask.remove(cleanupTask.uuid);
+ cleanupTaskQueue.poll();
+ } else {
+ final long waitTimeInMs = cleanupTask.scheduledTime -
System.currentTimeMillis();
+ LOGGER.info(
+ "Next load cleanup task {} is not ready to run, wait for at
least {} ms ({}s).",
+ cleanupTask.uuid,
+ waitTimeInMs,
+ waitTimeInMs / 1000.0);
+ return;
+ }
+ }
+ }
+ }
+
private void recover() {
if (!loadDir.exists()) {
return;
@@ -106,26 +135,28 @@ public class LoadTsFileManager {
uuid2WriterManager.put(uuid, writerManager);
writerManager.close();
- uuid2Future.put(
- uuid,
- cleanupExecutors.schedule(
- () -> forceCloseWriterManager(uuid),
- LoadTsFileScheduler.LOAD_TASK_MAX_TIME_IN_SECOND,
- TimeUnit.SECONDS));
+
+ synchronized (uuid2CleanupTask) {
+ final CleanupTask cleanupTask =
+ new CleanupTask(uuid,
LoadTsFileScheduler.LOAD_TASK_MAX_TIME_IN_SECOND * 1000);
+ uuid2CleanupTask.put(uuid, cleanupTask);
+ cleanupTaskQueue.add(cleanupTask);
+ }
}
}
public void writeToDataRegion(DataRegion dataRegion, LoadTsFilePieceNode
pieceNode, String uuid)
throws IOException {
if (!uuid2WriterManager.containsKey(uuid)) {
- uuid2Future.put(
- uuid,
- cleanupExecutors.schedule(
- () -> forceCloseWriterManager(uuid),
- LoadTsFileScheduler.LOAD_TASK_MAX_TIME_IN_SECOND,
- TimeUnit.SECONDS));
+ synchronized (uuid2CleanupTask) {
+ final CleanupTask cleanupTask =
+ new CleanupTask(uuid,
LoadTsFileScheduler.LOAD_TASK_MAX_TIME_IN_SECOND * 1000);
+ uuid2CleanupTask.put(uuid, cleanupTask);
+ cleanupTaskQueue.add(cleanupTask);
+ }
}
- TsFileWriterManager writerManager =
+
+ final TsFileWriterManager writerManager =
uuid2WriterManager.computeIfAbsent(
uuid, o -> new
TsFileWriterManager(SystemFileFactory.INSTANCE.getFile(loadDir, uuid)));
for (TsFileData tsFileData : pieceNode.getAllTsFileData()) {
@@ -158,29 +189,19 @@ public class LoadTsFileManager {
}
private void clean(String uuid) {
- uuid2WriterManager.get(uuid).close();
- uuid2WriterManager.remove(uuid);
- uuid2Future.get(uuid).cancel(true);
- uuid2Future.remove(uuid);
-
- final Path loadDirPath = loadDir.toPath();
- if (!Files.exists(loadDirPath)) {
- return;
- }
- try {
- Files.deleteIfExists(loadDirPath);
- LOGGER.info("Load dir {} was deleted.", loadDirPath);
- } catch (DirectoryNotEmptyException e) {
- LOGGER.info("Load dir {} is not empty, skip deleting.", loadDirPath);
- } catch (IOException e) {
- LOGGER.info(MESSAGE_DELETE_FAIL, loadDirPath);
+ synchronized (uuid2CleanupTask) {
+ final CleanupTask cleanupTask = uuid2CleanupTask.get(uuid);
+ if (cleanupTask != null) {
+ cleanupTask.cancel();
+ }
}
+
+ forceCloseWriterManager(uuid);
}
private void forceCloseWriterManager(String uuid) {
uuid2WriterManager.get(uuid).close();
uuid2WriterManager.remove(uuid);
- uuid2Future.remove(uuid);
final Path loadDirPath = loadDir.toPath();
if (!Files.exists(loadDirPath)) {
@@ -375,6 +396,42 @@ public class LoadTsFileManager {
}
}
+ private class CleanupTask implements Runnable, Comparable<CleanupTask> {
+
+ private final String uuid;
+ private final long scheduledTime;
+
+ private volatile boolean isCanceled = false;
+
+ private CleanupTask(String uuid, long delayInMs) {
+ this.uuid = uuid;
+ scheduledTime = System.currentTimeMillis() + delayInMs;
+ }
+
+ public void cancel() {
+ isCanceled = true;
+ }
+
+ @Override
+ public void run() {
+ if (isCanceled) {
+ LOGGER.info("Load cleanup task {} is canceled.", uuid);
+ } else {
+ LOGGER.info("Load cleanup task {} starts.", uuid);
+ try {
+ forceCloseWriterManager(uuid);
+ } catch (Exception e) {
+ LOGGER.warn("Load cleanup task {} error.", uuid, e);
+ }
+ }
+ }
+
+ @Override
+ public int compareTo(CleanupTask that) {
+ return Long.compare(this.scheduledTime, that.scheduledTime);
+ }
+ }
+
private static class DataPartitionInfo {
private final DataRegion dataRegion;
@@ -389,10 +446,6 @@ public class LoadTsFileManager {
return dataRegion;
}
- public TTimePartitionSlot getTimePartitionSlot() {
- return timePartitionSlot;
- }
-
@Override
public String toString() {
return String.join(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
index 3fe138d2dc1..71250a0c54e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
@@ -143,7 +143,7 @@ public class StorageEngine implements IService {
private List<FlushListener> customFlushListeners = new ArrayList<>();
private int recoverDataRegionNum = 0;
- private LoadTsFileManager loadTsFileManager;
+ private final LoadTsFileManager loadTsFileManager = new LoadTsFileManager();
private StorageEngine() {}
@@ -786,7 +786,7 @@ public class StorageEngine implements IService {
}
try {
- getLoadTsFileManager().writeToDataRegion(getDataRegion(dataRegionId),
pieceNode, uuid);
+ loadTsFileManager.writeToDataRegion(getDataRegion(dataRegionId),
pieceNode, uuid);
} catch (IOException e) {
LOGGER.error(
"IO error when writing piece node of TsFile {} to DataRegion {}.",
@@ -811,7 +811,7 @@ public class StorageEngine implements IService {
try {
switch (loadCommand) {
case EXECUTE:
- if (getLoadTsFileManager().loadAll(uuid, isGeneratedByPipe,
progressIndex)) {
+ if (loadTsFileManager.loadAll(uuid, isGeneratedByPipe,
progressIndex)) {
status = RpcUtils.SUCCESS_STATUS;
} else {
status.setCode(TSStatusCode.LOAD_FILE_ERROR.getStatusCode());
@@ -822,7 +822,7 @@ public class StorageEngine implements IService {
}
break;
case ROLLBACK:
- if (getLoadTsFileManager().deleteAll(uuid)) {
+ if (loadTsFileManager.deleteAll(uuid)) {
status = RpcUtils.SUCCESS_STATUS;
} else {
status.setCode(TSStatusCode.LOAD_FILE_ERROR.getStatusCode());
@@ -884,17 +884,6 @@ public class StorageEngine implements IService {
});
}
- private LoadTsFileManager getLoadTsFileManager() {
- if (loadTsFileManager == null) {
- synchronized (LoadTsFileManager.class) {
- if (loadTsFileManager == null) {
- loadTsFileManager = new LoadTsFileManager();
- }
- }
- }
- return loadTsFileManager;
- }
-
static class InstanceHolder {
private static final StorageEngine INSTANCE = new StorageEngine();