This is an automated email from the ASF dual-hosted git repository.

ericpai pushed a commit to branch bugfix/iotdb-3541
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit fbecf5cffa11f87b26581a33686ec34539d37b93
Author: ericpai <[email protected]>
AuthorDate: Mon Jun 20 14:20:57 2022 +0800

    [IOTDB-3541][IOTDB-3542] Make Cluster IT stable and usable
---
 .../org/apache/iotdb/it/env/ClusterEnvBase.java    | 65 ++++++++--------------
 .../itbase/runtime/ParallelRequestDelegate.java    |  8 ++-
 .../iotdb/itbase/runtime/RequestDelegate.java      | 49 +++++++++++++++-
 .../itbase/runtime/SerialRequestDelegate.java      |  4 +-
 .../src/assembly/resources/sbin/stop-datanode.bat  | 19 +++++--
 5 files changed, 96 insertions(+), 49 deletions(-)

diff --git 
a/integration-test/src/main/java/org/apache/iotdb/it/env/ClusterEnvBase.java 
b/integration-test/src/main/java/org/apache/iotdb/it/env/ClusterEnvBase.java
index 1a97596600..4d51982849 100644
--- a/integration-test/src/main/java/org/apache/iotdb/it/env/ClusterEnvBase.java
+++ b/integration-test/src/main/java/org/apache/iotdb/it/env/ClusterEnvBase.java
@@ -40,7 +40,6 @@ import java.util.Date;
 import java.util.List;
 import java.util.Random;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 
 import static org.apache.iotdb.jdbc.Config.VERSION;
