This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 3e54b82fb [feature][engine] add ST-Engine metrics (#3621)
3e54b82fb is described below
commit 3e54b82fb5cffdba517df173700a695e4ac4c89d
Author: ic4y <[email protected]>
AuthorDate: Fri Dec 2 22:31:52 2022 +0800
[feature][engine] add ST-Engine metrics (#3621)
* [feature][engine] add engine metrics
---
LICENSE | 2 +
.../seatunnel/api/common/metrics/JobMetrics.java | 159 +++++++++++++
.../seatunnel/api/common/metrics/Measurement.java | 143 ++++++++++++
.../api/common/metrics/MeasurementPredicates.java | 68 ++++++
.../seatunnel/api/common/metrics/Metric.java | 63 +++++
.../seatunnel/api/common/metrics/MetricNames.java | 33 +++
.../seatunnel/api/common/metrics/MetricTags.java | 44 ++++
.../api/common/metrics/RawJobMetrics.java | 75 ++++++
.../apache/seatunnel/api/common/metrics/Unit.java | 33 +++
.../seatunnel/engine/client/SeaTunnelClient.java | 8 +
.../engine/client/SeaTunnelClientTest.java | 69 ++++--
.../apache/seatunnel/engine/common/Constant.java | 2 +
.../codec/SeaTunnelGetJobMetricsCodec.java | 88 +++++++
.../SeaTunnelEngine.yaml | 20 ++
.../engine/server/CoordinatorService.java | 36 ++-
.../seatunnel/engine/server/SeaTunnelServer.java | 1 +
.../engine/server/TaskExecutionService.java | 81 ++++++-
.../seatunnel/engine/server/execution/Task.java | 13 +-
.../engine/server/master/JobHistoryService.java | 24 +-
.../seatunnel/engine/server/master/JobMaster.java | 47 ++++
.../engine/server/metrics/JobMetricsCollector.java | 80 +++++++
.../engine/server/metrics/JobMetricsUtil.java | 104 +++++++++
.../seatunnel/engine/server/metrics/Metrics.java | 55 +++++
.../engine/server/metrics/MetricsContext.java | 257 +++++++++++++++++++++
.../engine/server/metrics/MetricsImpl.java | 72 ++++++
.../engine/server/operation/AsyncOperation.java | 13 +-
.../operation/CleanTaskGroupContextOperation.java | 41 ++++
.../server/operation/GetJobMetricsOperation.java | 85 +++++++
.../operation/GetTaskGroupMetricsOperation.java | 61 +++++
.../server/protocol/task/GetJobMetricsTask.java | 50 ++++
.../task/SeaTunnelMessageTaskFactoryProvider.java | 3 +
.../serializable/OperationDataSerializerHook.java | 5 +
.../server/task/SeaTunnelSourceCollector.java | 7 +
.../engine/server/task/SeaTunnelTask.java | 20 ++
.../engine/server/task/flow/SinkFlowLifeCycle.java | 6 +
.../engine/server/master/JobMetricsTest.java | 97 ++++++++
.../resources/fake_to_console_job_metrics.conf | 54 +++++
37 files changed, 1982 insertions(+), 37 deletions(-)
diff --git a/LICENSE b/LICENSE
index 6d2243fa1..3c505f2fe 100644
--- a/LICENSE
+++ b/LICENSE
@@ -233,3 +233,5 @@
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engi
seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/checkpoint/CheckpointIDCounter.java
from https://github.com/apache/flink
seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/checkpoint/InternalCheckpointListener.java
from https://github.com/apache/flink
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/StandaloneCheckpointIDCounter.java
from https://github.com/apache/flink
+seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/metrics
from
https://github.com/hazelcast/hazelcast
+seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics
from
https://github.com/hazelcast/hazelcast
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/JobMetrics.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/JobMetrics.java
new file mode 100644
index 000000000..7c52013a6
--- /dev/null
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/JobMetrics.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.common.metrics;
+
+import static java.util.stream.Collectors.groupingBy;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Predicate;
+import java.util.stream.Collector;
+import java.util.stream.Collectors;
+
+public final class JobMetrics implements Serializable {
+
+ private static final JobMetrics EMPTY = new
JobMetrics(Collections.emptyMap());
+
+ private static final Collector<Measurement, ?, Map<String,
List<Measurement>>> COLLECTOR =
+ Collectors.groupingBy(Measurement::metric);
+
+ private Map<String, List<Measurement>> metrics; //metric name -> set of
measurements
+
+ JobMetrics() { //needed for deserialization
+ }
+
+ private JobMetrics(Map<String, List<Measurement>> metrics) {
+ this.metrics = new HashMap<>(metrics);
+ }
+
+ /**
+ * Returns an empty {@link JobMetrics} object.
+ */
+
+ public static JobMetrics empty() {
+ return EMPTY;
+ }
+
+ /**
+ * Builds a {@link JobMetrics} object based on a map of
+ * {@link Measurement}s.
+ */
+
+ public static JobMetrics of(Map<String, List<Measurement>> metrics) {
+ return new JobMetrics(metrics);
+ }
+
+ /**
+ * Returns all metrics present.
+ */
+
+ public Set<String> metrics() {
+ return Collections.unmodifiableSet(metrics.keySet());
+ }
+
+ /**
+ * Returns all {@link Measurement}s associated with a given metric name.
+ * <p>
+ * For a list of job-specific metric names please see {@link MetricNames}.
+ */
+ public List<Measurement> get(String metricName) {
+ Objects.requireNonNull(metricName);
+ List<Measurement> measurements = metrics.get(metricName);
+ return measurements == null ? Collections.emptyList() : measurements;
+ }
+
+ public JobMetrics filter(String tagName, String tagValue) {
+ return filter(MeasurementPredicates.tagValueEquals(tagName, tagValue));
+ }
+
+ public JobMetrics filter(Predicate<Measurement> predicate) {
+ Objects.requireNonNull(predicate, "predicate");
+
+ Map<String, List<Measurement>> filteredMetrics =
+ metrics.values().stream()
+ .flatMap(List::stream)
+ .filter(predicate)
+ .collect(COLLECTOR);
+ return new JobMetrics(filteredMetrics);
+ }
+
+ @Override
+ public int hashCode() {
+ return metrics.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == null || getClass() != obj.getClass()) {
+ return false;
+ }
+
+ if (obj == this) {
+ return true;
+ }
+
+ return Objects.equals(metrics, ((JobMetrics) obj).metrics);
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ metrics.entrySet().stream()
+ .sorted(Comparator.comparing(Entry::getKey))
+ .forEach(mainEntry -> {
+ sb.append(mainEntry.getKey()).append(":\n");
+ mainEntry.getValue().stream()
+ .collect(groupingBy(m -> {
+ String vertex = m.tag(MetricTags.TASK_NAME);
+ return vertex == null ? "" : vertex;
+ }))
+ .entrySet().stream()
+ .sorted(Comparator.comparing(Entry::getKey))
+ .forEach(e -> {
+ String vertexName = e.getKey();
+ sb.append(" ").append(vertexName).append(":\n");
+ e.getValue().forEach(m -> sb.append("
").append(m).append("\n"));
+ });
+ });
+ return sb.toString();
+ }
+
+ public String toJsonString() {
+ ObjectMapper objectMapper = new ObjectMapper();
+ objectMapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS,
false);
+ try {
+ return objectMapper.writeValueAsString(this.metrics);
+ } catch (JsonProcessingException e) {
+ ObjectNode objectNode = objectMapper.createObjectNode();
+ objectNode.put("err", "serialize JobMetrics err");
+ return objectNode.toString();
+ }
+ }
+}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/Measurement.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/Measurement.java
new file mode 100644
index 000000000..6ea645955
--- /dev/null
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/Measurement.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.common.metrics;
+
+import lombok.Data;
+
+import java.io.Serializable;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+/**
+ * Immutable data class containing information about one metric measurement,
+ * consisting of:
+ * <ul>
+ * <li>metric value</li>
+ * <li>metric timestamp, generated when the metric was gathered</li>
+ * <li>metric descriptor (set of tag name-value pairs) </li>
+ * </ul>
+ * <p>
+ * A metrics descriptor can be thought of as a set of attributes associated
+ * with a particular metric, metric which in turn is defined by its name
+ * (for a full list of metric names provided see {@link MetricNames}).
+ * The attributes are specified as tags that have names and values (for a
+ * full list of tag names see {@link MetricTags}). An example
+ * descriptor would have a collection of tags/attributes like this:
+ * {@code job=jobId, pipeline=pipelineId,
+ * unit=count, metric=SourceReceivedCount, ...}
+ *
+ */
+@Data
+public final class Measurement implements Serializable {
+
+ private Map<String, String> tags; //tag name -> tag value
+ private String metric;
+ private Object value;
+ private long timestamp;
+
+ Measurement() {
+ }
+
+ private Measurement(String metric, Object value, long timestamp,
Map<String, String> tags) {
+ this.metric = metric;
+ this.value = value;
+ this.timestamp = timestamp;
+ this.tags = new HashMap<>(tags);
+ }
+
+ /**
+ * Builds a {@link Measurement} instance based on timestamp, value and
+ * the metric descriptor in map form.
+ */
+ public static Measurement of(
+ String metric, Object value, long timestamp, Map<String, String> tags
+ ) {
+ Objects.requireNonNull(tags, "metric");
+ Objects.requireNonNull(tags, "tags");
+ return new Measurement(metric, value, timestamp, tags);
+ }
+
+ /**
+ * Returns the value associated with this {@link Measurement}.
+ */
+ public Object value() {
+ return value;
+ }
+
+ /**
+ * Returns the timestamps associated with this {@link Measurement}, the
+ * moment when the value was gathered.
+ */
+ public long timestamp() {
+ return timestamp;
+ }
+
+ /**
+ * Returns the name of the metric. For a list of different metrics
+ * see {@link MetricNames}.
+ */
+
+ public String metric() {
+ return metric;
+ }
+
+ /**
+ * Returns the value associated with a specific tag, based on the metric
+ * description of this particular {@link Measurement}. For a list of
+ * possible tag names see {@link MetricTags}.
+ */
+
+ public String tag(String name) {
+ return tags.get(name);
+ }
+
+ @Override
+ public int hashCode() {
+ return 31 * (int) (timestamp * 31 + value.hashCode()) +
Objects.hashCode(tags);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ final Measurement that;
+ return this == obj || obj instanceof Measurement
+ && this.timestamp == (that = (Measurement) obj).timestamp
+ && this.value == that.value
+ && Objects.equals(this.tags, that.tags);
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+
+ sb.append(String.format("%s %s", metric, value))
+ .append(" ")
+ .append(timestamp)
+ .append(" [");
+
+ String tags = this.tags.entrySet().stream()
+ .sorted(Comparator.comparing(Map.Entry::getKey))
+ .map(e -> e.getKey() + "=" + e.getValue())
+ .collect(Collectors.joining(", "));
+ sb.append(tags).append(']');
+
+ return sb.toString();
+ }
+}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/MeasurementPredicates.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/MeasurementPredicates.java
new file mode 100644
index 000000000..7f565ba6b
--- /dev/null
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/MeasurementPredicates.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.common.metrics;
+
+import java.util.function.Predicate;
+import java.util.regex.Pattern;
+
+/**
+ * Static utility class for creating various {@link Measurement} filtering
+ * predicates.
+ *
+ */
+public final class MeasurementPredicates {
+
+ private MeasurementPredicates() { }
+
+ /**
+ * Matches a {@link Measurement} which contain the specified tag.
+ *
+ * @param tag the tag of interest
+ * @return a filtering predicate
+ */
+ public static Predicate<Measurement> containsTag(String tag) {
+ return measurement -> measurement.tag(tag) != null;
+ }
+
+ /**
+ * Matches a {@link Measurement} which contains the specified tag and
+ * the tag has the specified value.
+ *
+ * @param tag the tag to match
+ * @param value the value the tag has to have
+ * @return a filtering predicate
+ */
+ public static Predicate<Measurement> tagValueEquals(String tag, String
value) {
+ return measurement -> value.equals(measurement.tag(tag));
+ }
+
+ /**
+ * Matches a {@link Measurement} which has this exact tag with a value
+ * matching the provided regular expression.
+ *
+ * @param tag the tag to match
+ * @param valueRegexp regular expression to match the value against
+ * @return a filtering predicate
+ */
+ public static Predicate<Measurement> tagValueMatches(String tag, String
valueRegexp) {
+ return measurement -> {
+ String value = measurement.tag(tag);
+ return value != null &&
Pattern.compile(valueRegexp).matcher(value).matches();
+ };
+ }
+}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/Metric.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/Metric.java
new file mode 100644
index 000000000..3e8c3e7c4
--- /dev/null
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/Metric.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.common.metrics;
+
+public interface Metric {
+
+ /**
+ * Returns the name of the associated metric.
+ */
+
+ String name();
+
+ /**
+ * Return the measurement unit for the associated metric. Meant
+ * to provide further information on the type of value measured
+ * by the user-defined metric. Doesn't affect the functionality of the
+ * metric, it still remains a simple numeric value, but is used to
+ * populate the {@link MetricTags#UNIT} tag in the metric's description.
+ */
+
+ Unit unit();
+
+ /**
+ * Increments the current value by 1.
+ */
+ void increment();
+
+ /**
+ * Increments the current value by the specified amount.
+ */
+ void increment(long amount);
+
+ /**
+ * Decrements the current value by 1.
+ */
+ void decrement();
+
+ /**
+ * Decrements the current value by the specified amount.
+ */
+ void decrement(long amount);
+
+ /**
+ * Sets the current value.
+ */
+ void set(long newValue);
+
+}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/MetricNames.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/MetricNames.java
new file mode 100644
index 000000000..a3511d92b
--- /dev/null
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/MetricNames.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.common.metrics;
+
+public final class MetricNames {
+
+ private MetricNames() {}
+
+ public static final String RECEIVED_COUNT = "receivedCount";
+
+ public static final String RECEIVED_BATCHES = "receivedBatches";
+
+ public static final String SOURCE_RECEIVED_COUNT = "SourceReceivedCount";
+
+ public static final String SOURCE_RECEIVED_QPS = "SourceReceivedQPS";
+ public static final String SINK_WRITE_COUNT = "SinkWriteCount";
+ public static final String SINK_WRITE_QPS = "SinkWriteQPS";
+}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/MetricTags.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/MetricTags.java
new file mode 100644
index 000000000..8a3207a33
--- /dev/null
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/MetricTags.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.common.metrics;
+
+public final class MetricTags {
+
+ private MetricTags() {
+ }
+
+ public static final String MEMBER = "member";
+
+ public static final String ADDRESS = "address";
+
+ public static final String JOB_ID = "jobId";
+
+ public static final String PIPELINE_ID = "pipelineId";
+
+ public static final String TASK_GROUP_ID = "taskGroupId";
+
+ public static final String TASK_ID = "taskID";
+
+ public static final String UNIT = "unit";
+
+ public static final String TASK_NAME = "taskName";
+
+ public static final String SERVICE = "service";
+
+ public static final String TASK_GROUP_LOCATION = "taskGroupLocation";
+}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/RawJobMetrics.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/RawJobMetrics.java
new file mode 100644
index 000000000..ddbdd89fa
--- /dev/null
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/RawJobMetrics.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.common.metrics;
+
+import java.util.Arrays;
+
+public final class RawJobMetrics {
+
+ private long timestamp;
+ private byte[] blob;
+
+ RawJobMetrics() {
+ }
+
+ private RawJobMetrics(long timestamp, byte[] blob) {
+ this.timestamp = timestamp;
+ this.blob = blob;
+ }
+
+ public static RawJobMetrics empty() {
+ return of(null);
+ }
+
+ public static RawJobMetrics of(byte[] blob) {
+ return new RawJobMetrics(System.currentTimeMillis(), blob);
+ }
+
+ public long getTimestamp() {
+ return timestamp;
+ }
+
+ public byte[] getBlob() {
+ return blob;
+ }
+
+ @Override
+ public int hashCode() {
+ return (int) timestamp * 31 + Arrays.hashCode(blob);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == null || getClass() != obj.getClass()) {
+ return false;
+ }
+
+ if (obj == this) {
+ return true;
+ }
+
+ RawJobMetrics that;
+ return Arrays.equals(blob, (that = (RawJobMetrics) obj).blob)
+ && this.timestamp == that.timestamp;
+ }
+
+ @Override
+ public String toString() {
+ return Arrays.toString(blob) + " @ " + timestamp;
+ }
+}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/Unit.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/Unit.java
new file mode 100644
index 000000000..70217741b
--- /dev/null
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/Unit.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.common.metrics;
+
+public enum Unit {
+ /** Size, counter, represented in bytes */
+ BYTES,
+ /** Timestamp or duration represented in ms */
+ MS,
+ /** An integer in range 0..100 */
+ PERCENT,
+ /** Number of items: size, counter... */
+ COUNT,
+ /** 0 or 1 */
+ BOOLEAN,
+ /** 0..n, ordinal of an enum */
+ ENUM,
+}
diff --git
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java
index 07c7de46d..9f8a9420b 100644
---
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java
+++
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java
@@ -20,6 +20,7 @@ package org.apache.seatunnel.engine.client;
import org.apache.seatunnel.engine.client.job.JobClient;
import org.apache.seatunnel.engine.client.job.JobExecutionEnvironment;
import org.apache.seatunnel.engine.common.config.JobConfig;
+import
org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelGetJobMetricsCodec;
import
org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelGetJobStateCodec;
import
org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelListJobStatusCodec;
import
org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelPrintMessageCodec;
@@ -80,4 +81,11 @@ public class SeaTunnelClient implements
SeaTunnelClientInstance {
SeaTunnelListJobStatusCodec::decodeResponse
);
}
+
+ public String getJobMetrics(Long jobId) {
+ return hazelcastClient.requestOnMasterAndDecodeResponse(
+ SeaTunnelGetJobMetricsCodec.encodeRequest(jobId),
+ SeaTunnelGetJobMetricsCodec::decodeResponse
+ );
+ }
}
diff --git
a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
index f8c118cfe..9be38e537 100644
---
a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
+++
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
@@ -17,6 +17,10 @@
package org.apache.seatunnel.engine.client;
+import static
org.apache.seatunnel.api.common.metrics.MetricNames.SINK_WRITE_COUNT;
+import static
org.apache.seatunnel.api.common.metrics.MetricNames.SINK_WRITE_QPS;
+import static
org.apache.seatunnel.api.common.metrics.MetricNames.SOURCE_RECEIVED_COUNT;
+import static
org.apache.seatunnel.api.common.metrics.MetricNames.SOURCE_RECEIVED_QPS;
import static org.awaitility.Awaitility.await;
import org.apache.seatunnel.common.config.Common;
@@ -33,8 +37,10 @@ import com.hazelcast.client.config.ClientConfig;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.instance.impl.HazelcastInstanceFactory;
import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.DisabledOnOs;
import org.junit.jupiter.api.condition.OS;
@@ -49,6 +55,7 @@ public class SeaTunnelClientTest {
private static SeaTunnelConfig SEATUNNEL_CONFIG =
ConfigProvider.locateAndGetSeaTunnelConfig();
private static HazelcastInstance INSTANCE;
+ private static SeaTunnelClient CLIENT;
@BeforeAll
public static void beforeClass() throws Exception {
@@ -58,14 +65,17 @@ public class SeaTunnelClientTest {
new
SeaTunnelNodeContext(ConfigProvider.locateAndGetSeaTunnelConfig()));
}
- @Test
- public void testSayHello() {
+ @BeforeEach
+ void setUp() {
ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
clientConfig.setClusterName(TestUtils.getClusterName("SeaTunnelClientTest"));
- SeaTunnelClient engineClient = new SeaTunnelClient(clientConfig);
+ CLIENT = new SeaTunnelClient(clientConfig);
+ }
+ @Test
+ public void testSayHello() {
String msg = "Hello world";
- String s = engineClient.printMessageToMaster(msg);
+ String s = CLIENT.printMessageToMaster(msg);
Assertions.assertEquals(msg, s);
}
@@ -76,10 +86,7 @@ public class SeaTunnelClientTest {
JobConfig jobConfig = new JobConfig();
jobConfig.setName("fake_to_file");
- ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
-
clientConfig.setClusterName(TestUtils.getClusterName("SeaTunnelClientTest"));
- SeaTunnelClient engineClient = new SeaTunnelClient(clientConfig);
- JobExecutionEnvironment jobExecutionEnv =
engineClient.createExecutionContext(filePath, jobConfig);
+ JobExecutionEnvironment jobExecutionEnv =
CLIENT.createExecutionContext(filePath, jobConfig);
try {
final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
@@ -103,10 +110,7 @@ public class SeaTunnelClientTest {
JobConfig jobConfig = new JobConfig();
jobConfig.setName("fake_to_console");
- ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
-
clientConfig.setClusterName(TestUtils.getClusterName("SeaTunnelClientTest"));
- SeaTunnelClient engineClient = new SeaTunnelClient(clientConfig);
- JobExecutionEnvironment jobExecutionEnv =
engineClient.createExecutionContext(filePath, jobConfig);
+ JobExecutionEnvironment jobExecutionEnv =
CLIENT.createExecutionContext(filePath, jobConfig);
try {
final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
@@ -117,17 +121,54 @@ public class SeaTunnelClientTest {
await().atMost(30000, TimeUnit.MILLISECONDS)
.untilAsserted(() -> Assertions.assertTrue(
- engineClient.getJobState(jobId).contains("RUNNING") &&
engineClient.listJobStatus().contains("RUNNING")));
+ CLIENT.getJobState(jobId).contains("RUNNING") &&
CLIENT.listJobStatus().contains("RUNNING")));
await().atMost(30000, TimeUnit.MILLISECONDS)
.untilAsserted(() -> Assertions.assertTrue(
- engineClient.getJobState(jobId).contains("FINISHED") &&
engineClient.listJobStatus().contains("FINISHED")));
+ CLIENT.getJobState(jobId).contains("FINISHED") &&
CLIENT.listJobStatus().contains("FINISHED")));
} catch (ExecutionException | InterruptedException e) {
throw new RuntimeException(e);
}
}
+ @Test
+ public void testGetJobMetrics() {
+ Common.setDeployMode(DeployMode.CLIENT);
+ String filePath = TestUtils.getResource("/client_test.conf");
+ JobConfig jobConfig = new JobConfig();
+ jobConfig.setName("fake_to_console");
+
+ JobExecutionEnvironment jobExecutionEnv =
CLIENT.createExecutionContext(filePath, jobConfig);
+
+ try {
+ final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
+ CompletableFuture<JobStatus> objectCompletableFuture =
CompletableFuture.supplyAsync(() -> {
+ return clientJobProxy.waitForJobComplete();
+ });
+ long jobId = clientJobProxy.getJobId();
+
+ await().atMost(30000, TimeUnit.MILLISECONDS)
+ .untilAsserted(() -> Assertions.assertTrue(
+ CLIENT.getJobState(jobId).contains("FINISHED") &&
CLIENT.listJobStatus().contains("FINISHED")));
+
+ String jobMetrics = CLIENT.getJobMetrics(jobId);
+
+ Assertions.assertTrue(jobMetrics.contains(SOURCE_RECEIVED_COUNT));
+ Assertions.assertTrue(jobMetrics.contains(SOURCE_RECEIVED_QPS));
+ Assertions.assertTrue(jobMetrics.contains(SINK_WRITE_COUNT));
+ Assertions.assertTrue(jobMetrics.contains(SINK_WRITE_QPS));
+
+ } catch (ExecutionException | InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @AfterEach
+ void tearDown() {
+ CLIENT.close();
+ }
+
@AfterAll
public static void after() {
INSTANCE.shutdown();
diff --git
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/Constant.java
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/Constant.java
index a7acbe876..5d00c5d0b 100644
---
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/Constant.java
+++
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/Constant.java
@@ -46,6 +46,8 @@ public class Constant {
public static final String IMAP_FINISHED_JOB_STATE = "finishedJobState";
+ public static final String IMAP_FINISHED_JOB_METRICS =
"finishedJobMetrics";
+
public static final String IMAP_STATE_TIMESTAMPS = "stateTimestamps";
public static final String IMAP_OWNED_SLOT_PROFILES =
"ownedSlotProfilesIMap";
diff --git
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/protocol/codec/SeaTunnelGetJobMetricsCodec.java
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/protocol/codec/SeaTunnelGetJobMetricsCodec.java
new file mode 100644
index 000000000..201d0f343
--- /dev/null
+++
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/protocol/codec/SeaTunnelGetJobMetricsCodec.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.seatunnel.engine.core.protocol.codec;
+
+import com.hazelcast.client.impl.protocol.ClientMessage;
+import com.hazelcast.client.impl.protocol.Generated;
+import com.hazelcast.client.impl.protocol.codec.builtin.*;
+
+import static com.hazelcast.client.impl.protocol.ClientMessage.*;
+import static
com.hazelcast.client.impl.protocol.codec.builtin.FixedSizeTypesCodec.*;
+
+/*
+ * This file is auto-generated by the Hazelcast Client Protocol Code Generator.
+ * To change this file, edit the templates or the protocol
+ * definitions on the https://github.com/hazelcast/hazelcast-client-protocol
+ * and regenerate it.
+ */
+
+/**
+ */
+@Generated("41fec4e1cc038a9e9be1823f1d0955ef")
+public final class SeaTunnelGetJobMetricsCodec {
+ //hex: 0xDE0800
+ public static final int REQUEST_MESSAGE_TYPE = 14551040;
+ //hex: 0xDE0801
+ public static final int RESPONSE_MESSAGE_TYPE = 14551041;
+ private static final int REQUEST_JOB_ID_FIELD_OFFSET =
PARTITION_ID_FIELD_OFFSET + INT_SIZE_IN_BYTES;
+ private static final int REQUEST_INITIAL_FRAME_SIZE =
REQUEST_JOB_ID_FIELD_OFFSET + LONG_SIZE_IN_BYTES;
+ private static final int RESPONSE_INITIAL_FRAME_SIZE =
RESPONSE_BACKUP_ACKS_FIELD_OFFSET + BYTE_SIZE_IN_BYTES;
+
+ private SeaTunnelGetJobMetricsCodec() {
+ }
+
+ public static ClientMessage encodeRequest(long jobId) {
+ ClientMessage clientMessage = ClientMessage.createForEncode();
+ clientMessage.setRetryable(true);
+ clientMessage.setOperationName("SeaTunnel.GetJobMetrics");
+ ClientMessage.Frame initialFrame = new ClientMessage.Frame(new
byte[REQUEST_INITIAL_FRAME_SIZE], UNFRAGMENTED_MESSAGE);
+ encodeInt(initialFrame.content, TYPE_FIELD_OFFSET,
REQUEST_MESSAGE_TYPE);
+ encodeInt(initialFrame.content, PARTITION_ID_FIELD_OFFSET, -1);
+ encodeLong(initialFrame.content, REQUEST_JOB_ID_FIELD_OFFSET, jobId);
+ clientMessage.add(initialFrame);
+ return clientMessage;
+ }
+
+ /**
+ */
+ public static long decodeRequest(ClientMessage clientMessage) {
+ ClientMessage.ForwardFrameIterator iterator =
clientMessage.frameIterator();
+ ClientMessage.Frame initialFrame = iterator.next();
+ return decodeLong(initialFrame.content, REQUEST_JOB_ID_FIELD_OFFSET);
+ }
+
+ public static ClientMessage encodeResponse(java.lang.String response) {
+ ClientMessage clientMessage = ClientMessage.createForEncode();
+ ClientMessage.Frame initialFrame = new ClientMessage.Frame(new
byte[RESPONSE_INITIAL_FRAME_SIZE], UNFRAGMENTED_MESSAGE);
+ encodeInt(initialFrame.content, TYPE_FIELD_OFFSET,
RESPONSE_MESSAGE_TYPE);
+ clientMessage.add(initialFrame);
+
+ StringCodec.encode(clientMessage, response);
+ return clientMessage;
+ }
+
+ /**
+ */
+ public static java.lang.String decodeResponse(ClientMessage clientMessage)
{
+ ClientMessage.ForwardFrameIterator iterator =
clientMessage.frameIterator();
+ //empty initial frame
+ iterator.next();
+ return StringCodec.decode(iterator);
+ }
+}
diff --git
a/seatunnel-engine/seatunnel-engine-core/src/main/resources/client-protocol-definition/SeaTunnelEngine.yaml
b/seatunnel-engine/seatunnel-engine-core/src/main/resources/client-protocol-definition/SeaTunnelEngine.yaml
index 6d599a747..ec40e9c92 100644
---
a/seatunnel-engine/seatunnel-engine-core/src/main/resources/client-protocol-definition/SeaTunnelEngine.yaml
+++
b/seatunnel-engine/seatunnel-engine-core/src/main/resources/client-protocol-definition/SeaTunnelEngine.yaml
@@ -154,3 +154,23 @@ methods:
since: 2.0
doc: ''
+ - id: 8
+ name: getJobMetrics
+ since: 2.0
+ doc: ''
+ request:
+ retryable: true
+ partitionIdentifier: -1
+ params:
+ - name: jobId
+ type: long
+ nullable: false
+ since: 2.0
+ doc: ''
+ response:
+ params:
+ - name: response
+ type: String
+ nullable: false
+ since: 2.0
+ doc: ''
\ No newline at end of file
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
index c8203ccc5..2128ece81 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
@@ -17,6 +17,7 @@
package org.apache.seatunnel.engine.server;
+import org.apache.seatunnel.api.common.metrics.JobMetrics;
import org.apache.seatunnel.common.utils.ExceptionUtils;
import org.apache.seatunnel.engine.common.Constant;
import org.apache.seatunnel.engine.common.config.EngineConfig;
@@ -34,6 +35,7 @@ import
org.apache.seatunnel.engine.server.execution.TaskExecutionState;
import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
import org.apache.seatunnel.engine.server.master.JobHistoryService;
import org.apache.seatunnel.engine.server.master.JobMaster;
+import org.apache.seatunnel.engine.server.metrics.JobMetricsUtil;
import org.apache.seatunnel.engine.server.resourcemanager.ResourceManager;
import
org.apache.seatunnel.engine.server.resourcemanager.ResourceManagerFactory;
import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
@@ -169,7 +171,8 @@ public class CoordinatorService {
runningJobStateIMap,
logger,
runningJobMasterMap,
-
nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_FINISHED_JOB_STATE)
+
nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_FINISHED_JOB_STATE),
+
nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_FINISHED_JOB_METRICS)
);
List<CompletableFuture<Void>> collect =
runningJobInfoIMap.entrySet().stream().map(entry -> {
@@ -241,8 +244,7 @@ public class CoordinatorService {
jobMaster.run();
} finally {
// storage job state info to HistoryStorage
- removeJobIMap(jobMaster);
- runningJobMasterMap.remove(jobId);
+ onJobDone(jobMaster, jobId);
}
});
return;
@@ -256,10 +258,7 @@ public class CoordinatorService {
jobMaster.getPhysicalPlan().getPipelineList().forEach(SubPlan::restorePipelineState);
jobMaster.run();
} finally {
- // storage job state info to HistoryStorage
- jobHistoryService.storeFinishedJobState(jobMaster);
- removeJobIMap(jobMaster);
- runningJobMasterMap.remove(jobId);
+ onJobDone(jobMaster, jobId);
}
});
return;
@@ -346,15 +345,20 @@ public class CoordinatorService {
try {
jobMaster.run();
} finally {
- // storage job state info to HistoryStorage
- jobHistoryService.storeFinishedJobState(jobMaster);
- removeJobIMap(jobMaster);
- runningJobMasterMap.remove(jobId);
+ onJobDone(jobMaster, jobId);
}
});
return new PassiveCompletableFuture<>(voidCompletableFuture);
}
+ private void onJobDone(JobMaster jobMaster, long jobId){
+ // storage job state and metrics to HistoryStorage
+ jobHistoryService.storeFinishedJobState(jobMaster);
+ jobHistoryService.storeFinishedJobMetrics(jobMaster);
+ removeJobIMap(jobMaster);
+ runningJobMasterMap.remove(jobId);
+ }
+
private void removeJobIMap(JobMaster jobMaster) {
Long jobId = jobMaster.getJobImmutableInformation().getJobId();
runningJobStateTimestampsIMap.remove(jobId);
@@ -412,6 +416,16 @@ public class CoordinatorService {
return runningJobMaster.getJobStatus();
}
+ public JobMetrics getJobMetrics(long jobId){
+ JobMaster runningJobMaster = runningJobMasterMap.get(jobId);
+ if (runningJobMaster == null) {
+ return jobHistoryService.getJobMetrics(jobId);
+ }
+ JobMetrics jobMetrics =
JobMetricsUtil.toJobMetrics(runningJobMaster.getCurrJobMetrics());
+ JobMetrics jobMetricsImap = jobHistoryService.getJobMetrics(jobId);
+ return jobMetricsImap != null ? jobMetricsImap : jobMetrics;
+ }
+
/**
* When TaskGroup ends, it is called by {@link TaskExecutionService} to
notify JobMaster the TaskGroup's state.
*/
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
index 231869318..bb55db983 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
@@ -92,6 +92,7 @@ public class SeaTunnelServer implements ManagedService,
MembershipAwareService,
taskExecutionService = new TaskExecutionService(
nodeEngine, nodeEngine.getProperties()
);
+
nodeEngine.getMetricsRegistry().registerDynamicMetricsProvider(taskExecutionService);
taskExecutionService.start();
getSlotService();
coordinatorService = new CoordinatorService(nodeEngine, this,
seaTunnelConfig.getEngineConfig());
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
index cf0867160..ad6a592e4 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
@@ -17,13 +17,20 @@
package org.apache.seatunnel.engine.server;
+import static org.apache.seatunnel.api.common.metrics.MetricTags.JOB_ID;
+import static org.apache.seatunnel.api.common.metrics.MetricTags.PIPELINE_ID;
+import static org.apache.seatunnel.api.common.metrics.MetricTags.TASK_GROUP_ID;
+import static
org.apache.seatunnel.api.common.metrics.MetricTags.TASK_GROUP_LOCATION;
+import static org.apache.seatunnel.api.common.metrics.MetricTags.TASK_ID;
import static com.hazelcast.jet.impl.util.ExceptionUtil.withTryCatch;
import static com.hazelcast.jet.impl.util.Util.uncheckRun;
+import static java.lang.Thread.currentThread;
import static java.util.Collections.emptyList;
import static java.util.concurrent.Executors.newCachedThreadPool;
import static java.util.stream.Collectors.partitioningBy;
import static java.util.stream.Collectors.toList;
+import org.apache.seatunnel.api.common.metrics.MetricTags;
import org.apache.seatunnel.common.utils.ExceptionUtils;
import
org.apache.seatunnel.engine.common.loader.SeatunnelChildFirstClassLoader;
import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
@@ -38,10 +45,15 @@ import
org.apache.seatunnel.engine.server.execution.TaskGroupContext;
import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
import org.apache.seatunnel.engine.server.execution.TaskLocation;
import org.apache.seatunnel.engine.server.execution.TaskTracker;
+import org.apache.seatunnel.engine.server.metrics.MetricsImpl;
import org.apache.seatunnel.engine.server.task.TaskGroupImmutableInformation;
import
org.apache.seatunnel.engine.server.task.operation.NotifyTaskStatusOperation;
import com.google.common.collect.Lists;
+import com.hazelcast.internal.metrics.DynamicMetricsProvider;
+import com.hazelcast.internal.metrics.MetricDescriptor;
+import com.hazelcast.internal.metrics.MetricsCollectionContext;
+import com.hazelcast.internal.metrics.MetricsRegistry;
import com.hazelcast.internal.serialization.Data;
import com.hazelcast.jet.impl.execution.init.CustomClassLoadedObject;
import com.hazelcast.logging.ILogger;
@@ -54,6 +66,7 @@ import org.apache.commons.collections4.CollectionUtils;
import java.net.URL;
import java.util.Collection;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -69,11 +82,12 @@ import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Stream;
/**
* This class is responsible for the execution of the Task
*/
-public class TaskExecutionService {
+public class TaskExecutionService implements DynamicMetricsProvider {
private final String hzInstanceName;
private final NodeEngineImpl nodeEngine;
@@ -84,6 +98,7 @@ public class TaskExecutionService {
private final RunBusWorkSupplier runBusWorkSupplier = new
RunBusWorkSupplier(executorService, threadShareTaskQueue);
// key: TaskID
private final ConcurrentMap<TaskGroupLocation, TaskGroupContext>
executionContexts = new ConcurrentHashMap<>();
+ private final ConcurrentMap<TaskGroupLocation, TaskGroupContext>
finishedExecutionContexts = new ConcurrentHashMap<>();
private final ConcurrentMap<TaskGroupLocation, CompletableFuture<Void>>
cancellationFutures =
new ConcurrentHashMap<>();
@@ -91,6 +106,11 @@ public class TaskExecutionService {
this.hzInstanceName = nodeEngine.getHazelcastInstance().getName();
this.nodeEngine = nodeEngine;
this.logger =
nodeEngine.getLoggingService().getLogger(TaskExecutionService.class);
+
+ MetricsRegistry registry = nodeEngine.getMetricsRegistry();
+ MetricDescriptor descriptor = registry.newMetricDescriptor()
+ .withTag(MetricTags.SERVICE, this.getClass().getSimpleName());
+ registry.registerStaticMetrics(descriptor, this);
}
public void start() {
@@ -107,9 +127,23 @@ public class TaskExecutionService {
}
private void submitThreadShareTask(TaskGroupExecutionTracker
taskGroupExecutionTracker, List<Task> tasks) {
- tasks.stream()
- .map(t -> new TaskTracker(t, taskGroupExecutionTracker))
- .forEach(threadShareTaskQueue::add);
+ Stream<TaskTracker> taskTrackerStream = tasks.stream()
+ .map(t -> {
+ if
(!taskGroupExecutionTracker.executionCompletedExceptionally()) {
+ try {
+ TaskTracker taskTracker = new TaskTracker(t,
taskGroupExecutionTracker);
+ taskTracker.task.init();
+ return taskTracker;
+ } catch (Exception e) {
+ taskGroupExecutionTracker.exception(e);
+ taskGroupExecutionTracker.taskDone();
+ }
+ }
+ return null;
+ });
+ if (!taskGroupExecutionTracker.executionCompletedExceptionally()) {
+ taskTrackerStream.forEach(threadShareTaskQueue::add);
+ }
}
private void submitBlockingTask(TaskGroupExecutionTracker
taskGroupExecutionTracker, List<Task> tasks) {
@@ -261,6 +295,34 @@ public class TaskExecutionService {
}
+ public void notifyCleanTaskGroupContext(TaskGroupLocation
taskGroupLocation){
+ finishedExecutionContexts.remove(taskGroupLocation);
+ }
+
+ @Override
+ public void provideDynamicMetrics(MetricDescriptor descriptor,
MetricsCollectionContext context) {
+ try {
+ MetricDescriptor copy1 =
descriptor.copy().withTag(MetricTags.SERVICE, this.getClass().getSimpleName());
+ Map<TaskGroupLocation, TaskGroupContext> contextMap = new
HashMap<>();
+ contextMap.putAll(executionContexts);
+ contextMap.putAll(finishedExecutionContexts);
+ contextMap.forEach((taskGroupLocation, taskGroupContext) -> {
+ MetricDescriptor copy2 =
copy1.copy().withTag(TASK_GROUP_LOCATION, taskGroupLocation.toString())
+ .withTag(JOB_ID,
String.valueOf(taskGroupLocation.getJobId()))
+ .withTag(PIPELINE_ID,
String.valueOf(taskGroupLocation.getPipelineId()))
+ .withTag(TASK_GROUP_ID,
String.valueOf(taskGroupLocation.getTaskGroupId()));
+ taskGroupContext.getTaskGroup().getTasks().forEach(task -> {
+ Long taskID = task.getTaskID();
+ MetricDescriptor copy3 = copy2.copy().withTag(TASK_ID,
String.valueOf(taskID));
+ task.provideDynamicMetrics(copy3, context);
+ });
+ });
+ } catch (Throwable t) {
+ logger.warning("Dynamic metric collection failed", t);
+ throw t;
+ }
+ }
+
private final class BlockingWorker implements Runnable {
private final TaskTracker tracker;
@@ -278,9 +340,11 @@ public class TaskExecutionService {
ClassLoader oldClassLoader =
Thread.currentThread().getContextClassLoader();
Thread.currentThread().setContextClassLoader(classLoader);
final Task t = tracker.task;
+ MetricsImpl.Container userMetricsContextContainer =
MetricsImpl.container();
try {
startedLatch.countDown();
t.init();
+ userMetricsContextContainer.setContext(t.getMetricsContext());
ProgressState result;
do {
result = t.call();
@@ -291,6 +355,7 @@ public class TaskExecutionService {
taskGroupExecutionTracker.exception(e);
} finally {
taskGroupExecutionTracker.taskDone();
+ userMetricsContextContainer.setContext(null);
}
Thread.currentThread().setContextClassLoader(oldClassLoader);
}
@@ -315,6 +380,8 @@ public class TaskExecutionService {
AtomicBoolean keep = new AtomicBoolean(true);
public AtomicReference<TaskTracker> exclusiveTaskTracker = new
AtomicReference<>();
final TaskCallTimer timer;
+ private Thread myThread;
+ private MetricsImpl.Container userMetricsContextContainer;
public LinkedBlockingDeque<TaskTracker> taskqueue;
@SuppressWarnings("checkstyle:MagicNumber")
@@ -328,6 +395,8 @@ public class TaskExecutionService {
@SneakyThrows
@Override
public void run() {
+ myThread = currentThread();
+ userMetricsContextContainer = MetricsImpl.container();
while (keep.get() && isRunning) {
TaskTracker taskTracker = null != exclusiveTaskTracker.get() ?
exclusiveTaskTracker.get() :
@@ -350,6 +419,8 @@ public class TaskExecutionService {
ProgressState call = null;
try {
//run task
+
myThread.setContextClassLoader(executionContexts.get(taskGroupExecutionTracker.taskGroup.getTaskGroupLocation()).getClassLoader());
+
userMetricsContextContainer.setContext(taskTracker.task.getMetricsContext());
call = taskTracker.task.call();
synchronized (timer) {
timer.timerStop();
@@ -366,6 +437,7 @@ public class TaskExecutionService {
} finally {
//stop timer
timer.timerStop();
+ userMetricsContextContainer.setContext(null);
}
//task call finished
if (null != call) {
@@ -449,6 +521,7 @@ public class TaskExecutionService {
logger.info("taskDone: " + taskGroup.getTaskGroupLocation());
if (completionLatch.decrementAndGet() == 0) {
TaskGroupLocation taskGroupLocation =
taskGroup.getTaskGroupLocation();
+ finishedExecutionContexts.put(taskGroupLocation,
executionContexts.get(taskGroupLocation));
executionContexts.remove(taskGroupLocation);
cancellationFutures.remove(taskGroupLocation);
Throwable ex = executionException.get();
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/Task.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/Task.java
index eea42c9ee..88d76b016 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/Task.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/Task.java
@@ -20,15 +20,19 @@ package org.apache.seatunnel.engine.server.execution;
import org.apache.seatunnel.engine.core.checkpoint.InternalCheckpointListener;
import org.apache.seatunnel.engine.server.checkpoint.ActionSubtaskState;
import org.apache.seatunnel.engine.server.checkpoint.Stateful;
+import org.apache.seatunnel.engine.server.metrics.MetricsContext;
import org.apache.seatunnel.engine.server.task.record.Barrier;
+import com.hazelcast.internal.metrics.DynamicMetricsProvider;
+import com.hazelcast.internal.metrics.MetricDescriptor;
+import com.hazelcast.internal.metrics.MetricsCollectionContext;
import lombok.NonNull;
import java.io.IOException;
import java.io.Serializable;
import java.util.List;
-public interface Task extends InternalCheckpointListener, Stateful,
Serializable {
+public interface Task extends DynamicMetricsProvider,
InternalCheckpointListener, Stateful, Serializable {
default void init() throws Exception {
}
@@ -53,4 +57,11 @@ public interface Task extends InternalCheckpointListener,
Stateful, Serializable
@Override
default void restoreState(List<ActionSubtaskState> actionStateList) throws
Exception {}
+
+ default MetricsContext getMetricsContext() {
+ return null;
+ }
+
+ default void provideDynamicMetrics(MetricDescriptor tagger,
MetricsCollectionContext context) {
+ }
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobHistoryService.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobHistoryService.java
index 4d309be38..fffb3f369 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobHistoryService.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobHistoryService.java
@@ -17,11 +17,14 @@
package org.apache.seatunnel.engine.server.master;
+import org.apache.seatunnel.api.common.metrics.JobMetrics;
+import org.apache.seatunnel.api.common.metrics.RawJobMetrics;
import org.apache.seatunnel.engine.core.job.JobStatus;
import org.apache.seatunnel.engine.core.job.PipelineStatus;
import org.apache.seatunnel.engine.server.dag.physical.PipelineLocation;
import org.apache.seatunnel.engine.server.execution.ExecutionState;
import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
+import org.apache.seatunnel.engine.server.metrics.JobMetricsUtil;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
@@ -36,6 +39,7 @@ import lombok.Data;
import java.io.Serializable;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
@@ -67,18 +71,22 @@ public class JobHistoryService {
//TODO need to limit the amount of storage
private final IMap<Long, JobStateData> finishedJobStateImap;
+ private final IMap<Long, JobMetrics> finishedJobMetricsImap;
+
private final ObjectMapper objectMapper;
public JobHistoryService(
IMap<Object, Object> runningJobStateIMap,
ILogger logger,
Map<Long, JobMaster> runningJobMasterMap,
- IMap<Long, JobStateData> finishedJobStateImap
+ IMap<Long, JobStateData> finishedJobStateImap,
+ IMap<Long, JobMetrics> finishedJobMetricsImap
) {
this.runningJobStateIMap = runningJobStateIMap;
this.logger = logger;
this.runningJobMasterMap = runningJobMasterMap;
this.finishedJobStateImap = finishedJobStateImap;
+ this.finishedJobMetricsImap = finishedJobMetricsImap;
this.objectMapper = new ObjectMapper();
this.objectMapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS,
false);
}
@@ -104,6 +112,10 @@ public class JobHistoryService {
finishedJobStateImap.getOrDefault(jobId, null);
}
+ public JobMetrics getJobMetrics(Long jobId){
+ return finishedJobMetricsImap.getOrDefault(jobId, null);
+ }
+
// Get detailed status of a single job as json
public String getJobStatusAsString(Long jobId) {
JobStateData jobStatus = getJobStatus(jobId);
@@ -128,6 +140,16 @@ public class JobHistoryService {
finishedJobStateImap.put(jobStateData.jobId, jobStateData, 14,
TimeUnit.DAYS);
}
+ @SuppressWarnings("checkstyle:MagicNumber")
+ public void storeFinishedJobMetrics(JobMaster jobMaster) {
+ List<RawJobMetrics> currJobMetrics = jobMaster.getCurrJobMetrics();
+ JobMetrics jobMetrics = JobMetricsUtil.toJobMetrics(currJobMetrics);
+ Long jobId = jobMaster.getJobImmutableInformation().getJobId();
+ finishedJobMetricsImap.put(jobId, jobMetrics, 14, TimeUnit.DAYS);
+ //Clean TaskGroupContext for TaskExecutionServer
+ jobMaster.cleanTaskGroupContext();
+ }
+
private JobStateData toJobStateMapper(JobMaster jobMaster) {
Long jobId = jobMaster.getJobImmutableInformation().getJobId();
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
index 3a2a8ca48..701a550d9 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
@@ -19,9 +19,11 @@ package org.apache.seatunnel.engine.server.master;
import static com.hazelcast.jet.impl.util.ExceptionUtil.withTryCatch;
+import org.apache.seatunnel.api.common.metrics.RawJobMetrics;
import org.apache.seatunnel.api.env.EnvCommonOptions;
import org.apache.seatunnel.common.utils.ExceptionUtils;
import org.apache.seatunnel.common.utils.RetryUtils;
+import org.apache.seatunnel.common.utils.SeaTunnelException;
import org.apache.seatunnel.engine.common.Constant;
import org.apache.seatunnel.engine.common.config.EngineConfig;
import org.apache.seatunnel.engine.common.config.server.CheckpointConfig;
@@ -32,6 +34,7 @@ import
org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
import org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
import org.apache.seatunnel.engine.core.job.JobStatus;
+import org.apache.seatunnel.engine.server.SeaTunnelServer;
import org.apache.seatunnel.engine.server.checkpoint.CheckpointManager;
import org.apache.seatunnel.engine.server.checkpoint.CheckpointPlan;
import org.apache.seatunnel.engine.server.dag.physical.PhysicalPlan;
@@ -40,6 +43,8 @@ import
org.apache.seatunnel.engine.server.dag.physical.PlanUtils;
import org.apache.seatunnel.engine.server.dag.physical.SubPlan;
import org.apache.seatunnel.engine.server.execution.TaskExecutionState;
import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
+import
org.apache.seatunnel.engine.server.operation.CleanTaskGroupContextOperation;
+import
org.apache.seatunnel.engine.server.operation.GetTaskGroupMetricsOperation;
import org.apache.seatunnel.engine.server.resourcemanager.ResourceManager;
import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
import org.apache.seatunnel.engine.server.scheduler.JobScheduler;
@@ -55,9 +60,12 @@ import com.hazelcast.logging.ILogger;
import com.hazelcast.logging.Logger;
import com.hazelcast.map.IMap;
import com.hazelcast.spi.impl.NodeEngine;
+import com.hazelcast.spi.impl.operationservice.impl.InvocationFuture;
import lombok.NonNull;
import org.apache.commons.collections4.CollectionUtils;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
@@ -275,6 +283,45 @@ public class JobMaster extends Thread {
return physicalPlan.getJobStatus();
}
+ public List<RawJobMetrics> getCurrJobMetrics() {
+ List<RawJobMetrics> metrics = new ArrayList<>();
+ ownedSlotProfilesIMap.forEach((pipelineLocation,
taskGroupLocationSlotProfileMap) -> {
+ taskGroupLocationSlotProfileMap.forEach((taskGroupLocation,
slotProfile) -> {
+ if (taskGroupLocation.getJobId() ==
this.getJobImmutableInformation().getJobId()) {
+ Address worker = slotProfile.getWorker();
+ InvocationFuture<Object> invoke =
nodeEngine.getOperationService().createInvocationBuilder(
+ SeaTunnelServer.SERVICE_NAME,
+ new GetTaskGroupMetricsOperation(taskGroupLocation),
+ worker).invoke();
+ try {
+ RawJobMetrics rawJobMetrics = (RawJobMetrics)
invoke.get();
+ metrics.add(rawJobMetrics);
+ } catch (Exception e) {
+ throw new SeaTunnelException(e.getMessage());
+ }
+ }
+ });
+ });
+ return metrics;
+ }
+
+ public void cleanTaskGroupContext() {
+ ownedSlotProfilesIMap.forEach((pipelineLocation,
taskGroupLocationSlotProfileMap) -> {
+ taskGroupLocationSlotProfileMap.forEach((taskGroupLocation,
slotProfile) -> {
+ Address worker = slotProfile.getWorker();
+ InvocationFuture<Object> invoke =
nodeEngine.getOperationService().createInvocationBuilder(
+ SeaTunnelServer.SERVICE_NAME,
+ new CleanTaskGroupContextOperation(taskGroupLocation),
+ worker).invoke();
+ try {
+ invoke.get();
+ } catch (Exception e) {
+ throw new SeaTunnelException(e.getMessage());
+ }
+ });
+ });
+ }
+
public PhysicalPlan getPhysicalPlan() {
return physicalPlan;
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/metrics/JobMetricsCollector.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/metrics/JobMetricsCollector.java
new file mode 100644
index 000000000..d718c2e3b
--- /dev/null
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/metrics/JobMetricsCollector.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.metrics;
+
+import org.apache.seatunnel.api.common.metrics.RawJobMetrics;
+import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
+
+import com.hazelcast.cluster.Member;
+import com.hazelcast.internal.metrics.MetricDescriptor;
+import com.hazelcast.internal.metrics.collectors.MetricsCollector;
+import com.hazelcast.internal.metrics.impl.MetricsCompressor;
+import com.hazelcast.logging.ILogger;
+
+import java.util.Objects;
+import java.util.function.UnaryOperator;
+
+public class JobMetricsCollector implements MetricsCollector {
+
+ private final TaskGroupLocation taskGroupLocation;
+ private final MetricsCompressor compressor;
+ private final ILogger logger;
+ private final UnaryOperator<MetricDescriptor> addPrefixFn;
+
+ public JobMetricsCollector(TaskGroupLocation taskGroupLocation, Member
member, ILogger logger) {
+ Objects.requireNonNull(member, "member");
+ this.logger = Objects.requireNonNull(logger, "logger");
+
+ this.taskGroupLocation = taskGroupLocation;
+ this.addPrefixFn = JobMetricsUtil.addMemberPrefixFn(member);
+ this.compressor = new MetricsCompressor();
+ }
+
+ @Override
+ public void collectLong(MetricDescriptor descriptor, long value) {
+ String taskGroupLocationStr =
JobMetricsUtil.getTaskGroupLocationFromMetricsDescriptor(descriptor);
+ if (taskGroupLocation.toString().equals(taskGroupLocationStr)) {
+ compressor.addLong(addPrefixFn.apply(descriptor), value);
+ }
+ }
+
+ @Override
+ public void collectDouble(MetricDescriptor descriptor, double value) {
+ String taskGroupLocationStr =
JobMetricsUtil.getTaskGroupLocationFromMetricsDescriptor(descriptor);
+ if (taskGroupLocation.toString().equals(taskGroupLocationStr)) {
+ compressor.addDouble(addPrefixFn.apply(descriptor), value);
+ }
+ }
+
+ @Override
+ public void collectException(MetricDescriptor descriptor, Exception e) {
+ String taskGroupLocationStr =
JobMetricsUtil.getTaskGroupLocationFromMetricsDescriptor(descriptor);
+ if (taskGroupLocation.toString().equals(taskGroupLocationStr)) {
+ logger.warning("Exception when rendering job metrics: " + e, e);
+ }
+ }
+
+ @Override
+ public void collectNoValue(MetricDescriptor descriptor) {
+
+ }
+
+ public RawJobMetrics getMetrics() {
+ return RawJobMetrics.of(compressor.getBlobAndReset());
+ }
+}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/metrics/JobMetricsUtil.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/metrics/JobMetricsUtil.java
new file mode 100644
index 000000000..f53f7fec6
--- /dev/null
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/metrics/JobMetricsUtil.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.metrics;
+
+import static org.apache.seatunnel.api.common.metrics.MetricTags.ADDRESS;
+import static org.apache.seatunnel.api.common.metrics.MetricTags.MEMBER;
+
+import org.apache.seatunnel.api.common.metrics.JobMetrics;
+import org.apache.seatunnel.api.common.metrics.Measurement;
+import org.apache.seatunnel.api.common.metrics.MetricTags;
+import org.apache.seatunnel.api.common.metrics.RawJobMetrics;
+
+import com.hazelcast.cluster.Member;
+import com.hazelcast.internal.metrics.MetricConsumer;
+import com.hazelcast.internal.metrics.MetricDescriptor;
+import com.hazelcast.internal.metrics.impl.MetricsCompressor;
+import com.hazelcast.internal.util.MapUtil;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.UnaryOperator;
+
+public final class JobMetricsUtil {
+
+ private JobMetricsUtil() {
+ }
+
+ public static String
getTaskGroupLocationFromMetricsDescriptor(MetricDescriptor descriptor){
+ for (int i = 0; i < descriptor.tagCount(); i++) {
+ if (MetricTags.TASK_GROUP_LOCATION.equals(descriptor.tag(i))) {
+ return descriptor.tagValue(i);
+ }
+ }
+ return null;
+ }
+
+ public static UnaryOperator<MetricDescriptor> addMemberPrefixFn(Member
member) {
+ String uuid = member.getUuid().toString();
+ String addr = member.getAddress().toString();
+ return d -> d.copy().withTag(MEMBER, uuid).withTag(ADDRESS, addr);
+ }
+
+ public static JobMetrics toJobMetrics(List<RawJobMetrics> rawJobMetrics) {
+ JobMetricsConsumer consumer = null;
+ for (RawJobMetrics metrics : rawJobMetrics) {
+ if (metrics.getBlob() == null) {
+ continue;
+ }
+ if (consumer == null) {
+ consumer = new JobMetricsConsumer();
+ }
+ consumer.timestamp = metrics.getTimestamp();
+ MetricsCompressor.extractMetrics(metrics.getBlob(), consumer);
+ }
+ return consumer == null ? JobMetrics.empty() :
JobMetrics.of(consumer.metrics);
+
+ }
+
+ private static class JobMetricsConsumer implements MetricConsumer {
+
+ final Map<String, List<Measurement>> metrics = new HashMap<>();
+ long timestamp;
+
+ @Override
+ public void consumeLong(MetricDescriptor descriptor, long value) {
+ metrics.computeIfAbsent(descriptor.metric(), k -> new
ArrayList<>())
+ .add(measurement(descriptor, value));
+ }
+
+ @Override
+ public void consumeDouble(MetricDescriptor descriptor, double value) {
+ metrics.computeIfAbsent(descriptor.metric(), k -> new
ArrayList<>())
+ .add(measurement(descriptor, value));
+ }
+
+ private Measurement measurement(MetricDescriptor descriptor, Object
value) {
+ Map<String, String> tags =
MapUtil.createHashMap(descriptor.tagCount());
+ for (int i = 0; i < descriptor.tagCount(); i++) {
+ tags.put(descriptor.tag(i), descriptor.tagValue(i));
+ }
+ if (descriptor.discriminator() != null ||
descriptor.discriminatorValue() != null) {
+ tags.put(descriptor.discriminator(),
descriptor.discriminatorValue());
+ }
+ return Measurement.of(descriptor.metric(), value, timestamp, tags);
+ }
+ }
+}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/metrics/Metrics.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/metrics/Metrics.java
new file mode 100644
index 000000000..72a8b2e67
--- /dev/null
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/metrics/Metrics.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.metrics;
+
+import org.apache.seatunnel.api.common.metrics.Metric;
+import org.apache.seatunnel.api.common.metrics.Unit;
+
+public final class Metrics {
+
+ private Metrics() {
+ }
+
+ public static Metric metric(String name) {
+ return MetricsImpl.metric(name, Unit.COUNT);
+ }
+
+ /**
+ * Same as {@link #metric(String)}, but allows us to also specify the
+ * measurement {@link Unit} of the metric.
+ */
+ public static Metric metric(String name, Unit unit) {
+ return MetricsImpl.metric(name, unit);
+ }
+
+ public static Metric qpsMetric(String name, Unit unit) {
+ return MetricsImpl.qpsMetric(name, unit);
+ }
+
+ public static Metric threadSafeMetric(String name) {
+ return MetricsImpl.threadSafeMetric(name, Unit.COUNT);
+ }
+
+ /**
+ * Same as {@link #threadSafeMetric(String)}, but allows us to also
+ * specify the measurement {@link Unit} of the metric.
+ */
+ public static Metric threadSafeMetric(String name, Unit unit) {
+ return MetricsImpl.threadSafeMetric(name, unit);
+ }
+}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/metrics/MetricsContext.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/metrics/MetricsContext.java
new file mode 100644
index 000000000..f54e02f79
--- /dev/null
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/metrics/MetricsContext.java
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.metrics;
+
+import org.apache.seatunnel.api.common.metrics.Metric;
+import org.apache.seatunnel.api.common.metrics.Unit;
+import org.apache.seatunnel.common.utils.SeaTunnelException;
+
+import com.hazelcast.internal.metrics.DynamicMetricsProvider;
+import com.hazelcast.internal.metrics.MetricDescriptor;
+import com.hazelcast.internal.metrics.MetricsCollectionContext;
+import com.hazelcast.internal.metrics.ProbeLevel;
+import com.hazelcast.internal.metrics.ProbeUnit;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLongFieldUpdater;
+import java.util.function.BiFunction;
+
+public class MetricsContext implements DynamicMetricsProvider {
+
+ private static final BiFunction<String, Unit, AbstractMetric>
CREATE_SINGLE_WRITER_METRIC = SingleWriterMetric::new;
+ private static final BiFunction<String, Unit, AbstractMetric>
CREATE_THREAD_SAFE_METRICS = ThreadSafeMetric::new;
+
+ private static final BiFunction<String, Unit, AbstractMetric>
CREATE_SINGLE_WRITER_QPS_METRIC = SingleWriterQPSMetric::new;
+
+ private volatile Map<String, AbstractMetric> metrics;
+
+ Metric metric(String name, Unit unit) {
+ return metric(name, unit, CREATE_SINGLE_WRITER_METRIC);
+ }
+
+ Metric qpsMetric(String name, Unit unit) {
+ return metric(name, unit, CREATE_SINGLE_WRITER_QPS_METRIC);
+ }
+
+ Metric threadSafeMetric(String name, Unit unit) {
+ return metric(name, unit, CREATE_THREAD_SAFE_METRICS);
+ }
+
+ private Metric metric(String name, Unit unit, BiFunction<String, Unit,
AbstractMetric> metricSupplier) {
+ if (metrics == null) { //first metric being stored
+ metrics = new ConcurrentHashMap<>();
+ }
+
+ AbstractMetric metric = metrics.get(name);
+ if (metric != null) {
+ return metric;
+ }
+
+ metric = metricSupplier.apply(name, unit);
+ metrics.put(name, metric);
+
+ return metric;
+ }
+
+ @Override
+ public void provideDynamicMetrics(MetricDescriptor tagger,
MetricsCollectionContext context) {
+ if (metrics != null) {
+ metrics.forEach((name, metric) -> {
+ if (metric.get() instanceof Long) {
+ context.collect(tagger.copy(), name, ProbeLevel.INFO,
toProbeUnit(metric.unit()),
+ (Long) metric.get());
+ } else if (metric.get() instanceof Double) {
+ context.collect(tagger.copy(), name, ProbeLevel.INFO,
toProbeUnit(metric.unit()),
+ (Double) metric.get());
+ } else {
+ throw new SeaTunnelException("The value of Metric does not
support " + metric.get().getClass().getSimpleName() + " data type");
+ }
+ });
+ }
+ }
+
+ private ProbeUnit toProbeUnit(Unit unit) {
+ return ProbeUnit.valueOf(unit.name());
+ }
+
+ private abstract static class AbstractMetric implements Metric {
+
+ private final String name;
+ private final Unit unit;
+
+ AbstractMetric(String name, Unit unit) {
+ this.name = name;
+ this.unit = unit;
+ }
+
+ @Override
+ public String name() {
+ return name;
+ }
+
+ @Override
+ public Unit unit() {
+ return unit;
+ }
+
+ protected abstract Object get();
+
+ }
+
+ private static final class SingleWriterQPSMetric extends AbstractMetric {
+
+ private static final AtomicLongFieldUpdater<SingleWriterQPSMetric>
VOLATILE_VALUE_UPDATER =
+ AtomicLongFieldUpdater.newUpdater(SingleWriterQPSMetric.class,
"value");
+
+ private volatile long value;
+ private volatile long timestamp;
+
+ SingleWriterQPSMetric(String name, Unit unit) {
+ super(name, unit);
+ }
+
+ @Override
+ public void set(long newValue) {
+ checkAndSetStartTime();
+ VOLATILE_VALUE_UPDATER.lazySet(this, newValue);
+ }
+
+ @Override
+ public void increment() {
+ checkAndSetStartTime();
+ VOLATILE_VALUE_UPDATER.lazySet(this, value + 1);
+ }
+
+ @Override
+ public void increment(long increment) {
+ checkAndSetStartTime();
+ VOLATILE_VALUE_UPDATER.lazySet(this, value + increment);
+ }
+
+ @Override
+ public void decrement() {
+ checkAndSetStartTime();
+ VOLATILE_VALUE_UPDATER.lazySet(this, value - 1);
+ }
+
+ @Override
+ public void decrement(long decrement) {
+ checkAndSetStartTime();
+ VOLATILE_VALUE_UPDATER.lazySet(this, value - decrement);
+ }
+
+ @SuppressWarnings("checkstyle:MagicNumber")
+ @Override
+ protected Object get() {
+ long cost = System.currentTimeMillis() - timestamp;
+ return (double) value * 1000 / cost;
+ }
+
+ private void checkAndSetStartTime(){
+ if (timestamp == 0){
+ timestamp = System.currentTimeMillis();
+ }
+ }
+ }
+
+ private static final class SingleWriterMetric extends AbstractMetric {
+
+ private static final AtomicLongFieldUpdater<SingleWriterMetric>
VOLATILE_VALUE_UPDATER =
+ AtomicLongFieldUpdater.newUpdater(SingleWriterMetric.class,
"value");
+
+ private volatile long value;
+
+ SingleWriterMetric(String name, Unit unit) {
+ super(name, unit);
+ }
+
+ @Override
+ public void set(long newValue) {
+ VOLATILE_VALUE_UPDATER.lazySet(this, newValue);
+ }
+
+ @Override
+ public void increment() {
+ VOLATILE_VALUE_UPDATER.lazySet(this, value + 1);
+ }
+
+ @Override
+ public void increment(long increment) {
+ VOLATILE_VALUE_UPDATER.lazySet(this, value + increment);
+ }
+
+ @Override
+ public void decrement() {
+ VOLATILE_VALUE_UPDATER.lazySet(this, value - 1);
+ }
+
+ @Override
+ public void decrement(long decrement) {
+ VOLATILE_VALUE_UPDATER.lazySet(this, value - decrement);
+ }
+
+ @Override
+ protected Object get() {
+ return value;
+ }
+ }
+
+ private static final class ThreadSafeMetric extends AbstractMetric {
+
+ private static final AtomicLongFieldUpdater<ThreadSafeMetric>
VOLATILE_VALUE_UPDATER =
+ AtomicLongFieldUpdater.newUpdater(ThreadSafeMetric.class,
"value");
+
+ private volatile long value;
+
+ ThreadSafeMetric(String name, Unit unit) {
+ super(name, unit);
+ }
+
+ @Override
+ public void increment() {
+ VOLATILE_VALUE_UPDATER.incrementAndGet(this);
+ }
+
+ @Override
+ public void increment(long amount) {
+ VOLATILE_VALUE_UPDATER.addAndGet(this, amount);
+ }
+
+ @Override
+ public void decrement() {
+ VOLATILE_VALUE_UPDATER.decrementAndGet(this);
+ }
+
+ @Override
+ public void decrement(long amount) {
+ VOLATILE_VALUE_UPDATER.addAndGet(this, -amount);
+ }
+
+ @Override
+ public void set(long newValue) {
+ VOLATILE_VALUE_UPDATER.set(this, newValue);
+ }
+
+ @Override
+ protected Object get() {
+ return VOLATILE_VALUE_UPDATER.get(this);
+ }
+ }
+
+}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/metrics/MetricsImpl.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/metrics/MetricsImpl.java
new file mode 100644
index 000000000..6eea089a2
--- /dev/null
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/metrics/MetricsImpl.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.metrics;
+
+import org.apache.seatunnel.api.common.metrics.Metric;
+import org.apache.seatunnel.api.common.metrics.Unit;
+
+public final class MetricsImpl {
+
+ private static final ThreadLocal<Container> CONTEXT =
ThreadLocal.withInitial(Container::new);
+
+ private MetricsImpl() {
+ }
+
+ public static Container container() {
+ return CONTEXT.get();
+ }
+
+ public static Metric metric(String name, Unit unit) {
+ return getContext().metric(name, unit);
+ }
+
+ public static Metric qpsMetric(String name, Unit unit) {
+ return getContext().qpsMetric(name, unit);
+ }
+
+ public static Metric threadSafeMetric(String name, Unit unit) {
+ return getContext().threadSafeMetric(name, unit);
+ }
+
+ private static org.apache.seatunnel.engine.server.metrics.MetricsContext
getContext() {
+ Container container = CONTEXT.get();
+ org.apache.seatunnel.engine.server.metrics.MetricsContext context =
container.getContext();
+ if (context == null) {
+ throw new RuntimeException("Thread %s has no metrics context set,
this method can " +
+ "be called only on threads executing the job's
processors");
+ }
+ return context;
+ }
+
+ public static class Container {
+
+ private org.apache.seatunnel.engine.server.metrics.MetricsContext
context;
+
+ Container() {
+ }
+
+ public org.apache.seatunnel.engine.server.metrics.MetricsContext
getContext() {
+ return context;
+ }
+
+ public void setContext(MetricsContext context) {
+ this.context = context;
+ }
+ }
+
+}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/AsyncOperation.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/AsyncOperation.java
index 56701e88b..cb2443047 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/AsyncOperation.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/AsyncOperation.java
@@ -1,11 +1,12 @@
/*
- * Copyright (c) 2008-2021, Hazelcast, Inc. All Rights Reserved.
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/CleanTaskGroupContextOperation.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/CleanTaskGroupContextOperation.java
new file mode 100644
index 000000000..ea4a7e4d7
--- /dev/null
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/CleanTaskGroupContextOperation.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.operation;
+
+import org.apache.seatunnel.engine.server.SeaTunnelServer;
+import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
+
+import com.hazelcast.spi.impl.operationservice.Operation;
+
+public class CleanTaskGroupContextOperation extends Operation {
+
+ private TaskGroupLocation taskGroupLocation;
+
+ public CleanTaskGroupContextOperation(TaskGroupLocation taskGroupLocation)
{
+ this.taskGroupLocation = taskGroupLocation;
+ }
+
+ @Override
+ public void run() {
+
+ //remove TaskGroupContext for TaskExecutionService
+ SeaTunnelServer service = getService();
+
service.getTaskExecutionService().notifyCleanTaskGroupContext(taskGroupLocation);
+
+ }
+}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/GetJobMetricsOperation.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/GetJobMetricsOperation.java
new file mode 100644
index 000000000..b8ce44401
--- /dev/null
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/GetJobMetricsOperation.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.operation;
+
+import org.apache.seatunnel.engine.server.SeaTunnelServer;
+import
org.apache.seatunnel.engine.server.serializable.OperationDataSerializerHook;
+
+import com.hazelcast.nio.ObjectDataInput;
+import com.hazelcast.nio.ObjectDataOutput;
+import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
+import com.hazelcast.spi.impl.AllowedDuringPassiveState;
+import com.hazelcast.spi.impl.operationservice.Operation;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+public class GetJobMetricsOperation extends Operation implements
IdentifiedDataSerializable, AllowedDuringPassiveState {
+ private long jobId;
+
+ private String response;
+
+ public GetJobMetricsOperation() {
+ }
+
+ public GetJobMetricsOperation(long jobId) {
+ this.jobId = jobId;
+ }
+
+ @Override
+ public final int getFactoryId() {
+ return OperationDataSerializerHook.FACTORY_ID;
+ }
+
+ @Override
+ public int getClassId() {
+ return OperationDataSerializerHook.GET_JOB_METRICS_OPERATOR;
+ }
+
+ @Override
+ protected void writeInternal(ObjectDataOutput out) throws IOException {
+ super.writeInternal(out);
+ out.writeLong(jobId);
+ }
+
+ @Override
+ protected void readInternal(ObjectDataInput in) throws IOException {
+ super.readInternal(in);
+ jobId = in.readLong();
+ }
+
+ @Override
+ public void run() {
+ SeaTunnelServer service = getService();
+ CompletableFuture<String> future = CompletableFuture.supplyAsync(() ->
{
+ return
service.getCoordinatorService().getJobMetrics(jobId).toJsonString();
+ });
+
+ try {
+ response = future.get();
+ } catch (InterruptedException | ExecutionException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public Object getResponse() {
+ return response;
+ }
+}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/GetTaskGroupMetricsOperation.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/GetTaskGroupMetricsOperation.java
new file mode 100644
index 000000000..181cf5048
--- /dev/null
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/GetTaskGroupMetricsOperation.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.operation;
+
+import org.apache.seatunnel.api.common.metrics.RawJobMetrics;
+import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
+import org.apache.seatunnel.engine.server.metrics.JobMetricsCollector;
+
+import com.hazelcast.cluster.Address;
+import com.hazelcast.logging.ILogger;
+import com.hazelcast.spi.impl.NodeEngineImpl;
+import com.hazelcast.spi.impl.operationservice.Operation;
+
+public class GetTaskGroupMetricsOperation extends Operation {
+
+ private TaskGroupLocation taskGroupLocation;
+ private RawJobMetrics response;
+
+ public GetTaskGroupMetricsOperation(TaskGroupLocation taskGroupLocation) {
+ this.taskGroupLocation = taskGroupLocation;
+ }
+
+ @Override
+ public void run() {
+ ILogger logger = getLogger();
+
+ Address callerAddress = getCallerAddress();
+
+ NodeEngineImpl nodeEngine = (NodeEngineImpl) getNodeEngine();
+ Address masterAddress = getNodeEngine().getMasterAddress();
+ if (!callerAddress.equals(masterAddress)) {
+ throw new IllegalStateException("Caller " + callerAddress + "
cannot get taskGroupLocation metrics"
+ + taskGroupLocation.toString() + " because it is not
master. Master is: " + masterAddress);
+ }
+
+ JobMetricsCollector
+ metricsRenderer = new JobMetricsCollector(taskGroupLocation,
nodeEngine.getLocalMember(), logger);
+ nodeEngine.getMetricsRegistry().collect(metricsRenderer);
+ response = metricsRenderer.getMetrics();
+ }
+
+ @Override
+ public Object getResponse() {
+ return response;
+ }
+}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/GetJobMetricsTask.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/GetJobMetricsTask.java
new file mode 100644
index 000000000..923234f5d
--- /dev/null
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/GetJobMetricsTask.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.protocol.task;
+
+import
org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelGetJobMetricsCodec;
+import org.apache.seatunnel.engine.server.operation.GetJobMetricsOperation;
+
+import com.hazelcast.client.impl.protocol.ClientMessage;
+import com.hazelcast.instance.impl.Node;
+import com.hazelcast.internal.nio.Connection;
+import com.hazelcast.spi.impl.operationservice.Operation;
+
+public class GetJobMetricsTask extends AbstractSeaTunnelMessageTask<Long,
String> {
+
+ protected GetJobMetricsTask(ClientMessage clientMessage, Node node,
Connection connection) {
+ super(clientMessage, node, connection,
+ SeaTunnelGetJobMetricsCodec::decodeRequest,
+ SeaTunnelGetJobMetricsCodec::encodeResponse);
+ }
+
+ @Override
+ protected Operation prepareOperation() {
+ return new GetJobMetricsOperation(parameters);
+ }
+
+ @Override
+ public String getMethodName() {
+ return "getJobStatus";
+ }
+
+ @Override
+ public Object[] getParameters() {
+ return new Object[0];
+ }
+}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/SeaTunnelMessageTaskFactoryProvider.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/SeaTunnelMessageTaskFactoryProvider.java
index 7910bcd76..130fe0f36 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/SeaTunnelMessageTaskFactoryProvider.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/SeaTunnelMessageTaskFactoryProvider.java
@@ -18,6 +18,7 @@
package org.apache.seatunnel.engine.server.protocol.task;
import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelCancelJobCodec;
+import
org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelGetJobMetricsCodec;
import
org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelGetJobStateCodec;
import
org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelGetJobStatusCodec;
import
org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelListJobStatusCodec;
@@ -61,5 +62,7 @@ public class SeaTunnelMessageTaskFactoryProvider implements
MessageTaskFactoryPr
(clientMessage, connection) -> new GetJobStateTask(clientMessage,
node, connection));
factories.put(SeaTunnelListJobStatusCodec.REQUEST_MESSAGE_TYPE,
(clientMessage, connection) -> new
ListJobStatusTask(clientMessage, node, connection));
+ factories.put(SeaTunnelGetJobMetricsCodec.REQUEST_MESSAGE_TYPE,
+ (clientMessage, connection) -> new
GetJobMetricsTask(clientMessage, node, connection));
}
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/OperationDataSerializerHook.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/OperationDataSerializerHook.java
index 42a240b0e..bfa6fef31 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/OperationDataSerializerHook.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/OperationDataSerializerHook.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.engine.server.serializable;
import
org.apache.seatunnel.engine.common.serializeable.SeaTunnelFactoryIdConstant;
import org.apache.seatunnel.engine.server.operation.CancelJobOperation;
+import org.apache.seatunnel.engine.server.operation.GetJobMetricsOperation;
import org.apache.seatunnel.engine.server.operation.GetJobStatusOperation;
import org.apache.seatunnel.engine.server.operation.PrintMessageOperation;
import org.apache.seatunnel.engine.server.operation.SubmitJobOperation;
@@ -49,6 +50,8 @@ public final class OperationDataSerializerHook implements
DataSerializerHook {
public static final int GET_JOB_STATUS_OPERATOR = 5;
+ public static final int GET_JOB_METRICS_OPERATOR = 6;
+
public static final int FACTORY_ID = FactoryIdHelper.getFactoryId(
SeaTunnelFactoryIdConstant.SEATUNNEL_OPERATION_DATA_SERIALIZER_FACTORY,
SeaTunnelFactoryIdConstant.SEATUNNEL_OPERATION_DATA_SERIALIZER_FACTORY_ID
@@ -81,6 +84,8 @@ public final class OperationDataSerializerHook implements
DataSerializerHook {
return new CancelJobOperation();
case GET_JOB_STATUS_OPERATOR:
return new GetJobStatusOperation();
+ case GET_JOB_METRICS_OPERATOR:
+ return new GetJobMetricsOperation();
default:
throw new IllegalArgumentException("Unknown type id " +
typeId);
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java
index 9e1280117..d1269266a 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java
@@ -17,8 +17,13 @@
package org.apache.seatunnel.engine.server.task;
+import static
org.apache.seatunnel.api.common.metrics.MetricNames.SOURCE_RECEIVED_COUNT;
+import static
org.apache.seatunnel.api.common.metrics.MetricNames.SOURCE_RECEIVED_QPS;
+
+import org.apache.seatunnel.api.common.metrics.Unit;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.table.type.Record;
+import org.apache.seatunnel.engine.server.metrics.Metrics;
import org.apache.seatunnel.engine.server.task.flow.OneInputFlowLifeCycle;
import java.io.IOException;
@@ -39,6 +44,8 @@ public class SeaTunnelSourceCollector<T> implements
Collector<T> {
public void collect(T row) {
try {
sendRecordToNext(new Record<>(row));
+ Metrics.qpsMetric(SOURCE_RECEIVED_QPS, Unit.COUNT).increment();
+ Metrics.metric(SOURCE_RECEIVED_COUNT, Unit.COUNT).increment();
} catch (IOException e) {
throw new RuntimeException(e);
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
index 69123b334..a735307a9 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
@@ -27,6 +27,7 @@ import static
org.apache.seatunnel.engine.server.task.statemachine.SeaTunnelTask
import static
org.apache.seatunnel.engine.server.task.statemachine.SeaTunnelTaskState.STARTING;
import static
org.apache.seatunnel.engine.server.task.statemachine.SeaTunnelTaskState.WAITING_RESTORE;
+import org.apache.seatunnel.api.common.metrics.MetricTags;
import org.apache.seatunnel.api.table.type.Record;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.common.utils.function.ConsumerWithException;
@@ -49,6 +50,7 @@ import
org.apache.seatunnel.engine.server.dag.physical.flow.PhysicalExecutionFlo
import
org.apache.seatunnel.engine.server.dag.physical.flow.UnknownFlowException;
import org.apache.seatunnel.engine.server.execution.TaskGroup;
import org.apache.seatunnel.engine.server.execution.TaskLocation;
+import org.apache.seatunnel.engine.server.metrics.MetricsContext;
import org.apache.seatunnel.engine.server.task.flow.ActionFlowLifeCycle;
import org.apache.seatunnel.engine.server.task.flow.FlowLifeCycle;
import
org.apache.seatunnel.engine.server.task.flow.IntermediateQueueFlowLifeCycle;
@@ -62,6 +64,8 @@ import
org.apache.seatunnel.engine.server.task.group.TaskGroupWithIntermediateQu
import org.apache.seatunnel.engine.server.task.record.Barrier;
import org.apache.seatunnel.engine.server.task.statemachine.SeaTunnelTaskState;
+import com.hazelcast.internal.metrics.MetricDescriptor;
+import com.hazelcast.internal.metrics.MetricsCollectionContext;
import lombok.NonNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -102,6 +106,8 @@ public abstract class SeaTunnelTask extends AbstractTask {
private TaskGroup taskBelongGroup;
+ private MetricsContext metricsContext;
+
public SeaTunnelTask(long jobID, TaskLocation taskID, int indexID, Flow
executionFlow) {
super(jobID, taskID);
this.indexID = indexID;
@@ -116,6 +122,7 @@ public abstract class SeaTunnelTask extends AbstractTask {
flowFutures = new ArrayList<>();
allCycles = new ArrayList<>();
startFlowLifeCycle = convertFlowToActionLifeCycle(executionFlow);
+ metricsContext = new MetricsContext();
for (FlowLifeCycle cycle : allCycles) {
cycle.init();
}
@@ -319,4 +326,17 @@ public abstract class SeaTunnelTask extends AbstractTask {
});
restoreComplete = true;
}
+
+ @Override
+ public MetricsContext getMetricsContext() {
+ return metricsContext;
+ }
+
+ @Override
+ public void provideDynamicMetrics(MetricDescriptor descriptor,
MetricsCollectionContext context) {
+ if (null != metricsContext) {
+ metricsContext.provideDynamicMetrics(
+ descriptor.copy().withTag(MetricTags.TASK_NAME,
this.getClass().getSimpleName()), context);
+ }
+ }
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
index 3a548105d..7458cd641 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
@@ -17,9 +17,12 @@
package org.apache.seatunnel.engine.server.task.flow;
+import static
org.apache.seatunnel.api.common.metrics.MetricNames.SINK_WRITE_COUNT;
+import static
org.apache.seatunnel.api.common.metrics.MetricNames.SINK_WRITE_QPS;
import static org.apache.seatunnel.engine.common.utils.ExceptionUtil.sneaky;
import static
org.apache.seatunnel.engine.server.task.AbstractTask.serializeStates;
+import org.apache.seatunnel.api.common.metrics.Unit;
import org.apache.seatunnel.api.serialization.Serializer;
import org.apache.seatunnel.api.sink.SinkCommitter;
import org.apache.seatunnel.api.sink.SinkWriter;
@@ -29,6 +32,7 @@ import
org.apache.seatunnel.engine.core.checkpoint.InternalCheckpointListener;
import org.apache.seatunnel.engine.core.dag.actions.SinkAction;
import org.apache.seatunnel.engine.server.checkpoint.ActionSubtaskState;
import org.apache.seatunnel.engine.server.execution.TaskLocation;
+import org.apache.seatunnel.engine.server.metrics.Metrics;
import org.apache.seatunnel.engine.server.task.SeaTunnelTask;
import org.apache.seatunnel.engine.server.task.context.SinkWriterContext;
import
org.apache.seatunnel.engine.server.task.operation.GetTaskGroupAddressOperation;
@@ -151,6 +155,8 @@ public class SinkFlowLifeCycle<T, CommitInfoT extends
Serializable, AggregatedCo
return;
}
writer.write((T) record.getData());
+ Metrics.qpsMetric(SINK_WRITE_QPS, Unit.COUNT).increment();
+ Metrics.metric(SINK_WRITE_COUNT, Unit.COUNT).increment();
}
} catch (Exception e) {
throw new RuntimeException(e);
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMetricsTest.java
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMetricsTest.java
new file mode 100644
index 000000000..f9130e747
--- /dev/null
+++
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMetricsTest.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.master;
+
+import static
org.apache.seatunnel.api.common.metrics.MetricNames.SINK_WRITE_COUNT;
+import static
org.apache.seatunnel.api.common.metrics.MetricNames.SINK_WRITE_QPS;
+import static
org.apache.seatunnel.api.common.metrics.MetricNames.SOURCE_RECEIVED_COUNT;
+import static
org.apache.seatunnel.api.common.metrics.MetricNames.SOURCE_RECEIVED_QPS;
+import static org.awaitility.Awaitility.await;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+import org.apache.seatunnel.api.common.metrics.JobMetrics;
+import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
+import org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
+import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
+import org.apache.seatunnel.engine.server.AbstractSeaTunnelServerTest;
+import org.apache.seatunnel.engine.server.TestUtils;
+
+import com.hazelcast.internal.serialization.Data;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.condition.DisabledOnOs;
+import org.junit.jupiter.api.condition.OS;
+
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
+
+@DisabledOnOs(OS.WINDOWS)
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+class JobMetricsTest extends AbstractSeaTunnelServerTest {
+
+ private static final Long JOB_1 = 1L;
+ private static final Long JOB_2 = 2L;
+ private static final Long JOB_3 = 3L;
+
+ @Test
+ public void testGetJobMetrics() throws Exception {
+ startJob(JOB_1, "fake_to_console_job_metrics.conf");
+ startJob(JOB_2, "fake_to_console_job_metrics.conf");
+
+ await().atMost(60000, TimeUnit.MILLISECONDS)
+ .untilAsserted(() -> {
+ JobMetrics jobMetrics =
server.getCoordinatorService().getJobMetrics(JOB_1);
+ if (jobMetrics.get(SINK_WRITE_COUNT).size() > 0) {
+ assertTrue((Long)
jobMetrics.get(SINK_WRITE_COUNT).get(0).value() > 0);
+ assertTrue((Long)
jobMetrics.get(SOURCE_RECEIVED_COUNT).get(0).value() > 0);
+ }
+ else {
+ fail();
+ }
+ });
+
+ // waiting for JOB_1 status turn to FINISHED
+ await().atMost(60000, TimeUnit.MILLISECONDS)
+ .untilAsserted(() -> Assertions.assertTrue(
+
server.getCoordinatorService().getJobHistoryService().listAllJob().contains(String.format("{\"jobId\":%s,\"jobStatus\":\"FINISHED\"}",
JOB_1))));
+
+ JobMetrics jobMetrics =
server.getCoordinatorService().getJobMetrics(JOB_1);
+ assertEquals(30, (Long)
jobMetrics.get(SINK_WRITE_COUNT).get(0).value());
+ assertEquals(30, (Long)
jobMetrics.get(SOURCE_RECEIVED_COUNT).get(0).value());
+ assertTrue((Double) jobMetrics.get(SOURCE_RECEIVED_QPS).get(0).value()
> 0);
+ assertTrue((Double) jobMetrics.get(SINK_WRITE_QPS).get(0).value() > 0);
+ }
+
+ private void startJob(Long jobid, String path){
+ LogicalDag testLogicalDag =
+ TestUtils.createTestLogicalPlan(path, jobid.toString(), jobid);
+
+ JobImmutableInformation jobImmutableInformation = new
JobImmutableInformation(jobid,
+ nodeEngine.getSerializationService().toData(testLogicalDag),
testLogicalDag.getJobConfig(),
+ Collections.emptyList());
+
+ Data data =
nodeEngine.getSerializationService().toData(jobImmutableInformation);
+
+ PassiveCompletableFuture<Void> voidPassiveCompletableFuture =
+ server.getCoordinatorService().submitJob(jobid, data);
+ voidPassiveCompletableFuture.join();
+ }
+}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/test/resources/fake_to_console_job_metrics.conf
b/seatunnel-engine/seatunnel-engine-server/src/test/resources/fake_to_console_job_metrics.conf
new file mode 100644
index 000000000..7edeb4ef7
--- /dev/null
+++
b/seatunnel-engine/seatunnel-engine-server/src/test/resources/fake_to_console_job_metrics.conf
@@ -0,0 +1,54 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in
seatunnel config
+######
+
+env {
+ # You can set engine configuration here
+ execution.parallelism = 1
+ job.mode = "BATCH"
+ execution.checkpoint.interval = 5000
+ #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
+}
+
+source {
+ # This is a example source plugin **only for test and demonstrate the
feature source plugin**
+ FakeSource {
+ result_table_name = "fake"
+ parallelism = 1
+ split.num = 3
+ row.num = 30
+ split.read-interval=120
+ schema = {
+ fields {
+ name = "string"
+ age = "int"
+ }
+ }
+ }
+
+}
+
+transform {
+}
+
+sink {
+ console {
+ source_table_name="fake"
+ }
+}
\ No newline at end of file