This is an automated email from the ASF dual-hosted git repository.
liugddx pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 16c5065b2f [Feature][Zeta] Added other metrics info of multi-table
(#7338)
16c5065b2f is described below
commit 16c5065b2fe63bbf6676c33a0dd61416b25d7154
Author: corgy-w <[email protected]>
AuthorDate: Wed Aug 14 17:25:49 2024 +0800
[Feature][Zeta] Added other metrics info of multi-table (#7338)
* [Feature][Zeta] Added other metrics info of multi-table
* [Feature][Zeta] Added other metrics restApi
* [Feature][Zeta] Added other metrics restApi test
* [Feature][Zeta] update rest-api doc
* [Improve][Connector-V2] optimize code
* [Improve][Connector-V2] optimize code
---------
Co-authored-by: wangchao <[email protected]>
---
docs/en/seatunnel-engine/rest-api.md | 18 +-
docs/zh/seatunnel-engine/rest-api.md | 18 +-
.../seatunnel/engine/e2e/MultiTableMetricsIT.java | 101 ++++++++--
.../batch_fake_multi_table_to_console.conf | 102 ++++++++--
.../engine/client/SeaTunnelClientTest.java | 50 ++++-
.../server/metrics/TaskMetricsCalcContext.java | 211 +++++++++++++++++++
.../server/rest/RestHttpGetCommandProcessor.java | 223 +++++++++++++++++----
.../server/task/SeaTunnelSourceCollector.java | 60 +-----
.../engine/server/task/flow/SinkFlowLifeCycle.java | 62 +-----
9 files changed, 657 insertions(+), 188 deletions(-)
diff --git a/docs/en/seatunnel-engine/rest-api.md
b/docs/en/seatunnel-engine/rest-api.md
index 99bba92dae..eb5eacfed1 100644
--- a/docs/en/seatunnel-engine/rest-api.md
+++ b/docs/en/seatunnel-engine/rest-api.md
@@ -192,8 +192,22 @@ This API has been deprecated, please use
/hazelcast/rest/maps/job-info/:jobId in
]
},
"metrics": {
- "sourceReceivedCount": "",
- "sinkWriteCount": ""
+ "SourceReceivedCount": "",
+ "SourceReceivedQPS": "",
+ "SourceReceivedBytes": "",
+ "SourceReceivedBytesPerSeconds": "",
+ "SinkWriteCount": "",
+ "SinkWriteQPS": "",
+ "SinkWriteBytes": "",
+ "SinkWriteBytesPerSeconds": "",
+ "TableSourceReceivedCount": {},
+ "TableSourceReceivedBytes": {},
+ "TableSourceReceivedBytesPerSeconds": {},
+ "TableSourceReceivedQPS": {},
+ "TableSinkWriteCount": {},
+ "TableSinkWriteQPS": {},
+ "TableSinkWriteBytes": {},
+ "TableSinkWriteBytesPerSeconds": {}
},
"finishedTime": "",
"errorMsg": null,
diff --git a/docs/zh/seatunnel-engine/rest-api.md
b/docs/zh/seatunnel-engine/rest-api.md
index 1b0166425b..d38ad61268 100644
--- a/docs/zh/seatunnel-engine/rest-api.md
+++ b/docs/zh/seatunnel-engine/rest-api.md
@@ -134,8 +134,22 @@ network:
]
},
"metrics": {
- "sourceReceivedCount": "",
- "sinkWriteCount": ""
+ "SourceReceivedCount": "",
+ "SourceReceivedQPS": "",
+ "SourceReceivedBytes": "",
+ "SourceReceivedBytesPerSeconds": "",
+ "SinkWriteCount": "",
+ "SinkWriteQPS": "",
+ "SinkWriteBytes": "",
+ "SinkWriteBytesPerSeconds": "",
+ "TableSourceReceivedCount": {},
+ "TableSourceReceivedBytes": {},
+ "TableSourceReceivedBytesPerSeconds": {},
+ "TableSourceReceivedQPS": {},
+ "TableSinkWriteCount": {},
+ "TableSinkWriteQPS": {},
+ "TableSinkWriteBytes": {},
+ "TableSinkWriteBytesPerSeconds": {}
},
"finishedTime": "",
"errorMsg": null,
diff --git
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/MultiTableMetricsIT.java
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/MultiTableMetricsIT.java
index 59942eb4cc..61df054d07 100644
---
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/MultiTableMetricsIT.java
+++
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/MultiTableMetricsIT.java
@@ -35,6 +35,7 @@ import org.junit.jupiter.api.Test;
import com.hazelcast.client.config.ClientConfig;
import com.hazelcast.instance.impl.HazelcastInstanceImpl;
+import io.restassured.response.Response;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
@@ -82,33 +83,101 @@ public class MultiTableMetricsIT {
Collections.singletonList(node1)
.forEach(
instance -> {
- given().get(
- HOST
- + instance.getCluster()
- .getLocalMember()
- .getAddress()
- .getPort()
- + RestConstant.JOB_INFO_URL
- + "/"
- + batchJobProxy.getJobId())
- .then()
+ Response response =
+ given().get(
+ HOST
+ +
instance.getCluster()
+
.getLocalMember()
+
.getAddress()
+ .getPort()
+ +
RestConstant.JOB_INFO_URL
+ + "/"
+ +
batchJobProxy.getJobId());
+ // In the test example, the data size of a single
[3, "C", 100] is 13
+ int dataSize = 13;
+ response.prettyPrint();
+ response.then()
.statusCode(200)
.body("jobName",
equalTo("batch_fake_multi_table_to_console"))
.body("jobStatus", equalTo("FINISHED"))
- .body("metrics.SourceReceivedCount",
equalTo("50"))
- .body("metrics.SinkWriteCount",
equalTo("50"))
+ .body("metrics.SourceReceivedCount",
equalTo("15"))
+ .body("metrics.SinkWriteCount",
equalTo("15"))
.body(
"metrics.TableSourceReceivedCount.'fake.table1'",
- equalTo("20"))
+ equalTo("10"))
.body(
"metrics.TableSourceReceivedCount.'fake.public.table2'",
- equalTo("30"))
+ equalTo("5"))
.body(
"metrics.TableSinkWriteCount.'fake.table1'",
- equalTo("20"))
+ equalTo("10"))
.body(
"metrics.TableSinkWriteCount.'fake.public.table2'",
- equalTo("30"));
+ equalTo("5"))
+ .body(
+ "metrics.SourceReceivedBytes",
+ equalTo(String.valueOf(dataSize *
15)))
+ .body(
+ "metrics.SinkWriteBytes",
+ equalTo(String.valueOf(dataSize *
15)))
+ .body(
+
"metrics.TableSourceReceivedBytes.'fake.table1'",
+ equalTo(String.valueOf(dataSize *
10)))
+ .body(
+
"metrics.TableSourceReceivedBytes.'fake.public.table2'",
+ equalTo(String.valueOf(dataSize *
5)))
+ .body(
+
"metrics.TableSinkWriteBytes.'fake.table1'",
+ equalTo(String.valueOf(dataSize *
10)))
+ .body(
+
"metrics.TableSinkWriteBytes.'fake.public.table2'",
+ equalTo(String.valueOf(dataSize *
5)));
+ Assertions.assertTrue(
+
Double.parseDouble(response.path("metrics.SourceReceivedQPS"))
+ > 0
+ && Double.parseDouble(
+ response.path(
+
"metrics.TableSourceReceivedQPS.'fake.table1'"))
+ > 0
+ && Double.parseDouble(
+ response.path(
+
"metrics.TableSourceReceivedQPS.'fake.public.table2'"))
+ > 0
+ && Double.parseDouble(
+
response.path("metrics.SinkWriteQPS"))
+ > 0
+ && Double.parseDouble(
+ response.path(
+
"metrics.TableSinkWriteQPS.'fake.table1'"))
+ > 0
+ && Double.parseDouble(
+ response.path(
+
"metrics.TableSinkWriteQPS.'fake.public.table2'"))
+ > 0
+ && Double.parseDouble(
+ response.path(
+
"metrics.SourceReceivedBytesPerSeconds"))
+ > 0
+ && Double.parseDouble(
+ response.path(
+
"metrics.TableSourceReceivedBytesPerSeconds.'fake.table1'"))
+ > 0
+ && Double.parseDouble(
+ response.path(
+
"metrics.TableSourceReceivedBytesPerSeconds.'fake.public.table2'"))
+ > 0
+ && Double.parseDouble(
+ response.path(
+
"metrics.SinkWriteBytesPerSeconds"))
+ > 0
+ && Double.parseDouble(
+ response.path(
+
"metrics.TableSinkWriteBytesPerSeconds.'fake.table1'"))
+ > 0
+ && Double.parseDouble(
+ response.path(
+
"metrics.TableSinkWriteBytesPerSeconds.'fake.public.table2'"))
+ > 0);
});
}
diff --git
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fake_multi_table_to_console.conf
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fake_multi_table_to_console.conf
index c51929a0ed..7459cc150e 100644
---
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fake_multi_table_to_console.conf
+++
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fake_multi_table_to_console.conf
@@ -26,29 +26,93 @@ env {
source {
# This is a example source plugin **only for test and demonstrate the
feature source plugin**
FakeSource {
- result_table_name = "fake1"
- row.num = 20
- schema = {
- table = "fake.table1"
- fields {
- name = "string"
- age = "int"
+ result_table_name = "fake1"
+ schema = {
+ table = "fake.table1"
+ fields {
+ id = bigint
+ name = string
+ score = int
+ }
}
+ rows = [
+ {
+ kind = INSERT
+ fields = [1, "A", 100]
+ },
+ {
+ kind = INSERT
+ fields = [2, "B", 100]
+ },
+ {
+ kind = INSERT
+ fields = [3, "C", 100]
+ },
+ {
+ kind = INSERT
+ fields = [3, "C", 100]
+ },
+ {
+ kind = INSERT
+ fields = [3, "C", 100]
+ },
+ {
+ kind = INSERT
+ fields = [3, "C", 100]
+ }
+ {
+ kind = UPDATE_BEFORE
+ fields = [1, "A", 100]
+ },
+ {
+ kind = UPDATE_AFTER
+ fields = [1, "A", 300]
+ },
+ {
+ kind = DELETE
+ fields = [2, "B", 100]
+ },
+ {
+ kind = INSERT
+ fields = [2, "B", 100]
+ }
+ ]
}
- }
- FakeSource {
- result_table_name = "fake2"
- row.num = 30
- schema = {
- table = "fake.public.table2"
- fields {
- name = "string"
- age = "int"
- sex = "int"
+ FakeSource {
+ result_table_name = "fake2"
+ schema = {
+ table = "fake.public.table2"
+ fields {
+ id = bigint
+ name = string
+ score = int
+ }
+ }
+ rows = [
+ {
+ kind = INSERT
+ fields = [1, "A", 100]
+ },
+ {
+ kind = INSERT
+ fields = [2, "B", 100]
+ },
+ {
+ kind = DELETE
+ fields = [2, "B", 100]
+ },
+ {
+ kind = INSERT
+ fields = [3, "C", 100]
+ },
+ {
+ kind = INSERT
+ fields = [3, "C", 100]
+ }
+ ]
}
- }
- }
+
}
transform {
diff --git
a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
index 100aa0b320..a8275a13b7 100644
---
a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
+++
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
@@ -62,8 +62,12 @@ import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
+import static
org.apache.seatunnel.api.common.metrics.MetricNames.SINK_WRITE_BYTES;
+import static
org.apache.seatunnel.api.common.metrics.MetricNames.SINK_WRITE_BYTES_PER_SECONDS;
import static
org.apache.seatunnel.api.common.metrics.MetricNames.SINK_WRITE_COUNT;
import static
org.apache.seatunnel.api.common.metrics.MetricNames.SINK_WRITE_QPS;
+import static
org.apache.seatunnel.api.common.metrics.MetricNames.SOURCE_RECEIVED_BYTES;
+import static
org.apache.seatunnel.api.common.metrics.MetricNames.SOURCE_RECEIVED_BYTES_PER_SECONDS;
import static
org.apache.seatunnel.api.common.metrics.MetricNames.SOURCE_RECEIVED_COUNT;
import static
org.apache.seatunnel.api.common.metrics.MetricNames.SOURCE_RECEIVED_QPS;
import static org.awaitility.Awaitility.await;
@@ -592,6 +596,23 @@ public class SeaTunnelClientTest {
jobMetrics.contains(SOURCE_RECEIVED_COUNT +
"#fake.public.table2"));
Assertions.assertTrue(jobMetrics.contains(SINK_WRITE_COUNT +
"#fake.table1"));
Assertions.assertTrue(jobMetrics.contains(SINK_WRITE_COUNT +
"#fake.public.table2"));
+ Assertions.assertTrue(jobMetrics.contains(SOURCE_RECEIVED_BYTES +
"#fake.table1"));
+ Assertions.assertTrue(
+ jobMetrics.contains(SOURCE_RECEIVED_BYTES +
"#fake.public.table2"));
+ Assertions.assertTrue(jobMetrics.contains(SINK_WRITE_BYTES +
"#fake.table1"));
+ Assertions.assertTrue(jobMetrics.contains(SINK_WRITE_BYTES +
"#fake.public.table2"));
+ Assertions.assertTrue(jobMetrics.contains(SOURCE_RECEIVED_QPS +
"#fake.table1"));
+ Assertions.assertTrue(jobMetrics.contains(SOURCE_RECEIVED_QPS +
"#fake.public.table2"));
+ Assertions.assertTrue(jobMetrics.contains(SINK_WRITE_QPS +
"#fake.table1"));
+ Assertions.assertTrue(jobMetrics.contains(SINK_WRITE_QPS +
"#fake.public.table2"));
+ Assertions.assertTrue(
+ jobMetrics.contains(SOURCE_RECEIVED_BYTES_PER_SECONDS +
"#fake.table1"));
+ Assertions.assertTrue(
+ jobMetrics.contains(SOURCE_RECEIVED_BYTES_PER_SECONDS +
"#fake.public.table2"));
+ Assertions.assertTrue(
+ jobMetrics.contains(SINK_WRITE_BYTES_PER_SECONDS +
"#fake.table1"));
+ Assertions.assertTrue(
+ jobMetrics.contains(SINK_WRITE_BYTES_PER_SECONDS +
"#fake.public.table2"));
log.info("jobMetrics : {}", jobMetrics);
JsonNode jobMetricsStr = new ObjectMapper().readTree(jobMetrics);
@@ -600,10 +621,6 @@ public class SeaTunnelClientTest {
Spliterators.spliteratorUnknownSize(
jobMetricsStr.fieldNames(), 0),
false)
- .filter(
- metricName ->
-
metricName.startsWith(SOURCE_RECEIVED_COUNT)
- ||
metricName.startsWith(SINK_WRITE_COUNT))
.collect(Collectors.toList());
Map<String, Long> totalCount =
@@ -654,6 +671,31 @@ public class SeaTunnelClientTest {
.filter(e ->
e.getKey().startsWith(SINK_WRITE_COUNT))
.mapToLong(Map.Entry::getValue)
.sum());
+ Assertions.assertEquals(
+ totalCount.get(SOURCE_RECEIVED_BYTES),
+ tableCount.entrySet().stream()
+ .filter(e ->
e.getKey().startsWith(SOURCE_RECEIVED_BYTES + "#"))
+ .mapToLong(Map.Entry::getValue)
+ .sum());
+ Assertions.assertEquals(
+ totalCount.get(SINK_WRITE_BYTES),
+ tableCount.entrySet().stream()
+ .filter(e ->
e.getKey().startsWith(SINK_WRITE_BYTES + "#"))
+ .mapToLong(Map.Entry::getValue)
+ .sum());
+ // Instantaneous rates in the same direction are directly added
+ Assertions.assertEquals(
+ totalCount.get(SOURCE_RECEIVED_QPS),
+ tableCount.entrySet().stream()
+ .filter(e ->
e.getKey().startsWith(SOURCE_RECEIVED_QPS + "#"))
+ .mapToLong(Map.Entry::getValue)
+ .sum());
+ Assertions.assertEquals(
+ totalCount.get(SINK_WRITE_QPS),
+ tableCount.entrySet().stream()
+ .filter(e -> e.getKey().startsWith(SINK_WRITE_QPS
+ "#"))
+ .mapToLong(Map.Entry::getValue)
+ .sum());
} catch (ExecutionException | InterruptedException |
JsonProcessingException e) {
throw new RuntimeException(e);
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/metrics/TaskMetricsCalcContext.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/metrics/TaskMetricsCalcContext.java
new file mode 100644
index 0000000000..eab9ecbd34
--- /dev/null
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/metrics/TaskMetricsCalcContext.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.engine.server.metrics;
+
+import org.apache.seatunnel.api.common.metrics.Counter;
+import org.apache.seatunnel.api.common.metrics.Meter;
+import org.apache.seatunnel.api.common.metrics.MetricsContext;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.common.constants.PluginType;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static
org.apache.seatunnel.api.common.metrics.MetricNames.SINK_WRITE_BYTES;
+import static
org.apache.seatunnel.api.common.metrics.MetricNames.SINK_WRITE_BYTES_PER_SECONDS;
+import static
org.apache.seatunnel.api.common.metrics.MetricNames.SINK_WRITE_COUNT;
+import static
org.apache.seatunnel.api.common.metrics.MetricNames.SINK_WRITE_QPS;
+import static
org.apache.seatunnel.api.common.metrics.MetricNames.SOURCE_RECEIVED_BYTES;
+import static
org.apache.seatunnel.api.common.metrics.MetricNames.SOURCE_RECEIVED_BYTES_PER_SECONDS;
+import static
org.apache.seatunnel.api.common.metrics.MetricNames.SOURCE_RECEIVED_COUNT;
+import static
org.apache.seatunnel.api.common.metrics.MetricNames.SOURCE_RECEIVED_QPS;
+
+public class TaskMetricsCalcContext {
+
+ private final MetricsContext metricsContext;
+
+ private final PluginType type;
+
+ private Counter count;
+
+ private Map<String, Counter> countPerTable = new ConcurrentHashMap<>();
+
+ private Meter QPS;
+
+ private Map<String, Meter> QPSPerTable = new ConcurrentHashMap<>();
+
+ private Counter bytes;
+
+ private Map<String, Counter> bytesPerTable = new ConcurrentHashMap<>();
+
+ private Meter bytesPerSeconds;
+
+ private Map<String, Meter> bytesPerSecondsPerTable = new
ConcurrentHashMap<>();
+
+ public TaskMetricsCalcContext(
+ MetricsContext metricsContext,
+ PluginType type,
+ boolean isMulti,
+ List<TablePath> tables) {
+ this.metricsContext = metricsContext;
+ this.type = type;
+ initializeMetrics(isMulti, tables);
+ }
+
+ private void initializeMetrics(boolean isMulti, List<TablePath> tables) {
+ if (type.equals(PluginType.SINK)) {
+ this.initializeMetrics(
+ isMulti,
+ tables,
+ SINK_WRITE_COUNT,
+ SINK_WRITE_QPS,
+ SINK_WRITE_BYTES,
+ SINK_WRITE_BYTES_PER_SECONDS);
+ } else if (type.equals(PluginType.SOURCE)) {
+ this.initializeMetrics(
+ isMulti,
+ tables,
+ SOURCE_RECEIVED_COUNT,
+ SOURCE_RECEIVED_QPS,
+ SOURCE_RECEIVED_BYTES,
+ SOURCE_RECEIVED_BYTES_PER_SECONDS);
+ }
+ }
+
+ private void initializeMetrics(
+ boolean isMulti,
+ List<TablePath> tables,
+ String countName,
+ String qpsName,
+ String bytesName,
+ String bytesPerSecondsName) {
+ count = metricsContext.counter(countName);
+ QPS = metricsContext.meter(qpsName);
+ bytes = metricsContext.counter(bytesName);
+ bytesPerSeconds = metricsContext.meter(bytesPerSecondsName);
+ if (isMulti) {
+ tables.forEach(
+ tablePath -> {
+ countPerTable.put(
+ tablePath.getFullName(),
+ metricsContext.counter(countName + "#" +
tablePath.getFullName()));
+ QPSPerTable.put(
+ tablePath.getFullName(),
+ metricsContext.meter(qpsName + "#" +
tablePath.getFullName()));
+ bytesPerTable.put(
+ tablePath.getFullName(),
+ metricsContext.counter(bytesName + "#" +
tablePath.getFullName()));
+ bytesPerSecondsPerTable.put(
+ tablePath.getFullName(),
+ metricsContext.meter(
+ bytesPerSecondsName + "#" +
tablePath.getFullName()));
+ });
+ }
+ }
+
+ public void updateMetrics(Object data) {
+ count.inc();
+ QPS.markEvent();
+ if (data instanceof SeaTunnelRow) {
+ SeaTunnelRow row = (SeaTunnelRow) data;
+ bytes.inc(row.getBytesSize());
+ bytesPerSeconds.markEvent(row.getBytesSize());
+ String tableId = row.getTableId();
+
+ if (StringUtils.isNotBlank(tableId)) {
+ String tableName = TablePath.of(tableId).getFullName();
+
+ // Processing count
+ processMetrics(
+ countPerTable,
+ Counter.class,
+ tableName,
+ SINK_WRITE_COUNT,
+ SOURCE_RECEIVED_COUNT,
+ Counter::inc);
+
+ // Processing bytes
+ processMetrics(
+ bytesPerTable,
+ Counter.class,
+ tableName,
+ SINK_WRITE_BYTES,
+ SOURCE_RECEIVED_BYTES,
+ counter -> counter.inc(row.getBytesSize()));
+
+ // Processing QPS
+ processMetrics(
+ QPSPerTable,
+ Meter.class,
+ tableName,
+ SINK_WRITE_QPS,
+ SOURCE_RECEIVED_QPS,
+ Meter::markEvent);
+
+ // Processing bytes rate
+ processMetrics(
+ bytesPerSecondsPerTable,
+ Meter.class,
+ tableName,
+ SINK_WRITE_BYTES_PER_SECONDS,
+ SOURCE_RECEIVED_BYTES_PER_SECONDS,
+ meter -> meter.markEvent(row.getBytesSize()));
+ }
+ }
+ }
+
+ private <T> void processMetrics(
+ Map<String, T> metricMap,
+ Class<T> cls,
+ String tableName,
+ String sinkMetric,
+ String sourceMetric,
+ MetricProcessor<T> processor) {
+ T metric = metricMap.get(tableName);
+ if (Objects.nonNull(metric)) {
+ processor.process(metric);
+ } else {
+ String metricName =
+ PluginType.SINK.equals(type)
+ ? sinkMetric + "#" + tableName
+ : sourceMetric + "#" + tableName;
+ T newMetric = createMetric(metricsContext, metricName, cls);
+ processor.process(newMetric);
+ metricMap.put(tableName, newMetric);
+ }
+ }
+
+ private <T> T createMetric(
+ MetricsContext metricsContext, String metricName, Class<T>
metricClass) {
+ if (metricClass == Counter.class) {
+ return metricClass.cast(metricsContext.counter(metricName));
+ } else if (metricClass == Meter.class) {
+ return metricClass.cast(metricsContext.meter(metricName));
+ }
+ throw new IllegalArgumentException("Unsupported metric class: " +
metricClass.getName());
+ }
+
+ @FunctionalInterface
+ interface MetricProcessor<T> {
+ void process(T t);
+ }
+}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java
index d5d60b7cbb..fec77708b6 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java
@@ -44,6 +44,8 @@ import
org.apache.seatunnel.engine.server.resourcemanager.opeartion.GetOverviewO
import
org.apache.seatunnel.engine.server.resourcemanager.resource.OverviewInfo;
import org.apache.seatunnel.engine.server.utils.NodeEngineUtil;
+import org.apache.commons.lang3.ArrayUtils;
+
import com.hazelcast.cluster.Address;
import com.hazelcast.cluster.Cluster;
import com.hazelcast.cluster.Member;
@@ -65,12 +67,20 @@ import java.util.Comparator;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
-import java.util.Spliterators;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import static com.hazelcast.internal.ascii.rest.HttpStatusCode.SC_500;
+import static
org.apache.seatunnel.api.common.metrics.MetricNames.SINK_WRITE_BYTES;
+import static
org.apache.seatunnel.api.common.metrics.MetricNames.SINK_WRITE_BYTES_PER_SECONDS;
+import static
org.apache.seatunnel.api.common.metrics.MetricNames.SINK_WRITE_COUNT;
+import static
org.apache.seatunnel.api.common.metrics.MetricNames.SINK_WRITE_QPS;
+import static
org.apache.seatunnel.api.common.metrics.MetricNames.SOURCE_RECEIVED_BYTES;
+import static
org.apache.seatunnel.api.common.metrics.MetricNames.SOURCE_RECEIVED_BYTES_PER_SECONDS;
+import static
org.apache.seatunnel.api.common.metrics.MetricNames.SOURCE_RECEIVED_COUNT;
+import static
org.apache.seatunnel.api.common.metrics.MetricNames.SOURCE_RECEIVED_QPS;
import static
org.apache.seatunnel.engine.server.rest.RestConstant.FINISHED_JOBS_INFO;
import static
org.apache.seatunnel.engine.server.rest.RestConstant.JOB_INFO_URL;
import static org.apache.seatunnel.engine.server.rest.RestConstant.OVERVIEW;
@@ -81,14 +91,22 @@ import static
org.apache.seatunnel.engine.server.rest.RestConstant.SYSTEM_MONITO
public class RestHttpGetCommandProcessor extends
HttpCommandProcessor<HttpGetCommand> {
- private static final String SOURCE_RECEIVED_COUNT = "SourceReceivedCount";
private static final String TABLE_SOURCE_RECEIVED_COUNT =
"TableSourceReceivedCount";
- private static final String SINK_WRITE_COUNT = "SinkWriteCount";
private static final String TABLE_SINK_WRITE_COUNT = "TableSinkWriteCount";
+ private static final String TABLE_SOURCE_RECEIVED_QPS =
"TableSourceReceivedQPS";
+ private static final String TABLE_SINK_WRITE_QPS = "TableSinkWriteQPS";
+ private static final String TABLE_SOURCE_RECEIVED_BYTES =
"TableSourceReceivedBytes";
+ private static final String TABLE_SINK_WRITE_BYTES = "TableSinkWriteBytes";
+ private static final String TABLE_SOURCE_RECEIVED_BYTES_PER_SECONDS =
+ "TableSourceReceivedBytesPerSeconds";
+ private static final String TABLE_SINK_WRITE_BYTES_PER_SECONDS =
+ "TableSinkWriteBytesPerSeconds";
+
private final Log4j2HttpGetCommandProcessor original;
private NodeEngine nodeEngine;
public RestHttpGetCommandProcessor(TextCommandService textCommandService) {
+
this(textCommandService, new
Log4j2HttpGetCommandProcessor(textCommandService));
}
@@ -369,43 +387,165 @@ public class RestHttpGetCommandProcessor extends
HttpCommandProcessor<HttpGetCom
private Map<String, Object> getJobMetrics(String jobMetrics) {
Map<String, Object> metricsMap = new HashMap<>();
- long sourceReadCount = 0L;
- long sinkWriteCount = 0L;
- Map<String, JsonNode> tableSourceReceivedCountMap = new HashMap<>();
- Map<String, JsonNode> tableSinkWriteCountMap = new HashMap<>();
+ // To add metrics, populate the corresponding array,
+ String[] countMetricsNames = {
+ SOURCE_RECEIVED_COUNT, SINK_WRITE_COUNT, SOURCE_RECEIVED_BYTES,
SINK_WRITE_BYTES
+ };
+ String[] rateMetricsNames = {
+ SOURCE_RECEIVED_QPS,
+ SINK_WRITE_QPS,
+ SOURCE_RECEIVED_BYTES_PER_SECONDS,
+ SINK_WRITE_BYTES_PER_SECONDS
+ };
+ String[] tableCountMetricsNames = {
+ TABLE_SOURCE_RECEIVED_COUNT,
+ TABLE_SINK_WRITE_COUNT,
+ TABLE_SOURCE_RECEIVED_BYTES,
+ TABLE_SINK_WRITE_BYTES
+ };
+ String[] tableRateMetricsNames = {
+ TABLE_SOURCE_RECEIVED_QPS,
+ TABLE_SINK_WRITE_QPS,
+ TABLE_SOURCE_RECEIVED_BYTES_PER_SECONDS,
+ TABLE_SINK_WRITE_BYTES_PER_SECONDS
+ };
+ Long[] metricsSums =
+ Stream.generate(() ->
0L).limit(countMetricsNames.length).toArray(Long[]::new);
+ Double[] metricsRates =
+ Stream.generate(() ->
0D).limit(rateMetricsNames.length).toArray(Double[]::new);
+
+ // Used to store various indicators at the table
+ Map<String, JsonNode>[] tableMetricsMaps =
+ new Map[] {
+ new HashMap<>(), // Source Received Count
+ new HashMap<>(), // Sink Write Count
+ new HashMap<>(), // Source Received Bytes
+ new HashMap<>(), // Sink Write Bytes
+ new HashMap<>(), // Source Received QPS
+ new HashMap<>(), // Sink Write QPS
+ new HashMap<>(), // Source Received Bytes Per Second
+ new HashMap<>() // Sink Write Bytes Per Second
+ };
+
try {
JsonNode jobMetricsStr = new ObjectMapper().readTree(jobMetrics);
- StreamSupport.stream(
-
Spliterators.spliteratorUnknownSize(jobMetricsStr.fieldNames(), 0),
- false)
- .filter(metricName -> metricName.contains("#"))
- .forEach(
+
+ jobMetricsStr
+ .fieldNames()
+ .forEachRemaining(
metricName -> {
- String tableName =
-
TablePath.of(metricName.split("#")[1]).getFullName();
- if
(metricName.startsWith(SOURCE_RECEIVED_COUNT)) {
- tableSourceReceivedCountMap.put(
- tableName,
jobMetricsStr.get(metricName));
- }
- if
(metricName.startsWith(SOURCE_RECEIVED_COUNT)) {
- tableSinkWriteCountMap.put(
- tableName,
jobMetricsStr.get(metricName));
+ if (metricName.contains("#")) {
+ String tableName =
+
TablePath.of(metricName.split("#")[1]).getFullName();
+ JsonNode metricNode =
jobMetricsStr.get(metricName);
+ processMetric(
+ metricName, tableName, metricNode,
tableMetricsMaps);
}
});
- JsonNode sourceReceivedCountJson =
jobMetricsStr.get(SOURCE_RECEIVED_COUNT);
- JsonNode sinkWriteCountJson = jobMetricsStr.get(SINK_WRITE_COUNT);
- for (int i = 0; i <
jobMetricsStr.get(SOURCE_RECEIVED_COUNT).size(); i++) {
- JsonNode sourceReader = sourceReceivedCountJson.get(i);
- JsonNode sinkWriter = sinkWriteCountJson.get(i);
- sourceReadCount += sourceReader.get("value").asLong();
- sinkWriteCount += sinkWriter.get("value").asLong();
- }
- } catch (JsonProcessingException | NullPointerException e) {
+
+ // Aggregation summary and rate metrics
+ aggregateMetrics(
+ jobMetricsStr,
+ metricsSums,
+ metricsRates,
+ ArrayUtils.addAll(countMetricsNames, rateMetricsNames));
+
+ } catch (JsonProcessingException e) {
return metricsMap;
}
- Map<String, Long> tableSourceReceivedCount =
- tableSourceReceivedCountMap.entrySet().stream()
+ populateMetricsMap(
+ metricsMap,
+ tableMetricsMaps,
+ ArrayUtils.addAll(tableCountMetricsNames,
tableRateMetricsNames),
+ countMetricsNames.length);
+ populateMetricsMap(
+ metricsMap,
+ Stream.concat(Arrays.stream(metricsSums),
Arrays.stream(metricsRates))
+ .toArray(Number[]::new),
+ ArrayUtils.addAll(countMetricsNames, rateMetricsNames),
+ metricsSums.length);
+
+ return metricsMap;
+ }
+
+ private void processMetric(
+ String metricName,
+ String tableName,
+ JsonNode metricNode,
+ Map<String, JsonNode>[] tableMetricsMaps) {
+ if (metricNode == null) return;
+
+ // Define index constant
+ final int SOURCE_COUNT_IDX = 0,
+ SINK_COUNT_IDX = 1,
+ SOURCE_BYTES_IDX = 2,
+ SINK_BYTES_IDX = 3,
+ SOURCE_QPS_IDX = 4,
+ SINK_QPS_IDX = 5,
+ SOURCE_BYTES_SEC_IDX = 6,
+ SINK_BYTES_SEC_IDX = 7;
+ if (metricName.startsWith(SOURCE_RECEIVED_COUNT + "#")) {
+ tableMetricsMaps[SOURCE_COUNT_IDX].put(tableName, metricNode);
+ } else if (metricName.startsWith(SINK_WRITE_COUNT + "#")) {
+ tableMetricsMaps[SINK_COUNT_IDX].put(tableName, metricNode);
+ } else if (metricName.startsWith(SOURCE_RECEIVED_BYTES + "#")) {
+ tableMetricsMaps[SOURCE_BYTES_IDX].put(tableName, metricNode);
+ } else if (metricName.startsWith(SINK_WRITE_BYTES + "#")) {
+ tableMetricsMaps[SINK_BYTES_IDX].put(tableName, metricNode);
+ } else if (metricName.startsWith(SOURCE_RECEIVED_QPS + "#")) {
+ tableMetricsMaps[SOURCE_QPS_IDX].put(tableName, metricNode);
+ } else if (metricName.startsWith(SINK_WRITE_QPS + "#")) {
+ tableMetricsMaps[SINK_QPS_IDX].put(tableName, metricNode);
+ } else if (metricName.startsWith(SOURCE_RECEIVED_BYTES_PER_SECONDS +
"#")) {
+ tableMetricsMaps[SOURCE_BYTES_SEC_IDX].put(tableName, metricNode);
+ } else if (metricName.startsWith(SINK_WRITE_BYTES_PER_SECONDS + "#")) {
+ tableMetricsMaps[SINK_BYTES_SEC_IDX].put(tableName, metricNode);
+ }
+ }
+
+ private void aggregateMetrics(
+ JsonNode jobMetricsStr,
+ Long[] metricsSums,
+ Double[] metricsRates,
+ String[] metricsNames) {
+ for (int i = 0; i < metricsNames.length; i++) {
+ JsonNode metricNode = jobMetricsStr.get(metricsNames[i]);
+ if (metricNode != null && metricNode.isArray()) {
+ for (JsonNode node : metricNode) {
+ // Match Rate Metrics vs. Value Metrics
+ if (i < metricsSums.length) {
+ metricsSums[i] += node.path("value").asLong();
+ } else {
+ metricsRates[i - metricsSums.length] +=
node.path("value").asDouble();
+ }
+ }
+ }
+ }
+ }
+
+ private void populateMetricsMap(
+ Map<String, Object> metricsMap,
+ Object[] metrics,
+ String[] metricNames,
+ int countMetricNames) {
+ for (int i = 0; i < metrics.length; i++) {
+ if (metrics[i] != null) {
+ if (metrics[i] instanceof Map) {
+ metricsMap.put(
+ metricNames[i],
+ aggregateMap(
+ (Map<String, JsonNode>) metrics[i], i >=
countMetricNames));
+ } else {
+ metricsMap.put(metricNames[i], metrics[i]);
+ }
+ }
+ }
+ }
+
+ public static Map<String, Object> aggregateMap(Map<String, JsonNode>
inputMap, boolean isRate) {
+ return isRate
+ ? inputMap.entrySet().stream()
.collect(
Collectors.toMap(
Map.Entry::getKey,
@@ -413,11 +553,12 @@ public class RestHttpGetCommandProcessor extends
HttpCommandProcessor<HttpGetCom
StreamSupport.stream(
entry.getValue().spliterator(),
false)
- .mapToLong(
- node ->
node.get("value").asLong())
- .sum()));
- Map<String, Long> tableSinkWriteCount =
- tableSinkWriteCountMap.entrySet().stream()
+ .mapToDouble(
+ node ->
+
node.path("value")
+
.asDouble())
+ .sum()))
+ : inputMap.entrySet().stream()
.collect(
Collectors.toMap(
Map.Entry::getKey,
@@ -426,14 +567,8 @@ public class RestHttpGetCommandProcessor extends
HttpCommandProcessor<HttpGetCom
entry.getValue().spliterator(),
false)
.mapToLong(
- node ->
node.get("value").asLong())
+ node ->
node.path("value").asLong())
.sum()));
-
- metricsMap.put(SOURCE_RECEIVED_COUNT, sourceReadCount);
- metricsMap.put(SINK_WRITE_COUNT, sinkWriteCount);
- metricsMap.put(TABLE_SOURCE_RECEIVED_COUNT, tableSourceReceivedCount);
- metricsMap.put(TABLE_SINK_WRITE_COUNT, tableSinkWriteCount);
- return metricsMap;
}
private SeaTunnelServer getSeaTunnelServer(boolean shouldBeMaster) {
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java
index e1b2494789..7f2e34bdcb 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java
@@ -17,8 +17,6 @@
package org.apache.seatunnel.engine.server.task;
-import org.apache.seatunnel.api.common.metrics.Counter;
-import org.apache.seatunnel.api.common.metrics.Meter;
import org.apache.seatunnel.api.common.metrics.MetricsContext;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.table.catalog.TablePath;
@@ -30,13 +28,14 @@ import org.apache.seatunnel.api.table.type.Record;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.core.starter.flowcontrol.FlowControlGate;
import org.apache.seatunnel.core.starter.flowcontrol.FlowControlStrategy;
import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
+import org.apache.seatunnel.engine.server.metrics.TaskMetricsCalcContext;
import org.apache.seatunnel.engine.server.task.flow.OneInputFlowLifeCycle;
import org.apache.commons.collections4.CollectionUtils;
-import org.apache.commons.lang3.StringUtils;
import lombok.extern.slf4j.Slf4j;
@@ -44,15 +43,8 @@ import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Objects;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
-import static
org.apache.seatunnel.api.common.metrics.MetricNames.SOURCE_RECEIVED_BYTES;
-import static
org.apache.seatunnel.api.common.metrics.MetricNames.SOURCE_RECEIVED_BYTES_PER_SECONDS;
-import static
org.apache.seatunnel.api.common.metrics.MetricNames.SOURCE_RECEIVED_COUNT;
-import static
org.apache.seatunnel.api.common.metrics.MetricNames.SOURCE_RECEIVED_QPS;
-
@Slf4j
public class SeaTunnelSourceCollector<T> implements Collector<T> {
@@ -62,19 +54,12 @@ public class SeaTunnelSourceCollector<T> implements
Collector<T> {
private final MetricsContext metricsContext;
+ private final TaskMetricsCalcContext taskMetricsCalcContext;
+
private final AtomicBoolean schemaChangeBeforeCheckpointSignal = new
AtomicBoolean(false);
private final AtomicBoolean schemaChangeAfterCheckpointSignal = new
AtomicBoolean(false);
- private final Counter sourceReceivedCount;
-
- private final Map<String, Counter> sourceReceivedCountPerTable = new
ConcurrentHashMap<>();
-
- private final Meter sourceReceivedQPS;
- private final Counter sourceReceivedBytes;
-
- private final Meter sourceReceivedBytesPerSeconds;
-
private volatile boolean emptyThisPollNext;
private final DataTypeChangeEventHandler dataTypeChangeEventHandler =
new DataTypeChangeEventDispatcher();
@@ -98,20 +83,12 @@ public class SeaTunnelSourceCollector<T> implements
Collector<T> {
.iterator()
.forEachRemaining(type ->
this.rowTypeMap.put(type.getKey(), type.getValue()));
}
- if (CollectionUtils.isNotEmpty(tablePaths)) {
- tablePaths.forEach(
- tablePath ->
- sourceReceivedCountPerTable.put(
- tablePath.getFullName(),
- metricsContext.counter(
- SOURCE_RECEIVED_COUNT
- + "#"
- +
tablePath.getFullName())));
- }
- sourceReceivedCount = metricsContext.counter(SOURCE_RECEIVED_COUNT);
- sourceReceivedQPS = metricsContext.meter(SOURCE_RECEIVED_QPS);
- sourceReceivedBytes = metricsContext.counter(SOURCE_RECEIVED_BYTES);
- sourceReceivedBytesPerSeconds =
metricsContext.meter(SOURCE_RECEIVED_BYTES_PER_SECONDS);
+ this.taskMetricsCalcContext =
+ new TaskMetricsCalcContext(
+ metricsContext,
+ PluginType.SOURCE,
+ CollectionUtils.isNotEmpty(tablePaths),
+ tablePaths);
flowControlGate = FlowControlGate.create(flowControlStrategy);
}
@@ -129,26 +106,11 @@ public class SeaTunnelSourceCollector<T> implements
Collector<T> {
throw new SeaTunnelEngineException(
"Unsupported row type: " +
rowType.getClass().getName());
}
- sourceReceivedBytes.inc(size);
- sourceReceivedBytesPerSeconds.markEvent(size);
flowControlGate.audit((SeaTunnelRow) row);
- if (StringUtils.isNotEmpty(tableId)) {
- String tableName = TablePath.of(tableId).getFullName();
- Counter sourceTableCounter =
sourceReceivedCountPerTable.get(tableName);
- if (Objects.nonNull(sourceTableCounter)) {
- sourceTableCounter.inc();
- } else {
- Counter counter =
- metricsContext.counter(SOURCE_RECEIVED_COUNT +
"#" + tableName);
- counter.inc();
- sourceReceivedCountPerTable.put(tableName, counter);
- }
- }
+ taskMetricsCalcContext.updateMetrics(row);
}
sendRecordToNext(new Record<>(row));
emptyThisPollNext = false;
- sourceReceivedCount.inc();
- sourceReceivedQPS.markEvent();
} catch (IOException e) {
throw new RuntimeException(e);
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
index de8257f1e9..cacaa75aae 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
@@ -17,8 +17,6 @@
package org.apache.seatunnel.engine.server.task.flow;
-import org.apache.seatunnel.api.common.metrics.Counter;
-import org.apache.seatunnel.api.common.metrics.Meter;
import org.apache.seatunnel.api.common.metrics.MetricsContext;
import org.apache.seatunnel.api.event.EventListener;
import org.apache.seatunnel.api.serialization.Serializer;
@@ -30,13 +28,14 @@ import
org.apache.seatunnel.api.sink.multitablesink.MultiTableSink;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.event.SchemaChangeEvent;
import org.apache.seatunnel.api.table.type.Record;
-import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.engine.core.checkpoint.InternalCheckpointListener;
import org.apache.seatunnel.engine.core.dag.actions.SinkAction;
import org.apache.seatunnel.engine.server.checkpoint.ActionStateKey;
import org.apache.seatunnel.engine.server.checkpoint.ActionSubtaskState;
import org.apache.seatunnel.engine.server.event.JobEventListener;
import org.apache.seatunnel.engine.server.execution.TaskLocation;
+import org.apache.seatunnel.engine.server.metrics.TaskMetricsCalcContext;
import org.apache.seatunnel.engine.server.task.SeaTunnelTask;
import org.apache.seatunnel.engine.server.task.context.SinkWriterContext;
import
org.apache.seatunnel.engine.server.task.operation.GetTaskGroupAddressOperation;
@@ -45,8 +44,6 @@ import
org.apache.seatunnel.engine.server.task.operation.sink.SinkPrepareCommitO
import
org.apache.seatunnel.engine.server.task.operation.sink.SinkRegisterOperation;
import org.apache.seatunnel.engine.server.task.record.Barrier;
-import org.apache.commons.lang3.StringUtils;
-
import com.hazelcast.cluster.Address;
import lombok.extern.slf4j.Slf4j;
@@ -56,18 +53,12 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
-import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
-import static
org.apache.seatunnel.api.common.metrics.MetricNames.SINK_WRITE_BYTES;
-import static
org.apache.seatunnel.api.common.metrics.MetricNames.SINK_WRITE_BYTES_PER_SECONDS;
-import static
org.apache.seatunnel.api.common.metrics.MetricNames.SINK_WRITE_COUNT;
-import static
org.apache.seatunnel.api.common.metrics.MetricNames.SINK_WRITE_QPS;
import static org.apache.seatunnel.engine.common.utils.ExceptionUtil.sneaky;
import static
org.apache.seatunnel.engine.server.task.AbstractTask.serializeStates;
@@ -96,15 +87,7 @@ public class SinkFlowLifeCycle<T, CommitInfoT extends
Serializable, AggregatedCo
private MetricsContext metricsContext;
- private Counter sinkWriteCount;
-
- private Map<String, Counter> sinkWriteCountPerTable = new
ConcurrentHashMap<>();
-
- private Meter sinkWriteQPS;
-
- private Counter sinkWriteBytes;
-
- private Meter sinkWriteBytesPerSeconds;
+ private TaskMetricsCalcContext taskMetricsCalcContext;
private final boolean containAggCommitter;
@@ -129,19 +112,13 @@ public class SinkFlowLifeCycle<T, CommitInfoT extends
Serializable, AggregatedCo
this.containAggCommitter = containAggCommitter;
this.metricsContext = metricsContext;
this.eventListener = new JobEventListener(taskLocation,
runningTask.getExecutionContext());
- sinkWriteCount = metricsContext.counter(SINK_WRITE_COUNT);
- sinkWriteQPS = metricsContext.meter(SINK_WRITE_QPS);
- sinkWriteBytes = metricsContext.counter(SINK_WRITE_BYTES);
- sinkWriteBytesPerSeconds =
metricsContext.meter(SINK_WRITE_BYTES_PER_SECONDS);
- if (sinkAction.getSink() instanceof MultiTableSink) {
- List<TablePath> sinkTables = ((MultiTableSink)
sinkAction.getSink()).getSinkTables();
- sinkTables.forEach(
- tablePath ->
- sinkWriteCountPerTable.put(
- tablePath.getFullName(),
- metricsContext.counter(
- SINK_WRITE_COUNT + "#" +
tablePath.getFullName())));
+ List<TablePath> sinkTables = new ArrayList<>();
+ boolean isMulti = sinkAction.getSink() instanceof MultiTableSink;
+ if (isMulti) {
+ sinkTables = ((MultiTableSink)
sinkAction.getSink()).getSinkTables();
}
+ this.taskMetricsCalcContext =
+ new TaskMetricsCalcContext(metricsContext, PluginType.SINK,
isMulti, sinkTables);
}
@Override
@@ -267,26 +244,7 @@ public class SinkFlowLifeCycle<T, CommitInfoT extends
Serializable, AggregatedCo
return;
}
writer.write((T) record.getData());
- sinkWriteCount.inc();
- sinkWriteQPS.markEvent();
- if (record.getData() instanceof SeaTunnelRow) {
- long size = ((SeaTunnelRow)
record.getData()).getBytesSize();
- sinkWriteBytes.inc(size);
- sinkWriteBytesPerSeconds.markEvent(size);
- String tableId = ((SeaTunnelRow)
record.getData()).getTableId();
- if (StringUtils.isNotBlank(tableId)) {
- String tableName = TablePath.of(tableId).getFullName();
- Counter sinkTableCounter =
sinkWriteCountPerTable.get(tableName);
- if (Objects.nonNull(sinkTableCounter)) {
- sinkTableCounter.inc();
- } else {
- Counter counter =
- metricsContext.counter(SINK_WRITE_COUNT +
"#" + tableName);
- counter.inc();
- sinkWriteCountPerTable.put(tableName, counter);
- }
- }
- }
+ taskMetricsCalcContext.updateMetrics(record.getData());
}
} catch (Exception e) {
throw new RuntimeException(e);