This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 902bc60352f Pipe: Fixed the crashed VM for ITs & Added watcher thread
for dataNode start (#17281)
902bc60352f is described below
commit 902bc60352fe6bcbb47b2338cd1ccdf1b5062295
Author: Caideyipi <[email protected]>
AuthorDate: Wed Mar 11 18:05:30 2026 +0800
Pipe: Fixed the crashed VM for ITs & Added watcher thread for dataNode
start (#17281)
* fix
* 200
* interrupt
* name
---
.../iotdb/it/env/cluster/ClusterConstant.java | 2 +-
.../iotdb/it/env/cluster/env/AbstractEnv.java | 33 +++++++++++++---------
.../itbase/runtime/ClusterTestConnection.java | 10 +++++--
.../iotdb/itbase/runtime/ClusterTestResultSet.java | 14 +++++++--
.../iotdb/itbase/runtime/ClusterTestStatement.java | 11 ++++++--
.../itbase/runtime/ParallelRequestDelegate.java | 9 ++++--
.../java/org/apache/iotdb/db/service/DataNode.java | 28 +++++++++++++++++-
7 files changed, 81 insertions(+), 26 deletions(-)
diff --git
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/ClusterConstant.java
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/ClusterConstant.java
index 82ed7976959..2d1457a00d2 100644
---
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/ClusterConstant.java
+++
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/ClusterConstant.java
@@ -230,7 +230,7 @@ public class ClusterConstant {
+ "*";
// Env Constant
- public static final int NODE_START_TIMEOUT = 100;
+ public static final int NODE_START_TIMEOUT = 200;
public static final int NODE_NETWORK_TIMEOUT_MS = 0;
public static final String ZERO_TIME_ZONE = "GMT+0";
diff --git
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java
index 8308225a9af..3fcbe897f75 100644
---
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java
+++
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java
@@ -237,7 +237,7 @@ public abstract class AbstractEnv implements BaseEnv {
final List<String> dataNodeEndpoints = new ArrayList<>();
final RequestDelegate<Void> dataNodesDelegate =
- new ParallelRequestDelegate<>(dataNodeEndpoints, NODE_START_TIMEOUT);
+ new ParallelRequestDelegate<>(dataNodeEndpoints, NODE_START_TIMEOUT,
this);
for (int i = 0; i < dataNodesNum; i++) {
DataNodeWrapper dataNodeWrapper = newDataNode();
dataNodeEndpoints.add(dataNodeWrapper.getIpAndPortString());
@@ -325,7 +325,7 @@ public abstract class AbstractEnv implements BaseEnv {
aiNodeWrapper.createLogDir();
final RequestDelegate<Void> aiNodesDelegate =
new ParallelRequestDelegate<>(
- Collections.singletonList(aiNodeEndPoint), NODE_START_TIMEOUT);
+ Collections.singletonList(aiNodeEndPoint), NODE_START_TIMEOUT,
this);
aiNodesDelegate.addRequest(
() -> {
@@ -617,7 +617,8 @@ public abstract class AbstractEnv implements BaseEnv {
final String username, final String password, final String sqlDialect)
throws SQLException {
return new ClusterTestConnection(
getWriteConnection(null, username, password, sqlDialect),
- getReadConnections(null, username, password, sqlDialect));
+ getReadConnections(null, username, password, sqlDialect),
+ this);
}
@Override
@@ -625,7 +626,8 @@ public abstract class AbstractEnv implements BaseEnv {
throws SQLException {
return new ClusterTestConnection(
getWriteConnection(null, username, password, sqlDialect),
- getOneAvailableReadConnection(null, username, password, sqlDialect));
+ getOneAvailableReadConnection(null, username, password, sqlDialect),
+ this);
}
@Override
@@ -638,7 +640,8 @@ public abstract class AbstractEnv implements BaseEnv {
return new ClusterTestConnection(
getWriteConnectionWithSpecifiedDataNode(
dataNodeWrapper, null, username, password, sqlDialect),
- getReadConnections(null, dataNodeWrapper, username, password,
sqlDialect));
+ getReadConnections(null, dataNodeWrapper, username, password,
sqlDialect),
+ this);
}
@Override
@@ -655,7 +658,8 @@ public abstract class AbstractEnv implements BaseEnv {
username,
password,
TABLE_SQL_DIALECT.equals(sqlDialect) ? TABLE_SQL_DIALECT :
TREE_SQL_DIALECT),
- Collections.emptyList());
+ Collections.emptyList(),
+ this);
}
@Override
@@ -665,7 +669,8 @@ public abstract class AbstractEnv implements BaseEnv {
return new ClusterTestConnection(
getWriteConnectionWithSpecifiedDataNode(
dataNode, null, username, password, TREE_SQL_DIALECT),
- getReadConnections(null, username, password, TREE_SQL_DIALECT));
+ getReadConnections(null, username, password, TREE_SQL_DIALECT),
+ this);
}
@Override
@@ -678,7 +683,8 @@ public abstract class AbstractEnv implements BaseEnv {
return System.getProperty("ReadAndVerifyWithMultiNode",
"true").equalsIgnoreCase("true")
? new ClusterTestConnection(
getWriteConnection(version, username, password, sqlDialect),
- getReadConnections(version, username, password, sqlDialect))
+ getReadConnections(version, username, password, sqlDialect),
+ this)
: getWriteConnection(version, username, password,
sqlDialect).getUnderlyingConnection();
}
@@ -885,7 +891,7 @@ public abstract class AbstractEnv implements BaseEnv {
throws SQLException {
final List<String> endpoints = new ArrayList<>();
final ParallelRequestDelegate<NodeConnection> readConnRequestDelegate =
- new ParallelRequestDelegate<>(endpoints, NODE_START_TIMEOUT);
+ new ParallelRequestDelegate<>(endpoints, NODE_START_TIMEOUT, this);
dataNodeWrapperList.stream()
.map(AbstractNodeWrapper::getIpAndPortString)
@@ -936,7 +942,7 @@ public abstract class AbstractEnv implements BaseEnv {
throws SQLException {
final List<String> endpoints = new ArrayList<>();
final ParallelRequestDelegate<NodeConnection> readConnRequestDelegate =
- new ParallelRequestDelegate<>(endpoints, NODE_START_TIMEOUT);
+ new ParallelRequestDelegate<>(endpoints, NODE_START_TIMEOUT, this);
endpoints.add(dataNode.getIpAndPortString());
readConnRequestDelegate.addRequest(
@@ -967,7 +973,7 @@ public abstract class AbstractEnv implements BaseEnv {
.map(DataNodeWrapper::getIpAndPortString)
.collect(Collectors.toList());
final RequestDelegate<Void> testDelegate =
- new ParallelRequestDelegate<>(endpoints, NODE_START_TIMEOUT);
+ new ParallelRequestDelegate<>(endpoints, NODE_START_TIMEOUT, this);
for (final DataNodeWrapper dataNode : dataNodeWrapperList) {
final String dataNodeEndpoint = dataNode.getIpAndPortString();
testDelegate.addRequest(
@@ -1322,7 +1328,8 @@ public abstract class AbstractEnv implements BaseEnv {
final RequestDelegate<Void> configNodeDelegate =
new ParallelRequestDelegate<>(
Collections.singletonList(newConfigNodeWrapper.getIpAndPortString()),
- NODE_START_TIMEOUT);
+ NODE_START_TIMEOUT,
+ this);
configNodeDelegate.addRequest(
() -> {
newConfigNodeWrapper.start();
@@ -1349,7 +1356,7 @@ public abstract class AbstractEnv implements BaseEnv {
final List<String> dataNodeEndpoints =
Collections.singletonList(newDataNodeWrapper.getIpAndPortString());
final RequestDelegate<Void> dataNodesDelegate =
- new ParallelRequestDelegate<>(dataNodeEndpoints, NODE_START_TIMEOUT);
+ new ParallelRequestDelegate<>(dataNodeEndpoints, NODE_START_TIMEOUT,
this);
dataNodesDelegate.addRequest(
() -> {
newDataNodeWrapper.start();
diff --git
a/integration-test/src/main/java/org/apache/iotdb/itbase/runtime/ClusterTestConnection.java
b/integration-test/src/main/java/org/apache/iotdb/itbase/runtime/ClusterTestConnection.java
index bac970ffc6f..f9bb5e91116 100644
---
a/integration-test/src/main/java/org/apache/iotdb/itbase/runtime/ClusterTestConnection.java
+++
b/integration-test/src/main/java/org/apache/iotdb/itbase/runtime/ClusterTestConnection.java
@@ -18,6 +18,8 @@
*/
package org.apache.iotdb.itbase.runtime;
+import org.apache.iotdb.it.env.cluster.env.AbstractEnv;
+
import org.apache.tsfile.external.commons.lang3.Validate;
import java.sql.Array;
@@ -43,20 +45,24 @@ import java.util.concurrent.Executor;
/** The implementation of {@link Connection} in cluster test. */
public class ClusterTestConnection implements Connection {
+ public final AbstractEnv env;
public final NodeConnection writeConnection;
public final List<NodeConnection> readConnections;
private boolean isClosed;
public ClusterTestConnection(
- NodeConnection writeConnection, List<NodeConnection> readConnections) {
+ final NodeConnection writeConnection,
+ final List<NodeConnection> readConnections,
+ final AbstractEnv env) {
Validate.notNull(readConnections);
this.writeConnection = writeConnection;
this.readConnections = readConnections;
+ this.env = env;
}
@Override
public Statement createStatement() throws SQLException {
- return new ClusterTestStatement(writeConnection, readConnections);
+ return new ClusterTestStatement(writeConnection, readConnections, env);
}
@Override
diff --git
a/integration-test/src/main/java/org/apache/iotdb/itbase/runtime/ClusterTestResultSet.java
b/integration-test/src/main/java/org/apache/iotdb/itbase/runtime/ClusterTestResultSet.java
index 64c18db11a8..98686d436be 100644
---
a/integration-test/src/main/java/org/apache/iotdb/itbase/runtime/ClusterTestResultSet.java
+++
b/integration-test/src/main/java/org/apache/iotdb/itbase/runtime/ClusterTestResultSet.java
@@ -18,6 +18,8 @@
*/
package org.apache.iotdb.itbase.runtime;
+import org.apache.iotdb.it.env.cluster.env.AbstractEnv;
+
import java.io.InputStream;
import java.io.Reader;
import java.math.BigDecimal;
@@ -47,17 +49,23 @@ public class ClusterTestResultSet implements ResultSet {
private final List<ResultSet> resultSets;
private final List<String> endpoints;
private final int queryTimeoutSeconds;
+ private final AbstractEnv env;
public ClusterTestResultSet(
- List<Statement> statements, List<String> endpoints, String sql, int
queryTimeoutSeconds)
+ final List<Statement> statements,
+ final List<String> endpoints,
+ final String sql,
+ final int queryTimeoutSeconds,
+ final AbstractEnv env)
throws SQLException {
this.queryTimeoutSeconds = queryTimeoutSeconds;
this.endpoints = endpoints;
- RequestDelegate<ResultSet> delegate = createRequestDelegate();
+ final RequestDelegate<ResultSet> delegate = createRequestDelegate();
for (Statement st : statements) {
delegate.addRequest(() -> st.executeQuery(sql));
}
resultSets = delegate.requestAll();
+ this.env = env;
}
@Override
@@ -1177,7 +1185,7 @@ public class ClusterTestResultSet implements ResultSet {
* executed in parallel in order to accelerate the test.
*/
private <T> RequestDelegate<T> createRequestDelegate() {
- return new ParallelRequestDelegate<>(endpoints, queryTimeoutSeconds);
+ return new ParallelRequestDelegate<>(endpoints, queryTimeoutSeconds, env);
}
private <T> RequestDelegate<T> createLocalRequestDelegate() {
diff --git
a/integration-test/src/main/java/org/apache/iotdb/itbase/runtime/ClusterTestStatement.java
b/integration-test/src/main/java/org/apache/iotdb/itbase/runtime/ClusterTestStatement.java
index 0523a384828..eccea1d1437 100644
---
a/integration-test/src/main/java/org/apache/iotdb/itbase/runtime/ClusterTestStatement.java
+++
b/integration-test/src/main/java/org/apache/iotdb/itbase/runtime/ClusterTestStatement.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.itbase.runtime;
+import org.apache.iotdb.it.env.cluster.env.AbstractEnv;
import org.apache.iotdb.jdbc.Config;
import org.slf4j.Logger;
@@ -47,9 +48,12 @@ public class ClusterTestStatement implements Statement {
private int maxRows = Integer.MAX_VALUE;
private int queryTimeout = DEFAULT_QUERY_TIMEOUT;
private int fetchSize = Config.DEFAULT_FETCH_SIZE;
+ private final AbstractEnv env;
public ClusterTestStatement(
- NodeConnection writeConnection, List<NodeConnection> readConnections) {
+ final NodeConnection writeConnection,
+ final List<NodeConnection> readConnections,
+ final AbstractEnv env) {
try {
this.writeStatement =
writeConnection.getUnderlyingConnection().createStatement();
updateConfig(writeStatement, 0);
@@ -71,6 +75,7 @@ public class ClusterTestStatement implements Statement {
if (readStatements.isEmpty()) {
LOGGER.warn("Failed to create any read statement.");
}
+ this.env = env;
}
private void updateConfig(Statement statement, int timeout) throws
SQLException {
@@ -87,7 +92,7 @@ public class ClusterTestStatement implements Statement {
*/
@Override
public ResultSet executeQuery(String sql) throws SQLException {
- return new ClusterTestResultSet(readStatements, readEndpoints, sql,
queryTimeout);
+ return new ClusterTestResultSet(readStatements, readEndpoints, sql,
queryTimeout, env);
}
@Override
@@ -100,7 +105,7 @@ public class ClusterTestStatement implements Statement {
List<String> endpoints = new ArrayList<>();
endpoints.add(writEndpoint);
endpoints.addAll(readEndpoints);
- RequestDelegate<Void> delegate = new ParallelRequestDelegate<>(endpoints,
queryTimeout);
+ RequestDelegate<Void> delegate = new ParallelRequestDelegate<>(endpoints,
queryTimeout, env);
delegate.addRequest(
() -> {
if (writeStatement != null) {
diff --git
a/integration-test/src/main/java/org/apache/iotdb/itbase/runtime/ParallelRequestDelegate.java
b/integration-test/src/main/java/org/apache/iotdb/itbase/runtime/ParallelRequestDelegate.java
index 19b6877d5c8..437a6da6e5b 100644
---
a/integration-test/src/main/java/org/apache/iotdb/itbase/runtime/ParallelRequestDelegate.java
+++
b/integration-test/src/main/java/org/apache/iotdb/itbase/runtime/ParallelRequestDelegate.java
@@ -19,7 +19,7 @@
package org.apache.iotdb.itbase.runtime;
-import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.env.cluster.env.AbstractEnv;
import org.apache.iotdb.itbase.exception.ParallelRequestTimeoutException;
import java.sql.SQLException;
@@ -37,10 +37,13 @@ import java.util.concurrent.TimeoutException;
*/
public class ParallelRequestDelegate<T> extends RequestDelegate<T> {
private final int taskTimeoutSeconds;
+ private final AbstractEnv env;
- public ParallelRequestDelegate(List<String> endpoints, int
taskTimeoutSeconds) {
+ public ParallelRequestDelegate(
+ final List<String> endpoints, final int taskTimeoutSeconds, final
AbstractEnv env) {
super(endpoints);
this.taskTimeoutSeconds = taskTimeoutSeconds;
+ this.env = env;
}
public List<T> requestAll() throws SQLException {
@@ -57,7 +60,7 @@ public class ParallelRequestDelegate<T> extends
RequestDelegate<T> {
} catch (ExecutionException e) {
exceptions[i] = e;
} catch (InterruptedException | TimeoutException e) {
- EnvFactory.getEnv().dumpTestJVMSnapshot();
+ env.dumpTestJVMSnapshot();
for (int j = i; j < getEndpoints().size(); j++) {
resultFutures.get(j).cancel(true);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java
index 2d7cbe18358..f5bb56eda00 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java
@@ -185,6 +185,7 @@ public class DataNode extends ServerCommandLine implements
DataNodeMBean {
private boolean schemaRegionConsensusStarted = false;
private boolean dataRegionConsensusStarted = false;
+ private static Thread watcherThread;
public DataNode() {
super("DataNode");
@@ -204,13 +205,38 @@ public class DataNode extends ServerCommandLine
implements DataNodeMBean {
return DataNodeHolder.INSTANCE;
}
- public static void main(String[] args) {
+ public static void main(final String[] args) {
+ final Thread hookThread = Thread.currentThread();
+ watcherThread =
+ new Thread(
+ () -> {
+ while (!Thread.interrupted() && hookThread.isAlive()) {
+ try {
+ Thread.sleep(10000);
+ final StackTraceElement[] stackTrace =
hookThread.getStackTrace();
+ final StringBuilder stackTraceBuilder =
+ new StringBuilder("Stack trace of main thread:\n");
+ for (final StackTraceElement traceElement : stackTrace) {
+
stackTraceBuilder.append(traceElement.toString()).append("\n");
+ }
+ logger.info(stackTraceBuilder.toString());
+ } catch (final InterruptedException e) {
+ Thread.currentThread().interrupt();
+ return;
+ }
+ }
+ },
+ "DataNodeStartWatcher");
+ watcherThread.setDaemon(true);
+ watcherThread.start();
+
logger.info("IoTDB-DataNode environment variables: {}",
IoTDBConfig.getEnvironmentVariables());
logger.info("IoTDB-DataNode default charset is: {}",
Charset.defaultCharset().displayName());
// let IoTDB handle the exception instead of ratis
ExitUtils.disableSystemExit();
DataNode dataNode = new DataNode();
int returnCode = dataNode.run(args);
+ watcherThread.interrupt();
if (returnCode != 0) {
System.exit(returnCode);
}