This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch env-fix
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/env-fix by this push:
new 355d2587bce fix
355d2587bce is described below
commit 355d2587bcea5ef7c63bd04d9d8cadc31cfdf5c5
Author: Caideyipi <[email protected]>
AuthorDate: Wed Mar 11 15:54:50 2026 +0800
fix
---
.../iotdb/it/env/cluster/env/AbstractEnv.java | 33 +++++++++++++---------
.../itbase/runtime/ClusterTestConnection.java | 10 +++++--
.../iotdb/itbase/runtime/ClusterTestResultSet.java | 14 +++++++--
.../iotdb/itbase/runtime/ClusterTestStatement.java | 11 ++++++--
.../itbase/runtime/ParallelRequestDelegate.java | 9 ++++--
5 files changed, 53 insertions(+), 24 deletions(-)
diff --git
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java
index 8308225a9af..3fcbe897f75 100644
---
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java
+++
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java
@@ -237,7 +237,7 @@ public abstract class AbstractEnv implements BaseEnv {
final List<String> dataNodeEndpoints = new ArrayList<>();
final RequestDelegate<Void> dataNodesDelegate =
- new ParallelRequestDelegate<>(dataNodeEndpoints, NODE_START_TIMEOUT);
+ new ParallelRequestDelegate<>(dataNodeEndpoints, NODE_START_TIMEOUT,
this);
for (int i = 0; i < dataNodesNum; i++) {
DataNodeWrapper dataNodeWrapper = newDataNode();
dataNodeEndpoints.add(dataNodeWrapper.getIpAndPortString());
@@ -325,7 +325,7 @@ public abstract class AbstractEnv implements BaseEnv {
aiNodeWrapper.createLogDir();
final RequestDelegate<Void> aiNodesDelegate =
new ParallelRequestDelegate<>(
- Collections.singletonList(aiNodeEndPoint), NODE_START_TIMEOUT);
+ Collections.singletonList(aiNodeEndPoint), NODE_START_TIMEOUT,
this);
aiNodesDelegate.addRequest(
() -> {
@@ -617,7 +617,8 @@ public abstract class AbstractEnv implements BaseEnv {
final String username, final String password, final String sqlDialect)
throws SQLException {
return new ClusterTestConnection(
getWriteConnection(null, username, password, sqlDialect),
- getReadConnections(null, username, password, sqlDialect));
+ getReadConnections(null, username, password, sqlDialect),
+ this);
}
@Override
@@ -625,7 +626,8 @@ public abstract class AbstractEnv implements BaseEnv {
throws SQLException {
return new ClusterTestConnection(
getWriteConnection(null, username, password, sqlDialect),
- getOneAvailableReadConnection(null, username, password, sqlDialect));
+ getOneAvailableReadConnection(null, username, password, sqlDialect),
+ this);
}
@Override
@@ -638,7 +640,8 @@ public abstract class AbstractEnv implements BaseEnv {
return new ClusterTestConnection(
getWriteConnectionWithSpecifiedDataNode(
dataNodeWrapper, null, username, password, sqlDialect),
- getReadConnections(null, dataNodeWrapper, username, password,
sqlDialect));
+ getReadConnections(null, dataNodeWrapper, username, password,
sqlDialect),
+ this);
}
@Override
@@ -655,7 +658,8 @@ public abstract class AbstractEnv implements BaseEnv {
username,
password,
TABLE_SQL_DIALECT.equals(sqlDialect) ? TABLE_SQL_DIALECT :
TREE_SQL_DIALECT),
- Collections.emptyList());
+ Collections.emptyList(),
+ this);
}
@Override
@@ -665,7 +669,8 @@ public abstract class AbstractEnv implements BaseEnv {
return new ClusterTestConnection(
getWriteConnectionWithSpecifiedDataNode(
dataNode, null, username, password, TREE_SQL_DIALECT),
- getReadConnections(null, username, password, TREE_SQL_DIALECT));
+ getReadConnections(null, username, password, TREE_SQL_DIALECT),
+ this);
}
@Override
@@ -678,7 +683,8 @@ public abstract class AbstractEnv implements BaseEnv {
return System.getProperty("ReadAndVerifyWithMultiNode",
"true").equalsIgnoreCase("true")
? new ClusterTestConnection(
getWriteConnection(version, username, password, sqlDialect),
- getReadConnections(version, username, password, sqlDialect))
+ getReadConnections(version, username, password, sqlDialect),
+ this)
: getWriteConnection(version, username, password,
sqlDialect).getUnderlyingConnection();
}
@@ -885,7 +891,7 @@ public abstract class AbstractEnv implements BaseEnv {
throws SQLException {
final List<String> endpoints = new ArrayList<>();
final ParallelRequestDelegate<NodeConnection> readConnRequestDelegate =
- new ParallelRequestDelegate<>(endpoints, NODE_START_TIMEOUT);
+ new ParallelRequestDelegate<>(endpoints, NODE_START_TIMEOUT, this);
dataNodeWrapperList.stream()
.map(AbstractNodeWrapper::getIpAndPortString)
@@ -936,7 +942,7 @@ public abstract class AbstractEnv implements BaseEnv {
throws SQLException {
final List<String> endpoints = new ArrayList<>();
final ParallelRequestDelegate<NodeConnection> readConnRequestDelegate =
- new ParallelRequestDelegate<>(endpoints, NODE_START_TIMEOUT);
+ new ParallelRequestDelegate<>(endpoints, NODE_START_TIMEOUT, this);
endpoints.add(dataNode.getIpAndPortString());
readConnRequestDelegate.addRequest(
@@ -967,7 +973,7 @@ public abstract class AbstractEnv implements BaseEnv {
.map(DataNodeWrapper::getIpAndPortString)
.collect(Collectors.toList());
final RequestDelegate<Void> testDelegate =
- new ParallelRequestDelegate<>(endpoints, NODE_START_TIMEOUT);
+ new ParallelRequestDelegate<>(endpoints, NODE_START_TIMEOUT, this);
for (final DataNodeWrapper dataNode : dataNodeWrapperList) {
final String dataNodeEndpoint = dataNode.getIpAndPortString();
testDelegate.addRequest(
@@ -1322,7 +1328,8 @@ public abstract class AbstractEnv implements BaseEnv {
final RequestDelegate<Void> configNodeDelegate =
new ParallelRequestDelegate<>(
Collections.singletonList(newConfigNodeWrapper.getIpAndPortString()),
- NODE_START_TIMEOUT);
+ NODE_START_TIMEOUT,
+ this);
configNodeDelegate.addRequest(
() -> {
newConfigNodeWrapper.start();
@@ -1349,7 +1356,7 @@ public abstract class AbstractEnv implements BaseEnv {
final List<String> dataNodeEndpoints =
Collections.singletonList(newDataNodeWrapper.getIpAndPortString());
final RequestDelegate<Void> dataNodesDelegate =
- new ParallelRequestDelegate<>(dataNodeEndpoints, NODE_START_TIMEOUT);
+ new ParallelRequestDelegate<>(dataNodeEndpoints, NODE_START_TIMEOUT,
this);
dataNodesDelegate.addRequest(
() -> {
newDataNodeWrapper.start();
diff --git
a/integration-test/src/main/java/org/apache/iotdb/itbase/runtime/ClusterTestConnection.java
b/integration-test/src/main/java/org/apache/iotdb/itbase/runtime/ClusterTestConnection.java
index bac970ffc6f..f9bb5e91116 100644
---
a/integration-test/src/main/java/org/apache/iotdb/itbase/runtime/ClusterTestConnection.java
+++
b/integration-test/src/main/java/org/apache/iotdb/itbase/runtime/ClusterTestConnection.java
@@ -18,6 +18,8 @@
*/
package org.apache.iotdb.itbase.runtime;
+import org.apache.iotdb.it.env.cluster.env.AbstractEnv;
+
import org.apache.tsfile.external.commons.lang3.Validate;
import java.sql.Array;
@@ -43,20 +45,24 @@ import java.util.concurrent.Executor;
/** The implementation of {@link Connection} in cluster test. */
public class ClusterTestConnection implements Connection {
+ public final AbstractEnv env;
public final NodeConnection writeConnection;
public final List<NodeConnection> readConnections;
private boolean isClosed;
public ClusterTestConnection(
- NodeConnection writeConnection, List<NodeConnection> readConnections) {
+ final NodeConnection writeConnection,
+ final List<NodeConnection> readConnections,
+ final AbstractEnv env) {
Validate.notNull(readConnections);
this.writeConnection = writeConnection;
this.readConnections = readConnections;
+ this.env = env;
}
@Override
public Statement createStatement() throws SQLException {
- return new ClusterTestStatement(writeConnection, readConnections);
+ return new ClusterTestStatement(writeConnection, readConnections, env);
}
@Override
diff --git
a/integration-test/src/main/java/org/apache/iotdb/itbase/runtime/ClusterTestResultSet.java
b/integration-test/src/main/java/org/apache/iotdb/itbase/runtime/ClusterTestResultSet.java
index 64c18db11a8..98686d436be 100644
---
a/integration-test/src/main/java/org/apache/iotdb/itbase/runtime/ClusterTestResultSet.java
+++
b/integration-test/src/main/java/org/apache/iotdb/itbase/runtime/ClusterTestResultSet.java
@@ -18,6 +18,8 @@
*/
package org.apache.iotdb.itbase.runtime;
+import org.apache.iotdb.it.env.cluster.env.AbstractEnv;
+
import java.io.InputStream;
import java.io.Reader;
import java.math.BigDecimal;
@@ -47,17 +49,23 @@ public class ClusterTestResultSet implements ResultSet {
private final List<ResultSet> resultSets;
private final List<String> endpoints;
private final int queryTimeoutSeconds;
+ private final AbstractEnv env;
public ClusterTestResultSet(
- List<Statement> statements, List<String> endpoints, String sql, int
queryTimeoutSeconds)
+ final List<Statement> statements,
+ final List<String> endpoints,
+ final String sql,
+ final int queryTimeoutSeconds,
+ final AbstractEnv env)
throws SQLException {
this.queryTimeoutSeconds = queryTimeoutSeconds;
this.endpoints = endpoints;
- RequestDelegate<ResultSet> delegate = createRequestDelegate();
+ final RequestDelegate<ResultSet> delegate = createRequestDelegate();
for (Statement st : statements) {
delegate.addRequest(() -> st.executeQuery(sql));
}
resultSets = delegate.requestAll();
+ this.env = env;
}
@Override
@@ -1177,7 +1185,7 @@ public class ClusterTestResultSet implements ResultSet {
* executed in parallel in order to accelerate the test.
*/
private <T> RequestDelegate<T> createRequestDelegate() {
- return new ParallelRequestDelegate<>(endpoints, queryTimeoutSeconds);
+ return new ParallelRequestDelegate<>(endpoints, queryTimeoutSeconds, env);
}
private <T> RequestDelegate<T> createLocalRequestDelegate() {
diff --git
a/integration-test/src/main/java/org/apache/iotdb/itbase/runtime/ClusterTestStatement.java
b/integration-test/src/main/java/org/apache/iotdb/itbase/runtime/ClusterTestStatement.java
index 0523a384828..eccea1d1437 100644
---
a/integration-test/src/main/java/org/apache/iotdb/itbase/runtime/ClusterTestStatement.java
+++
b/integration-test/src/main/java/org/apache/iotdb/itbase/runtime/ClusterTestStatement.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.itbase.runtime;
+import org.apache.iotdb.it.env.cluster.env.AbstractEnv;
import org.apache.iotdb.jdbc.Config;
import org.slf4j.Logger;
@@ -47,9 +48,12 @@ public class ClusterTestStatement implements Statement {
private int maxRows = Integer.MAX_VALUE;
private int queryTimeout = DEFAULT_QUERY_TIMEOUT;
private int fetchSize = Config.DEFAULT_FETCH_SIZE;
+ private final AbstractEnv env;
public ClusterTestStatement(
- NodeConnection writeConnection, List<NodeConnection> readConnections) {
+ final NodeConnection writeConnection,
+ final List<NodeConnection> readConnections,
+ final AbstractEnv env) {
try {
this.writeStatement =
writeConnection.getUnderlyingConnection().createStatement();
updateConfig(writeStatement, 0);
@@ -71,6 +75,7 @@ public class ClusterTestStatement implements Statement {
if (readStatements.isEmpty()) {
LOGGER.warn("Failed to create any read statement.");
}
+ this.env = env;
}
private void updateConfig(Statement statement, int timeout) throws
SQLException {
@@ -87,7 +92,7 @@ public class ClusterTestStatement implements Statement {
*/
@Override
public ResultSet executeQuery(String sql) throws SQLException {
- return new ClusterTestResultSet(readStatements, readEndpoints, sql,
queryTimeout);
+ return new ClusterTestResultSet(readStatements, readEndpoints, sql,
queryTimeout, env);
}
@Override
@@ -100,7 +105,7 @@ public class ClusterTestStatement implements Statement {
List<String> endpoints = new ArrayList<>();
endpoints.add(writEndpoint);
endpoints.addAll(readEndpoints);
- RequestDelegate<Void> delegate = new ParallelRequestDelegate<>(endpoints,
queryTimeout);
+ RequestDelegate<Void> delegate = new ParallelRequestDelegate<>(endpoints,
queryTimeout, env);
delegate.addRequest(
() -> {
if (writeStatement != null) {
diff --git
a/integration-test/src/main/java/org/apache/iotdb/itbase/runtime/ParallelRequestDelegate.java
b/integration-test/src/main/java/org/apache/iotdb/itbase/runtime/ParallelRequestDelegate.java
index 19b6877d5c8..437a6da6e5b 100644
---
a/integration-test/src/main/java/org/apache/iotdb/itbase/runtime/ParallelRequestDelegate.java
+++
b/integration-test/src/main/java/org/apache/iotdb/itbase/runtime/ParallelRequestDelegate.java
@@ -19,7 +19,7 @@
package org.apache.iotdb.itbase.runtime;
-import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.env.cluster.env.AbstractEnv;
import org.apache.iotdb.itbase.exception.ParallelRequestTimeoutException;
import java.sql.SQLException;
@@ -37,10 +37,13 @@ import java.util.concurrent.TimeoutException;
*/
public class ParallelRequestDelegate<T> extends RequestDelegate<T> {
private final int taskTimeoutSeconds;
+ private final AbstractEnv env;
- public ParallelRequestDelegate(List<String> endpoints, int
taskTimeoutSeconds) {
+ public ParallelRequestDelegate(
+ final List<String> endpoints, final int taskTimeoutSeconds, final
AbstractEnv env) {
super(endpoints);
this.taskTimeoutSeconds = taskTimeoutSeconds;
+ this.env = env;
}
public List<T> requestAll() throws SQLException {
@@ -57,7 +60,7 @@ public class ParallelRequestDelegate<T> extends
RequestDelegate<T> {
} catch (ExecutionException e) {
exceptions[i] = e;
} catch (InterruptedException | TimeoutException e) {
- EnvFactory.getEnv().dumpTestJVMSnapshot();
+ env.dumpTestJVMSnapshot();
for (int j = i; j < getEndpoints().size(); j++) {
resultFutures.get(j).cancel(true);
}