This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 50113e77cf [Fix][Zeta] Fix worker node metrics acquisition (#7862)
50113e77cf is described below

commit 50113e77cf6729c833bca6a5e20ef397b3992782
Author: corgy-w <[email protected]>
AuthorDate: Sun Oct 27 21:48:49 2024 +0800

    [Fix][Zeta] Fix worker node metrics acquisition (#7862)
---
 ...sterWorkerClusterSeaTunnelWithTelemetryIT.java} | 510 ++++++++++++++-------
 .../master-worker-cluster/hazelcast-master.yaml    |  49 ++
 .../master-worker-cluster/hazelcast-worker.yaml    |  49 ++
 .../master-worker-cluster/jvm_master_options       |  27 ++
 .../master-worker-cluster/jvm_worker_options       |  27 ++
 .../resources/master-worker-cluster/seatunnel.yaml |  39 ++
 .../resources/stream_fakesource_to_console.conf    |  82 ++++
 .../engine/server/SeaTunnelServerStarter.java      |  24 +-
 .../exports/JobThreadPoolStatusExports.java        | 159 +++----
 9 files changed, 700 insertions(+), 266 deletions(-)

diff --git 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/TelemetryApiIT.java
 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/telemetry/MasterWorkerClusterSeaTunnelWithTelemetryIT.java
similarity index 53%
rename from 
seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/TelemetryApiIT.java
rename to 
seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/telemetry/MasterWorkerClusterSeaTunnelWithTelemetryIT.java
index 3f85fbbe35..5acfad5592 100644
--- 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/TelemetryApiIT.java
+++ 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/telemetry/MasterWorkerClusterSeaTunnelWithTelemetryIT.java
@@ -15,194 +15,241 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.engine.e2e;
-
-import org.apache.seatunnel.common.config.Common;
-import org.apache.seatunnel.common.config.DeployMode;
-import org.apache.seatunnel.engine.client.SeaTunnelClient;
-import org.apache.seatunnel.engine.client.job.ClientJobExecutionEnvironment;
-import org.apache.seatunnel.engine.client.job.ClientJobProxy;
-import org.apache.seatunnel.engine.common.config.ConfigProvider;
-import org.apache.seatunnel.engine.common.config.JobConfig;
-import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
-import org.apache.seatunnel.engine.common.config.server.TelemetryConfig;
-import org.apache.seatunnel.engine.common.config.server.TelemetryMetricConfig;
-import org.apache.seatunnel.engine.core.job.JobStatus;
-import org.apache.seatunnel.engine.server.SeaTunnelServerStarter;
+package org.apache.seatunnel.engine.e2e.telemetry;
+
+import org.apache.seatunnel.e2e.common.container.seatunnel.SeaTunnelContainer;
+import org.apache.seatunnel.e2e.common.util.ContainerUtil;
+import org.apache.seatunnel.engine.server.rest.RestConstant;
 
 import org.awaitility.Awaitility;
-import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.Network;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.utility.DockerLoggerFactory;
+import org.testcontainers.utility.MountableFile;
 
-import com.hazelcast.client.config.ClientConfig;
-import com.hazelcast.instance.impl.HazelcastInstanceImpl;
-import lombok.extern.slf4j.Slf4j;
+import io.restassured.response.Response;
+import io.restassured.response.ValidatableResponse;
 
+import java.io.IOException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Arrays;
 import java.util.concurrent.TimeUnit;
 
 import static io.restassured.RestAssured.given;
+import static 
org.apache.seatunnel.e2e.common.util.ContainerUtil.PROJECT_ROOT_PATH;
+import static 
org.apache.seatunnel.engine.server.rest.RestConstant.CONTEXT_PATH;
 import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.matchesRegex;
 
-@Slf4j
-public class TelemetryApiIT {
+public class MasterWorkerClusterSeaTunnelWithTelemetryIT extends 
SeaTunnelContainer {
+
+    private GenericContainer<?> secondServer;
+
+    private final Network NETWORK = Network.newNetwork();
 
-    private static final String HOST = "http://localhost:";;
+    private static final String jobName = "test测试";
+    private static final String paramJobName = "param_test测试";
 
-    private static ClientJobProxy clientJobProxy;
+    private static final String http = "http://";;
 
-    private static HazelcastInstanceImpl hazelcastInstance;
+    private static final String colon = ":";
 
-    private static String testClusterName;
+    private static final String confFile = "/fakesource_to_console.conf";
 
-    @BeforeAll
-    static void beforeClass() throws Exception {
-        testClusterName = TestUtils.getClusterName("TelemetryApiIT");
-        SeaTunnelConfig seaTunnelConfig = 
ConfigProvider.locateAndGetSeaTunnelConfig();
-        seaTunnelConfig.getHazelcastConfig().setClusterName(testClusterName);
-        TelemetryMetricConfig telemetryMetricConfig = new 
TelemetryMetricConfig();
-        telemetryMetricConfig.setEnabled(true);
-        TelemetryConfig telemetryConfig = new TelemetryConfig();
-        telemetryConfig.setMetric(telemetryMetricConfig);
-        seaTunnelConfig.getEngineConfig().setTelemetryConfig(telemetryConfig);
-        hazelcastInstance = 
SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
-        Common.setDeployMode(DeployMode.CLIENT);
-        String filePath = 
TestUtils.getResource("stream_fakesource_to_file.conf");
-        JobConfig jobConfig = new JobConfig();
-        jobConfig.setName("fake_to_file");
+    private static final Path binPath = Paths.get(SEATUNNEL_HOME, "bin", 
SERVER_SHELL);
+    private static final Path config = Paths.get(SEATUNNEL_HOME, "config");
+    private static final Path hadoopJar =
+            Paths.get(SEATUNNEL_HOME, "lib/seatunnel-hadoop3-3.1.4-uber.jar");
 
-        ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
-        clientConfig.setClusterName(testClusterName);
-        SeaTunnelClient engineClient = new SeaTunnelClient(clientConfig);
-        ClientJobExecutionEnvironment jobExecutionEnv =
-                engineClient.createExecutionContext(filePath, jobConfig, 
seaTunnelConfig);
+    @Test
+    public void testSubmitJobs() throws InterruptedException {
+        testGetMetrics(server, "seatunnel", true);
+        testGetMetrics(secondServer, "seatunnel", false);
+    }
 
-        clientJobProxy = jobExecutionEnv.execute();
+    @Override
+    @BeforeEach
+    public void startUp() throws Exception {
+
+        server = createServer("server", "master");
+        secondServer = createServer("secondServer", "worker");
+
+        // check cluster
+        Awaitility.await()
+                .atMost(2, TimeUnit.MINUTES)
+                .untilAsserted(
+                        () -> {
+                            Response response =
+                                    given().get(
+                                                    http
+                                                            + server.getHost()
+                                                            + colon
+                                                            + 
server.getFirstMappedPort()
+                                                            + 
"/hazelcast/rest/cluster");
+                            response.then().statusCode(200);
+                            Assertions.assertEquals(
+                                    2, 
response.jsonPath().getList("members").size());
+                        });
+        String JobId =
+                submitJob(
+                                server,
+                                server.getMappedPort(5801),
+                                RestConstant.CONTEXT_PATH,
+                                "STREAMING",
+                                jobName,
+                                paramJobName)
+                        .getBody()
+                        .jsonPath()
+                        .getString("jobId");
 
         Awaitility.await()
                 .atMost(2, TimeUnit.MINUTES)
                 .untilAsserted(
-                        () ->
-                                Assertions.assertEquals(
-                                        JobStatus.RUNNING, 
clientJobProxy.getJobStatus()));
+                        () -> {
+                            Assertions.assertNotNull(JobId);
+                            given().get(
+                                            http
+                                                    + server.getHost()
+                                                    + colon
+                                                    + 
server.getFirstMappedPort()
+                                                    + CONTEXT_PATH
+                                                    + RestConstant.JOB_INFO_URL
+                                                    + "/"
+                                                    + JobId)
+                                    .then()
+                                    .statusCode(200)
+                                    .body("jobStatus", equalTo("RUNNING"));
+                        });
     }
 
-    @Test
-    public void testGetMetrics() throws InterruptedException {
-        given().get(
-                        HOST
-                                + hazelcastInstance
-                                        .getCluster()
-                                        .getLocalMember()
-                                        .getAddress()
-                                        .getPort()
-                                + "/hazelcast/rest/instance/metrics")
-                .then()
-                .statusCode(200)
-                // Use regular expressions to verify whether the response body 
is the indicator data
-                // of Prometheus
-                // Metric data is usually multi-line, use newlines for 
validation
-                .body(matchesRegex("(?s)^.*# HELP.*# TYPE.*$"))
-                // Verify that the response body contains a specific metric
-                // JVM metrics
-                .body(containsString("jvm_threads"))
-                .body(containsString("jvm_memory_pool"))
-                .body(containsString("jvm_gc"))
-                .body(containsString("jvm_info"))
-                .body(containsString("jvm_memory_bytes"))
-                .body(containsString("jvm_classes"))
-                .body(containsString("jvm_buffer_pool"))
-                .body(containsString("process_start"))
-                // cluster_info
-                .body(containsString("cluster_info{cluster=\"" + 
testClusterName))
-                // cluster_time
-                .body(containsString("cluster_time{cluster=\"" + 
testClusterName))
-                // Job thread pool metrics
-                .body(
-                        matchesRegex(
-                                
"(?s)^.*job_thread_pool_activeCount\\{cluster=\""
-                                        + testClusterName
-                                        + "\",address=.*$"))
-                .body(
-                        matchesRegex(
-                                
"(?s)^.*job_thread_pool_completedTask_total\\{cluster=\""
-                                        + testClusterName
-                                        + "\",address=.*$"))
-                .body(
-                        matchesRegex(
-                                
"(?s)^.*job_thread_pool_corePoolSize\\{cluster=\""
-                                        + testClusterName
-                                        + "\",address=.*$"))
-                .body(
-                        matchesRegex(
-                                
"(?s)^.*job_thread_pool_maximumPoolSize\\{cluster=\""
-                                        + testClusterName
-                                        + "\",address=.*$"))
-                .body(
-                        matchesRegex(
-                                "(?s)^.*job_thread_pool_poolSize\\{cluster=\""
-                                        + testClusterName
-                                        + "\",address=.*$"))
-                .body(
-                        matchesRegex(
-                                
"(?s)^.*job_thread_pool_task_total\\{cluster=\""
-                                        + testClusterName
-                                        + "\",address=.*$"))
-                .body(
-                        matchesRegex(
-                                
"(?s)^.*job_thread_pool_queueTaskCount\\{cluster=\""
-                                        + testClusterName
-                                        + "\",address=.*$"))
-                .body(
-                        matchesRegex(
-                                
"(?s)^.*job_thread_pool_rejection_total\\{cluster=\""
-                                        + testClusterName
-                                        + "\",address=.*$"))
-                // Job count metrics
-                .body(
-                        containsString(
-                                "job_count{cluster=\""
-                                        + testClusterName
-                                        + "\",type=\"canceled\",} 0.0"))
-                .body(
-                        containsString(
-                                "job_count{cluster=\""
-                                        + testClusterName
-                                        + "\",type=\"cancelling\",} 0.0"))
-                .body(
-                        containsString(
-                                "job_count{cluster=\""
-                                        + testClusterName
-                                        + "\",type=\"created\",} 0.0"))
-                .body(
-                        containsString(
-                                "job_count{cluster=\""
-                                        + testClusterName
-                                        + "\",type=\"failed\",} 0.0"))
-                .body(
-                        containsString(
-                                "job_count{cluster=\""
-                                        + testClusterName
-                                        + "\",type=\"failing\",} 0.0"))
-                .body(
-                        containsString(
-                                "job_count{cluster=\""
-                                        + testClusterName
-                                        + "\",type=\"finished\",} 0.0"))
-                // Running job count is 1
-                .body(
-                        containsString(
-                                "job_count{cluster=\""
-                                        + testClusterName
-                                        + "\",type=\"running\",} 1.0"))
-                .body(
-                        containsString(
-                                "job_count{cluster=\""
-                                        + testClusterName
-                                        + "\",type=\"scheduled\",} 0.0"))
-                // Node
+    public void testGetMetrics(GenericContainer<?> server, String 
testClusterName, boolean isMaster)
+            throws InterruptedException {
+        Response response =
+                given().get(
+                                http
+                                        + server.getHost()
+                                        + colon
+                                        + server.getFirstMappedPort()
+                                        + "/hazelcast/rest/instance/metrics");
+        ValidatableResponse validatableResponse =
+                response.then()
+                        .statusCode(200)
+                        // Use regular expressions to verify whether the 
response body is the
+                        // indicator data
+                        // of Prometheus
+                        // Metric data is usually multi-line, use newlines for 
validation
+                        .body(matchesRegex("(?s)^.*# HELP.*# TYPE.*$"))
+                        // Verify that the response body contains a specific 
metric
+                        // JVM metrics
+                        .body(containsString("jvm_threads"))
+                        .body(containsString("jvm_memory_pool"))
+                        .body(containsString("jvm_gc"))
+                        .body(containsString("jvm_info"))
+                        .body(containsString("jvm_memory_bytes"))
+                        .body(containsString("jvm_classes"))
+                        .body(containsString("jvm_buffer_pool"))
+                        .body(containsString("process_start"))
+                        // cluster_info
+                        .body(containsString("cluster_info{cluster=\"" + 
testClusterName))
+                        // cluster_time
+                        .body(containsString("cluster_time{cluster=\"" + 
testClusterName));
+
+        if (isMaster) {
+            validatableResponse
+                    // Job thread pool metrics
+                    .body(
+                            matchesRegex(
+                                    
"(?s)^.*job_thread_pool_activeCount\\{cluster=\""
+                                            + testClusterName
+                                            + "\",address=.*$"))
+                    .body(
+                            matchesRegex(
+                                    
"(?s)^.*job_thread_pool_completedTask_total\\{cluster=\""
+                                            + testClusterName
+                                            + "\",address=.*$"))
+                    .body(
+                            matchesRegex(
+                                    
"(?s)^.*job_thread_pool_corePoolSize\\{cluster=\""
+                                            + testClusterName
+                                            + "\",address=.*$"))
+                    .body(
+                            matchesRegex(
+                                    
"(?s)^.*job_thread_pool_maximumPoolSize\\{cluster=\""
+                                            + testClusterName
+                                            + "\",address=.*$"))
+                    .body(
+                            matchesRegex(
+                                    
"(?s)^.*job_thread_pool_poolSize\\{cluster=\""
+                                            + testClusterName
+                                            + "\",address=.*$"))
+                    .body(
+                            matchesRegex(
+                                    
"(?s)^.*job_thread_pool_task_total\\{cluster=\""
+                                            + testClusterName
+                                            + "\",address=.*$"))
+                    .body(
+                            matchesRegex(
+                                    
"(?s)^.*job_thread_pool_queueTaskCount\\{cluster=\""
+                                            + testClusterName
+                                            + "\",address=.*$"))
+                    .body(
+                            matchesRegex(
+                                    
"(?s)^.*job_thread_pool_rejection_total\\{cluster=\""
+                                            + testClusterName
+                                            + "\",address=.*$"))
+                    // Job count metrics
+                    .body(
+                            containsString(
+                                    "job_count{cluster=\""
+                                            + testClusterName
+                                            + "\",type=\"canceled\",} 0.0"))
+                    .body(
+                            containsString(
+                                    "job_count{cluster=\""
+                                            + testClusterName
+                                            + "\",type=\"cancelling\",} 0.0"))
+                    .body(
+                            containsString(
+                                    "job_count{cluster=\""
+                                            + testClusterName
+                                            + "\",type=\"created\",} 0.0"))
+                    .body(
+                            containsString(
+                                    "job_count{cluster=\""
+                                            + testClusterName
+                                            + "\",type=\"failed\",} 0.0"))
+                    .body(
+                            containsString(
+                                    "job_count{cluster=\""
+                                            + testClusterName
+                                            + "\",type=\"failing\",} 0.0"))
+                    .body(
+                            containsString(
+                                    "job_count{cluster=\""
+                                            + testClusterName
+                                            + "\",type=\"finished\",} 0.0"))
+                    // Running job count is 1
+                    .body(
+                            containsString(
+                                    "job_count{cluster=\""
+                                            + testClusterName
+                                            + "\",type=\"running\",} 1.0"))
+                    .body(
+                            containsString(
+                                    "job_count{cluster=\""
+                                            + testClusterName
+                                            + "\",type=\"scheduled\",} 0.0"));
+        }
+        // Node
+        validatableResponse
                 .body(
                         matchesRegex(
                                 "(?s)^.*node_state\\{cluster=\""
@@ -528,10 +575,139 @@ public class TelemetryApiIT {
                                         + "\",address=.*$"));
     }
 
-    @AfterAll
-    static void afterClass() {
-        if (hazelcastInstance != null) {
-            hazelcastInstance.shutdown();
+    @Override
+    @AfterEach
+    public void tearDown() throws Exception {
+        super.tearDown();
+        if (secondServer != null) {
+            secondServer.close();
         }
     }
+
+    private Response submitJob(
+            GenericContainer<?> container,
+            int port,
+            String contextPath,
+            String jobMode,
+            String jobName,
+            String paramJobName) {
+        return submitJob(jobMode, container, port, contextPath, false, 
jobName, paramJobName);
+    }
+
+    private Response submitJob(
+            String jobMode,
+            GenericContainer<?> container,
+            int port,
+            String contextPath,
+            boolean isStartWithSavePoint,
+            String jobName,
+            String paramJobName) {
+        String requestBody =
+                "{\n"
+                        + "    \"env\": {\n"
+                        + "        \"job.name\": \""
+                        + jobName
+                        + "\",\n"
+                        + "        \"job.mode\": \""
+                        + jobMode
+                        + "\"\n"
+                        + "    },\n"
+                        + "    \"source\": [\n"
+                        + "        {\n"
+                        + "            \"plugin_name\": \"FakeSource\",\n"
+                        + "            \"result_table_name\": \"fake\",\n"
+                        + "            \"row.num\": 100,\n"
+                        + "            \"schema\": {\n"
+                        + "                \"fields\": {\n"
+                        + "                    \"name\": \"string\",\n"
+                        + "                    \"age\": \"int\",\n"
+                        + "                    \"card\": \"int\"\n"
+                        + "                }\n"
+                        + "            }\n"
+                        + "        }\n"
+                        + "    ],\n"
+                        + "    \"transform\": [\n"
+                        + "    ],\n"
+                        + "    \"sink\": [\n"
+                        + "        {\n"
+                        + "            \"plugin_name\": \"Console\",\n"
+                        + "            \"source_table_name\": [\"fake\"]\n"
+                        + "        }\n"
+                        + "    ]\n"
+                        + "}";
+        String parameters = null;
+        if (paramJobName != null) {
+            parameters = "jobName=" + paramJobName;
+        }
+        if (isStartWithSavePoint) {
+            parameters = parameters + "&isStartWithSavePoint=true";
+        }
+        Response response =
+                given().body(requestBody)
+                        .header("Content-Type", "application/json; 
charset=utf-8")
+                        .post(
+                                parameters == null
+                                        ? http
+                                                + container.getHost()
+                                                + colon
+                                                + port
+                                                + contextPath
+                                                + RestConstant.SUBMIT_JOB_URL
+                                        : http
+                                                + container.getHost()
+                                                + colon
+                                                + port
+                                                + contextPath
+                                                + RestConstant.SUBMIT_JOB_URL
+                                                + "?"
+                                                + parameters);
+        return response;
+    }
+
+    private GenericContainer<?> createServer(String networkAlias, String role)
+            throws IOException, InterruptedException {
+
+        GenericContainer<?> server =
+                new GenericContainer<>(getDockerImage())
+                        .withNetwork(NETWORK)
+                        .withEnv("TZ", "UTC")
+                        .withCommand(
+                                
ContainerUtil.adaptPathForWin(binPath.toString()) + " -r " + role)
+                        .withNetworkAliases(networkAlias)
+                        .withExposedPorts()
+                        .withLogConsumer(
+                                new Slf4jLogConsumer(
+                                        DockerLoggerFactory.getLogger(
+                                                "seatunnel-engine:" + 
JDK_DOCKER_IMAGE)))
+                        .waitingFor(Wait.forListeningPort());
+        copySeaTunnelStarterToContainer(server);
+        server.setExposedPorts(Arrays.asList(5801));
+        server.withCopyFileToContainer(
+                MountableFile.forHostPath(
+                        PROJECT_ROOT_PATH
+                                + 
"/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/"),
+                config.toString());
+        server.withCopyFileToContainer(
+                MountableFile.forHostPath(
+                        PROJECT_ROOT_PATH
+                                + 
"/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/master-worker-cluster/"),
+                config.toString());
+        server.withCopyFileToContainer(
+                MountableFile.forHostPath(
+                        PROJECT_ROOT_PATH
+                                + 
"/seatunnel-shade/seatunnel-hadoop3-3.1.4-uber/target/seatunnel-hadoop3-3.1.4-uber.jar"),
+                hadoopJar.toString());
+        server.start();
+        // execute extra commands
+        executeExtraCommands(server);
+        ContainerUtil.copyConnectorJarToContainer(
+                server,
+                confFile,
+                getConnectorModulePath(),
+                getConnectorNamePrefix(),
+                getConnectorType(),
+                SEATUNNEL_HOME);
+
+        return server;
+    }
 }
diff --git 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/master-worker-cluster/hazelcast-master.yaml
 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/master-worker-cluster/hazelcast-master.yaml
new file mode 100644
index 0000000000..0aeade2fc1
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/master-worker-cluster/hazelcast-master.yaml
@@ -0,0 +1,49 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+hazelcast:
+  cluster-name: seatunnel
+  network:
+    rest-api:
+      enabled: true
+      endpoint-groups:
+        CLUSTER_WRITE:
+          enabled: true
+        DATA:
+          enabled: true
+    join:
+      tcp-ip:
+        enabled: true
+        member-list:
+          - secondServer
+          - server
+    port:
+      auto-increment: true
+      port-count: 100
+      port: 5801
+  properties:
+    hazelcast.invocation.max.retry.count: 20
+    hazelcast.tcp.join.port.try.count: 30
+    hazelcast.logging.type: log4j2
+    hazelcast.operation.generic.thread.count: 50
+    hazelcast.heartbeat.failuredetector.type: phi-accrual
+    hazelcast.heartbeat.interval.seconds: 2
+    hazelcast.max.no.heartbeat.seconds: 180
+    hazelcast.heartbeat.phiaccrual.failuredetector.threshold: 10
+    hazelcast.heartbeat.phiaccrual.failuredetector.sample.size: 200
+    hazelcast.heartbeat.phiaccrual.failuredetector.min.std.dev.millis: 100
+
diff --git 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/master-worker-cluster/hazelcast-worker.yaml
 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/master-worker-cluster/hazelcast-worker.yaml
new file mode 100644
index 0000000000..0aeade2fc1
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/master-worker-cluster/hazelcast-worker.yaml
@@ -0,0 +1,49 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+hazelcast:
+  cluster-name: seatunnel
+  network:
+    rest-api:
+      enabled: true
+      endpoint-groups:
+        CLUSTER_WRITE:
+          enabled: true
+        DATA:
+          enabled: true
+    join:
+      tcp-ip:
+        enabled: true
+        member-list:
+          - secondServer
+          - server
+    port:
+      auto-increment: true
+      port-count: 100
+      port: 5801
+  properties:
+    hazelcast.invocation.max.retry.count: 20
+    hazelcast.tcp.join.port.try.count: 30
+    hazelcast.logging.type: log4j2
+    hazelcast.operation.generic.thread.count: 50
+    hazelcast.heartbeat.failuredetector.type: phi-accrual
+    hazelcast.heartbeat.interval.seconds: 2
+    hazelcast.max.no.heartbeat.seconds: 180
+    hazelcast.heartbeat.phiaccrual.failuredetector.threshold: 10
+    hazelcast.heartbeat.phiaccrual.failuredetector.sample.size: 200
+    hazelcast.heartbeat.phiaccrual.failuredetector.min.std.dev.millis: 100
+
diff --git 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/master-worker-cluster/jvm_master_options
 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/master-worker-cluster/jvm_master_options
new file mode 100644
index 0000000000..f7d00c6eaf
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/master-worker-cluster/jvm_master_options
@@ -0,0 +1,27 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# JVM Heap
+-Xms2g
+-Xmx2g
+
+# JVM Dump
+-XX:+HeapDumpOnOutOfMemoryError
+-XX:HeapDumpPath=/tmp/seatunnel/dump/zeta-server
+
+# Only used for test!!! We should make sure soft reference be collected ASAP
+-XX:SoftRefLRUPolicyMSPerMB=1
diff --git 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/master-worker-cluster/jvm_worker_options
 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/master-worker-cluster/jvm_worker_options
new file mode 100644
index 0000000000..f7d00c6eaf
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/master-worker-cluster/jvm_worker_options
@@ -0,0 +1,27 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# JVM Heap
+-Xms2g
+-Xmx2g
+
+# JVM Dump
+-XX:+HeapDumpOnOutOfMemoryError
+-XX:HeapDumpPath=/tmp/seatunnel/dump/zeta-server
+
+# Only used for test!!! We should make sure soft reference be collected ASAP
+-XX:SoftRefLRUPolicyMSPerMB=1
diff --git 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/master-worker-cluster/seatunnel.yaml
 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/master-worker-cluster/seatunnel.yaml
new file mode 100644
index 0000000000..24aab7762c
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/master-worker-cluster/seatunnel.yaml
@@ -0,0 +1,39 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+seatunnel:
+  engine:
+    history-job-expire-minutes: 1440
+    backup-count: 2
+    queue-type: blockingqueue
+    print-execution-info-interval: 10
+    slot-service:
+      dynamic-slot: true
+    checkpoint:
+      interval: 300000
+      timeout: 100000
+      storage:
+        type: localfile
+        max-retained: 3
+        plugin-config:
+          namespace: /tmp/seatunnel/checkpoint_snapshot/
+    http:
+        enable-http: true
+        port: 8080
+    telemetry:
+      metric:
+        enabled: true
diff --git 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/stream_fakesource_to_console.conf
 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/stream_fakesource_to_console.conf
new file mode 100644
index 0000000000..6a7bcd4654
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/stream_fakesource_to_console.conf
@@ -0,0 +1,82 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in 
seatunnel config
+######
+
+env {
+  job.mode = "STREAMING"
+  checkpoint.interval = 5000
+}
+
+source {
+  # This is a example source plugin **only for test and demonstrate the 
feature source plugin**
+  FakeSource {
+    row.num = 10
+    map.size = 10
+    array.size = 10
+    bytes.length = 10
+    string.length = 10
+    parallelism = 1
+    result_table_name = "fake"
+    schema = {
+      fields {
+        c_map = "map<string, array<int>>"
+        c_array = "array<int>"
+        c_string = string
+        c_boolean = boolean
+        c_tinyint = tinyint
+        c_smallint = smallint
+        c_int = int
+        c_bigint = bigint
+        c_float = float
+        c_double = double
+        c_decimal = "decimal(30, 8)"
+        c_null = "null"
+        c_bytes = bytes
+        c_date = date
+        c_timestamp = timestamp
+        c_row = {
+          c_map = "map<string, map<string, string>>"
+          c_array = "array<int>"
+          c_string = string
+          c_boolean = boolean
+          c_tinyint = tinyint
+          c_smallint = smallint
+          c_int = int
+          c_bigint = bigint
+          c_float = float
+          c_double = double
+          c_decimal = "decimal(30, 8)"
+          c_null = "null"
+          c_bytes = bytes
+          c_date = date
+          c_timestamp = timestamp
+        }
+      }
+    }
+  }
+}
+
+transform {
+}
+
+sink {
+  console {
+
+  }
+}
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServerStarter.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServerStarter.java
index 36f500a279..8e69abd528 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServerStarter.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServerStarter.java
@@ -79,25 +79,13 @@ public class SeaTunnelServerStarter {
         seaTunnelConfig
                 .getEngineConfig()
                 .setClusterRole(EngineConfig.ClusterRole.MASTER_AND_WORKER);
-        return ((HazelcastInstanceProxy)
-                        HazelcastInstanceFactory.newHazelcastInstance(
-                                seaTunnelConfig.getHazelcastConfig(),
-                                HazelcastInstanceFactory.createInstanceName(
-                                        seaTunnelConfig.getHazelcastConfig()),
-                                new SeaTunnelNodeContext(seaTunnelConfig)))
-                .getOriginal();
+        return initializeHazelcastInstance(seaTunnelConfig, null);
     }
 
     public static HazelcastInstanceImpl createMasterHazelcastInstance(
             @NonNull SeaTunnelConfig seaTunnelConfig) {
         
seaTunnelConfig.getEngineConfig().setClusterRole(EngineConfig.ClusterRole.MASTER);
-        return ((HazelcastInstanceProxy)
-                        HazelcastInstanceFactory.newHazelcastInstance(
-                                seaTunnelConfig.getHazelcastConfig(),
-                                HazelcastInstanceFactory.createInstanceName(
-                                        seaTunnelConfig.getHazelcastConfig()),
-                                new SeaTunnelNodeContext(seaTunnelConfig)))
-                .getOriginal();
+        return initializeHazelcastInstance(seaTunnelConfig, null);
     }
 
     public static HazelcastInstanceImpl createWorkerHazelcastInstance(
@@ -105,13 +93,7 @@ public class SeaTunnelServerStarter {
         
seaTunnelConfig.getEngineConfig().setClusterRole(EngineConfig.ClusterRole.WORKER);
         // in hazelcast lite node will not store IMap data.
         seaTunnelConfig.getHazelcastConfig().setLiteMember(true);
-        return ((HazelcastInstanceProxy)
-                        HazelcastInstanceFactory.newHazelcastInstance(
-                                seaTunnelConfig.getHazelcastConfig(),
-                                HazelcastInstanceFactory.createInstanceName(
-                                        seaTunnelConfig.getHazelcastConfig()),
-                                new SeaTunnelNodeContext(seaTunnelConfig)))
-                .getOriginal();
+        return initializeHazelcastInstance(seaTunnelConfig, null);
     }
 
     public static HazelcastInstanceImpl createHazelcastInstance() {
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/telemetry/metrics/exports/JobThreadPoolStatusExports.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/telemetry/metrics/exports/JobThreadPoolStatusExports.java
index e50bffd547..c75a3324c8 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/telemetry/metrics/exports/JobThreadPoolStatusExports.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/telemetry/metrics/exports/JobThreadPoolStatusExports.java
@@ -39,84 +39,87 @@ public class JobThreadPoolStatusExports extends 
AbstractCollector {
     @Override
     public List<MetricFamilySamples> collect() {
         List<MetricFamilySamples> mfs = new ArrayList();
-
-        ThreadPoolStatus threadPoolStatusMetrics = 
getServer().getThreadPoolStatusMetrics();
-        List<String> labelNames = clusterLabelNames(ADDRESS, "type");
-
-        GaugeMetricFamily activeCount =
-                new GaugeMetricFamily(
-                        "job_thread_pool_activeCount",
-                        String.format(HELP, "activeCount"),
-                        labelNames);
-        activeCount.addMetric(
-                labelValues(localAddress(), "activeCount"),
-                threadPoolStatusMetrics.getActiveCount());
-        mfs.add(activeCount);
-
-        CounterMetricFamily completedTask =
-                new CounterMetricFamily(
-                        "job_thread_pool_completedTask",
-                        String.format(HELP, "completedTask"),
-                        labelNames);
-        completedTask.addMetric(
-                labelValues(localAddress(), "completedTask"),
-                threadPoolStatusMetrics.getCompletedTaskCount());
-        mfs.add(completedTask);
-
-        GaugeMetricFamily corePoolSize =
-                new GaugeMetricFamily(
-                        "job_thread_pool_corePoolSize",
-                        String.format(HELP, "corePoolSize"),
-                        labelNames);
-        corePoolSize.addMetric(
-                labelValues(localAddress(), "corePoolSize"),
-                threadPoolStatusMetrics.getCorePoolSize());
-        mfs.add(corePoolSize);
-
-        GaugeMetricFamily maximumPoolSize =
-                new GaugeMetricFamily(
-                        "job_thread_pool_maximumPoolSize",
-                        String.format(HELP, "maximumPoolSize"),
-                        labelNames);
-        maximumPoolSize.addMetric(
-                labelValues(localAddress(), "maximumPoolSize"),
-                threadPoolStatusMetrics.getMaximumPoolSize());
-        mfs.add(maximumPoolSize);
-
-        GaugeMetricFamily poolSize =
-                new GaugeMetricFamily(
-                        "job_thread_pool_poolSize", String.format(HELP, 
"poolSize"), labelNames);
-        poolSize.addMetric(
-                labelValues(localAddress(), "poolSize"), 
threadPoolStatusMetrics.getPoolSize());
-        mfs.add(poolSize);
-
-        CounterMetricFamily taskCount =
-                new CounterMetricFamily(
-                        "job_thread_pool_task", String.format(HELP, 
"taskCount"), labelNames);
-        taskCount.addMetric(
-                labelValues(localAddress(), "taskCount"), 
threadPoolStatusMetrics.getTaskCount());
-        mfs.add(taskCount);
-
-        GaugeMetricFamily queueTaskCount =
-                new GaugeMetricFamily(
-                        "job_thread_pool_queueTaskCount",
-                        String.format(HELP, "queueTaskCount"),
-                        labelNames);
-        queueTaskCount.addMetric(
-                labelValues(localAddress(), "queueTaskCount"),
-                threadPoolStatusMetrics.getQueueTaskCount());
-        mfs.add(queueTaskCount);
-
-        CounterMetricFamily rejectedTaskCount =
-                new CounterMetricFamily(
-                        "job_thread_pool_rejection",
-                        String.format(HELP, "rejectionCount"),
-                        labelNames);
-        rejectedTaskCount.addMetric(
-                labelValues(localAddress(), "rejectionCount"),
-                threadPoolStatusMetrics.getRejectionCount());
-        mfs.add(rejectedTaskCount);
-
+        if (isMaster()) {
+            ThreadPoolStatus threadPoolStatusMetrics = 
getServer().getThreadPoolStatusMetrics();
+            List<String> labelNames = clusterLabelNames(ADDRESS, "type");
+
+            GaugeMetricFamily activeCount =
+                    new GaugeMetricFamily(
+                            "job_thread_pool_activeCount",
+                            String.format(HELP, "activeCount"),
+                            labelNames);
+            activeCount.addMetric(
+                    labelValues(localAddress(), "activeCount"),
+                    threadPoolStatusMetrics.getActiveCount());
+            mfs.add(activeCount);
+
+            CounterMetricFamily completedTask =
+                    new CounterMetricFamily(
+                            "job_thread_pool_completedTask",
+                            String.format(HELP, "completedTask"),
+                            labelNames);
+            completedTask.addMetric(
+                    labelValues(localAddress(), "completedTask"),
+                    threadPoolStatusMetrics.getCompletedTaskCount());
+            mfs.add(completedTask);
+
+            GaugeMetricFamily corePoolSize =
+                    new GaugeMetricFamily(
+                            "job_thread_pool_corePoolSize",
+                            String.format(HELP, "corePoolSize"),
+                            labelNames);
+            corePoolSize.addMetric(
+                    labelValues(localAddress(), "corePoolSize"),
+                    threadPoolStatusMetrics.getCorePoolSize());
+            mfs.add(corePoolSize);
+
+            GaugeMetricFamily maximumPoolSize =
+                    new GaugeMetricFamily(
+                            "job_thread_pool_maximumPoolSize",
+                            String.format(HELP, "maximumPoolSize"),
+                            labelNames);
+            maximumPoolSize.addMetric(
+                    labelValues(localAddress(), "maximumPoolSize"),
+                    threadPoolStatusMetrics.getMaximumPoolSize());
+            mfs.add(maximumPoolSize);
+
+            GaugeMetricFamily poolSize =
+                    new GaugeMetricFamily(
+                            "job_thread_pool_poolSize",
+                            String.format(HELP, "poolSize"),
+                            labelNames);
+            poolSize.addMetric(
+                    labelValues(localAddress(), "poolSize"), 
threadPoolStatusMetrics.getPoolSize());
+            mfs.add(poolSize);
+
+            CounterMetricFamily taskCount =
+                    new CounterMetricFamily(
+                            "job_thread_pool_task", String.format(HELP, 
"taskCount"), labelNames);
+            taskCount.addMetric(
+                    labelValues(localAddress(), "taskCount"),
+                    threadPoolStatusMetrics.getTaskCount());
+            mfs.add(taskCount);
+
+            GaugeMetricFamily queueTaskCount =
+                    new GaugeMetricFamily(
+                            "job_thread_pool_queueTaskCount",
+                            String.format(HELP, "queueTaskCount"),
+                            labelNames);
+            queueTaskCount.addMetric(
+                    labelValues(localAddress(), "queueTaskCount"),
+                    threadPoolStatusMetrics.getQueueTaskCount());
+            mfs.add(queueTaskCount);
+
+            CounterMetricFamily rejectedTaskCount =
+                    new CounterMetricFamily(
+                            "job_thread_pool_rejection",
+                            String.format(HELP, "rejectionCount"),
+                            labelNames);
+            rejectedTaskCount.addMetric(
+                    labelValues(localAddress(), "rejectionCount"),
+                    threadPoolStatusMetrics.getRejectionCount());
+            mfs.add(rejectedTaskCount);
+        }
         return mfs;
     }
 }

Reply via email to