This is an automated email from the ASF dual-hosted git repository.
corgy-w pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new af9fe5be32 [Improve] [Zeta] Add observability metrics for engine state
stores (#10860)
af9fe5be32 is described below
commit af9fe5be3247e9fde6a2afaa6418b8790a695611
Author: JeremyXin <[email protected]>
AuthorDate: Sun May 17 22:41:29 2026 +0800
[Improve] [Zeta] Add observability metrics for engine state stores (#10860)
---
docs/en/engines/zeta/telemetry.md | 19 +++
docs/zh/engines/zeta/telemetry.md | 18 +++
.../metrics/ExportsInstanceInitializer.java | 3 +
.../exports/EngineStateStoreMetricExports.java | 110 +++++++++++++++
.../engine/server/metrics/MetricsApiTest.java | 3 +-
.../exports/EngineStateStoreMetricExportsTest.java | 153 +++++++++++++++++++++
6 files changed, 305 insertions(+), 1 deletion(-)
diff --git a/docs/en/engines/zeta/telemetry.md
b/docs/en/engines/zeta/telemetry.md
index 7b2f80678e..99a87735f7 100644
--- a/docs/en/engines/zeta/telemetry.md
+++ b/docs/en/engines/zeta/telemetry.md
@@ -49,6 +49,25 @@ Note: All metrics both have the same labelName `cluster`,
that's value is the co
| hazelcast_partition_isClusterSafe | Gauge | -
| Whether is cluster safe of partition
|
| hazelcast_partition_isLocalMemberSafe | Gauge | -
| Whether is local member safe of partition
|
+### Engine State Store Metrics
+
+These metrics expose the basic size and local resource usage of Zeta engine
state stores. The current backend is
+Hazelcast IMap, so the `backend` label value is `hazelcast`. Local metrics are
emitted by each node with the
+`address` label. To monitor the total entry count of an engine state store,
aggregate
+`engine_state_store_local_owned_entries` in Prometheus.
+
+| MetricName | Type | Labels
| DESCRIPTION
|
+|-------------------------------------------------|-------|-----------------------------------------------------------------------------|-----------------------------------------------------------------------------|
+| engine_state_store_local_owned_entries | Gauge | **address**,
server instance address. **store**, state store name. **backend**, state store
backend. | Local owned entries of an engine state store on this node.
|
+| engine_state_store_local_backup_entries | Gauge | **address**,
server instance address. **store**, state store name. **backend**, state store
backend. | Local backup entries of an engine state store on this node.
|
+| engine_state_store_local_heap_cost_bytes | Gauge | **address**,
server instance address. **store**, state store name. **backend**, state store
backend. | Local heap cost in bytes when the backend exposes it.
|
+
+Example PromQL:
+
+```promql
+sum by (cluster, store, backend) (engine_state_store_local_owned_entries)
+```
+
### Thread Pool Status
| MetricName | Type | Labels
| DESCRIPTION
|
diff --git a/docs/zh/engines/zeta/telemetry.md
b/docs/zh/engines/zeta/telemetry.md
index 5731a2315d..00d679845f 100644
--- a/docs/zh/engines/zeta/telemetry.md
+++ b/docs/zh/engines/zeta/telemetry.md
@@ -50,6 +50,24 @@ OpenMetrics 的指标文本可通过
`http://{instanceHost}:5801/hazelcast/rest/
| hazelcast_partition_isClusterSafe | Gauge | -
| 分区是否安全 |
| hazelcast_partition_isLocalMemberSafe | Gauge | -
| 本地成员是否安全 |
+### 引擎状态存储指标
+
+这些指标暴露 Zeta 引擎状态存储的基础大小和本地资源使用情况。当前后端是 Hazelcast IMap,因此 `backend`
+标签值为 `hazelcast`。本地指标由每个节点输出,并包含 `address` 标签。如需监控某个引擎状态存储的全局
+entry 总量,请在 Prometheus 中聚合 `engine_state_store_local_owned_entries`。
+
+| MetricName | Type | Labels
| 描述
|
+|-------------------------------------------------|-------|-----------------------------------------------------------|-----------------------------------------|
+| engine_state_store_local_owned_entries | Gauge |
**address**,服务器实例地址。**store**,状态存储名称。**backend**,状态存储后端。 | 当前节点上该引擎状态存储的本地
owned entry 数。 |
+| engine_state_store_local_backup_entries | Gauge |
**address**,服务器实例地址。**store**,状态存储名称。**backend**,状态存储后端。 | 当前节点上该引擎状态存储的本地
backup entry 数。 |
+| engine_state_store_local_heap_cost_bytes | Gauge |
**address**,服务器实例地址。**store**,状态存储名称。**backend**,状态存储后端。 |
后端支持时,当前节点上该引擎状态存储的本地堆内存成本,单位为字节。 |
+
+PromQL 示例:
+
+```promql
+sum by (cluster, store, backend) (engine_state_store_local_owned_entries)
+```
+
### 线程池状态
| MetricName | Type | Labels
| 描述 |
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/telemetry/metrics/ExportsInstanceInitializer.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/telemetry/metrics/ExportsInstanceInitializer.java
index f737fbf6c2..8d429536a2 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/telemetry/metrics/ExportsInstanceInitializer.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/telemetry/metrics/ExportsInstanceInitializer.java
@@ -18,6 +18,7 @@
package org.apache.seatunnel.engine.server.telemetry.metrics;
import
org.apache.seatunnel.engine.server.telemetry.metrics.exports.ClusterMetricExports;
+import
org.apache.seatunnel.engine.server.telemetry.metrics.exports.EngineStateStoreMetricExports;
import
org.apache.seatunnel.engine.server.telemetry.metrics.exports.JobMetricExports;
import
org.apache.seatunnel.engine.server.telemetry.metrics.exports.JobThreadPoolStatusExports;
import
org.apache.seatunnel.engine.server.telemetry.metrics.exports.NodeMetricExports;
@@ -45,6 +46,8 @@ public final class ExportsInstanceInitializer {
new JobThreadPoolStatusExports(node).register(collectorRegistry);
// Node metrics
new NodeMetricExports(node).register(collectorRegistry);
+ // Engine state store metrics
+ new
EngineStateStoreMetricExports(node).register(collectorRegistry);
// Cluster metrics
new ClusterMetricExports(node).register(collectorRegistry);
initialized = true;
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/telemetry/metrics/exports/EngineStateStoreMetricExports.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/telemetry/metrics/exports/EngineStateStoreMetricExports.java
new file mode 100644
index 0000000000..c8a8697039
--- /dev/null
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/telemetry/metrics/exports/EngineStateStoreMetricExports.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.telemetry.metrics.exports;
+
+import org.apache.seatunnel.engine.common.Constant;
+import org.apache.seatunnel.engine.server.telemetry.metrics.AbstractCollector;
+
+import com.hazelcast.core.HazelcastInstanceNotActiveException;
+import com.hazelcast.instance.impl.Node;
+import com.hazelcast.map.IMap;
+import com.hazelcast.map.LocalMapStats;
+import io.prometheus.client.GaugeMetricFamily;
+
+import java.util.Arrays;
+import java.util.List;
+
+public class EngineStateStoreMetricExports extends AbstractCollector {
+
+ private static final String BACKEND = "hazelcast";
+
+ private static final List<String> ENGINE_STATE_STORES =
+ Arrays.asList(
+ Constant.IMAP_RUNNING_JOB_INFO,
+ Constant.IMAP_RUNNING_JOB_STATE,
+ Constant.IMAP_STATE_TIMESTAMPS,
+ Constant.IMAP_OWNED_SLOT_PROFILES,
+ Constant.IMAP_RUNNING_JOB_METRICS,
+ Constant.IMAP_FINISHED_JOB_STATE,
+ Constant.IMAP_FINISHED_JOB_METRICS,
+ Constant.IMAP_FINISHED_JOB_VERTEX_INFO,
+ Constant.IMAP_CHECKPOINT_MONITOR,
+ Constant.IMAP_CONNECTOR_JAR_REF_COUNTERS,
+ Constant.IMAP_CHECKPOINT_ID,
+ Constant.IMAP_PENDING_PIPELINE_CLEANUP);
+
+ public EngineStateStoreMetricExports(Node node) {
+ super(node);
+ }
+
+ @Override
+ public List<MetricFamilySamples> collect() {
+ List<String> localLabelNames = clusterLabelNames(ADDRESS, "store",
"backend");
+ GaugeMetricFamily localOwnedEntries =
+ new GaugeMetricFamily(
+ "engine_state_store_local_owned_entries",
+ "Local owned entries of an engine state store on this
node",
+ localLabelNames);
+ GaugeMetricFamily localBackupEntries =
+ new GaugeMetricFamily(
+ "engine_state_store_local_backup_entries",
+ "Local backup entries of an engine state store on this
node",
+ localLabelNames);
+ GaugeMetricFamily localHeapCostBytes =
+ new GaugeMetricFamily(
+ "engine_state_store_local_heap_cost_bytes",
+ "Local heap cost in bytes of an engine state store on
this node",
+ localLabelNames);
+
+ for (String storeName : ENGINE_STATE_STORES) {
+ collectStoreMetrics(
+ storeName, localOwnedEntries, localBackupEntries,
localHeapCostBytes);
+ }
+
+ return Arrays.asList(localOwnedEntries, localBackupEntries,
localHeapCostBytes);
+ }
+
+ private void collectStoreMetrics(
+ String storeName,
+ GaugeMetricFamily localOwnedEntries,
+ GaugeMetricFamily localBackupEntries,
+ GaugeMetricFamily localHeapCostBytes) {
+ try {
+ IMap<?, ?> map = getNode().hazelcastInstance.getMap(storeName);
+ LocalMapStats localMapStats = map.getLocalMapStats();
+ List<String> localLabelValues = labelValues(localAddress(),
storeName, BACKEND);
+ localOwnedEntries.addMetric(localLabelValues,
localMapStats.getOwnedEntryCount());
+ localBackupEntries.addMetric(localLabelValues,
localMapStats.getBackupEntryCount());
+ localHeapCostBytes.addMetric(localLabelValues,
localMapStats.getHeapCost());
+ } catch (HazelcastInstanceNotActiveException e) {
+ getLogger(getClass())
+ .fine(
+ String.format(
+ "Skip state store metrics because
Hazelcast is not active, store=%s, backend=%s",
+ storeName, BACKEND),
+ e);
+ } catch (Exception e) {
+ getLogger(getClass())
+ .warning(
+ String.format(
+ "Failed to collect state store metrics,
store=%s, backend=%s",
+ storeName, BACKEND),
+ e);
+ }
+ }
+}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/metrics/MetricsApiTest.java
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/metrics/MetricsApiTest.java
index 27a9743cfb..dcfe8989ce 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/metrics/MetricsApiTest.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/metrics/MetricsApiTest.java
@@ -54,7 +54,8 @@ public class MetricsApiTest {
given().get("http://localhost:8080" + RestConstant.REST_URL_METRICS)
.then()
.statusCode(200)
- .body(containsString("process_start_time_seconds"));
+ .body(containsString("process_start_time_seconds"))
+
.body(containsString("engine_state_store_local_owned_entries"));
}
@AfterAll
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/telemetry/metrics/exports/EngineStateStoreMetricExportsTest.java
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/telemetry/metrics/exports/EngineStateStoreMetricExportsTest.java
new file mode 100644
index 0000000000..71c070031a
--- /dev/null
+++
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/telemetry/metrics/exports/EngineStateStoreMetricExportsTest.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.telemetry.metrics.exports;
+
+import org.apache.seatunnel.engine.common.Constant;
+import org.apache.seatunnel.engine.server.SeaTunnelServerStarter;
+import org.apache.seatunnel.engine.server.TestUtils;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.condition.DisabledOnOs;
+import org.junit.jupiter.api.condition.OS;
+
+import com.hazelcast.instance.impl.HazelcastInstanceImpl;
+import io.prometheus.client.Collector.MetricFamilySamples;
+import io.prometheus.client.Collector.MetricFamilySamples.Sample;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static org.awaitility.Awaitility.await;
+
+@DisabledOnOs(OS.WINDOWS)
+class EngineStateStoreMetricExportsTest {
+
+ private HazelcastInstanceImpl instance;
+
+ @AfterEach
+ void afterEach() {
+ if (instance != null) {
+ instance.shutdown();
+ }
+ }
+
+ @Test
+ void collectShouldExportLocalStateStoreMetrics() {
+ instance =
+ SeaTunnelServerStarter.createHazelcastInstance(
+
TestUtils.getClusterName("EngineStateStoreMetricExportsTest_localMetrics"));
+ await().atMost(10, TimeUnit.SECONDS)
+ .untilAsserted(() ->
Assertions.assertTrue(instance.node.isMaster()));
+ instance.getMap(Constant.IMAP_RUNNING_JOB_INFO).put(1L, "job-info");
+
+ List<MetricFamilySamples> metrics =
+ new EngineStateStoreMetricExports(instance.node).collect();
+
+ Sample ownedEntries =
+ findSample(
+ metrics,
+ "engine_state_store_local_owned_entries",
+ Constant.IMAP_RUNNING_JOB_INFO);
+ Assertions.assertEquals(
+ Arrays.asList("cluster", "address", "store", "backend"),
ownedEntries.labelNames);
+ Assertions.assertEquals("hazelcast", labelValue(ownedEntries,
"backend"));
+ Assertions.assertFalse(labelValue(ownedEntries, "address").isEmpty());
+
+ Assertions.assertNotNull(
+ findSample(
+ metrics,
+ "engine_state_store_local_backup_entries",
+ Constant.IMAP_RUNNING_JOB_INFO));
+ Assertions.assertNotNull(
+ findSample(
+ metrics,
+ "engine_state_store_local_heap_cost_bytes",
+ Constant.IMAP_RUNNING_JOB_INFO));
+ }
+
+ @Test
+ void collectShouldCoverAllEngineStateStores() {
+ instance =
+ SeaTunnelServerStarter.createHazelcastInstance(
+ TestUtils.getClusterName(
+
"EngineStateStoreMetricExportsTest_allStateStores"));
+ await().atMost(10, TimeUnit.SECONDS)
+ .untilAsserted(() ->
Assertions.assertTrue(instance.node.isMaster()));
+
+ List<MetricFamilySamples> metrics =
+ new EngineStateStoreMetricExports(instance.node).collect();
+
+ Set<String> exportedStores =
+ findMetricFamily(metrics,
"engine_state_store_local_owned_entries").samples.stream()
+ .map(sample -> labelValue(sample, "store"))
+ .collect(Collectors.toSet());
+
+ Assertions.assertEquals(
+ new HashSet<>(
+ Arrays.asList(
+ Constant.IMAP_RUNNING_JOB_INFO,
+ Constant.IMAP_RUNNING_JOB_STATE,
+ Constant.IMAP_STATE_TIMESTAMPS,
+ Constant.IMAP_OWNED_SLOT_PROFILES,
+ Constant.IMAP_RUNNING_JOB_METRICS,
+ Constant.IMAP_FINISHED_JOB_STATE,
+ Constant.IMAP_FINISHED_JOB_METRICS,
+ Constant.IMAP_FINISHED_JOB_VERTEX_INFO,
+ Constant.IMAP_CHECKPOINT_MONITOR,
+ Constant.IMAP_CONNECTOR_JAR_REF_COUNTERS,
+ Constant.IMAP_CHECKPOINT_ID,
+ Constant.IMAP_PENDING_PIPELINE_CLEANUP)),
+ exportedStores);
+ }
+
+ private static MetricFamilySamples findMetricFamily(
+ List<MetricFamilySamples> metrics, String name) {
+ return metrics.stream()
+ .filter(metricFamilySamples ->
name.equals(metricFamilySamples.name))
+ .findFirst()
+ .orElseThrow(() -> new AssertionError("Missing metric family:
" + name));
+ }
+
+ private static Sample findSample(
+ List<MetricFamilySamples> metrics, String metricName, String
storeName) {
+ return findMetricFamily(metrics, metricName).samples.stream()
+ .filter(sample -> storeName.equals(labelValue(sample,
"store")))
+ .findFirst()
+ .orElseThrow(
+ () ->
+ new AssertionError(
+ "Missing metric sample: "
+ + metricName
+ + ", store="
+ + storeName));
+ }
+
+ private static String labelValue(Sample sample, String labelName) {
+ int index = sample.labelNames.indexOf(labelName);
+ if (index < 0) {
+ throw new AssertionError("Missing label: " + labelName);
+ }
+ return sample.labelValues.get(index);
+ }
+}