This is an automated email from the ASF dual-hosted git repository.

marklau99 pushed a commit to branch IOTDB-4554
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 12f9badd045339c58568bf947ddd2abed675d3cf
Author: LiuXuxin <[email protected]>
AuthorDate: Fri Sep 30 11:07:52 2022 +0800

    wait for system recover in CompactionWorker
---
 .../db/engine/compaction/CompactionWorker.java     | 46 +++++++++++++++++++++-
 .../java/org/apache/iotdb/db/service/DataNode.java |  7 ++++
 .../java/org/apache/iotdb/db/service/IoTDB.java    |  8 +++-
 .../java/org/apache/iotdb/db/service/NewIoTDB.java |  7 +++-
 4 files changed, 65 insertions(+), 3 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionWorker.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionWorker.java
index c9dc58098c..be3c8e6e44 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionWorker.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionWorker.java
@@ -18,9 +18,14 @@
  */
 package org.apache.iotdb.db.engine.compaction;
 
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.compaction.constant.CompactionTaskStatus;
 import org.apache.iotdb.db.engine.compaction.task.AbstractCompactionTask;
 import org.apache.iotdb.db.engine.compaction.task.CompactionTaskSummary;
+import org.apache.iotdb.db.service.DataNode;
+import org.apache.iotdb.db.service.IoTDB;
+import org.apache.iotdb.db.service.NewIoTDB;
 import org.apache.iotdb.db.service.metrics.recorder.CompactionMetricsRecorder;
 import org.apache.iotdb.db.utils.datastructure.FixedPriorityBlockingQueue;
 
@@ -46,7 +51,10 @@ public class CompactionWorker implements Runnable {
 
   @Override
   public void run() {
-    while (!Thread.currentThread().isInterrupted()) {
+    if (!waitForSystemRecover()) {
+      return;
+    }
+    while (!Thread.interrupted()) {
       try {
         AbstractCompactionTask task = null;
         try {
@@ -72,6 +80,42 @@ public class CompactionWorker implements Runnable {
     }
   }
 
+  private boolean waitForSystemRecover() {
+    IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+    if (config.isClusterMode()) {
+      // wait for data node to recover
+      while (!DataNode.getInstance().isActivated()) {
+        try {
+          Thread.sleep(100);
+        } catch (InterruptedException e) {
+          log.error("Interrupted while waiting datanode to recover", e);
+          return false;
+        }
+      }
+    } else if (config.isMppMode()) {
+      // wait for new iotdb to recover
+      while (!NewIoTDB.getInstance().isActivated()) {
+        try {
+          Thread.sleep(100);
+        } catch (InterruptedException e) {
+          log.error("Interrupted while waiting NewIoTDB to recover", e);
+          return false;
+        }
+      }
+    } else {
+      // wait for iotdb to recover
+      while (!IoTDB.getInstance().isActivated()) {
+        try {
+          Thread.sleep(100);
+        } catch (InterruptedException e) {
+          log.error("Interrupted while waiting NewIoTDB to recover", e);
+          return false;
+        }
+      }
+    }
+    return true;
+  }
+
   static class CompactionTaskFuture implements Future<CompactionTaskSummary> {
     CompactionTaskSummary summary;
 
diff --git a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java 
b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
index 1fd5c68e1a..aadd309107 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
@@ -93,6 +93,8 @@ public class DataNode implements DataNodeMBean {
 
   private final TEndPoint thisNode = new TEndPoint();
 
+  private volatile boolean activated = false;
+
   private DataNode() {
     // we do not init anything here, so that we can re-initialize the instance 
in IT.
   }
@@ -250,6 +252,7 @@ public class DataNode implements DataNodeMBean {
       logger.error("Meet error while starting up.", e);
       throw new StartupException("Error in activating IoTDB DataNode.");
     }
+    activated = true;
     logger.info("IoTDB DataNode has started.");
 
     try {
@@ -437,6 +440,10 @@ public class DataNode implements DataNodeMBean {
     Thread.setDefaultUncaughtExceptionHandler(new 
IoTDBDefaultThreadExceptionHandler());
   }
 
+  public boolean isActivated() {
+    return activated;
+  }
+
   private static class DataNodeHolder {
 
     private static final DataNode INSTANCE = new DataNode();
diff --git a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java 
b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
index 8785aad28e..db43027037 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
@@ -74,6 +74,8 @@ public class IoTDB implements IoTDBMBean {
     return IoTDBHolder.INSTANCE;
   }
 
+  private volatile boolean activated = false;
+
   public static void main(String[] args) {
     try {
       IoTDBStartCheck.getInstance().checkConfig();
@@ -126,7 +128,7 @@ public class IoTDB implements IoTDBMBean {
     // reset config
     config.setAutoCreateSchemaEnabled(prevIsAutoCreateSchemaEnabled);
     config.setEnablePartialInsert(prevIsEnablePartialInsert);
-
+    activated = true;
     logger.info("{} has started.", IoTDBConstant.GLOBAL_DB_NAME);
   }
 
@@ -255,6 +257,10 @@ public class IoTDB implements IoTDBMBean {
     Thread.setDefaultUncaughtExceptionHandler(new 
IoTDBDefaultThreadExceptionHandler());
   }
 
+  public boolean isActivated() {
+    return activated;
+  }
+
   private static class IoTDBHolder {
 
     private static final IoTDB INSTANCE = new IoTDB();
diff --git a/server/src/main/java/org/apache/iotdb/db/service/NewIoTDB.java 
b/server/src/main/java/org/apache/iotdb/db/service/NewIoTDB.java
index 1aff419e56..ee4bb93449 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/NewIoTDB.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/NewIoTDB.java
@@ -68,6 +68,7 @@ public class NewIoTDB implements NewIoTDBMBean {
   private static final RegisterManager registerManager = new RegisterManager();
   public static LocalSchemaProcessor schemaProcessor = 
LocalSchemaProcessor.getInstance();
   public static LocalConfigNode configManager = LocalConfigNode.getInstance();
+  private volatile boolean activated = false;
 
   public static NewIoTDB getInstance() {
     return IoTDBHolder.INSTANCE;
@@ -111,7 +112,7 @@ public class NewIoTDB implements NewIoTDBMBean {
       logger.error("{} exit", IoTDBConstant.GLOBAL_DB_NAME);
       return;
     }
-
+    activated = true;
     logger.info("{} has started.", IoTDBConstant.GLOBAL_DB_NAME);
   }
 
@@ -232,6 +233,10 @@ public class NewIoTDB implements NewIoTDBMBean {
     Thread.setDefaultUncaughtExceptionHandler(new 
IoTDBDefaultThreadExceptionHandler());
   }
 
+  public boolean isActivated() {
+    return activated;
+  }
+
   private static class IoTDBHolder {
 
     private static final NewIoTDB INSTANCE = new NewIoTDB();

Reply via email to