This is an automated email from the ASF dual-hosted git repository. marklau99 pushed a commit to branch IOTDB-4554 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 12f9badd045339c58568bf947ddd2abed675d3cf Author: LiuXuxin <[email protected]> AuthorDate: Fri Sep 30 11:07:52 2022 +0800 wait for system recover in CompactionWorker --- .../db/engine/compaction/CompactionWorker.java | 46 +++++++++++++++++++++- .../java/org/apache/iotdb/db/service/DataNode.java | 7 ++++ .../java/org/apache/iotdb/db/service/IoTDB.java | 8 +++- .../java/org/apache/iotdb/db/service/NewIoTDB.java | 7 +++- 4 files changed, 65 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionWorker.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionWorker.java index c9dc58098c..be3c8e6e44 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionWorker.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionWorker.java @@ -18,9 +18,14 @@ */ package org.apache.iotdb.db.engine.compaction; +import org.apache.iotdb.db.conf.IoTDBConfig; +import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.engine.compaction.constant.CompactionTaskStatus; import org.apache.iotdb.db.engine.compaction.task.AbstractCompactionTask; import org.apache.iotdb.db.engine.compaction.task.CompactionTaskSummary; +import org.apache.iotdb.db.service.DataNode; +import org.apache.iotdb.db.service.IoTDB; +import org.apache.iotdb.db.service.NewIoTDB; import org.apache.iotdb.db.service.metrics.recorder.CompactionMetricsRecorder; import org.apache.iotdb.db.utils.datastructure.FixedPriorityBlockingQueue; @@ -46,7 +51,10 @@ public class CompactionWorker implements Runnable { @Override public void run() { - while (!Thread.currentThread().isInterrupted()) { + if (!waitForSystemRecover()) { + return; + } + while (!Thread.interrupted()) { try { AbstractCompactionTask task = null; try { @@ -72,6 +80,42 @@ public class CompactionWorker implements Runnable { } } + private boolean waitForSystemRecover() { + IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig(); + if (config.isClusterMode()) { + // wait for data node to recover + while (!DataNode.getInstance().isActivated()) { + try { + Thread.sleep(100); + } catch (InterruptedException e) { + log.error("Interrupted while waiting datanode to recover", e); + return false; + } + } + } else if (config.isMppMode()) { + // wait for new iotdb to recover + while (!NewIoTDB.getInstance().isActivated()) { + try { + Thread.sleep(100); + } catch (InterruptedException e) { + log.error("Interrupted while waiting NewIoTDB to recover", e); + return false; + } + } + } else { + // wait for iotdb to recover + while (!IoTDB.getInstance().isActivated()) { + try { + Thread.sleep(100); + } catch (InterruptedException e) { + log.error("Interrupted while waiting NewIoTDB to recover", e); + return false; + } + } + } + return true; + } + static class CompactionTaskFuture implements Future<CompactionTaskSummary> { CompactionTaskSummary summary; diff --git a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java index 1fd5c68e1a..aadd309107 100644 --- a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java +++ b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java @@ -93,6 +93,8 @@ public class DataNode implements DataNodeMBean { private final TEndPoint thisNode = new TEndPoint(); + private volatile boolean activated = false; + private DataNode() { // we do not init anything here, so that we can re-initialize the instance in IT. } @@ -250,6 +252,7 @@ public class DataNode implements DataNodeMBean { logger.error("Meet error while starting up.", e); throw new StartupException("Error in activating IoTDB DataNode."); } + activated = true; logger.info("IoTDB DataNode has started."); try { @@ -437,6 +440,10 @@ public class DataNode implements DataNodeMBean { Thread.setDefaultUncaughtExceptionHandler(new IoTDBDefaultThreadExceptionHandler()); } + public boolean isActivated() { + return activated; + } + private static class DataNodeHolder { private static final DataNode INSTANCE = new DataNode(); diff --git a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java index 8785aad28e..db43027037 100644 --- a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java +++ b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java @@ -74,6 +74,8 @@ public class IoTDB implements IoTDBMBean { return IoTDBHolder.INSTANCE; } + private volatile boolean activated = false; + public static void main(String[] args) { try { IoTDBStartCheck.getInstance().checkConfig(); @@ -126,7 +128,7 @@ public class IoTDB implements IoTDBMBean { // reset config config.setAutoCreateSchemaEnabled(prevIsAutoCreateSchemaEnabled); config.setEnablePartialInsert(prevIsEnablePartialInsert); - + activated = true; logger.info("{} has started.", IoTDBConstant.GLOBAL_DB_NAME); } @@ -255,6 +257,10 @@ public class IoTDB implements IoTDBMBean { Thread.setDefaultUncaughtExceptionHandler(new IoTDBDefaultThreadExceptionHandler()); } + public boolean isActivated() { + return activated; + } + private static class IoTDBHolder { private static final IoTDB INSTANCE = new IoTDB(); diff --git a/server/src/main/java/org/apache/iotdb/db/service/NewIoTDB.java b/server/src/main/java/org/apache/iotdb/db/service/NewIoTDB.java index 1aff419e56..ee4bb93449 100644 --- a/server/src/main/java/org/apache/iotdb/db/service/NewIoTDB.java +++ b/server/src/main/java/org/apache/iotdb/db/service/NewIoTDB.java @@ -68,6 +68,7 @@ public class NewIoTDB implements NewIoTDBMBean { private static final RegisterManager registerManager = new RegisterManager(); public static LocalSchemaProcessor schemaProcessor = LocalSchemaProcessor.getInstance(); public static LocalConfigNode configManager = LocalConfigNode.getInstance(); + private volatile boolean activated = false; public static NewIoTDB getInstance() { return IoTDBHolder.INSTANCE; @@ -111,7 +112,7 @@ public class NewIoTDB implements NewIoTDBMBean { logger.error("{} exit", IoTDBConstant.GLOBAL_DB_NAME); return; } - + activated = true; logger.info("{} has started.", IoTDBConstant.GLOBAL_DB_NAME); } @@ -232,6 +233,10 @@ public class NewIoTDB implements NewIoTDBMBean { Thread.setDefaultUncaughtExceptionHandler(new IoTDBDefaultThreadExceptionHandler()); } + public boolean isActivated() { + return activated; + } + private static class IoTDBHolder { private static final NewIoTDB INSTANCE = new NewIoTDB();
