This is an automated email from the ASF dual-hosted git repository.
JackieTien97 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new d11770d7443 feat: add integration test support for StreamNode (#17952)
d11770d7443 is described below
commit d11770d74435b14d2d2f3ec820eb438a42705fce
Author: suchenglong <[email protected]>
AuthorDate: Thu Jun 18 10:55:23 2026 +0800
feat: add integration test support for StreamNode (#17952)
---
.../org/apache/iotdb/it/env/cluster/env/AIEnv.java | 28 +++++-
.../iotdb/it/env/cluster/env/AbstractEnv.java | 99 +++++++++-------------
.../iotdb/it/env/cluster/node/AINodeStarter.java | 84 ++++++++++++++++++
.../itbase/runtime/ParallelRequestDelegate.java | 13 ++-
4 files changed, 160 insertions(+), 64 deletions(-)
diff --git
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AIEnv.java
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AIEnv.java
index f812e9db3d1..a15a6243b3c 100644
---
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AIEnv.java
+++
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AIEnv.java
@@ -19,7 +19,14 @@
package org.apache.iotdb.it.env.cluster.env;
+import org.apache.iotdb.it.env.cluster.node.AINodeStarter;
+import org.apache.iotdb.it.env.cluster.node.ConfigNodeWrapper;
+import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
+
+import java.util.List;
+
public class AIEnv extends AbstractEnv {
+
@Override
public void initClusterEnvironment() {
initClusterEnvironment(1, 1);
@@ -27,12 +34,29 @@ public class AIEnv extends AbstractEnv {
@Override
public void initClusterEnvironment(int configNodesNum, int dataNodesNum) {
- super.initEnvironment(configNodesNum, dataNodesNum, 600, true);
+ super.initEnvironment(configNodesNum, dataNodesNum, 600);
}
@Override
public void initClusterEnvironment(
int configNodesNum, int dataNodesNum, int testWorkingRetryCount) {
- super.initEnvironment(configNodesNum, dataNodesNum, testWorkingRetryCount,
true);
+ super.initEnvironment(configNodesNum, dataNodesNum, testWorkingRetryCount);
+ }
+
+ @Override
+ protected void initExtraNodes(
+ final List<ConfigNodeWrapper> configNodeWrappers,
+ final List<DataNodeWrapper> dataNodeWrappers,
+ final String testClassName) {
+ AINodeStarter.startAINode(
+ configNodeWrappers.get(0).getIpAndPortString(),
+ dataNodeWrappers.get(0).getPort(),
+ testClassName,
+ testMethodName,
+ index,
+ startTime,
+ extraNodeKillPoints,
+ this::registerExtraNode,
+ this::dumpTestJVMSnapshot);
}
}
diff --git
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java
index 0e7eefb3a41..77ccc25d936 100644
---
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java
+++
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java
@@ -47,7 +47,6 @@ import org.apache.iotdb.it.env.cluster.config.MppCommonConfig;
import org.apache.iotdb.it.env.cluster.config.MppConfigNodeConfig;
import org.apache.iotdb.it.env.cluster.config.MppDataNodeConfig;
import org.apache.iotdb.it.env.cluster.config.MppJVMConfig;
-import org.apache.iotdb.it.env.cluster.node.AINodeWrapper;
import org.apache.iotdb.it.env.cluster.node.AbstractNodeWrapper;
import org.apache.iotdb.it.env.cluster.node.ConfigNodeWrapper;
import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
@@ -101,7 +100,7 @@ public abstract class AbstractEnv implements BaseEnv {
private final Random rand = new Random();
protected List<ConfigNodeWrapper> configNodeWrapperList =
Collections.emptyList();
protected List<DataNodeWrapper> dataNodeWrapperList =
Collections.emptyList();
- protected List<AINodeWrapper> aiNodeWrapperList = Collections.emptyList();
+ protected List<AbstractNodeWrapper> extraNodeWrappers =
Collections.emptyList();
protected String testMethodName = null;
protected int index = 0;
protected long startTime;
@@ -109,6 +108,7 @@ public abstract class AbstractEnv implements BaseEnv {
private IClientManager<TEndPoint, SyncConfigNodeIServiceClient>
clientManager;
private List<String> configNodeKillPoints = new ArrayList<>();
private List<String> dataNodeKillPoints = new ArrayList<>();
+ protected List<String> extraNodeKillPoints = new ArrayList<>();
/**
* This config object stores the properties set by developers during the
test. It will be cleared
@@ -169,17 +169,10 @@ public abstract class AbstractEnv implements BaseEnv {
protected void initEnvironment(
final int configNodesNum, final int dataNodesNum, final int
testWorkingRetryCount) {
- initEnvironment(configNodesNum, dataNodesNum, testWorkingRetryCount,
false);
- }
-
- protected void initEnvironment(
- final int configNodesNum,
- final int dataNodesNum,
- final int retryCount,
- final boolean addAINode) {
- this.retryCount = retryCount;
+ this.retryCount = testWorkingRetryCount;
this.configNodeWrapperList = new ArrayList<>();
this.dataNodeWrapperList = new ArrayList<>();
+ this.extraNodeWrappers = new ArrayList<>();
clientManager =
new IClientManager.Factory<TEndPoint, SyncConfigNodeIServiceClient>()
@@ -258,14 +251,34 @@ public abstract class AbstractEnv implements BaseEnv {
throw new AssertionError();
}
- if (addAINode) {
- this.aiNodeWrapperList = new ArrayList<>();
- startAINode(seedConfigNode, this.dataNodeWrapperList.get(0).getPort(),
testClassName);
- }
+ initExtraNodes(configNodeWrapperList, dataNodeWrapperList, testClassName);
checkClusterStatusWithoutUnknown();
}
+ /**
+ * Hook method for subclasses to initialize and start extra node types
beyond the core ConfigNode
+ * and DataNode (e.g., AINode, StreamNode, ProxyNode).
+ *
+ * <p>Subclasses should create node wrappers, add them to {@link
#extraNodeWrappers}, configure
+ * kill points via {@link #extraNodeKillPoints}, and start the nodes.
Subclasses have direct
+ * access to protected fields: {@code testMethodName}, {@code index}, {@code
startTime}.
+ *
+ * @param configNodeWrappers list of all ConfigNode wrappers in the cluster
(unmodifiable)
+ * @param dataNodeWrappers list of all DataNode wrappers in the cluster
(unmodifiable)
+ * @param testClassName the test class name for logging and identification
purposes
+ */
+ protected void initExtraNodes(
+ final List<ConfigNodeWrapper> configNodeWrappers,
+ final List<DataNodeWrapper> dataNodeWrappers,
+ final String testClassName) {
+ // Default: no extra nodes. Subclasses override to add nodes.
+ }
+
+ protected void registerExtraNode(final AbstractNodeWrapper nodeWrapper) {
+ extraNodeWrappers.add(nodeWrapper);
+ }
+
private ConfigNodeWrapper newConfigNode() {
final ConfigNodeWrapper configNodeWrapper =
new ConfigNodeWrapper(
@@ -309,39 +322,6 @@ public abstract class AbstractEnv implements BaseEnv {
return dataNodeWrapper;
}
- private void startAINode(
- final String seedConfigNode, final int clusterIngressPort, final String
testClassName) {
- final String aiNodeEndPoint;
- final AINodeWrapper aiNodeWrapper =
- new AINodeWrapper(
- seedConfigNode,
- clusterIngressPort,
- testClassName,
- testMethodName,
- index,
- EnvUtils.searchAvailablePorts(),
- startTime);
- aiNodeWrapperList.add(aiNodeWrapper);
- aiNodeEndPoint = aiNodeWrapper.getIpAndPortString();
- aiNodeWrapper.createNodeDir();
- aiNodeWrapper.createLogDir();
- final RequestDelegate<Void> aiNodesDelegate =
- new ParallelRequestDelegate<>(
- Collections.singletonList(aiNodeEndPoint), NODE_START_TIMEOUT,
this);
-
- aiNodesDelegate.addRequest(
- () -> {
- aiNodeWrapper.start();
- return null;
- });
-
- try {
- aiNodesDelegate.requestAll();
- } catch (final SQLException e) {
- logger.error("Start aiNodes failed", e);
- }
- }
-
public String getTestClassName() {
final StackTraceElement[] stack = Thread.currentThread().getStackTrace();
for (final StackTraceElement stackTraceElement : stack) {
@@ -433,7 +413,7 @@ public abstract class AbstractEnv implements BaseEnv {
if (showClusterResp.getNodeStatus().size()
!= configNodeWrapperList.size()
+ dataNodeWrapperList.size()
- + aiNodeWrapperList.size()) {
+ + extraNodeWrappers.size()) {
passed = false;
nodeSizePassed = false;
actualNodeSize = showClusterResp.getNodeStatusSize();
@@ -465,7 +445,7 @@ public abstract class AbstractEnv implements BaseEnv {
processStatusMap.put(nodeWrapper, 0);
}
}
- for (AINodeWrapper nodeWrapper : aiNodeWrapperList) {
+ for (AbstractNodeWrapper nodeWrapper : extraNodeWrappers) {
boolean alive = nodeWrapper.getInstance().isAlive();
if (!alive) {
processStatusMap.put(nodeWrapper,
nodeWrapper.getInstance().waitFor());
@@ -568,14 +548,14 @@ public abstract class AbstractEnv implements BaseEnv {
configNodeWrapper.start();
}
}
- for (AINodeWrapper aiNodeWrapper : aiNodeWrapperList) {
- if (portOccupationMap.containsValue(aiNodeWrapper.getPid())) {
+ for (AbstractNodeWrapper extraNodeWrapper : extraNodeWrappers) {
+ if (portOccupationMap.containsValue(extraNodeWrapper.getPid())) {
logger.info(
- "A port is occupied by another AINode {}-{}, restart it",
- aiNodeWrapper.getIpAndPortString(),
- aiNodeWrapper.getPid());
- aiNodeWrapper.stop();
- aiNodeWrapper.start();
+ "A port is occupied by another node {}-{}, restart it",
+ extraNodeWrapper.getIpAndPortString(),
+ extraNodeWrapper.getPid());
+ extraNodeWrapper.stop();
+ extraNodeWrapper.start();
}
}
} catch (IOException e) {
@@ -592,8 +572,8 @@ public abstract class AbstractEnv implements BaseEnv {
public void cleanClusterEnvironment() {
final List<AbstractNodeWrapper> allNodeWrappers =
Stream.concat(
- dataNodeWrapperList.stream(),
- Stream.concat(configNodeWrapperList.stream(),
aiNodeWrapperList.stream()))
+ Stream.concat(configNodeWrapperList.stream(),
dataNodeWrapperList.stream()),
+ extraNodeWrappers.stream())
.collect(Collectors.toList());
allNodeWrappers.stream()
.findAny()
@@ -1045,6 +1025,7 @@ public abstract class AbstractEnv implements BaseEnv {
public List<AbstractNodeWrapper> getNodeWrapperList() {
final List<AbstractNodeWrapper> result = new
ArrayList<>(configNodeWrapperList);
result.addAll(dataNodeWrapperList);
+ result.addAll(extraNodeWrappers);
return result;
}
diff --git
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AINodeStarter.java
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AINodeStarter.java
new file mode 100644
index 00000000000..cde3aaa9235
--- /dev/null
+++
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AINodeStarter.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.it.env.cluster.node;
+
+import org.apache.iotdb.it.env.cluster.EnvUtils;
+import org.apache.iotdb.it.framework.IoTDBTestLogger;
+import org.apache.iotdb.itbase.runtime.ParallelRequestDelegate;
+import org.apache.iotdb.itbase.runtime.RequestDelegate;
+
+import org.slf4j.Logger;
+
+import java.sql.SQLException;
+import java.util.Collections;
+import java.util.List;
+import java.util.function.Consumer;
+
+import static
org.apache.iotdb.it.env.cluster.ClusterConstant.NODE_START_TIMEOUT;
+
+public class AINodeStarter {
+ private static final Logger logger = IoTDBTestLogger.logger;
+
+ private AINodeStarter() {}
+
+ public static AINodeWrapper startAINode(
+ final String seedConfigNode,
+ final int clusterIngressPort,
+ final String testClassName,
+ final String testMethodName,
+ final int clusterIndex,
+ final long startTime,
+ final List<String> killPoints,
+ final Consumer<AINodeWrapper> nodeRegister,
+ final Runnable dumpTestJVMSnapshot) {
+ final AINodeWrapper aiNodeWrapper =
+ new AINodeWrapper(
+ seedConfigNode,
+ clusterIngressPort,
+ testClassName,
+ testMethodName,
+ clusterIndex,
+ EnvUtils.searchAvailablePorts(),
+ startTime);
+ nodeRegister.accept(aiNodeWrapper);
+ aiNodeWrapper.setKillPoints(killPoints);
+ aiNodeWrapper.createNodeDir();
+ aiNodeWrapper.createLogDir();
+
+ final RequestDelegate<Void> aiNodesDelegate =
+ new ParallelRequestDelegate<>(
+ Collections.singletonList(aiNodeWrapper.getIpAndPortString()),
+ NODE_START_TIMEOUT,
+ dumpTestJVMSnapshot);
+ aiNodesDelegate.addRequest(
+ () -> {
+ aiNodeWrapper.start();
+ return null;
+ });
+
+ try {
+ aiNodesDelegate.requestAll();
+ } catch (final SQLException e) {
+ logger.error("Start AINode {} failed", aiNodeWrapper.getId(), e);
+ throw new AssertionError();
+ }
+ return aiNodeWrapper;
+ }
+}
diff --git
a/integration-test/src/main/java/org/apache/iotdb/itbase/runtime/ParallelRequestDelegate.java
b/integration-test/src/main/java/org/apache/iotdb/itbase/runtime/ParallelRequestDelegate.java
index 437a6da6e5b..0615fe950fd 100644
---
a/integration-test/src/main/java/org/apache/iotdb/itbase/runtime/ParallelRequestDelegate.java
+++
b/integration-test/src/main/java/org/apache/iotdb/itbase/runtime/ParallelRequestDelegate.java
@@ -37,13 +37,20 @@ import java.util.concurrent.TimeoutException;
*/
public class ParallelRequestDelegate<T> extends RequestDelegate<T> {
private final int taskTimeoutSeconds;
- private final AbstractEnv env;
+ private final Runnable dumpTestJVMSnapshot;
public ParallelRequestDelegate(
final List<String> endpoints, final int taskTimeoutSeconds, final
AbstractEnv env) {
+ this(endpoints, taskTimeoutSeconds, env != null ? env::dumpTestJVMSnapshot
: () -> {});
+ }
+
+ public ParallelRequestDelegate(
+ final List<String> endpoints,
+ final int taskTimeoutSeconds,
+ final Runnable dumpTestJVMSnapshot) {
super(endpoints);
this.taskTimeoutSeconds = taskTimeoutSeconds;
- this.env = env;
+ this.dumpTestJVMSnapshot = dumpTestJVMSnapshot;
}
public List<T> requestAll() throws SQLException {
@@ -60,7 +67,7 @@ public class ParallelRequestDelegate<T> extends
RequestDelegate<T> {
} catch (ExecutionException e) {
exceptions[i] = e;
} catch (InterruptedException | TimeoutException e) {
- env.dumpTestJVMSnapshot();
+ dumpTestJVMSnapshot.run();
for (int j = i; j < getEndpoints().size(); j++) {
resultFutures.get(j).cancel(true);
}