This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch iotdb-2274 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 20479e76b720edaea5df0c968dfd059b604a2c47 Author: Steve Yurong Su <[email protected]> AuthorDate: Mon Jan 10 15:52:09 2022 +0800 [IOTDB-2274] TriggerExample: deadlock between trigger recovery and MQTT service starting --- .../org/apache/iotdb/trigger/TriggerExample.java | 28 ++++++++++++++++++---- 1 file changed, 24 insertions(+), 4 deletions(-) diff --git a/example/trigger/src/main/java/org/apache/iotdb/trigger/TriggerExample.java b/example/trigger/src/main/java/org/apache/iotdb/trigger/TriggerExample.java index 6e3ae52..d9b9dc0 100644 --- a/example/trigger/src/main/java/org/apache/iotdb/trigger/TriggerExample.java +++ b/example/trigger/src/main/java/org/apache/iotdb/trigger/TriggerExample.java @@ -45,6 +45,14 @@ public class TriggerExample implements Trigger { private final LocalIoTDBHandler localIoTDBHandler = new LocalIoTDBHandler(); private final MQTTHandler mqttHandler = new MQTTHandler(); + // This field is required when the target MQTT server is current IoTDB. + // When IoTDB restarts, the registered triggers will be restored before starting the MQTT service. + // For this trigger, if openSinkHandlers() is called in onCreate(), IoTDB server will be stuck + // in openSinkHandlers when recovering, because it can't connect to the MQTT server (not started + // yet). + // See IOTDB-2274 for more detail. + private volatile boolean isSinksOpenedAfterCreation = false; + private SlidingSizeWindowEvaluationHandler windowEvaluationHandler; @Override @@ -54,8 +62,6 @@ public class TriggerExample implements Trigger { double lo = attributes.getDouble("lo"); double hi = attributes.getDouble("hi"); - openSinkHandlers(); - windowEvaluationHandler = new SlidingSizeWindowEvaluationHandler( new SlidingSizeWindowConfiguration(TSDataType.DOUBLE, 5, 5), @@ -93,19 +99,33 @@ public class TriggerExample implements Trigger { } @Override - public Double fire(long timestamp, Double value) { + public Double fire(long timestamp, Double value) throws Exception { + tryOpenSinksFirstOnFire(); windowEvaluationHandler.collect(timestamp, value); return value; } @Override - public double[] fire(long[] timestamps, double[] values) { + public double[] fire(long[] timestamps, double[] values) throws Exception { + tryOpenSinksFirstOnFire(); for (int i = 0; i < timestamps.length; ++i) { windowEvaluationHandler.collect(timestamps[i], values[i]); } return values; } + // See IOTDB-2274 for more detail. + private void tryOpenSinksFirstOnFire() throws Exception { + if (!isSinksOpenedAfterCreation) { + synchronized (this) { + if (!isSinksOpenedAfterCreation) { + openSinkHandlers(); + isSinksOpenedAfterCreation = true; + } + } + } + } + private void openSinkHandlers() throws Exception { localIoTDBHandler.open( new LocalIoTDBConfiguration(
