This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 20c4cd25a3e Subscription: skip on setup and cluster failure when 
running subscription restart IT & fix some bugs in 
SubscriptionExecutorServiceManager (#12710)
20c4cd25a3e is described below

commit 20c4cd25a3ef397315d96bb614154b2039614343
Author: V_Galaxy <[email protected]>
AuthorDate: Wed Jun 12 15:04:18 2024 +0800

    Subscription: skip on setup and cluster failure when running subscription 
restart IT & fix some bugs in SubscriptionExecutorServiceManager (#12710)
---
 .../it/IoTDBSubscriptionITConstant.java            |  2 +-
 .../iotdb/subscription/it/SkipOnSetUpFailure.java  | 82 ++++++++++++++++++++++
 .../it/dual/AbstractSubscriptionDualIT.java        | 13 ++++
 .../it/dual/IoTDBSubscriptionConsumerGroupIT.java  |  3 +-
 .../it/local/IoTDBSubscriptionRestartIT.java       | 41 ++++++++---
 .../SubscriptionExecutorServiceManager.java        |  8 +--
 6 files changed, 132 insertions(+), 17 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/IoTDBSubscriptionITConstant.java
 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/IoTDBSubscriptionITConstant.java
index d2dc8ee49a5..7d16110cd1f 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/IoTDBSubscriptionITConstant.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/IoTDBSubscriptionITConstant.java
@@ -23,7 +23,7 @@ public class IoTDBSubscriptionITConstant {
 
   public static final long AWAITILITY_POLL_DELAY_SECOND = 1L;
   public static final long AWAITILITY_POLL_INTERVAL_SECOND = 2L;
-  public static final long AWAITILITY_AT_MOST_SECOND = 240L;
+  public static final long AWAITILITY_AT_MOST_SECOND = 600L;
 
   public static final long SLEEP_NS = 1_000_000_000L;
   public static final long POLL_TIMEOUT_MS = 10_000L;
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/SkipOnSetUpFailure.java
 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/SkipOnSetUpFailure.java
new file mode 100644
index 00000000000..add7b7c1e2b
--- /dev/null
+++ 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/SkipOnSetUpFailure.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.subscription.it;
+
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.junit.AssumptionViolatedException;
+import org.junit.rules.TestRule;
+import org.junit.runner.Description;
+import org.junit.runners.model.Statement;
+
+import java.lang.reflect.Method;
+
+public class SkipOnSetUpFailure implements TestRule {
+
+  private final String setUpMethodName;
+
+  /**
+   * @param setUpMethodName Should be exactly the same as the method name 
decorated with @Before.
+   */
+  public SkipOnSetUpFailure(@NonNull final String setUpMethodName) {
+    this.setUpMethodName = setUpMethodName;
+  }
+
+  @Override
+  public Statement apply(final Statement base, final Description description) {
+    return new Statement() {
+      @Override
+      public void evaluate() throws Throwable {
+        try {
+          base.evaluate();
+        } catch (final Throwable e) {
+          // Trace back the exception stack to determine whether the exception 
was thrown during the
+          // setUp phase.
+          for (final StackTraceElement stackTraceElement : e.getStackTrace()) {
+            if (setUpMethodName.equals(stackTraceElement.getMethodName())
+                && 
description.getClassName().equals(stackTraceElement.getClassName())
+                && 
isMethodAnnotationWithBefore(stackTraceElement.getMethodName())) {
+              e.printStackTrace();
+              // Skip this test.
+              throw new AssumptionViolatedException(
+                  String.format(
+                      "Skipping test due to setup failure for %s#%s",
+                      description.getClassName(), 
description.getMethodName()));
+            }
+          }
+
+          // Re-throw the exception (which means the test has failed).
+          throw e;
+
+          // Regardless of the circumstances, the method decorated with @After 
will always be
+          // executed.
+        }
+      }
+
+      private boolean isMethodAnnotationWithBefore(final String methodName) {
+        try {
+          final Method method = 
description.getTestClass().getDeclaredMethod(methodName);
+          return method.isAnnotationPresent(org.junit.Before.class);
+        } catch (final Throwable ignored) {
+          return false;
+        }
+      }
+    };
+  }
+}
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/AbstractSubscriptionDualIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/AbstractSubscriptionDualIT.java
index 9aa01ad4acf..3d8eb45a7fd 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/AbstractSubscriptionDualIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/AbstractSubscriptionDualIT.java
@@ -21,17 +21,30 @@ package org.apache.iotdb.subscription.it.dual;
 
 import org.apache.iotdb.it.env.MultiEnvFactory;
 import org.apache.iotdb.itbase.env.BaseEnv;
+import 
org.apache.iotdb.session.subscription.consumer.SubscriptionExecutorServiceManager;
 
 import org.junit.After;
 import org.junit.Before;
+import org.junit.Rule;
+import org.junit.rules.TestName;
 
 abstract class AbstractSubscriptionDualIT {
 
   protected BaseEnv senderEnv;
   protected BaseEnv receiverEnv;
 
+  @Rule public TestName testName = new TestName();
+
   @Before
   public void setUp() {
+    // set thread name
+    Thread.currentThread().setName(String.format("%s - main", 
testName.getMethodName()));
+
+    // set thread pools core size
+    SubscriptionExecutorServiceManager.setControlFlowExecutorCorePoolSize(1);
+    
SubscriptionExecutorServiceManager.setUpstreamDataFlowExecutorCorePoolSize(1);
+    
SubscriptionExecutorServiceManager.setDownstreamDataFlowExecutorCorePoolSize(1);
+
     MultiEnvFactory.createEnv(2);
     senderEnv = MultiEnvFactory.getEnv(0);
     receiverEnv = MultiEnvFactory.getEnv(1);
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionConsumerGroupIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionConsumerGroupIT.java
index 992d151520f..db7b9e7ca3a 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionConsumerGroupIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionConsumerGroupIT.java
@@ -1005,7 +1005,7 @@ public class IoTDBSubscriptionConsumerGroupIT extends 
AbstractSubscriptionDualIT
                   LOGGER.info("consumer {} exiting...", consumers.get(index));
                 }
               },
-              consumers.get(index).toString());
+              String.format("%s - %s", testName.getMethodName(), 
consumers.get(index).toString()));
       t.start();
       threads.add(t);
     }
