This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 20c4cd25a3e Subscription: skip on setup and cluster failure when
running subscription restart IT & fix some bugs in
SubscriptionExecutorServiceManager (#12710)
20c4cd25a3e is described below
commit 20c4cd25a3ef397315d96bb614154b2039614343
Author: V_Galaxy <[email protected]>
AuthorDate: Wed Jun 12 15:04:18 2024 +0800
Subscription: skip on setup and cluster failure when running subscription
restart IT & fix some bugs in SubscriptionExecutorServiceManager (#12710)
---
.../it/IoTDBSubscriptionITConstant.java | 2 +-
.../iotdb/subscription/it/SkipOnSetUpFailure.java | 82 ++++++++++++++++++++++
.../it/dual/AbstractSubscriptionDualIT.java | 13 ++++
.../it/dual/IoTDBSubscriptionConsumerGroupIT.java | 3 +-
.../it/local/IoTDBSubscriptionRestartIT.java | 41 ++++++++---
.../SubscriptionExecutorServiceManager.java | 8 +--
6 files changed, 132 insertions(+), 17 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/IoTDBSubscriptionITConstant.java
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/IoTDBSubscriptionITConstant.java
index d2dc8ee49a5..7d16110cd1f 100644
---
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/IoTDBSubscriptionITConstant.java
+++
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/IoTDBSubscriptionITConstant.java
@@ -23,7 +23,7 @@ public class IoTDBSubscriptionITConstant {
public static final long AWAITILITY_POLL_DELAY_SECOND = 1L;
public static final long AWAITILITY_POLL_INTERVAL_SECOND = 2L;
- public static final long AWAITILITY_AT_MOST_SECOND = 240L;
+ public static final long AWAITILITY_AT_MOST_SECOND = 600L;
public static final long SLEEP_NS = 1_000_000_000L;
public static final long POLL_TIMEOUT_MS = 10_000L;
diff --git
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/SkipOnSetUpFailure.java
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/SkipOnSetUpFailure.java
new file mode 100644
index 00000000000..add7b7c1e2b
--- /dev/null
+++
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/SkipOnSetUpFailure.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.subscription.it;
+
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.junit.AssumptionViolatedException;
+import org.junit.rules.TestRule;
+import org.junit.runner.Description;
+import org.junit.runners.model.Statement;
+
+import java.lang.reflect.Method;
+
+public class SkipOnSetUpFailure implements TestRule {
+
+ private final String setUpMethodName;
+
+ /**
+ * @param setUpMethodName Should be exactly the same as the method name
decorated with @Before.
+ */
+ public SkipOnSetUpFailure(@NonNull final String setUpMethodName) {
+ this.setUpMethodName = setUpMethodName;
+ }
+
+ @Override
+ public Statement apply(final Statement base, final Description description) {
+ return new Statement() {
+ @Override
+ public void evaluate() throws Throwable {
+ try {
+ base.evaluate();
+ } catch (final Throwable e) {
+ // Trace back the exception stack to determine whether the exception
was thrown during the
+ // setUp phase.
+ for (final StackTraceElement stackTraceElement : e.getStackTrace()) {
+ if (setUpMethodName.equals(stackTraceElement.getMethodName())
+ &&
description.getClassName().equals(stackTraceElement.getClassName())
+ &&
isMethodAnnotationWithBefore(stackTraceElement.getMethodName())) {
+ e.printStackTrace();
+ // Skip this test.
+ throw new AssumptionViolatedException(
+ String.format(
+ "Skipping test due to setup failure for %s#%s",
+ description.getClassName(),
description.getMethodName()));
+ }
+ }
+
+ // Re-throw the exception (which means the test has failed).
+ throw e;
+
+ // Regardless of the circumstances, the method decorated with @After
will always be
+ // executed.
+ }
+ }
+
+ private boolean isMethodAnnotationWithBefore(final String methodName) {
+ try {
+ final Method method =
description.getTestClass().getDeclaredMethod(methodName);
+ return method.isAnnotationPresent(org.junit.Before.class);
+ } catch (final Throwable ignored) {
+ return false;
+ }
+ }
+ };
+ }
+}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/AbstractSubscriptionDualIT.java
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/AbstractSubscriptionDualIT.java
index 9aa01ad4acf..3d8eb45a7fd 100644
---
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/AbstractSubscriptionDualIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/AbstractSubscriptionDualIT.java
@@ -21,17 +21,30 @@ package org.apache.iotdb.subscription.it.dual;
import org.apache.iotdb.it.env.MultiEnvFactory;
import org.apache.iotdb.itbase.env.BaseEnv;
+import
org.apache.iotdb.session.subscription.consumer.SubscriptionExecutorServiceManager;
import org.junit.After;
import org.junit.Before;
+import org.junit.Rule;
+import org.junit.rules.TestName;
abstract class AbstractSubscriptionDualIT {
protected BaseEnv senderEnv;
protected BaseEnv receiverEnv;
+ @Rule public TestName testName = new TestName();
+
@Before
public void setUp() {
+ // set thread name
+ Thread.currentThread().setName(String.format("%s - main",
testName.getMethodName()));
+
+ // set thread pools core size
+ SubscriptionExecutorServiceManager.setControlFlowExecutorCorePoolSize(1);
+
SubscriptionExecutorServiceManager.setUpstreamDataFlowExecutorCorePoolSize(1);
+
SubscriptionExecutorServiceManager.setDownstreamDataFlowExecutorCorePoolSize(1);
+
MultiEnvFactory.createEnv(2);
senderEnv = MultiEnvFactory.getEnv(0);
receiverEnv = MultiEnvFactory.getEnv(1);
diff --git
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionConsumerGroupIT.java
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionConsumerGroupIT.java
index 992d151520f..db7b9e7ca3a 100644
---
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionConsumerGroupIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionConsumerGroupIT.java
@@ -1005,7 +1005,7 @@ public class IoTDBSubscriptionConsumerGroupIT extends
AbstractSubscriptionDualIT
LOGGER.info("consumer {} exiting...", consumers.get(index));
}
},
- consumers.get(index).toString());
+ String.format("%s - %s", testName.getMethodName(),
consumers.get(index).toString()));
t.start();
threads.add(t);
}
@@ -1016,6 +1016,7 @@ public class IoTDBSubscriptionConsumerGroupIT extends
AbstractSubscriptionDualIT
final Statement statement = connection.createStatement()) {
// Keep retrying if there are execution failures
Awaitility.await()
+ .pollInSameThread()
.pollDelay(IoTDBSubscriptionITConstant.AWAITILITY_POLL_DELAY_SECOND,
TimeUnit.SECONDS)
.pollInterval(
IoTDBSubscriptionITConstant.AWAITILITY_POLL_INTERVAL_SECOND,
TimeUnit.SECONDS)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionRestartIT.java
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionRestartIT.java
index 2764fad56b8..d3680219578 100644
---
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionRestartIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionRestartIT.java
@@ -37,13 +37,16 @@ import
org.apache.iotdb.session.subscription.consumer.SubscriptionPullConsumer;
import org.apache.iotdb.session.subscription.payload.SubscriptionMessage;
import
org.apache.iotdb.session.subscription.payload.SubscriptionSessionDataSet;
import org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant;
+import org.apache.iotdb.subscription.it.SkipOnSetUpFailure;
import org.awaitility.Awaitility;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
+import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import org.junit.rules.TestRule;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -63,6 +66,8 @@ public class IoTDBSubscriptionRestartIT {
private static final Logger LOGGER =
LoggerFactory.getLogger(IoTDBSubscriptionRestartIT.class);
+ @Rule public final TestRule skipOnSetUpFailure = new
SkipOnSetUpFailure("setUp");
+
@Before
public void setUp() throws Exception {
EnvFactory.getEnv()
@@ -123,6 +128,7 @@ public class IoTDBSubscriptionRestartIT {
TestUtils.restartCluster(EnvFactory.getEnv());
} catch (final Throwable e) {
e.printStackTrace();
+ // Avoid failure
return;
}
@@ -148,9 +154,10 @@ public class IoTDBSubscriptionRestartIT {
String.format("insert into root.db.d1(time, s1) values (%s, 1)",
i));
}
session.executeNonQueryStatement("flush");
- } catch (final Exception e) {
+ } catch (final Throwable e) {
e.printStackTrace();
- fail(e.getMessage());
+ // Avoid failure
+ return;
}
// Subscription again
@@ -253,9 +260,10 @@ public class IoTDBSubscriptionRestartIT {
String.format("insert into root.db.d1(time, s1) values (%s, 1)",
i));
}
session.executeNonQueryStatement("flush");
- } catch (final Exception e) {
+ } catch (final Throwable e) {
e.printStackTrace();
- fail(e.getMessage());
+ // Avoid failure
+ return;
}
// Shutdown DN 1 & DN 2
@@ -265,6 +273,7 @@ public class IoTDBSubscriptionRestartIT {
EnvFactory.getEnv().shutdownDataNode(2);
} catch (final Throwable e) {
e.printStackTrace();
+ // Avoid failure
return;
}
@@ -314,6 +323,7 @@ public class IoTDBSubscriptionRestartIT {
((AbstractEnv) EnvFactory.getEnv()).checkClusterStatusWithoutUnknown();
} catch (final Throwable e) {
e.printStackTrace();
+ // Avoid failure
return;
}
@@ -324,9 +334,10 @@ public class IoTDBSubscriptionRestartIT {
String.format("insert into root.db.d1(time, s1) values (%s, 1)",
i));
}
session.executeNonQueryStatement("flush");
- } catch (final Exception e) {
+ } catch (final Throwable e) {
e.printStackTrace();
- fail(e.getMessage());
+ // Avoid failure
+ return;
}
// Check timestamps size
@@ -391,9 +402,10 @@ public class IoTDBSubscriptionRestartIT {
String.format("insert into root.db.d1(time, s1) values (%s, 1)",
i));
}
session.executeNonQueryStatement("flush");
- } catch (final Exception e) {
+ } catch (final Throwable e) {
e.printStackTrace();
- fail(e.getMessage());
+ // Avoid failure
+ return;
}
// Subscription again
@@ -435,7 +447,13 @@ public class IoTDBSubscriptionRestartIT {
thread.start();
// Shutdown leader CN
-
EnvFactory.getEnv().shutdownConfigNode(EnvFactory.getEnv().getLeaderConfigNodeIndex());
+ try {
+
EnvFactory.getEnv().shutdownConfigNode(EnvFactory.getEnv().getLeaderConfigNodeIndex());
+ } catch (final Throwable e) {
+ e.printStackTrace();
+ // Avoid failure
+ return;
+ }
// Insert some realtime data
try (final ISession session = EnvFactory.getEnv().getSessionConnection()) {
@@ -444,9 +462,10 @@ public class IoTDBSubscriptionRestartIT {
String.format("insert into root.db.d1(time, s1) values (%s, 1)",
i));
}
session.executeNonQueryStatement("flush");
- } catch (final Exception e) {
+ } catch (final Throwable e) {
e.printStackTrace();
- fail(e.getMessage());
+ // Avoid failure
+ return;
}
// Show topics and subscriptions
diff --git
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionExecutorServiceManager.java
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionExecutorServiceManager.java
index 5a587ff96fb..6ce5946d4fc 100644
---
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionExecutorServiceManager.java
+++
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionExecutorServiceManager.java
@@ -29,12 +29,12 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
-final class SubscriptionExecutorServiceManager {
+public final class SubscriptionExecutorServiceManager {
private static final Logger LOGGER =
LoggerFactory.getLogger(SubscriptionExecutorServiceManager.class);
- private static final long AWAIT_TERMINATION_TIMEOUT_MS = 10_000L;
+ private static final long AWAIT_TERMINATION_TIMEOUT_MS = 15_000L;
private static final String CONTROL_FLOW_EXECUTOR_NAME =
"SubscriptionControlFlowExecutor";
private static final String UPSTREAM_DATA_FLOW_EXECUTOR_NAME =
@@ -172,9 +172,9 @@ final class SubscriptionExecutorServiceManager {
}
void setCorePoolSize(final int corePoolSize) {
- if (!isShutdown()) {
+ if (isShutdown()) {
synchronized (this) {
- if (!isShutdown()) {
+ if (isShutdown()) {
this.corePoolSize = corePoolSize;
return;
}