@@ -48,7 +47,8 @@ import static org.junit.Assert.fail;
 
 public abstract class ClusterEnvBase implements BaseEnv {
   private static final Logger logger = 
LoggerFactory.getLogger(ClusterEnvBase.class);
-  private final int NODE_START_TIMEOUT = 10;
+  private final int NODE_START_TIMEOUT = 100;
+  private final int PROBE_TIMEOUT = 2;
   private List<ConfigNode> configNodes;
   private List<DataNode> dataNodes;
   private final Random rand = new Random();
@@ -155,50 +155,33 @@ public abstract class ClusterEnvBase implements BaseEnv {
     return "IT";
   }
 
-  public void testWorking() throws InterruptedException {
+  public void testWorking() {
     List<String> endpoints =
         
dataNodes.stream().map(ClusterNodeBase::getIpAndPortString).collect(Collectors.toList());
-    boolean[] success = new boolean[dataNodes.size()];
-    Exception[] exceptions = new Exception[dataNodes.size()];
-    final int probeTimeout = 5;
-    AtomicInteger successCount = new AtomicInteger(0);
-    for (int counter = 0; counter < 30; counter++) {
-      RequestDelegate<Void> testDelegate = new 
ParallelRequestDelegate<>(endpoints, probeTimeout);
-      for (int i = 0; i < dataNodes.size(); i++) {
-        final int idx = i;
-        final String dataNodeEndpoint = dataNodes.get(i).getIpAndPortString();
-        testDelegate.addRequest(
-            () -> {
-              if (!success[idx]) {
-                try (Connection ignored = getConnection(dataNodeEndpoint, 
probeTimeout)) {
-                  success[idx] = true;
-                  successCount.incrementAndGet();
-                } catch (Exception e) {
-                  exceptions[idx] = e;
-                  logger.debug("Open connection of {} failed", 
dataNodeEndpoint, e);
-                }
+    RequestDelegate<Void> testDelegate =
+        new ParallelRequestDelegate<>(endpoints, NODE_START_TIMEOUT);
+    for (DataNode dataNode : dataNodes) {
+      final String dataNodeEndpoint = dataNode.getIpAndPortString();
+      testDelegate.addRequest(
+          () -> {
+            Exception lastException = null;
+            for (int i = 0; i < 30; i++) {
+              try (Connection ignored = getConnection(dataNodeEndpoint, 
PROBE_TIMEOUT)) {
+                return null;
+              } catch (Exception e) {
+                lastException = e;
+                TimeUnit.SECONDS.sleep(1L);
               }
-              return null;
-            });
-      }
-      try {
-        testDelegate.requestAll();
-      } catch (SQLException e) {
-        // It will never be thrown as it has already caught in the request.
-      }
-      if (successCount.get() == dataNodes.size()) {
-        logger.info("The whole cluster is ready.");
-        return;
-      }
-      TimeUnit.SECONDS.sleep(1);
+            }
+            logger.error("Try to connect {} failed.", dataNodeEndpoint, 
lastException);
+            throw lastException;
+          });
     }
-    // The cluster is not ready after 30 times to try
-    for (int i = 0; i < dataNodes.size(); i++) {
-      if (!success[i] && exceptions[i] != null) {
-        logger.error("Connect to {} failed", 
dataNodes.get(i).getIpAndPortString(), exceptions[i]);
-      }
+    try {
+      testDelegate.requestAll();
+    } catch (Exception e) {
+      fail("After 30 times retry, the cluster can't work!");
     }
-    fail("After 30 times retry, the cluster can't work!");
   }
 
   @Override
diff --git 
a/integration-test/src/main/java/org/apache/iotdb/itbase/runtime/ParallelRequestDelegate.java
 
b/integration-test/src/main/java/org/apache/iotdb/itbase/runtime/ParallelRequestDelegate.java
index fbec912adb..92b5b478bc 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/itbase/runtime/ParallelRequestDelegate.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/itbase/runtime/ParallelRequestDelegate.java
@@ -22,8 +22,10 @@ import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 /**
  * ParallelRequestDelegate will handle requests in parallel. It's more 
efficient when requests
@@ -44,10 +46,13 @@ public class ParallelRequestDelegate<T> extends 
RequestDelegate<T> {
       resultFutures.add(f);
     }
     List<T> results = new ArrayList<>(getRequests().size());
+    Exception[] exceptions = new Exception[getEndpoints().size()];
     for (int i = 0; i < getEndpoints().size(); i++) {
       try {
         results.add(resultFutures.get(i).get(taskTimeoutSeconds, 
TimeUnit.SECONDS));
-      } catch (Exception e) {
+      } catch (ExecutionException e) {
+        exceptions[i] = e;
+      } catch (InterruptedException | TimeoutException e) {
         for (int j = i; j < getEndpoints().size(); j++) {
           resultFutures.get(j).cancel(true);
         }
@@ -55,6 +60,7 @@ public class ParallelRequestDelegate<T> extends 
RequestDelegate<T> {
             String.format("Waiting for query results of %s failed", 
getEndpoints().get(i)), e);
       }
     }
+    handleExceptions(exceptions);
     return results;
   }
 }
diff --git 
a/integration-test/src/main/java/org/apache/iotdb/itbase/runtime/RequestDelegate.java
 
b/integration-test/src/main/java/org/apache/iotdb/itbase/runtime/RequestDelegate.java
index af71fe1577..472ee81f91 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/itbase/runtime/RequestDelegate.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/itbase/runtime/RequestDelegate.java
@@ -18,15 +18,20 @@
  */
 package org.apache.iotdb.itbase.runtime;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Objects;
 import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
 
 /** This class is used to handle multi requests and gather their returned 
values. */
 public abstract class RequestDelegate<T> {
 
+  private static final Logger logger = 
LoggerFactory.getLogger(RequestDelegate.class);
   private final List<String> endpoints;
   private final List<Callable<T>> requests = new ArrayList<>();
 
@@ -49,7 +54,9 @@ public abstract class RequestDelegate<T> {
   }
 
   /**
-   * Do the requests which have been added, and return a list of their return 
values.
+   * Do the requests which have been added, and return a list of their return 
values. If some
+   * exception throws from the request, the exception thrown by each request 
will be compared and be
+   * thrown if they are even, or a InconsistentDataException.
    *
    * @return the return values of all the request added in order.
    * @throws SQLException if any error happens during requesting.
@@ -76,6 +83,46 @@ public abstract class RequestDelegate<T> {
     return data;
   }
 
+  protected void handleExceptions(Exception[] exceptions) throws SQLException {
+    if (exceptions.length == 0) {
+      return;
+    }
+    String[] exceptionMsg = new String[exceptions.length];
+    Throwable[] businessExceptions = new Throwable[exceptions.length];
+    boolean exceptionInconsistent = false;
+    for (int i = 0; i < exceptions.length; i++) {
+      if (exceptions[i] != null) {
+        businessExceptions[i] =
+            exceptions[i] instanceof ExecutionException ? 
exceptions[i].getCause() : exceptions[i];
+        exceptionMsg[i] = businessExceptions[i].getMessage();
+      }
+    }
+    for (int i = 1; i < exceptionMsg.length; i++) {
+      if (!Objects.equals(exceptionMsg[i], exceptionMsg[0])) {
+        exceptionInconsistent = true;
+        break;
+      }
+    }
+    if (!exceptionInconsistent && exceptionMsg[0] != null) {
+      throw new SQLException(exceptionMsg[0]);
+    }
+    if (exceptionInconsistent) {
+      for (int i = 0; i < exceptions.length; i++) {
+        if (exceptions[i] != null) {
+          // As each exception has its own stacktrace, in order to display 
them clearly, we can only
+          // print them through logger.
+          logger.error(
+              "Exception happens during request to {}",
+              getEndpoints().get(i),
+              businessExceptions[i]);
+        } else {
+          logger.error("No exception happens during request to {}", 
getEndpoints().get(i));
+        }
+      }
+      throw new InconsistentDataException(exceptionMsg, getEndpoints());
+    }
+  }
+
   protected List<String> getEndpoints() {
     return endpoints;
   }
diff --git 
a/integration-test/src/main/java/org/apache/iotdb/itbase/runtime/SerialRequestDelegate.java
 
b/integration-test/src/main/java/org/apache/iotdb/itbase/runtime/SerialRequestDelegate.java
index 10e53b62e6..096983af23 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/itbase/runtime/SerialRequestDelegate.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/itbase/runtime/SerialRequestDelegate.java
@@ -35,13 +35,15 @@ public class SerialRequestDelegate<T> extends 
RequestDelegate<T> {
   @Override
   public List<T> requestAll() throws SQLException {
     List<T> results = new ArrayList<>(getEndpoints().size());
+    Exception[] exceptions = new Exception[getEndpoints().size()];
     for (int i = 0; i < getEndpoints().size(); i++) {
       try {
         results.add(getRequests().get(i).call());
       } catch (Exception e) {
-        throw new SQLException(String.format("Request %s error.", 
getEndpoints().get(i)), e);
+        exceptions[i] = e;
       }
     }
+    handleExceptions(exceptions);
     return results;
   }
 }
diff --git a/server/src/assembly/resources/sbin/stop-datanode.bat 
b/server/src/assembly/resources/sbin/stop-datanode.bat
index e33c1bd81f..b7de06477f 100644
--- a/server/src/assembly/resources/sbin/stop-datanode.bat
+++ b/server/src/assembly/resources/sbin/stop-datanode.bat
@@ -19,11 +19,20 @@
 
 @echo off
 
-pushd..
-set exec_dir=%cd%
-popd
-set exec_dir=%exec_dir:\=\\%
-wmic process where (commandline like "%%iotdb.DataNode%%" and not 
name="wmic.exe" and  commandline  like "%%%exec_dir%%%") delete
+set current_dir=%~dp0
+set superior_dir=%current_dir%\..\
 
+for /f  "eol=; tokens=2,2 delims==" %%i in ('findstr /i "^rpc_port"
+%superior_dir%\conf\iotdb-engine.properties') do (
+  set rpc_port=%%i
+)
 
+for /f  "eol=; tokens=2,2 delims==" %%i in ('findstr /i "rpc_address"
+%superior_dir%\conf\iotdb-engine.properties') do (
+  set rpc_address=%%i
+)
 
+for /f "tokens=5" %%a in ('netstat /ano ^| findstr %rpc_address%:%rpc_port%') 
do (
+  taskkill /f /pid %%a
+)
+rem ps ax | grep -i 'iotdb.DataNode' | grep -v grep | awk '{print $1}' | xargs 
kill -SIGTERM

Reply via email to