This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new d003bd85b6 [Feature][Zeta] Added the metrics information of table
statistics in multi-table mode (#7212)
d003bd85b6 is described below
commit d003bd85b65b307e77867781986cc7d58c3ba80a
Author: zhangdonghao <[email protected]>
AuthorDate: Fri Jul 26 10:44:09 2024 +0800
[Feature][Zeta] Added the metrics information of table statistics in
multi-table mode (#7212)
---
.../api/sink/multitablesink/MultiTableSink.java | 5 +
.../seatunnel/engine/e2e/MultiTableMetricsIT.java | 125 +++++++++++++++++++++
.../batch_fake_multi_table_to_console.conf | 64 +++++++++++
.../engine/client/SeaTunnelClientTest.java | 114 +++++++++++++++++++
.../batch_fake_multi_table_to_console.conf | 66 +++++++++++
.../server/rest/RestHttpGetCommandProcessor.java | 74 +++++++++++-
.../server/task/SeaTunnelSourceCollector.java | 53 +++++++--
.../engine/server/task/SourceSeaTunnelTask.java | 13 ++-
.../engine/server/task/flow/SinkFlowLifeCycle.java | 38 +++++++
9 files changed, 537 insertions(+), 15 deletions(-)
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSink.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSink.java
index bb04283ca6..923ecff8b8 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSink.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSink.java
@@ -25,6 +25,7 @@ import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
import org.apache.seatunnel.api.sink.SinkCommitter;
import org.apache.seatunnel.api.sink.SinkCommonOptions;
import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.factory.MultiTableFactoryContext;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
@@ -149,6 +150,10 @@ public class MultiTableSink
return Optional.of(new
MultiTableSinkAggregatedCommitter(aggCommitters));
}
+ public List<TablePath> getSinkTables() {
+ return
sinks.keySet().stream().map(TablePath::of).collect(Collectors.toList());
+ }
+
@Override
public Optional<Serializer<MultiTableAggregatedCommitInfo>>
getAggregatedCommitInfoSerializer() {
diff --git
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/MultiTableMetricsIT.java
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/MultiTableMetricsIT.java
new file mode 100644
index 0000000000..59942eb4cc
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/MultiTableMetricsIT.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.e2e;
+
+import org.apache.seatunnel.engine.client.SeaTunnelClient;
+import org.apache.seatunnel.engine.client.job.ClientJobExecutionEnvironment;
+import org.apache.seatunnel.engine.client.job.ClientJobProxy;
+import org.apache.seatunnel.engine.common.config.ConfigProvider;
+import org.apache.seatunnel.engine.common.config.JobConfig;
+import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
+import org.apache.seatunnel.engine.core.job.JobStatus;
+import org.apache.seatunnel.engine.server.SeaTunnelServerStarter;
+import org.apache.seatunnel.engine.server.rest.RestConstant;
+
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import com.hazelcast.client.config.ClientConfig;
+import com.hazelcast.instance.impl.HazelcastInstanceImpl;
+
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
+
+import static io.restassured.RestAssured.given;
+import static org.hamcrest.Matchers.equalTo;
+
+public class MultiTableMetricsIT {
+
+ private static final String HOST = "http://localhost:";
+
+ private static ClientJobProxy batchJobProxy;
+
+ private static HazelcastInstanceImpl node1;
+
+ private static SeaTunnelClient engineClient;
+
+ @BeforeEach
+ void beforeClass() throws Exception {
+ String testClusterName = TestUtils.getClusterName("RestApiIT");
+ SeaTunnelConfig seaTunnelConfig =
ConfigProvider.locateAndGetSeaTunnelConfig();
+ seaTunnelConfig.getHazelcastConfig().setClusterName(testClusterName);
+ node1 =
SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
+
+ ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
+ clientConfig.setClusterName(testClusterName);
+ engineClient = new SeaTunnelClient(clientConfig);
+
+ String batchFilePath =
TestUtils.getResource("batch_fake_multi_table_to_console.conf");
+ JobConfig batchConf = new JobConfig();
+ batchConf.setName("batch_fake_multi_table_to_console");
+ ClientJobExecutionEnvironment batchJobExecutionEnv =
+ engineClient.createExecutionContext(batchFilePath, batchConf,
seaTunnelConfig);
+ batchJobProxy = batchJobExecutionEnv.execute();
+ Awaitility.await()
+ .atMost(2, TimeUnit.MINUTES)
+ .untilAsserted(
+ () ->
+ Assertions.assertEquals(
+ JobStatus.FINISHED,
batchJobProxy.getJobStatus()));
+ }
+
+ @Test
+ public void multiTableMetrics() {
+ Collections.singletonList(node1)
+ .forEach(
+ instance -> {
+ given().get(
+ HOST
+ + instance.getCluster()
+ .getLocalMember()
+ .getAddress()
+ .getPort()
+ + RestConstant.JOB_INFO_URL
+ + "/"
+ + batchJobProxy.getJobId())
+ .then()
+ .statusCode(200)
+ .body("jobName",
equalTo("batch_fake_multi_table_to_console"))
+ .body("jobStatus", equalTo("FINISHED"))
+ .body("metrics.SourceReceivedCount",
equalTo("50"))
+ .body("metrics.SinkWriteCount",
equalTo("50"))
+ .body(
+
"metrics.TableSourceReceivedCount.'fake.table1'",
+ equalTo("20"))
+ .body(
+
"metrics.TableSourceReceivedCount.'fake.public.table2'",
+ equalTo("30"))
+ .body(
+
"metrics.TableSinkWriteCount.'fake.table1'",
+ equalTo("20"))
+ .body(
+
"metrics.TableSinkWriteCount.'fake.public.table2'",
+ equalTo("30"));
+ });
+ }
+
+ @AfterEach
+ void afterClass() {
+ if (engineClient != null) {
+ engineClient.close();
+ }
+
+ if (node1 != null) {
+ node1.shutdown();
+ }
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fake_multi_table_to_console.conf
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fake_multi_table_to_console.conf
new file mode 100644
index 0000000000..c51929a0ed
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fake_multi_table_to_console.conf
@@ -0,0 +1,64 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in
seatunnel config
+######
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ # This is a example source plugin **only for test and demonstrate the
feature source plugin**
+ FakeSource {
+ result_table_name = "fake1"
+ row.num = 20
+ schema = {
+ table = "fake.table1"
+ fields {
+ name = "string"
+ age = "int"
+ }
+ }
+ }
+
+ FakeSource {
+ result_table_name = "fake2"
+ row.num = 30
+ schema = {
+ table = "fake.public.table2"
+ fields {
+ name = "string"
+ age = "int"
+ sex = "int"
+ }
+ }
+ }
+}
+
+transform {
+}
+
+sink {
+ console {
+ source_table_name = "fake1"
+ }
+ console {
+ source_table_name = "fake2"
+ }
+}
diff --git
a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
index d7e55db4ec..100aa0b320 100644
---
a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
+++
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
@@ -17,6 +17,8 @@
package org.apache.seatunnel.engine.client;
+import
org.apache.seatunnel.shade.com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.JsonNode;
import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.seatunnel.common.config.Common;
@@ -51,10 +53,14 @@ import lombok.extern.slf4j.Slf4j;
import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Spliterators;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
import static
org.apache.seatunnel.api.common.metrics.MetricNames.SINK_WRITE_COUNT;
import static
org.apache.seatunnel.api.common.metrics.MetricNames.SINK_WRITE_QPS;
@@ -548,6 +554,114 @@ public class SeaTunnelClientTest {
}
}
+ @Test
+ public void testGetMultiTableJobMetrics() {
+ Common.setDeployMode(DeployMode.CLIENT);
+ String filePath =
TestUtils.getResource("/batch_fake_multi_table_to_console.conf");
+ JobConfig jobConfig = new JobConfig();
+ jobConfig.setName("testGetMultiTableJobMetrics");
+
+ SeaTunnelClient seaTunnelClient = createSeaTunnelClient();
+ JobClient jobClient = seaTunnelClient.getJobClient();
+
+ try {
+ ClientJobExecutionEnvironment jobExecutionEnv =
+ seaTunnelClient.createExecutionContext(filePath,
jobConfig, SEATUNNEL_CONFIG);
+
+ final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
+ CompletableFuture<JobStatus> objectCompletableFuture =
+ CompletableFuture.supplyAsync(
+ () -> {
+ return clientJobProxy.waitForJobComplete();
+ });
+ long jobId = clientJobProxy.getJobId();
+
+ await().atMost(30000, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () ->
+ Assertions.assertTrue(
+
jobClient.getJobDetailStatus(jobId).contains("FINISHED")
+ && jobClient
+
.listJobStatus(true)
+
.contains("FINISHED")));
+
+ String jobMetrics = jobClient.getJobMetrics(jobId);
+
+ Assertions.assertTrue(jobMetrics.contains(SOURCE_RECEIVED_COUNT +
"#fake.table1"));
+ Assertions.assertTrue(
+ jobMetrics.contains(SOURCE_RECEIVED_COUNT +
"#fake.public.table2"));
+ Assertions.assertTrue(jobMetrics.contains(SINK_WRITE_COUNT +
"#fake.table1"));
+ Assertions.assertTrue(jobMetrics.contains(SINK_WRITE_COUNT +
"#fake.public.table2"));
+
+ log.info("jobMetrics : {}", jobMetrics);
+ JsonNode jobMetricsStr = new ObjectMapper().readTree(jobMetrics);
+ List<String> metricNameList =
+ StreamSupport.stream(
+ Spliterators.spliteratorUnknownSize(
+ jobMetricsStr.fieldNames(), 0),
+ false)
+ .filter(
+ metricName ->
+
metricName.startsWith(SOURCE_RECEIVED_COUNT)
+ ||
metricName.startsWith(SINK_WRITE_COUNT))
+ .collect(Collectors.toList());
+
+ Map<String, Long> totalCount =
+ metricNameList.stream()
+ .filter(metrics -> !metrics.contains("#"))
+ .collect(
+ Collectors.toMap(
+ metrics -> metrics,
+ metrics ->
+ StreamSupport.stream(
+
jobMetricsStr
+
.get(metrics)
+
.spliterator(),
+ false)
+ .mapToLong(
+ value ->
+
value.get("value")
+
.asLong())
+ .sum()));
+
+ Map<String, Long> tableCount =
+ metricNameList.stream()
+ .filter(metrics -> metrics.contains("#"))
+ .collect(
+ Collectors.toMap(
+ metrics -> metrics,
+ metrics ->
+ StreamSupport.stream(
+
jobMetricsStr
+
.get(metrics)
+
.spliterator(),
+ false)
+ .mapToLong(
+ value ->
+
value.get("value")
+
.asLong())
+ .sum()));
+
+ Assertions.assertEquals(
+ totalCount.get(SOURCE_RECEIVED_COUNT),
+ tableCount.entrySet().stream()
+ .filter(e ->
e.getKey().startsWith(SOURCE_RECEIVED_COUNT))
+ .mapToLong(Map.Entry::getValue)
+ .sum());
+ Assertions.assertEquals(
+ totalCount.get(SINK_WRITE_COUNT),
+ tableCount.entrySet().stream()
+ .filter(e ->
e.getKey().startsWith(SINK_WRITE_COUNT))
+ .mapToLong(Map.Entry::getValue)
+ .sum());
+
+ } catch (ExecutionException | InterruptedException |
JsonProcessingException e) {
+ throw new RuntimeException(e);
+ } finally {
+ seaTunnelClient.close();
+ }
+ }
+
@AfterAll
public static void after() {
INSTANCE.shutdown();
diff --git
a/seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fake_multi_table_to_console.conf
b/seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fake_multi_table_to_console.conf
new file mode 100644
index 0000000000..df7ae51fe6
--- /dev/null
+++
b/seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fake_multi_table_to_console.conf
@@ -0,0 +1,66 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in
seatunnel config
+######
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ # This is a example source plugin **only for test and demonstrate the
feature source plugin**
+ FakeSource {
+ result_table_name = "fake1"
+ row.num = 20
+ schema = {
+ table = "fake.table1"
+ fields {
+ name = "string"
+ age = "int"
+ }
+ }
+ parallelism = 1
+ }
+
+ FakeSource {
+ result_table_name = "fake2"
+ row.num = 30
+ schema = {
+ table = "fake.public.table2"
+ fields {
+ name = "string"
+ age = "int"
+ sex = "int"
+ }
+ }
+ parallelism = 1
+ }
+}
+
+transform {
+}
+
+sink {
+ console {
+ source_table_name = "fake1"
+ }
+ console {
+ source_table_name = "fake2"
+ }
+}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java
index 6081b0f2ea..d5d60b7cbb 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java
@@ -22,6 +22,7 @@ import
org.apache.seatunnel.shade.com.fasterxml.jackson.databind.JsonNode;
import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.seatunnel.api.common.metrics.JobMetrics;
+import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.common.utils.DateTimeUtils;
import org.apache.seatunnel.common.utils.JsonUtils;
import org.apache.seatunnel.engine.common.Constant;
@@ -64,8 +65,10 @@ import java.util.Comparator;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
+import java.util.Spliterators;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
import static com.hazelcast.internal.ascii.rest.HttpStatusCode.SC_500;
import static
org.apache.seatunnel.engine.server.rest.RestConstant.FINISHED_JOBS_INFO;
@@ -79,7 +82,9 @@ import static
org.apache.seatunnel.engine.server.rest.RestConstant.SYSTEM_MONITO
public class RestHttpGetCommandProcessor extends
HttpCommandProcessor<HttpGetCommand> {
private static final String SOURCE_RECEIVED_COUNT = "SourceReceivedCount";
+ private static final String TABLE_SOURCE_RECEIVED_COUNT =
"TableSourceReceivedCount";
private static final String SINK_WRITE_COUNT = "SinkWriteCount";
+ private static final String TABLE_SINK_WRITE_COUNT = "TableSinkWriteCount";
private final Log4j2HttpGetCommandProcessor original;
private NodeEngine nodeEngine;
@@ -362,12 +367,31 @@ public class RestHttpGetCommandProcessor extends
HttpCommandProcessor<HttpGetCom
.collect(JsonArray::new, JsonArray::add,
JsonArray::add));
}
- private Map<String, Long> getJobMetrics(String jobMetrics) {
- Map<String, Long> metricsMap = new HashMap<>();
+ private Map<String, Object> getJobMetrics(String jobMetrics) {
+ Map<String, Object> metricsMap = new HashMap<>();
long sourceReadCount = 0L;
long sinkWriteCount = 0L;
+ Map<String, JsonNode> tableSourceReceivedCountMap = new HashMap<>();
+ Map<String, JsonNode> tableSinkWriteCountMap = new HashMap<>();
try {
JsonNode jobMetricsStr = new ObjectMapper().readTree(jobMetrics);
+ StreamSupport.stream(
+
Spliterators.spliteratorUnknownSize(jobMetricsStr.fieldNames(), 0),
+ false)
+ .filter(metricName -> metricName.contains("#"))
+ .forEach(
+ metricName -> {
+ String tableName =
+
TablePath.of(metricName.split("#")[1]).getFullName();
+ if
(metricName.startsWith(SOURCE_RECEIVED_COUNT)) {
+ tableSourceReceivedCountMap.put(
+ tableName,
jobMetricsStr.get(metricName));
+ }
+ if
(metricName.startsWith(SOURCE_RECEIVED_COUNT)) {
+ tableSinkWriteCountMap.put(
+ tableName,
jobMetricsStr.get(metricName));
+ }
+ });
JsonNode sourceReceivedCountJson =
jobMetricsStr.get(SOURCE_RECEIVED_COUNT);
JsonNode sinkWriteCountJson = jobMetricsStr.get(SINK_WRITE_COUNT);
for (int i = 0; i <
jobMetricsStr.get(SOURCE_RECEIVED_COUNT).size(); i++) {
@@ -379,9 +403,36 @@ public class RestHttpGetCommandProcessor extends
HttpCommandProcessor<HttpGetCom
} catch (JsonProcessingException | NullPointerException e) {
return metricsMap;
}
+
+ Map<String, Long> tableSourceReceivedCount =
+ tableSourceReceivedCountMap.entrySet().stream()
+ .collect(
+ Collectors.toMap(
+ Map.Entry::getKey,
+ entry ->
+ StreamSupport.stream(
+
entry.getValue().spliterator(),
+ false)
+ .mapToLong(
+ node ->
node.get("value").asLong())
+ .sum()));
+ Map<String, Long> tableSinkWriteCount =
+ tableSinkWriteCountMap.entrySet().stream()
+ .collect(
+ Collectors.toMap(
+ Map.Entry::getKey,
+ entry ->
+ StreamSupport.stream(
+
entry.getValue().spliterator(),
+ false)
+ .mapToLong(
+ node ->
node.get("value").asLong())
+ .sum()));
+
metricsMap.put(SOURCE_RECEIVED_COUNT, sourceReadCount);
metricsMap.put(SINK_WRITE_COUNT, sinkWriteCount);
-
+ metricsMap.put(TABLE_SOURCE_RECEIVED_COUNT, tableSourceReceivedCount);
+ metricsMap.put(TABLE_SINK_WRITE_COUNT, tableSinkWriteCount);
return metricsMap;
}
@@ -475,11 +526,24 @@ public class RestHttpGetCommandProcessor extends
HttpCommandProcessor<HttpGetCom
.add(
RestConstant.IS_START_WITH_SAVE_POINT,
jobImmutableInformation.isStartWithSavePoint())
- .add(RestConstant.METRICS,
JsonUtil.toJsonObject(getJobMetrics(jobMetrics)));
+ .add(RestConstant.METRICS,
toJsonObject(getJobMetrics(jobMetrics)));
return jobInfoJson;
}
+ private JsonObject toJsonObject(Map<String, Object> jobMetrics) {
+ JsonObject members = new JsonObject();
+ jobMetrics.forEach(
+ (key, value) -> {
+ if (value instanceof Map) {
+ members.add(key, toJsonObject((Map<String, Object>)
value));
+ } else {
+ members.add(key, value.toString());
+ }
+ });
+ return members;
+ }
+
private JsonObject getJobInfoJson(JobState jobState, String jobMetrics,
JobDAGInfo jobDAGInfo) {
return new JsonObject()
.add(RestConstant.JOB_ID, String.valueOf(jobState.getJobId()))
@@ -498,6 +562,6 @@ public class RestHttpGetCommandProcessor extends
HttpCommandProcessor<HttpGetCom
DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS))
.add(RestConstant.JOB_DAG, JsonUtils.toJsonString(jobDAGInfo))
.add(RestConstant.PLUGIN_JARS_URLS, new JsonArray())
- .add(RestConstant.METRICS,
JsonUtil.toJsonObject(getJobMetrics(jobMetrics)));
+ .add(RestConstant.METRICS,
toJsonObject(getJobMetrics(jobMetrics)));
}
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java
index f5d4aed1ab..62612d0617 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java
@@ -21,6 +21,7 @@ import org.apache.seatunnel.api.common.metrics.Counter;
import org.apache.seatunnel.api.common.metrics.Meter;
import org.apache.seatunnel.api.common.metrics.MetricsContext;
import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.event.SchemaChangeEvent;
import
org.apache.seatunnel.api.table.event.handler.DataTypeChangeEventDispatcher;
import org.apache.seatunnel.api.table.event.handler.DataTypeChangeEventHandler;
@@ -34,12 +35,17 @@ import
org.apache.seatunnel.core.starter.flowcontrol.FlowControlStrategy;
import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
import org.apache.seatunnel.engine.server.task.flow.OneInputFlowLifeCycle;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import static
org.apache.seatunnel.api.common.metrics.MetricNames.SOURCE_RECEIVED_BYTES;
@@ -54,12 +60,16 @@ public class SeaTunnelSourceCollector<T> implements
Collector<T> {
private final List<OneInputFlowLifeCycle<Record<?>>> outputs;
+ private final MetricsContext metricsContext;
+
private final AtomicBoolean schemaChangeBeforeCheckpointSignal = new
AtomicBoolean(false);
private final AtomicBoolean schemaChangeAfterCheckpointSignal = new
AtomicBoolean(false);
private final Counter sourceReceivedCount;
+ private final Map<String, Counter> sourceReceivedCountPerTable = new
ConcurrentHashMap<>();
+
private final Meter sourceReceivedQPS;
private final Counter sourceReceivedBytes;
@@ -77,17 +87,24 @@ public class SeaTunnelSourceCollector<T> implements
Collector<T> {
List<OneInputFlowLifeCycle<Record<?>>> outputs,
MetricsContext metricsContext,
FlowControlStrategy flowControlStrategy,
- SeaTunnelDataType rowType) {
+ SeaTunnelDataType rowType,
+ List<TablePath> tablePaths) {
this.checkpointLock = checkpointLock;
this.outputs = outputs;
this.rowType = rowType;
+ this.metricsContext = metricsContext;
if (rowType instanceof MultipleRowType) {
((MultipleRowType) rowType)
.iterator()
- .forEachRemaining(
- type -> {
- this.rowTypeMap.put(type.getKey(),
type.getValue());
- });
+ .forEachRemaining(type ->
this.rowTypeMap.put(type.getKey(), type.getValue()));
+ }
+ if (CollectionUtils.isNotEmpty(tablePaths)) {
+ tablePaths.forEach(
+ tablePath ->
+ sourceReceivedCountPerTable.put(
+ getFullName(tablePath),
+ metricsContext.counter(
+ SOURCE_RECEIVED_COUNT + "#" +
getFullName(tablePath))));
}
sourceReceivedCount = metricsContext.counter(SOURCE_RECEIVED_COUNT);
sourceReceivedQPS = metricsContext.meter(SOURCE_RECEIVED_QPS);
@@ -100,14 +117,12 @@ public class SeaTunnelSourceCollector<T> implements
Collector<T> {
public void collect(T row) {
try {
if (row instanceof SeaTunnelRow) {
+ String tableId = ((SeaTunnelRow) row).getTableId();
int size;
if (rowType instanceof SeaTunnelRowType) {
size = ((SeaTunnelRow)
row).getBytesSize((SeaTunnelRowType) rowType);
} else if (rowType instanceof MultipleRowType) {
- size =
- ((SeaTunnelRow) row)
- .getBytesSize(
- rowTypeMap.get(((SeaTunnelRow)
row).getTableId()));
+ size = ((SeaTunnelRow)
row).getBytesSize(rowTypeMap.get(tableId));
} else {
throw new SeaTunnelEngineException(
"Unsupported row type: " +
rowType.getClass().getName());
@@ -115,6 +130,18 @@ public class SeaTunnelSourceCollector<T> implements
Collector<T> {
sourceReceivedBytes.inc(size);
sourceReceivedBytesPerSeconds.markEvent(size);
flowControlGate.audit((SeaTunnelRow) row);
+ if (StringUtils.isNotEmpty(tableId)) {
+ String tableName = getFullName(TablePath.of(tableId));
+ Counter sourceTableCounter =
sourceReceivedCountPerTable.get(tableName);
+ if (Objects.nonNull(sourceTableCounter)) {
+ sourceTableCounter.inc();
+ } else {
+ Counter counter =
+ metricsContext.counter(SOURCE_RECEIVED_COUNT +
"#" + tableName);
+ counter.inc();
+ sourceReceivedCountPerTable.put(tableName, counter);
+ }
+ }
}
sendRecordToNext(new Record<>(row));
emptyThisPollNext = false;
@@ -205,4 +232,12 @@ public class SeaTunnelSourceCollector<T> implements
Collector<T> {
}
}
}
+
+ private String getFullName(TablePath tablePath) {
+ if (StringUtils.isBlank(tablePath.getTableName())) {
+ tablePath =
+ TablePath.of(tablePath.getDatabaseName(),
tablePath.getSchemaName(), "default");
+ }
+ return tablePath.getFullName();
+ }
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSeaTunnelTask.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSeaTunnelTask.java
index 53171d4031..dbcde3e9d6 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSeaTunnelTask.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSeaTunnelTask.java
@@ -22,6 +22,8 @@ import org.apache.seatunnel.api.serialization.Serializer;
import org.apache.seatunnel.api.source.SourceSplit;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.core.starter.flowcontrol.FlowControlStrategy;
import org.apache.seatunnel.engine.core.dag.actions.SourceAction;
@@ -37,9 +39,11 @@ import com.hazelcast.logging.Logger;
import lombok.Getter;
import lombok.NonNull;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
public class SourceSeaTunnelTask<T, SplitT extends SourceSplit> extends
SeaTunnelTask {
@@ -76,10 +80,16 @@ public class SourceSeaTunnelTask<T, SplitT extends
SourceSplit> extends SeaTunne
+ startFlowLifeCycle.getClass().getName());
} else {
SeaTunnelDataType sourceProducedType;
+ List<TablePath> tablePaths = new ArrayList<>();
try {
List<CatalogTable> producedCatalogTables =
sourceFlow.getAction().getSource().getProducedCatalogTables();
sourceProducedType =
CatalogTableUtil.convertToDataType(producedCatalogTables);
+ tablePaths =
+ producedCatalogTables.stream()
+ .map(CatalogTable::getTableId)
+ .map(TableIdentifier::toTablePath)
+ .collect(Collectors.toList());
} catch (UnsupportedOperationException e) {
// TODO remove it when all connector use
`getProducedCatalogTables`
sourceProducedType =
sourceFlow.getAction().getSource().getProducedType();
@@ -90,7 +100,8 @@ public class SourceSeaTunnelTask<T, SplitT extends
SourceSplit> extends SeaTunne
outputs,
this.getMetricsContext(),
FlowControlStrategy.fromMap(envOption),
- sourceProducedType);
+ sourceProducedType,
+ tablePaths);
((SourceFlowLifeCycle<T, SplitT>)
startFlowLifeCycle).setCollector(collector);
}
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
index 48c530a0c3..516e1c97c4 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
@@ -26,6 +26,8 @@ import
org.apache.seatunnel.api.sink.MultiTableResourceManager;
import org.apache.seatunnel.api.sink.SinkCommitter;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.sink.SupportResourceShare;
+import org.apache.seatunnel.api.sink.multitablesink.MultiTableSink;
+import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.event.SchemaChangeEvent;
import org.apache.seatunnel.api.table.type.Record;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
@@ -43,6 +45,8 @@ import
org.apache.seatunnel.engine.server.task.operation.sink.SinkPrepareCommitO
import
org.apache.seatunnel.engine.server.task.operation.sink.SinkRegisterOperation;
import org.apache.seatunnel.engine.server.task.record.Barrier;
+import org.apache.commons.lang3.StringUtils;
+
import com.hazelcast.cluster.Address;
import lombok.extern.slf4j.Slf4j;
@@ -52,9 +56,11 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
@@ -92,6 +98,8 @@ public class SinkFlowLifeCycle<T, CommitInfoT extends
Serializable, AggregatedCo
private Counter sinkWriteCount;
+ private Map<String, Counter> sinkWriteCountPerTable = new
ConcurrentHashMap<>();
+
private Meter sinkWriteQPS;
private Counter sinkWriteBytes;
@@ -125,6 +133,15 @@ public class SinkFlowLifeCycle<T, CommitInfoT extends
Serializable, AggregatedCo
sinkWriteQPS = metricsContext.meter(SINK_WRITE_QPS);
sinkWriteBytes = metricsContext.counter(SINK_WRITE_BYTES);
sinkWriteBytesPerSeconds =
metricsContext.meter(SINK_WRITE_BYTES_PER_SECONDS);
+ if (sinkAction.getSink() instanceof MultiTableSink) {
+ List<TablePath> sinkTables = ((MultiTableSink)
sinkAction.getSink()).getSinkTables();
+ sinkTables.forEach(
+ tablePath ->
+ sinkWriteCountPerTable.put(
+ getFullName(tablePath),
+ metricsContext.counter(
+ SINK_WRITE_COUNT + "#" +
getFullName(tablePath))));
+ }
}
@Override
@@ -256,6 +273,19 @@ public class SinkFlowLifeCycle<T, CommitInfoT extends
Serializable, AggregatedCo
long size = ((SeaTunnelRow)
record.getData()).getBytesSize();
sinkWriteBytes.inc(size);
sinkWriteBytesPerSeconds.markEvent(size);
+ String tableId = ((SeaTunnelRow)
record.getData()).getTableId();
+ if (StringUtils.isNotBlank(tableId)) {
+ String tableName = getFullName(TablePath.of(tableId));
+ Counter sinkTableCounter =
sinkWriteCountPerTable.get(tableName);
+ if (Objects.nonNull(sinkTableCounter)) {
+ sinkTableCounter.inc();
+ } else {
+ Counter counter =
+ metricsContext.counter(SINK_WRITE_COUNT +
"#" + tableName);
+ counter.inc();
+ sinkWriteCountPerTable.put(tableName, counter);
+ }
+ }
}
}
} catch (Exception e) {
@@ -315,4 +345,12 @@ public class SinkFlowLifeCycle<T, CommitInfoT extends
Serializable, AggregatedCo
((SupportResourceShare)
this.writer).setMultiTableResourceManager(resourceManager, 0);
}
}
+
+ private String getFullName(TablePath tablePath) {
+ if (StringUtils.isBlank(tablePath.getTableName())) {
+ tablePath =
+ TablePath.of(tablePath.getDatabaseName(),
tablePath.getSchemaName(), "default");
+ }
+ return tablePath.getFullName();
+ }
}