This is an automated email from the ASF dual-hosted git repository.
corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 6d9e08612b [Feature][Core] Support show cluster members information in
seatunnel-cluster scripts (#9502)
6d9e08612b is described below
commit 6d9e08612b73656347e8d7303b81f9451d60282f
Author: xiaochen <[email protected]>
AuthorDate: Thu Jul 31 21:46:24 2025 +0800
[Feature][Core] Support show cluster members information in
seatunnel-cluster scripts (#9502)
---
docs/en/seatunnel-engine/user-command.md | 89 ++++++++++++++++++++-
docs/zh/seatunnel-engine/user-command.md | 91 +++++++++++++++++++++-
.../starter/seatunnel/args/ServerCommandArgs.java | 5 ++
.../seatunnel/command/ServerExecuteCommand.java | 80 +++++++++++++++++++
.../command/ServerExecuteCommandTest.java | 35 +++++++++
5 files changed, 298 insertions(+), 2 deletions(-)
diff --git a/docs/en/seatunnel-engine/user-command.md
b/docs/en/seatunnel-engine/user-command.md
index f8957a9827..6730b793ba 100644
--- a/docs/en/seatunnel-engine/user-command.md
+++ b/docs/en/seatunnel-engine/user-command.md
@@ -2,7 +2,7 @@
sidebar_position: 13
---
-# Command Line Tool
+# Client Command Line Tool
The SeaTunnel Engine provides a command line tool for managing the jobs of the
SeaTunnel Engine. You can use the command line tool to submit, stop, pause,
resume, delete jobs, view job status and monitoring metrics, etc.
@@ -132,3 +132,90 @@ We can configure the JVM options for the SeaTunnel Engine
client in the followin
Modify the JVM parameters in the
`$SEATUNNEL_HOME/config/jvm_client_options` file. Please note that the JVM
parameters in this file will be applied to all jobs submitted using
`seatunnel.sh`, including Local Mode and Cluster Mode.
2. Add JVM options when submitting jobs. For example, `sh bin/seatunnel.sh
--config $SEATUNNEL_HOME/config/v2.batch.config.template -DJvmOption="-Xms2G
-Xmx2G"`
+
+# Server Command Line Tool
+
+SeaTunnel Engine provides server management commands for starting, stopping,
and managing SeaTunnel Engine cluster nodes.
+
+```shell
+sh bin/seatunnel-cluster.sh -h
+```
+
+Server commands support the following parameters:
+
+```shell
+Usage: seatunnel-cluster.sh [options]
+ Options:
+ -cn, --cluster The name of cluster.
+ -d, --daemon The cluster daemon mode.
+ -r, --role The cluster node role, support [master, worker,
master_and_worker] (default: master_and_worker).
+ -m, --member Show cluster members information.
+ -h, --help Show the usage message.
+```
+
+## Start cluster
+
+You can get help information for server commands with the following command:
+
+```shell
+# Start in foreground
+sh bin/seatunnel-cluster.sh
+
+# Start in daemon mode
+sh bin/seatunnel-cluster.sh -d
+```
+
+## Show cluster members information
+
+You can view cluster members information using the following command:
+
+```shell
+sh bin/seatunnel-cluster.sh -m -cn my_cluster
+```
+
+This command will output detailed information about all members in the
cluster, including:
+- **Member ID**: Unique identifier for each cluster member
+- **Address**: IP address and port of the member
+- **Role**: Member role (ACTIVE MASTER, MASTER, or WORKER)
+- **Version**: Hazelcast version running on the member
+
+**Example output:**
+```
+Member ID Address Role
Version
+a1b2c3d4-e5f6-7890-abcd-ef1234567890 192.168.1.100:5701 ACTIVE MASTER
5.3.0
+b2c3d4e5-f6g7-8901-bcde-f23456789012 192.168.1.101:5701 MASTER
5.3.0
+c3d4e5f6-g7h8-9012-cdef-345678901234 192.168.1.102:5701 WORKER
5.3.0
+```
+
+**Note**: You must specify the cluster name with the `-cn` parameter. The
cluster must be running for this command to work.
+
+## Stop cluster
+
+SeaTunnel provides a dedicated stop script to shut down cluster nodes:
+
+```shell
+sh bin/stop-seatunnel-cluster.sh -h
+```
+
+The stop command supports the following parameters:
+
+```shell
+Usage: stop-seatunnel-cluster.sh [options]
+ Options:
+ -cn, --cluster The name of the cluster to shut down (default:
seatunnel_default_cluster)
+ -h, --help Show the usage message
+```
+
+### Stop default cluster
+
+```shell
+# Stop the default cluster (seatunnel_default_cluster)
+sh bin/stop-seatunnel-cluster.sh
+```
+
+### Stop specified cluster
+
+```shell
+# Stop a cluster with specified name
+sh bin/stop-seatunnel-cluster.sh -cn my_cluster
+```
\ No newline at end of file
diff --git a/docs/zh/seatunnel-engine/user-command.md
b/docs/zh/seatunnel-engine/user-command.md
index 1ceea35c85..8891d81361 100644
--- a/docs/zh/seatunnel-engine/user-command.md
+++ b/docs/zh/seatunnel-engine/user-command.md
@@ -2,7 +2,7 @@
sidebar_position: 13
---
-# 命令行工具
+# 客户端命令行工具
SeaTunnel Engine 提供了一个命令行工具,用于管理 SeaTunnel Engine
的作业。您可以使用命令行工具提交、停止、暂停、恢复、删除作业,查看作业状态和监控指标等。
@@ -138,6 +138,7 @@ bin/seatunnel.sh --config
$SEATUNNEL_HOME/config/v2.batch.config.template
被cancel的作业的所有断点信息都将被删除,无法通过seatunnel.sh -r <jobId>恢复。
+
## 配置JVM参数
我们可以通过以下方式为 SeaTunnel Engine 客户端配置 JVM 参数:
@@ -147,3 +148,91 @@ bin/seatunnel.sh --config
$SEATUNNEL_HOME/config/v2.batch.config.template
在 `$SEATUNNEL_HOME/config/jvm_client_options` 文件中修改 JVM 参数。请注意,该文件中的 JVM
参数将应用于使用 `seatunnel.sh` 提交的所有作业,包括 Local 模式和 Cluster 模式。
2. 在提交作业时添加 JVM 参数。例如,`sh bin/seatunnel.sh --config
$SEATUNNEL_HOME/config/v2.batch.config.template -DJvmOption="-Xms2G -Xmx2G"`
+
+
+# 服务端命令行工具
+
+SeaTunnel Engine 提供了服务端管理命令,用于启动、停止和管理 SeaTunnel Engine 集群节点。
+
+```shell
+sh bin/seatunnel-cluster.sh -h
+```
+
+服务器命令支持以下参数:
+
+```shell
+Usage: seatunnel-cluster.sh [options]
+ Options:
+ -cn, --cluster 集群名称
+ -d, --daemon 以守护进程模式运行
+ -r, --role 集群节点角色,支持 master、worker、master_and_worker (默认:
master_and_worker)
+ -m, --member 显示集群成员信息
+ -h, --help 显示帮助信息
+```
+
+## 启动集群
+
+可以通过如下命令获取服务器命令的帮助信息:
+
+```shell
+# 前台启动
+sh bin/seatunnel-cluster.sh
+
+# 后台启动(守护进程模式)
+sh bin/seatunnel-cluster.sh -d
+```
+
+## 查看集群成员信息
+
+您可以使用以下命令查看集群成员信息:
+
+```shell
+sh bin/seatunnel-cluster.sh -m -cn my_cluster
+```
+
+该命令会输出集群中所有成员的详细信息,包括:
+- **Member ID(成员ID)**: 每个集群成员的唯一标识符
+- **Address(地址)**: 成员的IP地址和端口
+- **Role(角色)**: 成员角色(ACTIVE MASTER、MASTER 或 WORKER)
+- **Version(版本)**: 成员运行的 Hazelcast 版本
+
+**输出示例:**
+```
+Member ID Address Role
Version
+a1b2c3d4-e5f6-7890-abcd-ef1234567890 192.168.1.100:5701 ACTIVE MASTER
5.3.0
+b2c3d4e5-f6g7-8901-bcde-f23456789012 192.168.1.101:5701 MASTER
5.3.0
+c3d4e5f6-g7h8-9012-cdef-345678901234 192.168.1.102:5701 WORKER
5.3.0
+```
+
+**注意**: 必须使用 `-cn` 参数指定集群名称。集群必须处于运行状态才能执行此命令。
+
+## 停止集群
+
+SeaTunnel 提供了专门的停止脚本来关闭集群节点:
+
+```shell
+sh bin/stop-seatunnel-cluster.sh -h
+```
+
+停止命令支持以下参数:
+
+```shell
+Usage: stop-seatunnel-cluster.sh [options]
+ Options:
+ -cn, --cluster 要关闭的集群名称 (默认: seatunnel_default_cluster)
+ -h, --help 显示帮助信息
+```
+
+### 停止默认集群
+
+```shell
+# 停止默认集群 (seatunnel_default_cluster)
+sh bin/stop-seatunnel-cluster.sh
+```
+
+### 停止指定集群
+
+```shell
+# 停止指定名称的集群
+sh bin/stop-seatunnel-cluster.sh -cn my_cluster
+```
\ No newline at end of file
diff --git
a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/ServerCommandArgs.java
b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/ServerCommandArgs.java
index 6dcdbddc5a..a8a1492679 100644
---
a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/ServerCommandArgs.java
+++
b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/ServerCommandArgs.java
@@ -44,6 +44,11 @@ public class ServerCommandArgs extends CommandArgs {
"The cluster node role, default is master_and_worker,
support master, worker, master_and_worker")
private String clusterRole;
+ @Parameter(
+ names = {"-m", "--member"},
+ description = "Show cluster members information")
+ private boolean showClusterMembers = false;
+
@Override
public Command<?> buildCommand() {
return new ServerExecuteCommand(this);
diff --git
a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ServerExecuteCommand.java
b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ServerExecuteCommand.java
index 8a9d049a21..824605a2a4 100644
---
a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ServerExecuteCommand.java
+++
b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ServerExecuteCommand.java
@@ -17,6 +17,8 @@
package org.apache.seatunnel.core.starter.seatunnel.command;
+import
org.apache.seatunnel.shade.com.google.common.annotations.VisibleForTesting;
+
import org.apache.seatunnel.core.starter.command.Command;
import org.apache.seatunnel.core.starter.seatunnel.args.ServerCommandArgs;
import org.apache.seatunnel.engine.common.config.ConfigProvider;
@@ -29,8 +31,16 @@ import org.apache.commons.lang3.JavaVersion;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.SystemUtils;
+import com.hazelcast.client.HazelcastClient;
+import com.hazelcast.client.config.ClientConfig;
+import com.hazelcast.client.impl.clientside.HazelcastClientInstanceImpl;
+import com.hazelcast.client.impl.clientside.HazelcastClientProxy;
+import com.hazelcast.cluster.Address;
+import com.hazelcast.cluster.Member;
import lombok.extern.slf4j.Slf4j;
+import java.util.Collection;
+import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -48,6 +58,11 @@ public class ServerExecuteCommand implements
Command<ServerCommandArgs> {
public void execute() {
checkEnvironment();
SeaTunnelConfig seaTunnelConfig =
ConfigProvider.locateAndGetSeaTunnelConfig();
+ if (this.serverCommandArgs.isShowClusterMembers()) {
+ showClusterMembers();
+ return;
+ }
+
String clusterRole = this.serverCommandArgs.getClusterRole();
if (StringUtils.isNotBlank(clusterRole)) {
if
(EngineConfig.ClusterRole.MASTER.toString().equalsIgnoreCase(clusterRole)) {
@@ -97,4 +112,69 @@ public class ServerExecuteCommand implements
Command<ServerCommandArgs> {
return !SystemUtils.isJavaVersionAtLeast(JavaVersion.JAVA_1_8);
}
}
+
+ @VisibleForTesting
+ public Set<Member> showClusterMembers() {
+ HazelcastClientInstanceImpl client = null;
+ try {
+ String clusterName = serverCommandArgs.getClusterName();
+ if (StringUtils.isBlank(clusterName)) {
+ throw new SeaTunnelEngineException(
+ "Cluster name is required. Please specify it using -cn
or --cluster option.");
+ }
+ ClientConfig clientConfig =
ConfigProvider.locateAndGetClientConfig();
+ clientConfig.setClusterName(clusterName);
+ client =
+ ((HazelcastClientProxy)
HazelcastClient.newHazelcastClient(clientConfig))
+ .client;
+ if (!client.getLifecycleService().isRunning()) {
+ throw new SeaTunnelEngineException(
+ String.format(
+ "cluster: %s is not running, Please start the
cluster first.",
+ clusterName));
+ }
+ Set<Member> members = client.getCluster().getMembers();
+ if (members.isEmpty()) {
+ System.out.println("No active members found in the cluster.");
+ return members;
+ }
+
+ Collection<Member> memberList =
client.getClientClusterService().getMemberList();
+
+ Member masterMember =
client.getClientClusterService().getMasterMember();
+ System.out.printf(
+ "%-36s %-20s %-20s %-10s\n", "Member ID", "Address",
"Role", "Version");
+
+ for (Member member : members) {
+ System.out.printf(
+ "%-36s %-20s %-20s %-10s\n",
+ member.getUuid(),
+ member.getAddress(),
+ getRole(masterMember.getAddress(), member),
+ member.getVersion());
+ }
+ return members;
+ } catch (Exception e) {
+ throw new SeaTunnelEngineException("Failed to get cluster members
information", e);
+ } finally {
+ if (client != null) {
+ try {
+ client.shutdown();
+ } catch (Exception e) {
+ log.warn("Failed to shutdown Hazelcast client", e);
+ }
+ }
+ }
+ }
+
+ private String getRole(Address masterAddress, Member member) {
+
+ if (member.isLiteMember()) {
+ return EngineConfig.ClusterRole.WORKER.toString();
+ }
+ if (masterAddress.toString().equals(member.getAddress().toString())) {
+ return "ACTIVE MASTER";
+ }
+ return EngineConfig.ClusterRole.MASTER.toString();
+ }
}
diff --git
a/seatunnel-core/seatunnel-starter/src/test/java/org/apache/seatunnel/core/starter/seatunnel/command/ServerExecuteCommandTest.java
b/seatunnel-core/seatunnel-starter/src/test/java/org/apache/seatunnel/core/starter/seatunnel/command/ServerExecuteCommandTest.java
index 0962ed5ada..16f76aef74 100644
---
a/seatunnel-core/seatunnel-starter/src/test/java/org/apache/seatunnel/core/starter/seatunnel/command/ServerExecuteCommandTest.java
+++
b/seatunnel-core/seatunnel-starter/src/test/java/org/apache/seatunnel/core/starter/seatunnel/command/ServerExecuteCommandTest.java
@@ -17,11 +17,20 @@
package org.apache.seatunnel.core.starter.seatunnel.command;
+import org.apache.seatunnel.core.starter.seatunnel.args.ServerCommandArgs;
+import org.apache.seatunnel.engine.common.config.ConfigProvider;
+import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
+import org.apache.seatunnel.engine.server.SeaTunnelServerStarter;
+
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.DisabledOnJre;
import org.junit.jupiter.api.condition.JRE;
+import com.hazelcast.cluster.Member;
+
+import java.util.Set;
+
public class ServerExecuteCommandTest {
@Test
@@ -34,4 +43,30 @@ public class ServerExecuteCommandTest {
Assertions.assertTrue(ServerExecuteCommand.isAllocatingThreadGetName());
System.setProperty("java.version", realVersion);
}
+
+ @Test
+ public void testMemberList() {
+ String clusterName = getClusterName("ServerExecuteCommandTest");
+ SeaTunnelConfig seaTunnelConfig =
ConfigProvider.locateAndGetSeaTunnelConfig();
+ seaTunnelConfig.getHazelcastConfig().setClusterName(clusterName);
+
seaTunnelConfig.getEngineConfig().getHttpConfig().setEnableDynamicPort(true);
+
+ SeaTunnelServerStarter.createMasterHazelcastInstance(seaTunnelConfig);
+ SeaTunnelServerStarter.createMasterHazelcastInstance(seaTunnelConfig);
+ SeaTunnelServerStarter.createWorkerHazelcastInstance(seaTunnelConfig);
+ SeaTunnelServerStarter.createWorkerHazelcastInstance(seaTunnelConfig);
+ SeaTunnelServerStarter.createWorkerHazelcastInstance(seaTunnelConfig);
+
+ ServerCommandArgs serverCommandArgs = new ServerCommandArgs();
+ serverCommandArgs.setClusterName(clusterName);
+ serverCommandArgs.setShowClusterMembers(true);
+
+ ServerExecuteCommand serverExecuteCommand = new
ServerExecuteCommand(serverCommandArgs);
+ Set<Member> members = serverExecuteCommand.showClusterMembers();
+ Assertions.assertEquals(5, members.size());
+ }
+
+ public static String getClusterName(String testClassName) {
+ return System.getProperty("user.name") + "_" + testClassName;
+ }
}