This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 50113e77cf [Fix][Zeta] Fix worker node metrics acquisition (#7862)
50113e77cf is described below
commit 50113e77cf6729c833bca6a5e20ef397b3992782
Author: corgy-w <[email protected]>
AuthorDate: Sun Oct 27 21:48:49 2024 +0800
[Fix][Zeta] Fix worker node metrics acquisition (#7862)
---
...sterWorkerClusterSeaTunnelWithTelemetryIT.java} | 510 ++++++++++++++-------
.../master-worker-cluster/hazelcast-master.yaml | 49 ++
.../master-worker-cluster/hazelcast-worker.yaml | 49 ++
.../master-worker-cluster/jvm_master_options | 27 ++
.../master-worker-cluster/jvm_worker_options | 27 ++
.../resources/master-worker-cluster/seatunnel.yaml | 39 ++
.../resources/stream_fakesource_to_console.conf | 82 ++++
.../engine/server/SeaTunnelServerStarter.java | 24 +-
.../exports/JobThreadPoolStatusExports.java | 159 +++----
9 files changed, 700 insertions(+), 266 deletions(-)
diff --git
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/TelemetryApiIT.java
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/telemetry/MasterWorkerClusterSeaTunnelWithTelemetryIT.java
similarity index 53%
rename from
seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/TelemetryApiIT.java
rename to
seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/telemetry/MasterWorkerClusterSeaTunnelWithTelemetryIT.java
index 3f85fbbe35..5acfad5592 100644
---
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/TelemetryApiIT.java
+++
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/telemetry/MasterWorkerClusterSeaTunnelWithTelemetryIT.java
@@ -15,194 +15,241 @@
* limitations under the License.
*/
-package org.apache.seatunnel.engine.e2e;
-
-import org.apache.seatunnel.common.config.Common;
-import org.apache.seatunnel.common.config.DeployMode;
-import org.apache.seatunnel.engine.client.SeaTunnelClient;
-import org.apache.seatunnel.engine.client.job.ClientJobExecutionEnvironment;
-import org.apache.seatunnel.engine.client.job.ClientJobProxy;
-import org.apache.seatunnel.engine.common.config.ConfigProvider;
-import org.apache.seatunnel.engine.common.config.JobConfig;
-import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
-import org.apache.seatunnel.engine.common.config.server.TelemetryConfig;
-import org.apache.seatunnel.engine.common.config.server.TelemetryMetricConfig;
-import org.apache.seatunnel.engine.core.job.JobStatus;
-import org.apache.seatunnel.engine.server.SeaTunnelServerStarter;
+package org.apache.seatunnel.engine.e2e.telemetry;
+
+import org.apache.seatunnel.e2e.common.container.seatunnel.SeaTunnelContainer;
+import org.apache.seatunnel.e2e.common.util.ContainerUtil;
+import org.apache.seatunnel.engine.server.rest.RestConstant;
import org.awaitility.Awaitility;
-import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.Network;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.utility.DockerLoggerFactory;
+import org.testcontainers.utility.MountableFile;
-import com.hazelcast.client.config.ClientConfig;
-import com.hazelcast.instance.impl.HazelcastInstanceImpl;
-import lombok.extern.slf4j.Slf4j;
+import io.restassured.response.Response;
+import io.restassured.response.ValidatableResponse;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import static io.restassured.RestAssured.given;
+import static
org.apache.seatunnel.e2e.common.util.ContainerUtil.PROJECT_ROOT_PATH;
+import static
org.apache.seatunnel.engine.server.rest.RestConstant.CONTEXT_PATH;
import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.matchesRegex;
-@Slf4j
-public class TelemetryApiIT {
+public class MasterWorkerClusterSeaTunnelWithTelemetryIT extends
SeaTunnelContainer {
+
+ private GenericContainer<?> secondServer;
+
+ private final Network NETWORK = Network.newNetwork();
- private static final String HOST = "http://localhost:";
+ private static final String jobName = "test测试";
+ private static final String paramJobName = "param_test测试";
- private static ClientJobProxy clientJobProxy;
+ private static final String http = "http://";
- private static HazelcastInstanceImpl hazelcastInstance;
+ private static final String colon = ":";
- private static String testClusterName;
+ private static final String confFile = "/fakesource_to_console.conf";
- @BeforeAll
- static void beforeClass() throws Exception {
- testClusterName = TestUtils.getClusterName("TelemetryApiIT");
- SeaTunnelConfig seaTunnelConfig =
ConfigProvider.locateAndGetSeaTunnelConfig();
- seaTunnelConfig.getHazelcastConfig().setClusterName(testClusterName);
- TelemetryMetricConfig telemetryMetricConfig = new
TelemetryMetricConfig();
- telemetryMetricConfig.setEnabled(true);
- TelemetryConfig telemetryConfig = new TelemetryConfig();
- telemetryConfig.setMetric(telemetryMetricConfig);
- seaTunnelConfig.getEngineConfig().setTelemetryConfig(telemetryConfig);
- hazelcastInstance =
SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
- Common.setDeployMode(DeployMode.CLIENT);
- String filePath =
TestUtils.getResource("stream_fakesource_to_file.conf");
- JobConfig jobConfig = new JobConfig();
- jobConfig.setName("fake_to_file");
+ private static final Path binPath = Paths.get(SEATUNNEL_HOME, "bin",
SERVER_SHELL);
+ private static final Path config = Paths.get(SEATUNNEL_HOME, "config");
+ private static final Path hadoopJar =
+ Paths.get(SEATUNNEL_HOME, "lib/seatunnel-hadoop3-3.1.4-uber.jar");
- ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
- clientConfig.setClusterName(testClusterName);
- SeaTunnelClient engineClient = new SeaTunnelClient(clientConfig);
- ClientJobExecutionEnvironment jobExecutionEnv =
- engineClient.createExecutionContext(filePath, jobConfig,
seaTunnelConfig);
+ @Test
+ public void testSubmitJobs() throws InterruptedException {
+ testGetMetrics(server, "seatunnel", true);
+ testGetMetrics(secondServer, "seatunnel", false);
+ }
- clientJobProxy = jobExecutionEnv.execute();
+ @Override
+ @BeforeEach
+ public void startUp() throws Exception {
+
+ server = createServer("server", "master");
+ secondServer = createServer("secondServer", "worker");
+
+ // check cluster
+ Awaitility.await()
+ .atMost(2, TimeUnit.MINUTES)
+ .untilAsserted(
+ () -> {
+ Response response =
+ given().get(
+ http
+ + server.getHost()
+ + colon
+ +
server.getFirstMappedPort()
+ +
"/hazelcast/rest/cluster");
+ response.then().statusCode(200);
+ Assertions.assertEquals(
+ 2,
response.jsonPath().getList("members").size());
+ });
+ String JobId =
+ submitJob(
+ server,
+ server.getMappedPort(5801),
+ RestConstant.CONTEXT_PATH,
+ "STREAMING",
+ jobName,
+ paramJobName)
+ .getBody()
+ .jsonPath()
+ .getString("jobId");
Awaitility.await()
.atMost(2, TimeUnit.MINUTES)
.untilAsserted(
- () ->
- Assertions.assertEquals(
- JobStatus.RUNNING,
clientJobProxy.getJobStatus()));
+ () -> {
+ Assertions.assertNotNull(JobId);
+ given().get(
+ http
+ + server.getHost()
+ + colon
+ +
server.getFirstMappedPort()
+ + CONTEXT_PATH
+ + RestConstant.JOB_INFO_URL
+ + "/"
+ + JobId)
+ .then()
+ .statusCode(200)
+ .body("jobStatus", equalTo("RUNNING"));
+ });
}
- @Test
- public void testGetMetrics() throws InterruptedException {
- given().get(
- HOST
- + hazelcastInstance
- .getCluster()
- .getLocalMember()
- .getAddress()
- .getPort()
- + "/hazelcast/rest/instance/metrics")
- .then()
- .statusCode(200)
- // Use regular expressions to verify whether the response body
is the indicator data
- // of Prometheus
- // Metric data is usually multi-line, use newlines for
validation
- .body(matchesRegex("(?s)^.*# HELP.*# TYPE.*$"))
- // Verify that the response body contains a specific metric
- // JVM metrics
- .body(containsString("jvm_threads"))
- .body(containsString("jvm_memory_pool"))
- .body(containsString("jvm_gc"))
- .body(containsString("jvm_info"))
- .body(containsString("jvm_memory_bytes"))
- .body(containsString("jvm_classes"))
- .body(containsString("jvm_buffer_pool"))
- .body(containsString("process_start"))
- // cluster_info
- .body(containsString("cluster_info{cluster=\"" +
testClusterName))
- // cluster_time
- .body(containsString("cluster_time{cluster=\"" +
testClusterName))
- // Job thread pool metrics
- .body(
- matchesRegex(
-
"(?s)^.*job_thread_pool_activeCount\\{cluster=\""
- + testClusterName
- + "\",address=.*$"))
- .body(
- matchesRegex(
-
"(?s)^.*job_thread_pool_completedTask_total\\{cluster=\""
- + testClusterName
- + "\",address=.*$"))
- .body(
- matchesRegex(
-
"(?s)^.*job_thread_pool_corePoolSize\\{cluster=\""
- + testClusterName
- + "\",address=.*$"))
- .body(
- matchesRegex(
-
"(?s)^.*job_thread_pool_maximumPoolSize\\{cluster=\""
- + testClusterName
- + "\",address=.*$"))
- .body(
- matchesRegex(
- "(?s)^.*job_thread_pool_poolSize\\{cluster=\""
- + testClusterName
- + "\",address=.*$"))
- .body(
- matchesRegex(
-
"(?s)^.*job_thread_pool_task_total\\{cluster=\""
- + testClusterName
- + "\",address=.*$"))
- .body(
- matchesRegex(
-
"(?s)^.*job_thread_pool_queueTaskCount\\{cluster=\""
- + testClusterName
- + "\",address=.*$"))
- .body(
- matchesRegex(
-
"(?s)^.*job_thread_pool_rejection_total\\{cluster=\""
- + testClusterName
- + "\",address=.*$"))
- // Job count metrics
- .body(
- containsString(
- "job_count{cluster=\""
- + testClusterName
- + "\",type=\"canceled\",} 0.0"))
- .body(
- containsString(
- "job_count{cluster=\""
- + testClusterName
- + "\",type=\"cancelling\",} 0.0"))
- .body(
- containsString(
- "job_count{cluster=\""
- + testClusterName
- + "\",type=\"created\",} 0.0"))
- .body(
- containsString(
- "job_count{cluster=\""
- + testClusterName
- + "\",type=\"failed\",} 0.0"))
- .body(
- containsString(
- "job_count{cluster=\""
- + testClusterName
- + "\",type=\"failing\",} 0.0"))
- .body(
- containsString(
- "job_count{cluster=\""
- + testClusterName
- + "\",type=\"finished\",} 0.0"))
- // Running job count is 1
- .body(
- containsString(
- "job_count{cluster=\""
- + testClusterName
- + "\",type=\"running\",} 1.0"))
- .body(
- containsString(
- "job_count{cluster=\""
- + testClusterName
- + "\",type=\"scheduled\",} 0.0"))
- // Node
+ public void testGetMetrics(GenericContainer<?> server, String
testClusterName, boolean isMaster)
+ throws InterruptedException {
+ Response response =
+ given().get(
+ http
+ + server.getHost()
+ + colon
+ + server.getFirstMappedPort()
+ + "/hazelcast/rest/instance/metrics");
+ ValidatableResponse validatableResponse =
+ response.then()
+ .statusCode(200)
+ // Use regular expressions to verify whether the
response body is the
+ // indicator data
+ // of Prometheus
+ // Metric data is usually multi-line, use newlines for
validation
+ .body(matchesRegex("(?s)^.*# HELP.*# TYPE.*$"))
+ // Verify that the response body contains a specific
metric
+ // JVM metrics
+ .body(containsString("jvm_threads"))
+ .body(containsString("jvm_memory_pool"))
+ .body(containsString("jvm_gc"))
+ .body(containsString("jvm_info"))
+ .body(containsString("jvm_memory_bytes"))
+ .body(containsString("jvm_classes"))
+ .body(containsString("jvm_buffer_pool"))
+ .body(containsString("process_start"))
+ // cluster_info
+ .body(containsString("cluster_info{cluster=\"" +
testClusterName))
+ // cluster_time
+ .body(containsString("cluster_time{cluster=\"" +
testClusterName));
+
+ if (isMaster) {
+ validatableResponse
+ // Job thread pool metrics
+ .body(
+ matchesRegex(
+
"(?s)^.*job_thread_pool_activeCount\\{cluster=\""
+ + testClusterName
+ + "\",address=.*$"))
+ .body(
+ matchesRegex(
+
"(?s)^.*job_thread_pool_completedTask_total\\{cluster=\""
+ + testClusterName
+ + "\",address=.*$"))
+ .body(
+ matchesRegex(
+
"(?s)^.*job_thread_pool_corePoolSize\\{cluster=\""
+ + testClusterName
+ + "\",address=.*$"))
+ .body(
+ matchesRegex(
+
"(?s)^.*job_thread_pool_maximumPoolSize\\{cluster=\""
+ + testClusterName
+ + "\",address=.*$"))
+ .body(
+ matchesRegex(
+
"(?s)^.*job_thread_pool_poolSize\\{cluster=\""
+ + testClusterName
+ + "\",address=.*$"))
+ .body(
+ matchesRegex(
+
"(?s)^.*job_thread_pool_task_total\\{cluster=\""
+ + testClusterName
+ + "\",address=.*$"))
+ .body(
+ matchesRegex(
+
"(?s)^.*job_thread_pool_queueTaskCount\\{cluster=\""
+ + testClusterName
+ + "\",address=.*$"))
+ .body(
+ matchesRegex(
+
"(?s)^.*job_thread_pool_rejection_total\\{cluster=\""
+ + testClusterName
+ + "\",address=.*$"))
+ // Job count metrics
+ .body(
+ containsString(
+ "job_count{cluster=\""
+ + testClusterName
+ + "\",type=\"canceled\",} 0.0"))
+ .body(
+ containsString(
+ "job_count{cluster=\""
+ + testClusterName
+ + "\",type=\"cancelling\",} 0.0"))
+ .body(
+ containsString(
+ "job_count{cluster=\""
+ + testClusterName
+ + "\",type=\"created\",} 0.0"))
+ .body(
+ containsString(
+ "job_count{cluster=\""
+ + testClusterName
+ + "\",type=\"failed\",} 0.0"))
+ .body(
+ containsString(
+ "job_count{cluster=\""
+ + testClusterName
+ + "\",type=\"failing\",} 0.0"))
+ .body(
+ containsString(
+ "job_count{cluster=\""
+ + testClusterName
+ + "\",type=\"finished\",} 0.0"))
+ // Running job count is 1
+ .body(
+ containsString(
+ "job_count{cluster=\""
+ + testClusterName
+ + "\",type=\"running\",} 1.0"))
+ .body(
+ containsString(
+ "job_count{cluster=\""
+ + testClusterName
+ + "\",type=\"scheduled\",} 0.0"));
+ }
+ // Node
+ validatableResponse
.body(
matchesRegex(
"(?s)^.*node_state\\{cluster=\""
@@ -528,10 +575,139 @@ public class TelemetryApiIT {
+ "\",address=.*$"));
}
- @AfterAll
- static void afterClass() {
- if (hazelcastInstance != null) {
- hazelcastInstance.shutdown();
+ @Override
+ @AfterEach
+ public void tearDown() throws Exception {
+ super.tearDown();
+ if (secondServer != null) {
+ secondServer.close();
}
}
+
+ private Response submitJob(
+ GenericContainer<?> container,
+ int port,
+ String contextPath,
+ String jobMode,
+ String jobName,
+ String paramJobName) {
+ return submitJob(jobMode, container, port, contextPath, false,
jobName, paramJobName);
+ }
+
+ private Response submitJob(
+ String jobMode,
+ GenericContainer<?> container,
+ int port,
+ String contextPath,
+ boolean isStartWithSavePoint,
+ String jobName,
+ String paramJobName) {
+ String requestBody =
+ "{\n"
+ + " \"env\": {\n"
+ + " \"job.name\": \""
+ + jobName
+ + "\",\n"
+ + " \"job.mode\": \""
+ + jobMode
+ + "\"\n"
+ + " },\n"
+ + " \"source\": [\n"
+ + " {\n"
+ + " \"plugin_name\": \"FakeSource\",\n"
+ + " \"result_table_name\": \"fake\",\n"
+ + " \"row.num\": 100,\n"
+ + " \"schema\": {\n"
+ + " \"fields\": {\n"
+ + " \"name\": \"string\",\n"
+ + " \"age\": \"int\",\n"
+ + " \"card\": \"int\"\n"
+ + " }\n"
+ + " }\n"
+ + " }\n"
+ + " ],\n"
+ + " \"transform\": [\n"
+ + " ],\n"
+ + " \"sink\": [\n"
+ + " {\n"
+ + " \"plugin_name\": \"Console\",\n"
+ + " \"source_table_name\": [\"fake\"]\n"
+ + " }\n"
+ + " ]\n"
+ + "}";
+ String parameters = null;
+ if (paramJobName != null) {
+ parameters = "jobName=" + paramJobName;
+ }
+ if (isStartWithSavePoint) {
+ parameters = parameters + "&isStartWithSavePoint=true";
+ }
+ Response response =
+ given().body(requestBody)
+ .header("Content-Type", "application/json;
charset=utf-8")
+ .post(
+ parameters == null
+ ? http
+ + container.getHost()
+ + colon
+ + port
+ + contextPath
+ + RestConstant.SUBMIT_JOB_URL
+ : http
+ + container.getHost()
+ + colon
+ + port
+ + contextPath
+ + RestConstant.SUBMIT_JOB_URL
+ + "?"
+ + parameters);
+ return response;
+ }
+
+ private GenericContainer<?> createServer(String networkAlias, String role)
+ throws IOException, InterruptedException {
+
+ GenericContainer<?> server =
+ new GenericContainer<>(getDockerImage())
+ .withNetwork(NETWORK)
+ .withEnv("TZ", "UTC")
+ .withCommand(
+
ContainerUtil.adaptPathForWin(binPath.toString()) + " -r " + role)
+ .withNetworkAliases(networkAlias)
+ .withExposedPorts()
+ .withLogConsumer(
+ new Slf4jLogConsumer(
+ DockerLoggerFactory.getLogger(
+ "seatunnel-engine:" +
JDK_DOCKER_IMAGE)))
+ .waitingFor(Wait.forListeningPort());
+ copySeaTunnelStarterToContainer(server);
+ server.setExposedPorts(Arrays.asList(5801));
+ server.withCopyFileToContainer(
+ MountableFile.forHostPath(
+ PROJECT_ROOT_PATH
+ +
"/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/"),
+ config.toString());
+ server.withCopyFileToContainer(
+ MountableFile.forHostPath(
+ PROJECT_ROOT_PATH
+ +
"/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/master-worker-cluster/"),
+ config.toString());
+ server.withCopyFileToContainer(
+ MountableFile.forHostPath(
+ PROJECT_ROOT_PATH
+ +
"/seatunnel-shade/seatunnel-hadoop3-3.1.4-uber/target/seatunnel-hadoop3-3.1.4-uber.jar"),
+ hadoopJar.toString());
+ server.start();
+ // execute extra commands
+ executeExtraCommands(server);
+ ContainerUtil.copyConnectorJarToContainer(
+ server,
+ confFile,
+ getConnectorModulePath(),
+ getConnectorNamePrefix(),
+ getConnectorType(),
+ SEATUNNEL_HOME);
+
+ return server;
+ }
}
diff --git
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/master-worker-cluster/hazelcast-master.yaml
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/master-worker-cluster/hazelcast-master.yaml
new file mode 100644
index 0000000000..0aeade2fc1
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/master-worker-cluster/hazelcast-master.yaml
@@ -0,0 +1,49 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+hazelcast:
+ cluster-name: seatunnel
+ network:
+ rest-api:
+ enabled: true
+ endpoint-groups:
+ CLUSTER_WRITE:
+ enabled: true
+ DATA:
+ enabled: true
+ join:
+ tcp-ip:
+ enabled: true
+ member-list:
+ - secondServer
+ - server
+ port:
+ auto-increment: true
+ port-count: 100
+ port: 5801
+ properties:
+ hazelcast.invocation.max.retry.count: 20
+ hazelcast.tcp.join.port.try.count: 30
+ hazelcast.logging.type: log4j2
+ hazelcast.operation.generic.thread.count: 50
+ hazelcast.heartbeat.failuredetector.type: phi-accrual
+ hazelcast.heartbeat.interval.seconds: 2
+ hazelcast.max.no.heartbeat.seconds: 180
+ hazelcast.heartbeat.phiaccrual.failuredetector.threshold: 10
+ hazelcast.heartbeat.phiaccrual.failuredetector.sample.size: 200
+ hazelcast.heartbeat.phiaccrual.failuredetector.min.std.dev.millis: 100
+
diff --git
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/master-worker-cluster/hazelcast-worker.yaml
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/master-worker-cluster/hazelcast-worker.yaml
new file mode 100644
index 0000000000..0aeade2fc1
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/master-worker-cluster/hazelcast-worker.yaml
@@ -0,0 +1,49 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+hazelcast:
+ cluster-name: seatunnel
+ network:
+ rest-api:
+ enabled: true
+ endpoint-groups:
+ CLUSTER_WRITE:
+ enabled: true
+ DATA:
+ enabled: true
+ join:
+ tcp-ip:
+ enabled: true
+ member-list:
+ - secondServer
+ - server
+ port:
+ auto-increment: true
+ port-count: 100
+ port: 5801
+ properties:
+ hazelcast.invocation.max.retry.count: 20
+ hazelcast.tcp.join.port.try.count: 30
+ hazelcast.logging.type: log4j2
+ hazelcast.operation.generic.thread.count: 50
+ hazelcast.heartbeat.failuredetector.type: phi-accrual
+ hazelcast.heartbeat.interval.seconds: 2
+ hazelcast.max.no.heartbeat.seconds: 180
+ hazelcast.heartbeat.phiaccrual.failuredetector.threshold: 10
+ hazelcast.heartbeat.phiaccrual.failuredetector.sample.size: 200
+ hazelcast.heartbeat.phiaccrual.failuredetector.min.std.dev.millis: 100
+
diff --git
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/master-worker-cluster/jvm_master_options
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/master-worker-cluster/jvm_master_options
new file mode 100644
index 0000000000..f7d00c6eaf
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/master-worker-cluster/jvm_master_options
@@ -0,0 +1,27 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# JVM Heap
+-Xms2g
+-Xmx2g
+
+# JVM Dump
+-XX:+HeapDumpOnOutOfMemoryError
+-XX:HeapDumpPath=/tmp/seatunnel/dump/zeta-server
+
+# Only used for test!!! We should make sure soft reference be collected ASAP
+-XX:SoftRefLRUPolicyMSPerMB=1
diff --git
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/master-worker-cluster/jvm_worker_options
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/master-worker-cluster/jvm_worker_options
new file mode 100644
index 0000000000..f7d00c6eaf
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/master-worker-cluster/jvm_worker_options
@@ -0,0 +1,27 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# JVM Heap
+-Xms2g
+-Xmx2g
+
+# JVM Dump
+-XX:+HeapDumpOnOutOfMemoryError
+-XX:HeapDumpPath=/tmp/seatunnel/dump/zeta-server
+
+# Only used for test!!! We should make sure soft reference be collected ASAP
+-XX:SoftRefLRUPolicyMSPerMB=1
diff --git
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/master-worker-cluster/seatunnel.yaml
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/master-worker-cluster/seatunnel.yaml
new file mode 100644
index 0000000000..24aab7762c
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/master-worker-cluster/seatunnel.yaml
@@ -0,0 +1,39 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+seatunnel:
+ engine:
+ history-job-expire-minutes: 1440
+ backup-count: 2
+ queue-type: blockingqueue
+ print-execution-info-interval: 10
+ slot-service:
+ dynamic-slot: true
+ checkpoint:
+ interval: 300000
+ timeout: 100000
+ storage:
+ type: localfile
+ max-retained: 3
+ plugin-config:
+ namespace: /tmp/seatunnel/checkpoint_snapshot/
+ http:
+ enable-http: true
+ port: 8080
+ telemetry:
+ metric:
+ enabled: true
diff --git
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/stream_fakesource_to_console.conf
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/stream_fakesource_to_console.conf
new file mode 100644
index 0000000000..6a7bcd4654
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/stream_fakesource_to_console.conf
@@ -0,0 +1,82 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in
seatunnel config
+######
+
+env {
+ job.mode = "STREAMING"
+ checkpoint.interval = 5000
+}
+
+source {
+ # This is a example source plugin **only for test and demonstrate the
feature source plugin**
+ FakeSource {
+ row.num = 10
+ map.size = 10
+ array.size = 10
+ bytes.length = 10
+ string.length = 10
+ parallelism = 1
+ result_table_name = "fake"
+ schema = {
+ fields {
+ c_map = "map<string, array<int>>"
+ c_array = "array<int>"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_decimal = "decimal(30, 8)"
+ c_null = "null"
+ c_bytes = bytes
+ c_date = date
+ c_timestamp = timestamp
+ c_row = {
+ c_map = "map<string, map<string, string>>"
+ c_array = "array<int>"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_decimal = "decimal(30, 8)"
+ c_null = "null"
+ c_bytes = bytes
+ c_date = date
+ c_timestamp = timestamp
+ }
+ }
+ }
+ }
+}
+
+transform {
+}
+
+sink {
+ console {
+
+ }
+}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServerStarter.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServerStarter.java
index 36f500a279..8e69abd528 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServerStarter.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServerStarter.java
@@ -79,25 +79,13 @@ public class SeaTunnelServerStarter {
seaTunnelConfig
.getEngineConfig()
.setClusterRole(EngineConfig.ClusterRole.MASTER_AND_WORKER);
- return ((HazelcastInstanceProxy)
- HazelcastInstanceFactory.newHazelcastInstance(
- seaTunnelConfig.getHazelcastConfig(),
- HazelcastInstanceFactory.createInstanceName(
- seaTunnelConfig.getHazelcastConfig()),
- new SeaTunnelNodeContext(seaTunnelConfig)))
- .getOriginal();
+ return initializeHazelcastInstance(seaTunnelConfig, null);
}
public static HazelcastInstanceImpl createMasterHazelcastInstance(
@NonNull SeaTunnelConfig seaTunnelConfig) {
seaTunnelConfig.getEngineConfig().setClusterRole(EngineConfig.ClusterRole.MASTER);
- return ((HazelcastInstanceProxy)
- HazelcastInstanceFactory.newHazelcastInstance(
- seaTunnelConfig.getHazelcastConfig(),
- HazelcastInstanceFactory.createInstanceName(
- seaTunnelConfig.getHazelcastConfig()),
- new SeaTunnelNodeContext(seaTunnelConfig)))
- .getOriginal();
+ return initializeHazelcastInstance(seaTunnelConfig, null);
}
public static HazelcastInstanceImpl createWorkerHazelcastInstance(
@@ -105,13 +93,7 @@ public class SeaTunnelServerStarter {
seaTunnelConfig.getEngineConfig().setClusterRole(EngineConfig.ClusterRole.WORKER);
// in hazelcast lite node will not store IMap data.
seaTunnelConfig.getHazelcastConfig().setLiteMember(true);
- return ((HazelcastInstanceProxy)
- HazelcastInstanceFactory.newHazelcastInstance(
- seaTunnelConfig.getHazelcastConfig(),
- HazelcastInstanceFactory.createInstanceName(
- seaTunnelConfig.getHazelcastConfig()),
- new SeaTunnelNodeContext(seaTunnelConfig)))
- .getOriginal();
+ return initializeHazelcastInstance(seaTunnelConfig, null);
}
public static HazelcastInstanceImpl createHazelcastInstance() {
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/telemetry/metrics/exports/JobThreadPoolStatusExports.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/telemetry/metrics/exports/JobThreadPoolStatusExports.java
index e50bffd547..c75a3324c8 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/telemetry/metrics/exports/JobThreadPoolStatusExports.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/telemetry/metrics/exports/JobThreadPoolStatusExports.java
@@ -39,84 +39,87 @@ public class JobThreadPoolStatusExports extends
AbstractCollector {
@Override
public List<MetricFamilySamples> collect() {
List<MetricFamilySamples> mfs = new ArrayList();
-
- ThreadPoolStatus threadPoolStatusMetrics =
getServer().getThreadPoolStatusMetrics();
- List<String> labelNames = clusterLabelNames(ADDRESS, "type");
-
- GaugeMetricFamily activeCount =
- new GaugeMetricFamily(
- "job_thread_pool_activeCount",
- String.format(HELP, "activeCount"),
- labelNames);
- activeCount.addMetric(
- labelValues(localAddress(), "activeCount"),
- threadPoolStatusMetrics.getActiveCount());
- mfs.add(activeCount);
-
- CounterMetricFamily completedTask =
- new CounterMetricFamily(
- "job_thread_pool_completedTask",
- String.format(HELP, "completedTask"),
- labelNames);
- completedTask.addMetric(
- labelValues(localAddress(), "completedTask"),
- threadPoolStatusMetrics.getCompletedTaskCount());
- mfs.add(completedTask);
-
- GaugeMetricFamily corePoolSize =
- new GaugeMetricFamily(
- "job_thread_pool_corePoolSize",
- String.format(HELP, "corePoolSize"),
- labelNames);
- corePoolSize.addMetric(
- labelValues(localAddress(), "corePoolSize"),
- threadPoolStatusMetrics.getCorePoolSize());
- mfs.add(corePoolSize);
-
- GaugeMetricFamily maximumPoolSize =
- new GaugeMetricFamily(
- "job_thread_pool_maximumPoolSize",
- String.format(HELP, "maximumPoolSize"),
- labelNames);
- maximumPoolSize.addMetric(
- labelValues(localAddress(), "maximumPoolSize"),
- threadPoolStatusMetrics.getMaximumPoolSize());
- mfs.add(maximumPoolSize);
-
- GaugeMetricFamily poolSize =
- new GaugeMetricFamily(
- "job_thread_pool_poolSize", String.format(HELP,
"poolSize"), labelNames);
- poolSize.addMetric(
- labelValues(localAddress(), "poolSize"),
threadPoolStatusMetrics.getPoolSize());
- mfs.add(poolSize);
-
- CounterMetricFamily taskCount =
- new CounterMetricFamily(
- "job_thread_pool_task", String.format(HELP,
"taskCount"), labelNames);
- taskCount.addMetric(
- labelValues(localAddress(), "taskCount"),
threadPoolStatusMetrics.getTaskCount());
- mfs.add(taskCount);
-
- GaugeMetricFamily queueTaskCount =
- new GaugeMetricFamily(
- "job_thread_pool_queueTaskCount",
- String.format(HELP, "queueTaskCount"),
- labelNames);
- queueTaskCount.addMetric(
- labelValues(localAddress(), "queueTaskCount"),
- threadPoolStatusMetrics.getQueueTaskCount());
- mfs.add(queueTaskCount);
-
- CounterMetricFamily rejectedTaskCount =
- new CounterMetricFamily(
- "job_thread_pool_rejection",
- String.format(HELP, "rejectionCount"),
- labelNames);
- rejectedTaskCount.addMetric(
- labelValues(localAddress(), "rejectionCount"),
- threadPoolStatusMetrics.getRejectionCount());
- mfs.add(rejectedTaskCount);
-
+ if (isMaster()) {
+ ThreadPoolStatus threadPoolStatusMetrics =
getServer().getThreadPoolStatusMetrics();
+ List<String> labelNames = clusterLabelNames(ADDRESS, "type");
+
+ GaugeMetricFamily activeCount =
+ new GaugeMetricFamily(
+ "job_thread_pool_activeCount",
+ String.format(HELP, "activeCount"),
+ labelNames);
+ activeCount.addMetric(
+ labelValues(localAddress(), "activeCount"),
+ threadPoolStatusMetrics.getActiveCount());
+ mfs.add(activeCount);
+
+ CounterMetricFamily completedTask =
+ new CounterMetricFamily(
+ "job_thread_pool_completedTask",
+ String.format(HELP, "completedTask"),
+ labelNames);
+ completedTask.addMetric(
+ labelValues(localAddress(), "completedTask"),
+ threadPoolStatusMetrics.getCompletedTaskCount());
+ mfs.add(completedTask);
+
+ GaugeMetricFamily corePoolSize =
+ new GaugeMetricFamily(
+ "job_thread_pool_corePoolSize",
+ String.format(HELP, "corePoolSize"),
+ labelNames);
+ corePoolSize.addMetric(
+ labelValues(localAddress(), "corePoolSize"),
+ threadPoolStatusMetrics.getCorePoolSize());
+ mfs.add(corePoolSize);
+
+ GaugeMetricFamily maximumPoolSize =
+ new GaugeMetricFamily(
+ "job_thread_pool_maximumPoolSize",
+ String.format(HELP, "maximumPoolSize"),
+ labelNames);
+ maximumPoolSize.addMetric(
+ labelValues(localAddress(), "maximumPoolSize"),
+ threadPoolStatusMetrics.getMaximumPoolSize());
+ mfs.add(maximumPoolSize);
+
+ GaugeMetricFamily poolSize =
+ new GaugeMetricFamily(
+ "job_thread_pool_poolSize",
+ String.format(HELP, "poolSize"),
+ labelNames);
+ poolSize.addMetric(
+ labelValues(localAddress(), "poolSize"),
threadPoolStatusMetrics.getPoolSize());
+ mfs.add(poolSize);
+
+ CounterMetricFamily taskCount =
+ new CounterMetricFamily(
+ "job_thread_pool_task", String.format(HELP,
"taskCount"), labelNames);
+ taskCount.addMetric(
+ labelValues(localAddress(), "taskCount"),
+ threadPoolStatusMetrics.getTaskCount());
+ mfs.add(taskCount);
+
+ GaugeMetricFamily queueTaskCount =
+ new GaugeMetricFamily(
+ "job_thread_pool_queueTaskCount",
+ String.format(HELP, "queueTaskCount"),
+ labelNames);
+ queueTaskCount.addMetric(
+ labelValues(localAddress(), "queueTaskCount"),
+ threadPoolStatusMetrics.getQueueTaskCount());
+ mfs.add(queueTaskCount);
+
+ CounterMetricFamily rejectedTaskCount =
+ new CounterMetricFamily(
+ "job_thread_pool_rejection",
+ String.format(HELP, "rejectionCount"),
+ labelNames);
+ rejectedTaskCount.addMetric(
+ labelValues(localAddress(), "rejectionCount"),
+ threadPoolStatusMetrics.getRejectionCount());
+ mfs.add(rejectedTaskCount);
+ }
return mfs;
}
}