This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch iotdb-2274
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 20479e76b720edaea5df0c968dfd059b604a2c47
Author: Steve Yurong Su <[email protected]>
AuthorDate: Mon Jan 10 15:52:09 2022 +0800

    [IOTDB-2274] TriggerExample: deadlock between trigger recovery and MQTT 
service starting
---
 .../org/apache/iotdb/trigger/TriggerExample.java   | 28 ++++++++++++++++++----
 1 file changed, 24 insertions(+), 4 deletions(-)

diff --git 
a/example/trigger/src/main/java/org/apache/iotdb/trigger/TriggerExample.java 
b/example/trigger/src/main/java/org/apache/iotdb/trigger/TriggerExample.java
index 6e3ae52..d9b9dc0 100644
--- a/example/trigger/src/main/java/org/apache/iotdb/trigger/TriggerExample.java
+++ b/example/trigger/src/main/java/org/apache/iotdb/trigger/TriggerExample.java
@@ -45,6 +45,14 @@ public class TriggerExample implements Trigger {
   private final LocalIoTDBHandler localIoTDBHandler = new LocalIoTDBHandler();
   private final MQTTHandler mqttHandler = new MQTTHandler();
 
+  // This field is required when the target MQTT server is current IoTDB.
+  // When IoTDB restarts, the registered triggers will be restored before 
starting the MQTT service.
+  // For this trigger, if openSinkHandlers() is called in onCreate(), IoTDB 
server will be stuck
+  // in openSinkHandlers when recovering, because it can't connect to the MQTT 
server (not started
+  // yet).
+  // See IOTDB-2274 for more detail.
+  private volatile boolean isSinksOpenedAfterCreation = false;
+
   private SlidingSizeWindowEvaluationHandler windowEvaluationHandler;
 
   @Override
@@ -54,8 +62,6 @@ public class TriggerExample implements Trigger {
     double lo = attributes.getDouble("lo");
     double hi = attributes.getDouble("hi");
 
-    openSinkHandlers();
-
     windowEvaluationHandler =
         new SlidingSizeWindowEvaluationHandler(
             new SlidingSizeWindowConfiguration(TSDataType.DOUBLE, 5, 5),
@@ -93,19 +99,33 @@ public class TriggerExample implements Trigger {
   }
 
   @Override
-  public Double fire(long timestamp, Double value) {
+  public Double fire(long timestamp, Double value) throws Exception {
+    tryOpenSinksFirstOnFire();
     windowEvaluationHandler.collect(timestamp, value);
     return value;
   }
 
   @Override
-  public double[] fire(long[] timestamps, double[] values) {
+  public double[] fire(long[] timestamps, double[] values) throws Exception {
+    tryOpenSinksFirstOnFire();
     for (int i = 0; i < timestamps.length; ++i) {
       windowEvaluationHandler.collect(timestamps[i], values[i]);
     }
     return values;
   }
 
+  // See IOTDB-2274 for more detail.
+  private void tryOpenSinksFirstOnFire() throws Exception {
+    if (!isSinksOpenedAfterCreation) {
+      synchronized (this) {
+        if (!isSinksOpenedAfterCreation) {
+          openSinkHandlers();
+          isSinksOpenedAfterCreation = true;
+        }
+      }
+    }
+  }
+
   private void openSinkHandlers() throws Exception {
     localIoTDBHandler.open(
         new LocalIoTDBConfiguration(

Reply via email to