This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 8266544d03 [Improve][Zeta] Add an interface for batch retrieval of
JobMetrics (#4576)
8266544d03 is described below
commit 8266544d035b6c8f1550ac91393a86fec88293eb
Author: ic4y <[email protected]>
AuthorDate: Tue Jun 20 11:03:06 2023 +0800
[Improve][Zeta] Add an interface for batch retrieval of JobMetrics (#4576)
---
.../starter/seatunnel/args/ClientCommandArgs.java | 5 ++
.../seatunnel/command/ClientExecuteCommand.java | 3 +
.../seatunnel/engine/client/job/JobClient.java | 7 ++
.../engine/client/SeaTunnelClientTest.java | 54 ++++++++++++
.../src/test/resources/batch_fake_to_console.conf | 48 +++++++++++
.../codec/SeaTunnelGetRunningJobMetricsCodec.java | 84 +++++++++++++++++++
.../SeaTunnelEngine.yaml | 16 ++++
.../engine/server/CoordinatorService.java | 78 +++++++++++++++++
.../engine/server/metrics/JobMetricsUtil.java | 80 ++++++++++++++++++
.../server/metrics/ZetaMetricsCollector.java | 76 +++++++++++++++++
.../server/operation/GetJobMetricsOperation.java | 9 +-
...ion.java => GetRunningJobMetricsOperation.java} | 22 ++---
.../server/protocol/task/GetJobMetricsTask.java | 2 +-
...ricsTask.java => GetRunningJobMetricsTask.java} | 17 ++--
.../task/SeaTunnelMessageTaskFactoryProvider.java | 5 ++
.../ClientToServerOperationDataSerializerHook.java | 5 ++
.../serializable/TaskDataSerializerHook.java | 5 ++
.../server/task/operation/GetMetricsOperation.java | 97 ++++++++++++++++++++++
18 files changed, 587 insertions(+), 26 deletions(-)
diff --git
a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/ClientCommandArgs.java
b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/ClientCommandArgs.java
index 840266bf45..9504c9cb2c 100644
---
a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/ClientCommandArgs.java
+++
b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/ClientCommandArgs.java
@@ -78,6 +78,11 @@ public class ClientCommandArgs extends AbstractCommandArgs {
description = "Get job metrics by JobId")
private String metricsJobId;
+ @Parameter(
+ names = {"--get_running_job_metrics"},
+ description = "Gets metrics for running jobs")
+ private boolean getRunningJobMetrics = false;
+
@Parameter(
names = {"-l", "--list"},
description = "list job status")
diff --git
a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java
b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java
index 50112cb2a6..14b00540f2 100644
---
a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java
+++
b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java
@@ -97,6 +97,9 @@ public class ClientExecuteCommand implements
Command<ClientCommandArgs> {
if (clientCommandArgs.isListJob()) {
String jobStatus =
engineClient.getJobClient().listJobStatus(true);
System.out.println(jobStatus);
+ } else if (clientCommandArgs.isGetRunningJobMetrics()) {
+ String runningJobMetrics =
engineClient.getJobClient().getRunningJobMetrics();
+ System.out.println(runningJobMetrics);
} else if (null != clientCommandArgs.getJobId()) {
String jobState =
engineClient
diff --git
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobClient.java
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobClient.java
index 8405f409ee..c4a4f68a69 100644
---
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobClient.java
+++
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobClient.java
@@ -35,6 +35,7 @@ import
org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelGetJobDetailStat
import
org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelGetJobInfoCodec;
import
org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelGetJobMetricsCodec;
import
org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelGetJobStatusCodec;
+import
org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelGetRunningJobMetricsCodec;
import
org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelListJobStatusCodec;
import
org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelSavePointJobCodec;
@@ -117,6 +118,12 @@ public class JobClient {
SeaTunnelGetJobMetricsCodec::decodeResponse);
}
+ public String getRunningJobMetrics() {
+ return hazelcastClient.requestOnMasterAndDecodeResponse(
+ SeaTunnelGetRunningJobMetricsCodec.encodeRequest(),
+ SeaTunnelGetRunningJobMetricsCodec::decodeResponse);
+ }
+
public void savePointJob(Long jobId) {
PassiveCompletableFuture<Void> cancelFuture =
hazelcastClient.requestOnMasterAndGetCompletableFuture(
diff --git
a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
index 369120673a..85aec59c27 100644
---
a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
+++
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
@@ -202,6 +202,8 @@ public class SeaTunnelClientTest {
String jobMetrics = jobClient.getJobMetrics(jobId);
+ System.out.println(jobMetrics);
+
Assertions.assertTrue(jobMetrics.contains(SOURCE_RECEIVED_COUNT));
Assertions.assertTrue(jobMetrics.contains(SOURCE_RECEIVED_QPS));
Assertions.assertTrue(jobMetrics.contains(SINK_WRITE_COUNT));
@@ -214,6 +216,58 @@ public class SeaTunnelClientTest {
}
}
+ @Test
+ public void testGetRunningJobMetrics() throws ExecutionException,
InterruptedException {
+ Common.setDeployMode(DeployMode.CLUSTER);
+ String filePath = TestUtils.getResource("/batch_fake_to_console.conf");
+ JobConfig jobConfig = new JobConfig();
+ jobConfig.setName("fake_to_console1");
+
+ SeaTunnelClient seaTunnelClient = createSeaTunnelClient();
+ JobClient jobClient = seaTunnelClient.getJobClient();
+
+ ClientJobProxy execute1 =
+ seaTunnelClient.createExecutionContext(filePath,
jobConfig).execute();
+ long jobId1 = execute1.getJobId();
+
+ execute1.waitForJobComplete();
+
+ filePath = TestUtils.getResource("streaming_fake_to_console.conf");
+ jobConfig = new JobConfig();
+ jobConfig.setName("fake_to_console2");
+ ClientJobProxy execute2 =
+ seaTunnelClient.createExecutionContext(filePath,
jobConfig).execute();
+ ClientJobProxy execute3 =
+ seaTunnelClient.createExecutionContext(filePath,
jobConfig).execute();
+
+ long jobId2 = execute2.getJobId();
+ long jobId3 = execute3.getJobId();
+
+ await().atMost(30000, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () ->
+ Assertions.assertTrue(
+
jobClient.getJobStatus(jobId1).equals("FINISHED")
+ &&
jobClient.getJobStatus(jobId2).equals("RUNNING")
+ && jobClient
+ .getJobStatus(jobId3)
+ .equals("RUNNING")));
+
+ System.out.println(jobClient.getRunningJobMetrics());
+
+ await().atMost(30000, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () -> {
+ String runningJobMetrics =
jobClient.getRunningJobMetrics();
+ Assertions.assertTrue(
+ runningJobMetrics.contains(jobId2 + "")
+ &&
runningJobMetrics.contains(jobId3 + ""));
+ });
+
+ jobClient.cancelJob(jobId2);
+ jobClient.cancelJob(jobId3);
+ }
+
@Test
public void testCancelJob() throws ExecutionException,
InterruptedException {
Common.setDeployMode(DeployMode.CLIENT);
diff --git
a/seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fake_to_console.conf
b/seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fake_to_console.conf
new file mode 100644
index 0000000000..e8d74830fa
--- /dev/null
+++
b/seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fake_to_console.conf
@@ -0,0 +1,48 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in
seatunnel config
+######
+
+env {
+ # You can set engine configuration here
+ execution.parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ # This is a example source plugin **only for test and demonstrate the
feature source plugin**
+ FakeSource {
+ result_table_name = "fake"
+ parallelism = 1
+ schema = {
+ fields {
+ name = "string"
+ age = "int"
+ }
+ }
+ }
+}
+
+transform {
+}
+
+sink {
+ console {
+ source_table_name="fake"
+ }
+}
\ No newline at end of file
diff --git
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/protocol/codec/SeaTunnelGetRunningJobMetricsCodec.java
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/protocol/codec/SeaTunnelGetRunningJobMetricsCodec.java
new file mode 100644
index 0000000000..44b0f89222
--- /dev/null
+++
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/protocol/codec/SeaTunnelGetRunningJobMetricsCodec.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.core.protocol.codec;
+
+import com.hazelcast.client.impl.protocol.ClientMessage;
+import com.hazelcast.client.impl.protocol.Generated;
+import com.hazelcast.client.impl.protocol.codec.builtin.StringCodec;
+
+import static
com.hazelcast.client.impl.protocol.ClientMessage.PARTITION_ID_FIELD_OFFSET;
+import static
com.hazelcast.client.impl.protocol.ClientMessage.RESPONSE_BACKUP_ACKS_FIELD_OFFSET;
+import static
com.hazelcast.client.impl.protocol.ClientMessage.TYPE_FIELD_OFFSET;
+import static
com.hazelcast.client.impl.protocol.ClientMessage.UNFRAGMENTED_MESSAGE;
+import static
com.hazelcast.client.impl.protocol.codec.builtin.FixedSizeTypesCodec.BYTE_SIZE_IN_BYTES;
+import static
com.hazelcast.client.impl.protocol.codec.builtin.FixedSizeTypesCodec.INT_SIZE_IN_BYTES;
+import static
com.hazelcast.client.impl.protocol.codec.builtin.FixedSizeTypesCodec.encodeInt;
+
+/*
+ * This file is auto-generated by the Hazelcast Client Protocol Code Generator.
+ * To change this file, edit the templates or the protocol
+ * definitions on the https://github.com/hazelcast/hazelcast-client-protocol
+ * and regenerate it.
+ */
+
+/** */
+@Generated("2a54110c40297eed90df5f79bde1171d")
+public final class SeaTunnelGetRunningJobMetricsCodec {
+ // hex: 0xDE0C00
+ public static final int REQUEST_MESSAGE_TYPE = 14552064;
+ // hex: 0xDE0C01
+ public static final int RESPONSE_MESSAGE_TYPE = 14552065;
+ private static final int REQUEST_INITIAL_FRAME_SIZE =
+ PARTITION_ID_FIELD_OFFSET + INT_SIZE_IN_BYTES;
+ private static final int RESPONSE_INITIAL_FRAME_SIZE =
+ RESPONSE_BACKUP_ACKS_FIELD_OFFSET + BYTE_SIZE_IN_BYTES;
+
+ private SeaTunnelGetRunningJobMetricsCodec() {}
+
+ public static ClientMessage encodeRequest() {
+ ClientMessage clientMessage = ClientMessage.createForEncode();
+ clientMessage.setRetryable(true);
+ clientMessage.setOperationName("SeaTunnel.GetRunningJobMetrics");
+ ClientMessage.Frame initialFrame =
+ new ClientMessage.Frame(new byte[REQUEST_INITIAL_FRAME_SIZE],
UNFRAGMENTED_MESSAGE);
+ encodeInt(initialFrame.content, TYPE_FIELD_OFFSET,
REQUEST_MESSAGE_TYPE);
+ encodeInt(initialFrame.content, PARTITION_ID_FIELD_OFFSET, -1);
+ clientMessage.add(initialFrame);
+ return clientMessage;
+ }
+
+ public static ClientMessage encodeResponse(java.lang.String response) {
+ ClientMessage clientMessage = ClientMessage.createForEncode();
+ ClientMessage.Frame initialFrame =
+ new ClientMessage.Frame(
+ new byte[RESPONSE_INITIAL_FRAME_SIZE],
UNFRAGMENTED_MESSAGE);
+ encodeInt(initialFrame.content, TYPE_FIELD_OFFSET,
RESPONSE_MESSAGE_TYPE);
+ clientMessage.add(initialFrame);
+
+ StringCodec.encode(clientMessage, response);
+ return clientMessage;
+ }
+
+ /** */
+ public static java.lang.String decodeResponse(ClientMessage clientMessage)
{
+ ClientMessage.ForwardFrameIterator iterator =
clientMessage.frameIterator();
+ // empty initial frame
+ iterator.next();
+ return StringCodec.decode(iterator);
+ }
+}
diff --git
a/seatunnel-engine/seatunnel-engine-core/src/main/resources/client-protocol-definition/SeaTunnelEngine.yaml
b/seatunnel-engine/seatunnel-engine-core/src/main/resources/client-protocol-definition/SeaTunnelEngine.yaml
index 78317d8250..f14f86d389 100644
---
a/seatunnel-engine/seatunnel-engine-core/src/main/resources/client-protocol-definition/SeaTunnelEngine.yaml
+++
b/seatunnel-engine/seatunnel-engine-core/src/main/resources/client-protocol-definition/SeaTunnelEngine.yaml
@@ -217,6 +217,22 @@ methods:
retryable: true
partitionIdentifier: -1
params: []
+ response:
+ params:
+ - name: response
+ type: String
+ nullable: false
+ since: 2.0
+ doc: ''
+
+ - id: 12
+ name: getRunningJobMetrics
+ since: 2.0
+ doc: ''
+ request:
+ retryable: true
+ partitionIdentifier: -1
+ params: [ ]
response:
params:
- name: response
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
index bf913a4245..d2931d0c37 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
@@ -18,7 +18,9 @@
package org.apache.seatunnel.engine.server;
import org.apache.seatunnel.api.common.metrics.JobMetrics;
+import org.apache.seatunnel.api.common.metrics.RawJobMetrics;
import org.apache.seatunnel.common.utils.ExceptionUtils;
+import org.apache.seatunnel.common.utils.SeaTunnelException;
import org.apache.seatunnel.common.utils.StringFormatUtils;
import org.apache.seatunnel.engine.common.Constant;
import org.apache.seatunnel.engine.common.config.EngineConfig;
@@ -45,9 +47,12 @@ import
org.apache.seatunnel.engine.server.metrics.SeaTunnelMetricsContext;
import org.apache.seatunnel.engine.server.resourcemanager.ResourceManager;
import
org.apache.seatunnel.engine.server.resourcemanager.ResourceManagerFactory;
import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
+import org.apache.seatunnel.engine.server.task.operation.GetMetricsOperation;
+import org.apache.seatunnel.engine.server.utils.NodeEngineUtil;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.hazelcast.cluster.Address;
+import com.hazelcast.core.HazelcastInstanceNotActiveException;
import com.hazelcast.internal.serialization.Data;
import com.hazelcast.internal.services.MembershipServiceEvent;
import com.hazelcast.logging.ILogger;
@@ -55,9 +60,12 @@ import com.hazelcast.map.IMap;
import com.hazelcast.spi.impl.NodeEngineImpl;
import lombok.NonNull;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
@@ -68,6 +76,9 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
+import static org.apache.seatunnel.api.common.metrics.MetricTags.JOB_ID;
+import static
org.apache.seatunnel.engine.server.metrics.JobMetricsUtil.toJobMetricsMap;
+
public class CoordinatorService {
private final NodeEngineImpl nodeEngine;
private final ILogger logger;
@@ -530,6 +541,73 @@ public class CoordinatorService {
return jobMetricsImap != null ? jobMetricsImap.merge(jobMetrics) :
jobMetrics;
}
+ public Map<Long, JobMetrics> getRunningJobMetrics() {
+ final Set<Long> runningJobIds = runningJobMasterMap.keySet();
+
+ Set<Address> addresses = new HashSet<>();
+ ownedSlotProfilesIMap.forEach(
+ (pipelineLocation, ownedSlotProfilesIMap) -> {
+ if (runningJobIds.contains(pipelineLocation.getJobId())) {
+ ownedSlotProfilesIMap
+ .values()
+ .forEach(
+ ownedSlotProfile -> {
+
addresses.add(ownedSlotProfile.getWorker());
+ });
+ }
+ });
+
+ List<RawJobMetrics> metrics = new ArrayList<>();
+
+ addresses.forEach(
+ address -> {
+ try {
+ if (nodeEngine.getClusterService().getMember(address)
!= null) {
+ RawJobMetrics rawJobMetrics =
+ (RawJobMetrics)
+
NodeEngineUtil.sendOperationToMemberNode(
+ nodeEngine,
+ new
GetMetricsOperation(
+ dis ->
+
(dis.tagValue(JOB_ID)
+
!= null
+
&& runningJobIds
+
.contains(
+
Long
+
.parseLong(
+
dis
+
.tagValue(
+
JOB_ID))))),
+ address)
+ .get();
+ metrics.add(rawJobMetrics);
+ }
+ }
+ // HazelcastInstanceNotActiveException. It means that the
node is
+ // offline, so waiting for the taskGroup to restore can be
successful
+ catch (HazelcastInstanceNotActiveException e) {
+ logger.warning(
+ String.format(
+ "get metrics with exception: %s.",
+ ExceptionUtils.getMessage(e)));
+ } catch (Exception e) {
+ throw new SeaTunnelException(e.getMessage());
+ }
+ });
+
+ Map<Long, JobMetrics> longJobMetricsMap = toJobMetricsMap(metrics);
+
+ longJobMetricsMap.forEach(
+ (jobId, jobMetrics) -> {
+ JobMetrics jobMetricsImap =
jobHistoryService.getJobMetrics(jobId);
+ if (jobMetricsImap != null) {
+ longJobMetricsMap.put(jobId,
jobMetricsImap.merge(jobMetrics));
+ }
+ });
+
+ return longJobMetricsMap;
+ }
+
public JobDAGInfo getJobInfo(long jobId) {
JobDAGInfo jobInfo = jobHistoryService.getJobDAGInfo(jobId);
if (jobInfo != null) {
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/metrics/JobMetricsUtil.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/metrics/JobMetricsUtil.java
index 722e6a5ddb..057b0dbeb5 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/metrics/JobMetricsUtil.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/metrics/JobMetricsUtil.java
@@ -17,6 +17,11 @@
package org.apache.seatunnel.engine.server.metrics;
+import
org.apache.seatunnel.shade.com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper;
+import
org.apache.seatunnel.shade.com.fasterxml.jackson.databind.SerializationFeature;
+import
org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.ObjectNode;
+
import org.apache.seatunnel.api.common.metrics.JobMetrics;
import org.apache.seatunnel.api.common.metrics.Measurement;
import org.apache.seatunnel.api.common.metrics.MetricTags;
@@ -35,10 +40,13 @@ import java.util.Map;
import java.util.function.UnaryOperator;
import static org.apache.seatunnel.api.common.metrics.MetricTags.ADDRESS;
+import static org.apache.seatunnel.api.common.metrics.MetricTags.JOB_ID;
import static org.apache.seatunnel.api.common.metrics.MetricTags.MEMBER;
public final class JobMetricsUtil {
+ private static ObjectMapper OBJECTMAPPER = new ObjectMapper();
+
private JobMetricsUtil() {}
public static String
getTaskGroupLocationFromMetricsDescriptor(MetricDescriptor descriptor) {
@@ -68,6 +76,78 @@ public final class JobMetricsUtil {
return JobMetrics.of(consumer.metrics);
}
+ public static String toJsonString(Object o) {
+ OBJECTMAPPER.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS,
false);
+ try {
+ return
OBJECTMAPPER.writerWithDefaultPrettyPrinter().writeValueAsString(o);
+ } catch (JsonProcessingException e) {
+ ObjectNode objectNode = OBJECTMAPPER.createObjectNode();
+ objectNode.put("err", "serialize JobMetrics err");
+ return objectNode.toString();
+ }
+ }
+
+ public static Map<Long, JobMetrics> toJobMetricsMap(List<RawJobMetrics>
rawJobMetrics) {
+ metricsConsumer consumer = new metricsConsumer();
+ for (RawJobMetrics metrics : rawJobMetrics) {
+ if (metrics.getBlob() == null) {
+ continue;
+ }
+ consumer.timestamp = metrics.getTimestamp();
+ MetricsCompressor.extractMetrics(metrics.getBlob(), consumer);
+ }
+
+ Map<Long, JobMetrics> jobMetricsMap =
MapUtil.createHashMap(consumer.metrics.size());
+ consumer.metrics.forEach(
+ (jobId, metrics) -> {
+ jobMetricsMap.put(jobId, JobMetrics.of(metrics));
+ });
+
+ return jobMetricsMap;
+ }
+
+ private static class metricsConsumer implements MetricConsumer {
+
+ final Map<Long, Map<String, List<Measurement>>> metrics = new
HashMap<>();
+ long timestamp;
+
+ @Override
+ public void consumeLong(MetricDescriptor descriptor, long value) {
+
+ String jobId = descriptor.tagValue(JOB_ID);
+ if (jobId == null) {
+ return;
+ }
+ long jobIdLong = Long.parseLong(jobId);
+ metrics.computeIfAbsent(jobIdLong, k -> new HashMap<>())
+ .computeIfAbsent(descriptor.metric(), k -> new
ArrayList<>())
+ .add(measurement(descriptor, value));
+ }
+
+ @Override
+ public void consumeDouble(MetricDescriptor descriptor, double value) {
+ String jobId = descriptor.tagValue(JOB_ID);
+ if (jobId == null) {
+ return;
+ }
+ long jobIdLong = Long.parseLong(jobId);
+ metrics.computeIfAbsent(jobIdLong, k -> new HashMap<>())
+ .computeIfAbsent(descriptor.metric(), k -> new
ArrayList<>())
+ .add(measurement(descriptor, value));
+ }
+
+ private Measurement measurement(MetricDescriptor descriptor, Object
value) {
+ Map<String, String> tags =
MapUtil.createHashMap(descriptor.tagCount());
+ for (int i = 0; i < descriptor.tagCount(); i++) {
+ tags.put(descriptor.tag(i), descriptor.tagValue(i));
+ }
+ if (descriptor.discriminator() != null ||
descriptor.discriminatorValue() != null) {
+ tags.put(descriptor.discriminator(),
descriptor.discriminatorValue());
+ }
+ return Measurement.of(descriptor.metric(), value, timestamp, tags);
+ }
+ }
+
private static class JobMetricsConsumer implements MetricConsumer {
final Map<String, List<Measurement>> metrics = new HashMap<>();
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/metrics/ZetaMetricsCollector.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/metrics/ZetaMetricsCollector.java
new file mode 100644
index 0000000000..8a4f5401d1
--- /dev/null
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/metrics/ZetaMetricsCollector.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.metrics;
+
+import org.apache.seatunnel.api.common.metrics.RawJobMetrics;
+
+import com.hazelcast.cluster.Member;
+import com.hazelcast.internal.metrics.MetricDescriptor;
+import com.hazelcast.internal.metrics.collectors.MetricsCollector;
+import com.hazelcast.internal.metrics.impl.MetricsCompressor;
+import com.hazelcast.logging.ILogger;
+
+import java.util.Objects;
+import java.util.function.Predicate;
+import java.util.function.UnaryOperator;
+
+public class ZetaMetricsCollector implements MetricsCollector {
+
+ private final Predicate<MetricDescriptor> metricDescriptorPredicate;
+ private final MetricsCompressor compressor;
+ private final ILogger logger;
+ private final UnaryOperator<MetricDescriptor> addPrefixFn;
+
+ public ZetaMetricsCollector(
+ Predicate<MetricDescriptor> metricDescriptorPredicate, Member
member, ILogger logger) {
+ Objects.requireNonNull(member, "member");
+ this.logger = Objects.requireNonNull(logger, "logger");
+
+ this.metricDescriptorPredicate = metricDescriptorPredicate;
+ this.addPrefixFn = JobMetricsUtil.addMemberPrefixFn(member);
+ this.compressor = new MetricsCompressor();
+ }
+
+ @Override
+ public void collectLong(MetricDescriptor descriptor, long value) {
+ if (metricDescriptorPredicate.test(descriptor)) {
+ compressor.addLong(addPrefixFn.apply(descriptor), value);
+ }
+ }
+
+ @Override
+ public void collectDouble(MetricDescriptor descriptor, double value) {
+ if (metricDescriptorPredicate.test(descriptor)) {
+ compressor.addDouble(addPrefixFn.apply(descriptor), value);
+ }
+ }
+
+ @Override
+ public void collectException(MetricDescriptor descriptor, Exception e) {
+ if (metricDescriptorPredicate.test(descriptor)) {
+ logger.warning("Exception when rendering job metrics: " + e, e);
+ }
+ }
+
+ @Override
+ public void collectNoValue(MetricDescriptor descriptor) {}
+
+ public RawJobMetrics getMetrics() {
+ return RawJobMetrics.of(compressor.getBlobAndReset());
+ }
+}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/GetJobMetricsOperation.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/GetJobMetricsOperation.java
index 965816cd13..0668647381 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/GetJobMetricsOperation.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/GetJobMetricsOperation.java
@@ -30,6 +30,8 @@ import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
+import static
org.apache.seatunnel.engine.server.metrics.JobMetricsUtil.toJsonString;
+
public class GetJobMetricsOperation extends Operation
implements IdentifiedDataSerializable, AllowedDuringPassiveState {
private long jobId;
@@ -70,9 +72,10 @@ public class GetJobMetricsOperation extends Operation
CompletableFuture<String> future =
CompletableFuture.supplyAsync(
() -> {
- return service.getCoordinatorService()
- .getJobMetrics(jobId)
- .toJsonString();
+ return toJsonString(
+ service.getCoordinatorService()
+ .getJobMetrics(jobId)
+ .getMetrics());
},
getNodeEngine()
.getExecutionService()
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/GetJobMetricsOperation.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/GetRunningJobMetricsOperation.java
similarity index 80%
copy from
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/GetJobMetricsOperation.java
copy to
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/GetRunningJobMetricsOperation.java
index 965816cd13..9277c324f3 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/GetJobMetricsOperation.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/GetRunningJobMetricsOperation.java
@@ -30,17 +30,14 @@ import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
-public class GetJobMetricsOperation extends Operation
+import static
org.apache.seatunnel.engine.server.metrics.JobMetricsUtil.toJsonString;
+
+public class GetRunningJobMetricsOperation extends Operation
implements IdentifiedDataSerializable, AllowedDuringPassiveState {
- private long jobId;
private String response;
- public GetJobMetricsOperation() {}
-
- public GetJobMetricsOperation(long jobId) {
- this.jobId = jobId;
- }
+ public GetRunningJobMetricsOperation() {}
@Override
public final int getFactoryId() {
@@ -49,19 +46,17 @@ public class GetJobMetricsOperation extends Operation
@Override
public int getClassId() {
- return
ClientToServerOperationDataSerializerHook.GET_JOB_METRICS_OPERATOR;
+ return
ClientToServerOperationDataSerializerHook.GET_RUNNING_JOB_METRICS_OPERATOR;
}
@Override
protected void writeInternal(ObjectDataOutput out) throws IOException {
super.writeInternal(out);
- out.writeLong(jobId);
}
@Override
protected void readInternal(ObjectDataInput in) throws IOException {
super.readInternal(in);
- jobId = in.readLong();
}
@Override
@@ -70,13 +65,12 @@ public class GetJobMetricsOperation extends Operation
CompletableFuture<String> future =
CompletableFuture.supplyAsync(
() -> {
- return service.getCoordinatorService()
- .getJobMetrics(jobId)
- .toJsonString();
+ return toJsonString(
+
service.getCoordinatorService().getRunningJobMetrics());
},
getNodeEngine()
.getExecutionService()
- .getExecutor("get_job_metrics_operation"));
+
.getExecutor("get_running_job_metrics_operation"));
try {
response = future.get();
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/GetJobMetricsTask.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/GetJobMetricsTask.java
index d083a6a3aa..ea3e834f66 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/GetJobMetricsTask.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/GetJobMetricsTask.java
@@ -43,7 +43,7 @@ public class GetJobMetricsTask extends
AbstractSeaTunnelMessageTask<Long, String
@Override
public String getMethodName() {
- return "getJobStatus";
+ return "getJobMetrics";
}
@Override
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/GetJobMetricsTask.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/GetRunningJobMetricsTask.java
similarity index 74%
copy from
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/GetJobMetricsTask.java
copy to
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/GetRunningJobMetricsTask.java
index d083a6a3aa..2c13e253eb 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/GetJobMetricsTask.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/GetRunningJobMetricsTask.java
@@ -17,33 +17,34 @@
package org.apache.seatunnel.engine.server.protocol.task;
-import
org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelGetJobMetricsCodec;
-import org.apache.seatunnel.engine.server.operation.GetJobMetricsOperation;
+import
org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelGetRunningJobMetricsCodec;
+import
org.apache.seatunnel.engine.server.operation.GetRunningJobMetricsOperation;
import com.hazelcast.client.impl.protocol.ClientMessage;
import com.hazelcast.instance.impl.Node;
import com.hazelcast.internal.nio.Connection;
import com.hazelcast.spi.impl.operationservice.Operation;
-public class GetJobMetricsTask extends AbstractSeaTunnelMessageTask<Long,
String> {
+public class GetRunningJobMetricsTask extends
AbstractSeaTunnelMessageTask<Void, String> {
- protected GetJobMetricsTask(ClientMessage clientMessage, Node node,
Connection connection) {
+ protected GetRunningJobMetricsTask(
+ ClientMessage clientMessage, Node node, Connection connection) {
super(
clientMessage,
node,
connection,
- SeaTunnelGetJobMetricsCodec::decodeRequest,
- SeaTunnelGetJobMetricsCodec::encodeResponse);
+ m -> null,
+ SeaTunnelGetRunningJobMetricsCodec::encodeResponse);
}
@Override
protected Operation prepareOperation() {
- return new GetJobMetricsOperation(parameters);
+ return new GetRunningJobMetricsOperation();
}
@Override
public String getMethodName() {
- return "getJobStatus";
+ return "getRunningJobMetrics";
}
@Override
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/SeaTunnelMessageTaskFactoryProvider.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/SeaTunnelMessageTaskFactoryProvider.java
index 6d19ada03f..f3c88b9ebc 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/SeaTunnelMessageTaskFactoryProvider.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/SeaTunnelMessageTaskFactoryProvider.java
@@ -23,6 +23,7 @@ import
org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelGetJobDetailStat
import
org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelGetJobInfoCodec;
import
org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelGetJobMetricsCodec;
import
org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelGetJobStatusCodec;
+import
org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelGetRunningJobMetricsCodec;
import
org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelListJobStatusCodec;
import
org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelPrintMessageCodec;
import
org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelSavePointJobCodec;
@@ -92,5 +93,9 @@ public class SeaTunnelMessageTaskFactoryProvider implements
MessageTaskFactoryPr
SeaTunnelGetClusterHealthMetricsCodec.REQUEST_MESSAGE_TYPE,
(clientMessage, connection) ->
new GetClusterHealthMetricsTask(clientMessage, node,
connection));
+ factories.put(
+ SeaTunnelGetRunningJobMetricsCodec.REQUEST_MESSAGE_TYPE,
+ (clientMessage, connection) ->
+ new GetRunningJobMetricsTask(clientMessage, node,
connection));
}
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/ClientToServerOperationDataSerializerHook.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/ClientToServerOperationDataSerializerHook.java
index 4cfef1cd59..188e4fe065 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/ClientToServerOperationDataSerializerHook.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/ClientToServerOperationDataSerializerHook.java
@@ -24,6 +24,7 @@ import
org.apache.seatunnel.engine.server.operation.GetJobDetailStatusOperation;
import org.apache.seatunnel.engine.server.operation.GetJobInfoOperation;
import org.apache.seatunnel.engine.server.operation.GetJobMetricsOperation;
import org.apache.seatunnel.engine.server.operation.GetJobStatusOperation;
+import
org.apache.seatunnel.engine.server.operation.GetRunningJobMetricsOperation;
import org.apache.seatunnel.engine.server.operation.PrintMessageOperation;
import org.apache.seatunnel.engine.server.operation.SavePointJobOperation;
import org.apache.seatunnel.engine.server.operation.SubmitJobOperation;
@@ -60,6 +61,8 @@ public final class ClientToServerOperationDataSerializerHook
implements DataSeri
public static final int GET_CLUSTER_HEALTH_METRICS = 9;
+ public static final int GET_RUNNING_JOB_METRICS_OPERATOR = 10;
+
public static final int FACTORY_ID =
FactoryIdHelper.getFactoryId(
SeaTunnelFactoryIdConstant.SEATUNNEL_OPERATION_DATA_SERIALIZER_FACTORY,
@@ -100,6 +103,8 @@ public final class
ClientToServerOperationDataSerializerHook implements DataSeri
return new SavePointJobOperation();
case GET_CLUSTER_HEALTH_METRICS:
return new GetClusterHealthMetricsOperation();
+ case GET_RUNNING_JOB_METRICS_OPERATOR:
+ return new GetRunningJobMetricsOperation();
default:
throw new IllegalArgumentException("Unknown type id " +
typeId);
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/TaskDataSerializerHook.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/TaskDataSerializerHook.java
index 90d543cee3..c5b9baeb0d 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/TaskDataSerializerHook.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/TaskDataSerializerHook.java
@@ -25,6 +25,7 @@ import
org.apache.seatunnel.engine.server.task.operation.CancelTaskOperation;
import
org.apache.seatunnel.engine.server.task.operation.CheckTaskGroupIsExecutingOperation;
import
org.apache.seatunnel.engine.server.task.operation.CleanTaskGroupContextOperation;
import org.apache.seatunnel.engine.server.task.operation.DeployTaskOperation;
+import org.apache.seatunnel.engine.server.task.operation.GetMetricsOperation;
import
org.apache.seatunnel.engine.server.task.operation.GetTaskGroupAddressOperation;
import
org.apache.seatunnel.engine.server.task.operation.GetTaskGroupMetricsOperation;
import
org.apache.seatunnel.engine.server.task.operation.NotifyTaskStatusOperation;
@@ -89,6 +90,8 @@ public class TaskDataSerializerHook implements
DataSerializerHook {
public static final int CHECK_TASKGROUP_IS_EXECUTING = 21;
+ public static final int GET_METRICS_OPERATION = 22;
+
public static final int FACTORY_ID =
FactoryIdHelper.getFactoryId(
SeaTunnelFactoryIdConstant.SEATUNNEL_TASK_DATA_SERIALIZER_FACTORY,
@@ -151,6 +154,8 @@ public class TaskDataSerializerHook implements
DataSerializerHook {
return new SourceReaderEventOperation();
case CHECK_TASKGROUP_IS_EXECUTING:
return new CheckTaskGroupIsExecutingOperation();
+ case GET_METRICS_OPERATION:
+ return new GetMetricsOperation();
default:
throw new IllegalArgumentException("Unknown type id " +
typeId);
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/GetMetricsOperation.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/GetMetricsOperation.java
new file mode 100644
index 0000000000..15003a641d
--- /dev/null
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/GetMetricsOperation.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.task.operation;
+
+import org.apache.seatunnel.api.common.metrics.RawJobMetrics;
+import org.apache.seatunnel.engine.server.metrics.ZetaMetricsCollector;
+import org.apache.seatunnel.engine.server.serializable.TaskDataSerializerHook;
+
+import com.hazelcast.cluster.Address;
+import com.hazelcast.internal.metrics.MetricDescriptor;
+import com.hazelcast.logging.ILogger;
+import com.hazelcast.nio.ObjectDataInput;
+import com.hazelcast.nio.ObjectDataOutput;
+import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
+import com.hazelcast.spi.impl.NodeEngineImpl;
+import com.hazelcast.spi.impl.operationservice.Operation;
+
+import java.io.IOException;
+import java.util.function.Predicate;
+
+public class GetMetricsOperation extends Operation implements
IdentifiedDataSerializable {
+
+ private Predicate<MetricDescriptor> metricDescriptorPredicate;
+ private RawJobMetrics response;
+
+ public GetMetricsOperation() {}
+
+ public GetMetricsOperation(Predicate<MetricDescriptor>
metricDescriptorPredicate) {
+ this.metricDescriptorPredicate = metricDescriptorPredicate;
+ }
+
+ @Override
+ public void run() {
+ ILogger logger = getLogger();
+
+ Address callerAddress = getCallerAddress();
+
+ NodeEngineImpl nodeEngine = (NodeEngineImpl) getNodeEngine();
+ Address masterAddress = getNodeEngine().getMasterAddress();
+ if (!callerAddress.equals(masterAddress)) {
+ throw new IllegalStateException(
+ "Caller "
+ + callerAddress
+ + " cannot get metrics"
+ + " because it is not master. Master is: "
+ + masterAddress);
+ }
+
+ ZetaMetricsCollector metricsRenderer =
+ new ZetaMetricsCollector(
+ metricDescriptorPredicate,
nodeEngine.getLocalMember(), logger);
+ nodeEngine.getMetricsRegistry().collect(metricsRenderer);
+ response = metricsRenderer.getMetrics();
+ }
+
+ @Override
+ protected void writeInternal(ObjectDataOutput out) throws IOException {
+ super.writeInternal(out);
+ out.writeObject(metricDescriptorPredicate);
+ }
+
+ @Override
+ protected void readInternal(ObjectDataInput in) throws IOException {
+ super.readInternal(in);
+ this.metricDescriptorPredicate = in.readObject();
+ }
+
+ @Override
+ public Object getResponse() {
+ return response;
+ }
+
+ @Override
+ public int getFactoryId() {
+ return TaskDataSerializerHook.FACTORY_ID;
+ }
+
+ @Override
+ public int getClassId() {
+ return TaskDataSerializerHook.GET_METRICS_OPERATION;
+ }
+}