This is an automated email from the ASF dual-hosted git repository. ericpai pushed a commit to branch bugfix/iotdb-3541 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit fbecf5cffa11f87b26581a33686ec34539d37b93 Author: ericpai <[email protected]> AuthorDate: Mon Jun 20 14:20:57 2022 +0800 [IOTDB-3541][IOTDB-3542] Make Cluster IT stable and usable --- .../org/apache/iotdb/it/env/ClusterEnvBase.java | 65 ++++++++-------------- .../itbase/runtime/ParallelRequestDelegate.java | 8 ++- .../iotdb/itbase/runtime/RequestDelegate.java | 49 +++++++++++++++- .../itbase/runtime/SerialRequestDelegate.java | 4 +- .../src/assembly/resources/sbin/stop-datanode.bat | 19 +++++-- 5 files changed, 96 insertions(+), 49 deletions(-) diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/ClusterEnvBase.java b/integration-test/src/main/java/org/apache/iotdb/it/env/ClusterEnvBase.java index 1a97596600..4d51982849 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/ClusterEnvBase.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/ClusterEnvBase.java @@ -40,7 +40,6 @@ import java.util.Date; import java.util.List; import java.util.Random; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import static org.apache.iotdb.jdbc.Config.VERSION; @@ -48,7 +47,8 @@ import static org.junit.Assert.fail; public abstract class ClusterEnvBase implements BaseEnv { private static final Logger logger = LoggerFactory.getLogger(ClusterEnvBase.class); - private final int NODE_START_TIMEOUT = 10; + private final int NODE_START_TIMEOUT = 100; + private final int PROBE_TIMEOUT = 2; private List<ConfigNode> configNodes; private List<DataNode> dataNodes; private final Random rand = new Random(); @@ -155,50 +155,33 @@ public abstract class ClusterEnvBase implements BaseEnv { return "IT"; } - public void testWorking() throws InterruptedException { + public void testWorking() { List<String> endpoints = dataNodes.stream().map(ClusterNodeBase::getIpAndPortString).collect(Collectors.toList()); - boolean[] success = new boolean[dataNodes.size()]; - Exception[] exceptions = new Exception[dataNodes.size()]; - final int probeTimeout = 5; - AtomicInteger successCount = new AtomicInteger(0); - for (int counter = 0; counter < 30; counter++) { - RequestDelegate<Void> testDelegate = new ParallelRequestDelegate<>(endpoints, probeTimeout); - for (int i = 0; i < dataNodes.size(); i++) { - final int idx = i; - final String dataNodeEndpoint = dataNodes.get(i).getIpAndPortString(); - testDelegate.addRequest( - () -> { - if (!success[idx]) { - try (Connection ignored = getConnection(dataNodeEndpoint, probeTimeout)) { - success[idx] = true; - successCount.incrementAndGet(); - } catch (Exception e) { - exceptions[idx] = e; - logger.debug("Open connection of {} failed", dataNodeEndpoint, e); - } + RequestDelegate<Void> testDelegate = + new ParallelRequestDelegate<>(endpoints, NODE_START_TIMEOUT); + for (DataNode dataNode : dataNodes) { + final String dataNodeEndpoint = dataNode.getIpAndPortString(); + testDelegate.addRequest( + () -> { + Exception lastException = null; + for (int i = 0; i < 30; i++) { + try (Connection ignored = getConnection(dataNodeEndpoint, PROBE_TIMEOUT)) { + return null; + } catch (Exception e) { + lastException = e; + TimeUnit.SECONDS.sleep(1L); } - return null; - }); - } - try { - testDelegate.requestAll(); - } catch (SQLException e) { - // It will never be thrown as it has already caught in the request. - } - if (successCount.get() == dataNodes.size()) { - logger.info("The whole cluster is ready."); - return; - } - TimeUnit.SECONDS.sleep(1); + } + logger.error("Try to connect {} failed.", dataNodeEndpoint, lastException); + throw lastException; + }); } - // The cluster is not ready after 30 times to try - for (int i = 0; i < dataNodes.size(); i++) { - if (!success[i] && exceptions[i] != null) { - logger.error("Connect to {} failed", dataNodes.get(i).getIpAndPortString(), exceptions[i]); - } + try { + testDelegate.requestAll(); + } catch (Exception e) { + fail("After 30 times retry, the cluster can't work!"); } - fail("After 30 times retry, the cluster can't work!"); } @Override diff --git a/integration-test/src/main/java/org/apache/iotdb/itbase/runtime/ParallelRequestDelegate.java b/integration-test/src/main/java/org/apache/iotdb/itbase/runtime/ParallelRequestDelegate.java index fbec912adb..92b5b478bc 100644 --- a/integration-test/src/main/java/org/apache/iotdb/itbase/runtime/ParallelRequestDelegate.java +++ b/integration-test/src/main/java/org/apache/iotdb/itbase/runtime/ParallelRequestDelegate.java @@ -22,8 +22,10 @@ import java.sql.SQLException; import java.util.ArrayList; import java.util.List; import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; /** * ParallelRequestDelegate will handle requests in parallel. It's more efficient when requests @@ -44,10 +46,13 @@ public class ParallelRequestDelegate<T> extends RequestDelegate<T> { resultFutures.add(f); } List<T> results = new ArrayList<>(getRequests().size()); + Exception[] exceptions = new Exception[getEndpoints().size()]; for (int i = 0; i < getEndpoints().size(); i++) { try { results.add(resultFutures.get(i).get(taskTimeoutSeconds, TimeUnit.SECONDS)); - } catch (Exception e) { + } catch (ExecutionException e) { + exceptions[i] = e; + } catch (InterruptedException | TimeoutException e) { for (int j = i; j < getEndpoints().size(); j++) { resultFutures.get(j).cancel(true); } @@ -55,6 +60,7 @@ public class ParallelRequestDelegate<T> extends RequestDelegate<T> { String.format("Waiting for query results of %s failed", getEndpoints().get(i)), e); } } + handleExceptions(exceptions); return results; } } diff --git a/integration-test/src/main/java/org/apache/iotdb/itbase/runtime/RequestDelegate.java b/integration-test/src/main/java/org/apache/iotdb/itbase/runtime/RequestDelegate.java index af71fe1577..472ee81f91 100644 --- a/integration-test/src/main/java/org/apache/iotdb/itbase/runtime/RequestDelegate.java +++ b/integration-test/src/main/java/org/apache/iotdb/itbase/runtime/RequestDelegate.java @@ -18,15 +18,20 @@ */ package org.apache.iotdb.itbase.runtime; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.sql.SQLException; import java.util.ArrayList; import java.util.List; import java.util.Objects; import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; /** This class is used to handle multi requests and gather their returned values. */ public abstract class RequestDelegate<T> { + private static final Logger logger = LoggerFactory.getLogger(RequestDelegate.class); private final List<String> endpoints; private final List<Callable<T>> requests = new ArrayList<>(); @@ -49,7 +54,9 @@ public abstract class RequestDelegate<T> { } /** - * Do the requests which have been added, and return a list of their return values. + * Do the requests which have been added, and return a list of their return values. If some + * exception throws from the request, the exception thrown by each request will be compared and be + * thrown if they are even, or a InconsistentDataException. * * @return the return values of all the request added in order. * @throws SQLException if any error happens during requesting. @@ -76,6 +83,46 @@ public abstract class RequestDelegate<T> { return data; } + protected void handleExceptions(Exception[] exceptions) throws SQLException { + if (exceptions.length == 0) { + return; + } + String[] exceptionMsg = new String[exceptions.length]; + Throwable[] businessExceptions = new Throwable[exceptions.length]; + boolean exceptionInconsistent = false; + for (int i = 0; i < exceptions.length; i++) { + if (exceptions[i] != null) { + businessExceptions[i] = + exceptions[i] instanceof ExecutionException ? exceptions[i].getCause() : exceptions[i]; + exceptionMsg[i] = businessExceptions[i].getMessage(); + } + } + for (int i = 1; i < exceptionMsg.length; i++) { + if (!Objects.equals(exceptionMsg[i], exceptionMsg[0])) { + exceptionInconsistent = true; + break; + } + } + if (!exceptionInconsistent && exceptionMsg[0] != null) { + throw new SQLException(exceptionMsg[0]); + } + if (exceptionInconsistent) { + for (int i = 0; i < exceptions.length; i++) { + if (exceptions[i] != null) { + // As each exception has its own stacktrace, in order to display them clearly, we can only + // print them through logger. + logger.error( + "Exception happens during request to {}", + getEndpoints().get(i), + businessExceptions[i]); + } else { + logger.error("No exception happens during request to {}", getEndpoints().get(i)); + } + } + throw new InconsistentDataException(exceptionMsg, getEndpoints()); + } + } + protected List<String> getEndpoints() { return endpoints; } diff --git a/integration-test/src/main/java/org/apache/iotdb/itbase/runtime/SerialRequestDelegate.java b/integration-test/src/main/java/org/apache/iotdb/itbase/runtime/SerialRequestDelegate.java index 10e53b62e6..096983af23 100644 --- a/integration-test/src/main/java/org/apache/iotdb/itbase/runtime/SerialRequestDelegate.java +++ b/integration-test/src/main/java/org/apache/iotdb/itbase/runtime/SerialRequestDelegate.java @@ -35,13 +35,15 @@ public class SerialRequestDelegate<T> extends RequestDelegate<T> { @Override public List<T> requestAll() throws SQLException { List<T> results = new ArrayList<>(getEndpoints().size()); + Exception[] exceptions = new Exception[getEndpoints().size()]; for (int i = 0; i < getEndpoints().size(); i++) { try { results.add(getRequests().get(i).call()); } catch (Exception e) { - throw new SQLException(String.format("Request %s error.", getEndpoints().get(i)), e); + exceptions[i] = e; } } + handleExceptions(exceptions); return results; } } diff --git a/server/src/assembly/resources/sbin/stop-datanode.bat b/server/src/assembly/resources/sbin/stop-datanode.bat index e33c1bd81f..b7de06477f 100644 --- a/server/src/assembly/resources/sbin/stop-datanode.bat +++ b/server/src/assembly/resources/sbin/stop-datanode.bat @@ -19,11 +19,20 @@ @echo off -pushd.. -set exec_dir=%cd% -popd -set exec_dir=%exec_dir:\=\\% -wmic process where (commandline like "%%iotdb.DataNode%%" and not name="wmic.exe" and commandline like "%%%exec_dir%%%") delete +set current_dir=%~dp0 +set superior_dir=%current_dir%\..\ +for /f "eol=; tokens=2,2 delims==" %%i in ('findstr /i "^rpc_port" +%superior_dir%\conf\iotdb-engine.properties') do ( + set rpc_port=%%i +) +for /f "eol=; tokens=2,2 delims==" %%i in ('findstr /i "rpc_address" +%superior_dir%\conf\iotdb-engine.properties') do ( + set rpc_address=%%i +) +for /f "tokens=5" %%a in ('netstat /ano ^| findstr %rpc_address%:%rpc_port%') do ( + taskkill /f /pid %%a +) +rem ps ax | grep -i 'iotdb.DataNode' | grep -v grep | awk '{print $1}' | xargs kill -SIGTERM
