This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch env-13
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/env-13 by this push:
new d36fcdf0625 Pipe: Fixed the crashed VM for ITs & Added watcher thread
for dataNode start (#17281)
d36fcdf0625 is described below
commit d36fcdf06252f2a313fe58e108743f23ee5b846f
Author: Caideyipi <[email protected]>
AuthorDate: Wed Mar 11 18:05:30 2026 +0800
Pipe: Fixed the crashed VM for ITs & Added watcher thread for dataNode
start (#17281)
* fix
* 200
* interrupt
* name
---
.../iotdb/it/env/cluster/ClusterConstant.java | 2 +-
.../iotdb/it/env/cluster/env/AbstractEnv.java | 25 +++++++++----------
.../itbase/runtime/ClusterTestConnection.java | 10 ++++++--
.../iotdb/itbase/runtime/ClusterTestResultSet.java | 14 ++++++++---
.../iotdb/itbase/runtime/ClusterTestStatement.java | 11 ++++++---
.../itbase/runtime/ParallelRequestDelegate.java | 9 ++++---
.../java/org/apache/iotdb/db/service/DataNode.java | 28 +++++++++++++++++++++-
7 files changed, 74 insertions(+), 25 deletions(-)
diff --git
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/ClusterConstant.java
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/ClusterConstant.java
index 147b57f2f67..4f650f8ccc4 100644
---
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/ClusterConstant.java
+++
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/ClusterConstant.java
@@ -195,7 +195,7 @@ public class ClusterConstant {
+ "*";
// Env Constant
- public static final int NODE_START_TIMEOUT = 100;
+ public static final int NODE_START_TIMEOUT = 200;
public static final int NODE_NETWORK_TIMEOUT_MS = 0;
public static final String ZERO_TIME_ZONE = "GMT+0";
diff --git
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java
index 7358de70621..ed9a15accd1 100644
---
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java
+++
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java
@@ -234,7 +234,7 @@ public abstract class AbstractEnv implements BaseEnv {
final List<String> dataNodeEndpoints = new ArrayList<>();
final RequestDelegate<Void> dataNodesDelegate =
- new ParallelRequestDelegate<>(dataNodeEndpoints, NODE_START_TIMEOUT);
+ new ParallelRequestDelegate<>(dataNodeEndpoints, NODE_START_TIMEOUT,
this);
for (int i = 0; i < dataNodesNum; i++) {
final DataNodeWrapper dataNodeWrapper =
new DataNodeWrapper(
@@ -292,7 +292,7 @@ public abstract class AbstractEnv implements BaseEnv {
aiNodeWrapper.createLogDir();
final RequestDelegate<Void> aiNodesDelegate =
new ParallelRequestDelegate<>(
- Collections.singletonList(aiNodeEndPoint), NODE_START_TIMEOUT);
+ Collections.singletonList(aiNodeEndPoint), NODE_START_TIMEOUT,
this);
aiNodesDelegate.addRequest(
() -> {
@@ -435,7 +435,7 @@ public abstract class AbstractEnv implements BaseEnv {
@Override
public Connection getConnection(String username, String password) throws
SQLException {
return new ClusterTestConnection(
- getWriteConnection(null, username, password), getReadConnections(null,
username, password));
+ getWriteConnection(null, username, password), getReadConnections(null,
username, password), this);
}
@Override
@@ -444,7 +444,7 @@ public abstract class AbstractEnv implements BaseEnv {
throws SQLException {
return new ClusterTestConnection(
getWriteConnectionWithSpecifiedDataNode(dataNodeWrapper, null,
username, password),
- getReadConnections(null, dataNodeWrapper, username, password));
+ getReadConnections(null, dataNodeWrapper, username, password), this);
}
@Override
@@ -453,7 +453,7 @@ public abstract class AbstractEnv implements BaseEnv {
throws SQLException {
return new ClusterTestConnection(
getWriteConnectionWithSpecifiedDataNode(dataNode, null, username,
password),
- Collections.emptyList());
+ Collections.emptyList(), this);
}
@Override
@@ -462,7 +462,7 @@ public abstract class AbstractEnv implements BaseEnv {
throws SQLException {
return new ClusterTestConnection(
getWriteConnectionWithSpecifiedDataNode(dataNode, null, username,
password),
- getReadConnections(null, username, password));
+ getReadConnections(null, username, password), this);
}
@Override
@@ -471,7 +471,7 @@ public abstract class AbstractEnv implements BaseEnv {
if (System.getProperty("ReadAndVerifyWithMultiNode",
"true").equalsIgnoreCase("true")) {
return new ClusterTestConnection(
getWriteConnection(version, username, password),
- getReadConnections(version, username, password));
+ getReadConnections(version, username, password), this);
} else {
return getWriteConnection(version, username,
password).getUnderlyingConnecton();
}
@@ -597,7 +597,7 @@ public abstract class AbstractEnv implements BaseEnv {
Constant.Version version, String username, String password) throws
SQLException {
List<String> endpoints = new ArrayList<>();
ParallelRequestDelegate<NodeConnection> readConnRequestDelegate =
- new ParallelRequestDelegate<>(endpoints, NODE_START_TIMEOUT);
+ new ParallelRequestDelegate<>(endpoints, NODE_START_TIMEOUT, this);
for (DataNodeWrapper dataNodeWrapper : this.dataNodeWrapperList) {
final String endpoint = dataNodeWrapper.getIpAndPortString();
endpoints.add(endpoint);
@@ -628,7 +628,7 @@ public abstract class AbstractEnv implements BaseEnv {
throws SQLException {
final List<String> endpoints = new ArrayList<>();
final ParallelRequestDelegate<NodeConnection> readConnRequestDelegate =
- new ParallelRequestDelegate<>(endpoints, NODE_START_TIMEOUT);
+ new ParallelRequestDelegate<>(endpoints, NODE_START_TIMEOUT, this);
endpoints.add(dataNode.getIpAndPortString());
readConnRequestDelegate.addRequest(
@@ -663,7 +663,7 @@ public abstract class AbstractEnv implements BaseEnv {
.map(DataNodeWrapper::getIpAndPortString)
.collect(Collectors.toList());
final RequestDelegate<Void> testDelegate =
- new ParallelRequestDelegate<>(endpoints, NODE_START_TIMEOUT);
+ new ParallelRequestDelegate<>(endpoints, NODE_START_TIMEOUT, this);
for (final DataNodeWrapper dataNode : dataNodeWrapperList) {
final String dataNodeEndpoint = dataNode.getIpAndPortString();
testDelegate.addRequest(
@@ -1014,7 +1014,8 @@ public abstract class AbstractEnv implements BaseEnv {
final RequestDelegate<Void> configNodeDelegate =
new ParallelRequestDelegate<>(
Collections.singletonList(newConfigNodeWrapper.getIpAndPortString()),
- NODE_START_TIMEOUT);
+ NODE_START_TIMEOUT,
+ this);
configNodeDelegate.addRequest(
() -> {
newConfigNodeWrapper.start();
@@ -1041,7 +1042,7 @@ public abstract class AbstractEnv implements BaseEnv {
final List<String> dataNodeEndpoints =
Collections.singletonList(newDataNodeWrapper.getIpAndPortString());
final RequestDelegate<Void> dataNodesDelegate =
- new ParallelRequestDelegate<>(dataNodeEndpoints, NODE_START_TIMEOUT);
+ new ParallelRequestDelegate<>(dataNodeEndpoints, NODE_START_TIMEOUT,
this);
dataNodesDelegate.addRequest(
() -> {
newDataNodeWrapper.start();
diff --git
a/integration-test/src/main/java/org/apache/iotdb/itbase/runtime/ClusterTestConnection.java
b/integration-test/src/main/java/org/apache/iotdb/itbase/runtime/ClusterTestConnection.java
index 77a5136e696..af910c36a5c 100644
---
a/integration-test/src/main/java/org/apache/iotdb/itbase/runtime/ClusterTestConnection.java
+++
b/integration-test/src/main/java/org/apache/iotdb/itbase/runtime/ClusterTestConnection.java
@@ -19,6 +19,8 @@
package org.apache.iotdb.itbase.runtime;
import org.apache.commons.lang3.Validate;
+import org.apache.iotdb.it.env.cluster.env.AbstractEnv;
+
import java.sql.Array;
import java.sql.Blob;
@@ -43,20 +45,24 @@ import java.util.concurrent.Executor;
/** The implementation of {@link Connection} in cluster test. */
public class ClusterTestConnection implements Connection {
+ private final AbstractEnv env;
private final NodeConnection writeConnection;
private final List<NodeConnection> readConnections;
private boolean isClosed;
public ClusterTestConnection(
- NodeConnection writeConnection, List<NodeConnection> readConnections) {
+ final NodeConnection writeConnection,
+ final List<NodeConnection> readConnections,
+ final AbstractEnv env) {
Validate.notNull(readConnections);
this.writeConnection = writeConnection;
this.readConnections = readConnections;
+ this.env = env;
}
@Override
public Statement createStatement() throws SQLException {
- return new ClusterTestStatement(writeConnection, readConnections);
+ return new ClusterTestStatement(writeConnection, readConnections, env);
}
@Override
diff --git
a/integration-test/src/main/java/org/apache/iotdb/itbase/runtime/ClusterTestResultSet.java
b/integration-test/src/main/java/org/apache/iotdb/itbase/runtime/ClusterTestResultSet.java
index e98ff7e7c14..3c32c550330 100644
---
a/integration-test/src/main/java/org/apache/iotdb/itbase/runtime/ClusterTestResultSet.java
+++
b/integration-test/src/main/java/org/apache/iotdb/itbase/runtime/ClusterTestResultSet.java
@@ -18,6 +18,8 @@
*/
package org.apache.iotdb.itbase.runtime;
+import org.apache.iotdb.it.env.cluster.env.AbstractEnv;
+
import java.io.InputStream;
import java.io.Reader;
import java.math.BigDecimal;
@@ -47,17 +49,23 @@ public class ClusterTestResultSet implements ResultSet {
private final List<ResultSet> resultSets;
private final List<String> endpoints;
private final int queryTimeoutSeconds;
+ private final AbstractEnv env;
public ClusterTestResultSet(
- List<Statement> statements, List<String> endpoints, String sql, int
queryTimeoutSeconds)
+ final List<Statement> statements,
+ final List<String> endpoints,
+ final String sql,
+ final int queryTimeoutSeconds,
+ final AbstractEnv env)
throws SQLException {
this.queryTimeoutSeconds = queryTimeoutSeconds;
this.endpoints = endpoints;
- RequestDelegate<ResultSet> delegate = createRequestDelegate();
+ final RequestDelegate<ResultSet> delegate = createRequestDelegate();
for (Statement st : statements) {
delegate.addRequest(() -> st.executeQuery(sql));
}
resultSets = delegate.requestAll();
+ this.env = env;
}
@Override
@@ -1169,7 +1177,7 @@ public class ClusterTestResultSet implements ResultSet {
* executed in parallel in order to accelerate the test.
*/
private <T> RequestDelegate<T> createRequestDelegate() {
- return new ParallelRequestDelegate<>(endpoints, queryTimeoutSeconds);
+ return new ParallelRequestDelegate<>(endpoints, queryTimeoutSeconds, env);
}
private <T> RequestDelegate<T> createLocalRequestDelegate() {
diff --git
a/integration-test/src/main/java/org/apache/iotdb/itbase/runtime/ClusterTestStatement.java
b/integration-test/src/main/java/org/apache/iotdb/itbase/runtime/ClusterTestStatement.java
index c6b139a3eeb..f7381bb543a 100644
---
a/integration-test/src/main/java/org/apache/iotdb/itbase/runtime/ClusterTestStatement.java
+++
b/integration-test/src/main/java/org/apache/iotdb/itbase/runtime/ClusterTestStatement.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.itbase.runtime;
+import org.apache.iotdb.it.env.cluster.env.AbstractEnv;
import org.apache.iotdb.jdbc.Config;
import org.slf4j.Logger;
@@ -44,9 +45,12 @@ public class ClusterTestStatement implements Statement {
private int maxRows = Integer.MAX_VALUE;
private int queryTimeout = DEFAULT_QUERY_TIMEOUT;
private int fetchSize = Config.DEFAULT_FETCH_SIZE;
+ private final AbstractEnv env;
public ClusterTestStatement(
- NodeConnection writeConnection, List<NodeConnection> readConnections) {
+ final NodeConnection writeConnection,
+ final List<NodeConnection> readConnections,
+ final AbstractEnv env) {
try {
this.writeStatement =
writeConnection.getUnderlyingConnecton().createStatement();
updateConfig(writeStatement, 0);
@@ -68,6 +72,7 @@ public class ClusterTestStatement implements Statement {
if (readStatements.isEmpty()) {
LOGGER.warn("Failed to create any read statement.");
}
+ this.env = env;
}
private void updateConfig(Statement statement, int timeout) throws
SQLException {
@@ -77,7 +82,7 @@ public class ClusterTestStatement implements Statement {
@Override
public ResultSet executeQuery(String sql) throws SQLException {
- return new ClusterTestResultSet(readStatements, readEndpoints, sql,
queryTimeout);
+ return new ClusterTestResultSet(readStatements, readEndpoints, sql,
queryTimeout, env);
}
@Override
@@ -90,7 +95,7 @@ public class ClusterTestStatement implements Statement {
List<String> endpoints = new ArrayList<>();
endpoints.add(writEndpoint);
endpoints.addAll(readEndpoints);
- RequestDelegate<Void> delegate = new ParallelRequestDelegate<>(endpoints,
queryTimeout);
+ RequestDelegate<Void> delegate = new ParallelRequestDelegate<>(endpoints,
queryTimeout, env);
delegate.addRequest(
() -> {
if (writeStatement != null) {
diff --git
a/integration-test/src/main/java/org/apache/iotdb/itbase/runtime/ParallelRequestDelegate.java
b/integration-test/src/main/java/org/apache/iotdb/itbase/runtime/ParallelRequestDelegate.java
index 19b6877d5c8..437a6da6e5b 100644
---
a/integration-test/src/main/java/org/apache/iotdb/itbase/runtime/ParallelRequestDelegate.java
+++
b/integration-test/src/main/java/org/apache/iotdb/itbase/runtime/ParallelRequestDelegate.java
@@ -19,7 +19,7 @@
package org.apache.iotdb.itbase.runtime;
-import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.env.cluster.env.AbstractEnv;
import org.apache.iotdb.itbase.exception.ParallelRequestTimeoutException;
import java.sql.SQLException;
@@ -37,10 +37,13 @@ import java.util.concurrent.TimeoutException;
*/
public class ParallelRequestDelegate<T> extends RequestDelegate<T> {
private final int taskTimeoutSeconds;
+ private final AbstractEnv env;
- public ParallelRequestDelegate(List<String> endpoints, int
taskTimeoutSeconds) {
+ public ParallelRequestDelegate(
+ final List<String> endpoints, final int taskTimeoutSeconds, final
AbstractEnv env) {
super(endpoints);
this.taskTimeoutSeconds = taskTimeoutSeconds;
+ this.env = env;
}
public List<T> requestAll() throws SQLException {
@@ -57,7 +60,7 @@ public class ParallelRequestDelegate<T> extends
RequestDelegate<T> {
} catch (ExecutionException e) {
exceptions[i] = e;
} catch (InterruptedException | TimeoutException e) {
- EnvFactory.getEnv().dumpTestJVMSnapshot();
+ env.dumpTestJVMSnapshot();
for (int j = i; j < getEndpoints().size(); j++) {
resultFutures.get(j).cancel(true);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java
index c557ac61b61..35ded217eba 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java
@@ -169,6 +169,7 @@ public class DataNode extends ServerCommandLine implements
DataNodeMBean {
private boolean schemaRegionConsensusStarted = false;
private boolean dataRegionConsensusStarted = false;
+ private static Thread watcherThread;
public DataNode() {
super("DataNode");
@@ -189,11 +190,36 @@ public class DataNode extends ServerCommandLine
implements DataNodeMBean {
return DataNodeHolder.INSTANCE;
}
- public static void main(String[] args) {
+ public static void main(final String[] args) {
+ final Thread hookThread = Thread.currentThread();
+ watcherThread =
+ new Thread(
+ () -> {
+ while (!Thread.interrupted() && hookThread.isAlive()) {
+ try {
+ Thread.sleep(10000);
+ final StackTraceElement[] stackTrace =
hookThread.getStackTrace();
+ final StringBuilder stackTraceBuilder =
+ new StringBuilder("Stack trace of main thread:\n");
+ for (final StackTraceElement traceElement : stackTrace) {
+
stackTraceBuilder.append(traceElement.toString()).append("\n");
+ }
+ logger.info(stackTraceBuilder.toString());
+ } catch (final InterruptedException e) {
+ Thread.currentThread().interrupt();
+ return;
+ }
+ }
+ },
+ "DataNodeStartWatcher");
+ watcherThread.setDaemon(true);
+ watcherThread.start();
+
logger.info("IoTDB-DataNode environment variables: {}",
IoTDBConfig.getEnvironmentVariables());
logger.info("IoTDB-DataNode default charset is: {}",
Charset.defaultCharset().displayName());
DataNode dataNode = new DataNode();
int returnCode = dataNode.run(args);
+ watcherThread.interrupt();
if (returnCode != 0) {
System.exit(returnCode);
}