@@ -1016,6 +1016,7 @@ public class IoTDBSubscriptionConsumerGroupIT extends 
AbstractSubscriptionDualIT
           final Statement statement = connection.createStatement()) {
         // Keep retrying if there are execution failures
         Awaitility.await()
+            .pollInSameThread()
             
.pollDelay(IoTDBSubscriptionITConstant.AWAITILITY_POLL_DELAY_SECOND, 
TimeUnit.SECONDS)
             .pollInterval(
                 IoTDBSubscriptionITConstant.AWAITILITY_POLL_INTERVAL_SECOND, 
TimeUnit.SECONDS)
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionRestartIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionRestartIT.java
index 2764fad56b8..d3680219578 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionRestartIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionRestartIT.java
@@ -37,13 +37,16 @@ import 
org.apache.iotdb.session.subscription.consumer.SubscriptionPullConsumer;
 import org.apache.iotdb.session.subscription.payload.SubscriptionMessage;
 import 
org.apache.iotdb.session.subscription.payload.SubscriptionSessionDataSet;
 import org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant;
+import org.apache.iotdb.subscription.it.SkipOnSetUpFailure;
 
 import org.awaitility.Awaitility;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.junit.rules.TestRule;
 import org.junit.runner.RunWith;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -63,6 +66,8 @@ public class IoTDBSubscriptionRestartIT {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(IoTDBSubscriptionRestartIT.class);
 
+  @Rule public final TestRule skipOnSetUpFailure = new 
SkipOnSetUpFailure("setUp");
+
   @Before
   public void setUp() throws Exception {
     EnvFactory.getEnv()
@@ -123,6 +128,7 @@ public class IoTDBSubscriptionRestartIT {
       TestUtils.restartCluster(EnvFactory.getEnv());
     } catch (final Throwable e) {
       e.printStackTrace();
+      // Avoid failure
       return;
     }
 
@@ -148,9 +154,10 @@ public class IoTDBSubscriptionRestartIT {
             String.format("insert into root.db.d1(time, s1) values (%s, 1)", 
i));
       }
       session.executeNonQueryStatement("flush");
-    } catch (final Exception e) {
+    } catch (final Throwable e) {
       e.printStackTrace();
-      fail(e.getMessage());
+      // Avoid failure
+      return;
     }
 
     // Subscription again
@@ -253,9 +260,10 @@ public class IoTDBSubscriptionRestartIT {
             String.format("insert into root.db.d1(time, s1) values (%s, 1)", 
i));
       }
       session.executeNonQueryStatement("flush");
-    } catch (final Exception e) {
+    } catch (final Throwable e) {
       e.printStackTrace();
-      fail(e.getMessage());
+      // Avoid failure
+      return;
     }
 
     // Shutdown DN 1 & DN 2
@@ -265,6 +273,7 @@ public class IoTDBSubscriptionRestartIT {
       EnvFactory.getEnv().shutdownDataNode(2);
     } catch (final Throwable e) {
       e.printStackTrace();
+      // Avoid failure
       return;
     }
 
@@ -314,6 +323,7 @@ public class IoTDBSubscriptionRestartIT {
       ((AbstractEnv) EnvFactory.getEnv()).checkClusterStatusWithoutUnknown();
     } catch (final Throwable e) {
       e.printStackTrace();
+      // Avoid failure
       return;
     }
 
@@ -324,9 +334,10 @@ public class IoTDBSubscriptionRestartIT {
             String.format("insert into root.db.d1(time, s1) values (%s, 1)", 
i));
       }
       session.executeNonQueryStatement("flush");
-    } catch (final Exception e) {
+    } catch (final Throwable e) {
       e.printStackTrace();
-      fail(e.getMessage());
+      // Avoid failure
+      return;
     }
 
     // Check timestamps size
@@ -391,9 +402,10 @@ public class IoTDBSubscriptionRestartIT {
             String.format("insert into root.db.d1(time, s1) values (%s, 1)", 
i));
       }
       session.executeNonQueryStatement("flush");
-    } catch (final Exception e) {
+    } catch (final Throwable e) {
       e.printStackTrace();
-      fail(e.getMessage());
+      // Avoid failure
+      return;
     }
 
     // Subscription again
@@ -435,7 +447,13 @@ public class IoTDBSubscriptionRestartIT {
     thread.start();
 
     // Shutdown leader CN
-    
EnvFactory.getEnv().shutdownConfigNode(EnvFactory.getEnv().getLeaderConfigNodeIndex());
+    try {
+      
EnvFactory.getEnv().shutdownConfigNode(EnvFactory.getEnv().getLeaderConfigNodeIndex());
+    } catch (final Throwable e) {
+      e.printStackTrace();
+      // Avoid failure
+      return;
+    }
 
     // Insert some realtime data
     try (final ISession session = EnvFactory.getEnv().getSessionConnection()) {
@@ -444,9 +462,10 @@ public class IoTDBSubscriptionRestartIT {
             String.format("insert into root.db.d1(time, s1) values (%s, 1)", 
i));
       }
       session.executeNonQueryStatement("flush");
-    } catch (final Exception e) {
+    } catch (final Throwable e) {
       e.printStackTrace();
-      fail(e.getMessage());
+      // Avoid failure
+      return;
     }
 
     // Show topics and subscriptions
diff --git 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionExecutorServiceManager.java
 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionExecutorServiceManager.java
index 5a587ff96fb..6ce5946d4fc 100644
--- 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionExecutorServiceManager.java
+++ 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionExecutorServiceManager.java
@@ -29,12 +29,12 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 
-final class SubscriptionExecutorServiceManager {
+public final class SubscriptionExecutorServiceManager {
 
   private static final Logger LOGGER =
       LoggerFactory.getLogger(SubscriptionExecutorServiceManager.class);
 
-  private static final long AWAIT_TERMINATION_TIMEOUT_MS = 10_000L;
+  private static final long AWAIT_TERMINATION_TIMEOUT_MS = 15_000L;
 
   private static final String CONTROL_FLOW_EXECUTOR_NAME = 
"SubscriptionControlFlowExecutor";
   private static final String UPSTREAM_DATA_FLOW_EXECUTOR_NAME =
@@ -172,9 +172,9 @@ final class SubscriptionExecutorServiceManager {
     }
 
     void setCorePoolSize(final int corePoolSize) {
-      if (!isShutdown()) {
+      if (isShutdown()) {
         synchronized (this) {
-          if (!isShutdown()) {
+          if (isShutdown()) {
             this.corePoolSize = corePoolSize;
             return;
           }

Reply via email to