This is an automated email from the ASF dual-hosted git repository.

corgy-w pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new af9fe5be32 [Improve] [Zeta] Add observability metrics for engine state 
stores (#10860)
af9fe5be32 is described below

commit af9fe5be3247e9fde6a2afaa6418b8790a695611
Author: JeremyXin <[email protected]>
AuthorDate: Sun May 17 22:41:29 2026 +0800

    [Improve] [Zeta] Add observability metrics for engine state stores (#10860)
---
 docs/en/engines/zeta/telemetry.md                  |  19 +++
 docs/zh/engines/zeta/telemetry.md                  |  18 +++
 .../metrics/ExportsInstanceInitializer.java        |   3 +
 .../exports/EngineStateStoreMetricExports.java     | 110 +++++++++++++++
 .../engine/server/metrics/MetricsApiTest.java      |   3 +-
 .../exports/EngineStateStoreMetricExportsTest.java | 153 +++++++++++++++++++++
 6 files changed, 305 insertions(+), 1 deletion(-)

diff --git a/docs/en/engines/zeta/telemetry.md 
b/docs/en/engines/zeta/telemetry.md
index 7b2f80678e..99a87735f7 100644
--- a/docs/en/engines/zeta/telemetry.md
+++ b/docs/en/engines/zeta/telemetry.md
@@ -49,6 +49,25 @@ Note: All metrics both have the same labelName `cluster`, 
that's value is the co
 | hazelcast_partition_isClusterSafe         | Gauge | -                        
                                                                                
                          | Whether is cluster safe of partition                
                    |
 | hazelcast_partition_isLocalMemberSafe     | Gauge | -                        
                                                                                
                          | Whether is local member safe of partition           
                    |
 
+### Engine State Store Metrics
+
+These metrics expose the basic size and local resource usage of Zeta engine 
state stores. The current backend is
+Hazelcast IMap, so the `backend` label value is `hazelcast`. Local metrics are 
emitted by each node with the
+`address` label. To monitor the total entry count of an engine state store, 
aggregate
+`engine_state_store_local_owned_entries` in Prometheus.
+
+| MetricName                                      | Type  | Labels             
                                                         | DESCRIPTION          
                                                       |
+|-------------------------------------------------|-------|-----------------------------------------------------------------------------|-----------------------------------------------------------------------------|
+| engine_state_store_local_owned_entries          | Gauge | **address**, 
server instance address. **store**, state store name. **backend**, state store 
backend. | Local owned entries of an engine state store on this node.           
       |
+| engine_state_store_local_backup_entries         | Gauge | **address**, 
server instance address. **store**, state store name. **backend**, state store 
backend. | Local backup entries of an engine state store on this node.          
       |
+| engine_state_store_local_heap_cost_bytes        | Gauge | **address**, 
server instance address. **store**, state store name. **backend**, state store 
backend. | Local heap cost in bytes when the backend exposes it.                
       |
+
+Example PromQL:
+
+```promql
+sum by (cluster, store, backend) (engine_state_store_local_owned_entries)
+```
+
 ### Thread Pool Status
 
 | MetricName                          | Type    | Labels                       
                                      | DESCRIPTION                             
                                       |
diff --git a/docs/zh/engines/zeta/telemetry.md 
b/docs/zh/engines/zeta/telemetry.md
index 5731a2315d..00d679845f 100644
--- a/docs/zh/engines/zeta/telemetry.md
+++ b/docs/zh/engines/zeta/telemetry.md
@@ -50,6 +50,24 @@ OpenMetrics 的指标文本可通过 
`http://{instanceHost}:5801/hazelcast/rest/
 | hazelcast_partition_isClusterSafe         | Gauge | -                        
                                                                                
  | 分区是否安全                              |
 | hazelcast_partition_isLocalMemberSafe     | Gauge | -                        
                                                                                
  | 本地成员是否安全                            |
 
+### 引擎状态存储指标
+
+这些指标暴露 Zeta 引擎状态存储的基础大小和本地资源使用情况。当前后端是 Hazelcast IMap,因此 `backend`
+标签值为 `hazelcast`。本地指标由每个节点输出,并包含 `address` 标签。如需监控某个引擎状态存储的全局
+entry 总量,请在 Prometheus 中聚合 `engine_state_store_local_owned_entries`。
+
+| MetricName                                      | Type  | Labels             
                                       | 描述                                     
 |
+|-------------------------------------------------|-------|-----------------------------------------------------------|-----------------------------------------|
+| engine_state_store_local_owned_entries          | Gauge | 
**address**,服务器实例地址。**store**,状态存储名称。**backend**,状态存储后端。 | 当前节点上该引擎状态存储的本地 
owned entry 数。       |
+| engine_state_store_local_backup_entries         | Gauge | 
**address**,服务器实例地址。**store**,状态存储名称。**backend**,状态存储后端。 | 当前节点上该引擎状态存储的本地 
backup entry 数。      |
+| engine_state_store_local_heap_cost_bytes        | Gauge | 
**address**,服务器实例地址。**store**,状态存储名称。**backend**,状态存储后端。 | 
后端支持时,当前节点上该引擎状态存储的本地堆内存成本,单位为字节。 |
+
+PromQL 示例:
+
+```promql
+sum by (cluster, store, backend) (engine_state_store_local_owned_entries)
+```
+
 ### 线程池状态
 
 | MetricName                          | Type    | Labels                       
           | 描述                             |
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/telemetry/metrics/ExportsInstanceInitializer.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/telemetry/metrics/ExportsInstanceInitializer.java
index f737fbf6c2..8d429536a2 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/telemetry/metrics/ExportsInstanceInitializer.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/telemetry/metrics/ExportsInstanceInitializer.java
@@ -18,6 +18,7 @@
 package org.apache.seatunnel.engine.server.telemetry.metrics;
 
 import 
org.apache.seatunnel.engine.server.telemetry.metrics.exports.ClusterMetricExports;
+import 
org.apache.seatunnel.engine.server.telemetry.metrics.exports.EngineStateStoreMetricExports;
 import 
org.apache.seatunnel.engine.server.telemetry.metrics.exports.JobMetricExports;
 import 
org.apache.seatunnel.engine.server.telemetry.metrics.exports.JobThreadPoolStatusExports;
 import 
org.apache.seatunnel.engine.server.telemetry.metrics.exports.NodeMetricExports;
@@ -45,6 +46,8 @@ public final class ExportsInstanceInitializer {
             new JobThreadPoolStatusExports(node).register(collectorRegistry);
             // Node metrics
             new NodeMetricExports(node).register(collectorRegistry);
+            // Engine state store metrics
+            new 
EngineStateStoreMetricExports(node).register(collectorRegistry);
             // Cluster metrics
             new ClusterMetricExports(node).register(collectorRegistry);
             initialized = true;
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/telemetry/metrics/exports/EngineStateStoreMetricExports.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/telemetry/metrics/exports/EngineStateStoreMetricExports.java
new file mode 100644
index 0000000000..c8a8697039
--- /dev/null
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/telemetry/metrics/exports/EngineStateStoreMetricExports.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.telemetry.metrics.exports;
+
+import org.apache.seatunnel.engine.common.Constant;
+import org.apache.seatunnel.engine.server.telemetry.metrics.AbstractCollector;
+
+import com.hazelcast.core.HazelcastInstanceNotActiveException;
+import com.hazelcast.instance.impl.Node;
+import com.hazelcast.map.IMap;
+import com.hazelcast.map.LocalMapStats;
+import io.prometheus.client.GaugeMetricFamily;
+
+import java.util.Arrays;
+import java.util.List;
+
+public class EngineStateStoreMetricExports extends AbstractCollector {
+
+    private static final String BACKEND = "hazelcast";
+
+    private static final List<String> ENGINE_STATE_STORES =
+            Arrays.asList(
+                    Constant.IMAP_RUNNING_JOB_INFO,
+                    Constant.IMAP_RUNNING_JOB_STATE,
+                    Constant.IMAP_STATE_TIMESTAMPS,
+                    Constant.IMAP_OWNED_SLOT_PROFILES,
+                    Constant.IMAP_RUNNING_JOB_METRICS,
+                    Constant.IMAP_FINISHED_JOB_STATE,
+                    Constant.IMAP_FINISHED_JOB_METRICS,
+                    Constant.IMAP_FINISHED_JOB_VERTEX_INFO,
+                    Constant.IMAP_CHECKPOINT_MONITOR,
+                    Constant.IMAP_CONNECTOR_JAR_REF_COUNTERS,
+                    Constant.IMAP_CHECKPOINT_ID,
+                    Constant.IMAP_PENDING_PIPELINE_CLEANUP);
+
+    public EngineStateStoreMetricExports(Node node) {
+        super(node);
+    }
+
+    @Override
+    public List<MetricFamilySamples> collect() {
+        List<String> localLabelNames = clusterLabelNames(ADDRESS, "store", 
"backend");
+        GaugeMetricFamily localOwnedEntries =
+                new GaugeMetricFamily(
+                        "engine_state_store_local_owned_entries",
+                        "Local owned entries of an engine state store on this 
node",
+                        localLabelNames);
+        GaugeMetricFamily localBackupEntries =
+                new GaugeMetricFamily(
+                        "engine_state_store_local_backup_entries",
+                        "Local backup entries of an engine state store on this 
node",
+                        localLabelNames);
+        GaugeMetricFamily localHeapCostBytes =
+                new GaugeMetricFamily(
+                        "engine_state_store_local_heap_cost_bytes",
+                        "Local heap cost in bytes of an engine state store on 
this node",
+                        localLabelNames);
+
+        for (String storeName : ENGINE_STATE_STORES) {
+            collectStoreMetrics(
+                    storeName, localOwnedEntries, localBackupEntries, 
localHeapCostBytes);
+        }
+
+        return Arrays.asList(localOwnedEntries, localBackupEntries, 
localHeapCostBytes);
+    }
+
+    private void collectStoreMetrics(
+            String storeName,
+            GaugeMetricFamily localOwnedEntries,
+            GaugeMetricFamily localBackupEntries,
+            GaugeMetricFamily localHeapCostBytes) {
+        try {
+            IMap<?, ?> map = getNode().hazelcastInstance.getMap(storeName);
+            LocalMapStats localMapStats = map.getLocalMapStats();
+            List<String> localLabelValues = labelValues(localAddress(), 
storeName, BACKEND);
+            localOwnedEntries.addMetric(localLabelValues, 
localMapStats.getOwnedEntryCount());
+            localBackupEntries.addMetric(localLabelValues, 
localMapStats.getBackupEntryCount());
+            localHeapCostBytes.addMetric(localLabelValues, 
localMapStats.getHeapCost());
+        } catch (HazelcastInstanceNotActiveException e) {
+            getLogger(getClass())
+                    .fine(
+                            String.format(
+                                    "Skip state store metrics because 
Hazelcast is not active, store=%s, backend=%s",
+                                    storeName, BACKEND),
+                            e);
+        } catch (Exception e) {
+            getLogger(getClass())
+                    .warning(
+                            String.format(
+                                    "Failed to collect state store metrics, 
store=%s, backend=%s",
+                                    storeName, BACKEND),
+                            e);
+        }
+    }
+}
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/metrics/MetricsApiTest.java
 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/metrics/MetricsApiTest.java
index 27a9743cfb..dcfe8989ce 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/metrics/MetricsApiTest.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/metrics/MetricsApiTest.java
@@ -54,7 +54,8 @@ public class MetricsApiTest {
         given().get("http://localhost:8080"; + RestConstant.REST_URL_METRICS)
                 .then()
                 .statusCode(200)
-                .body(containsString("process_start_time_seconds"));
+                .body(containsString("process_start_time_seconds"))
+                
.body(containsString("engine_state_store_local_owned_entries"));
     }
 
     @AfterAll
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/telemetry/metrics/exports/EngineStateStoreMetricExportsTest.java
 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/telemetry/metrics/exports/EngineStateStoreMetricExportsTest.java
new file mode 100644
index 0000000000..71c070031a
--- /dev/null
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/telemetry/metrics/exports/EngineStateStoreMetricExportsTest.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.telemetry.metrics.exports;
+
+import org.apache.seatunnel.engine.common.Constant;
+import org.apache.seatunnel.engine.server.SeaTunnelServerStarter;
+import org.apache.seatunnel.engine.server.TestUtils;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.condition.DisabledOnOs;
+import org.junit.jupiter.api.condition.OS;
+
+import com.hazelcast.instance.impl.HazelcastInstanceImpl;
+import io.prometheus.client.Collector.MetricFamilySamples;
+import io.prometheus.client.Collector.MetricFamilySamples.Sample;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static org.awaitility.Awaitility.await;
+
+@DisabledOnOs(OS.WINDOWS)
+class EngineStateStoreMetricExportsTest {
+
+    private HazelcastInstanceImpl instance;
+
+    @AfterEach
+    void afterEach() {
+        if (instance != null) {
+            instance.shutdown();
+        }
+    }
+
+    @Test
+    void collectShouldExportLocalStateStoreMetrics() {
+        instance =
+                SeaTunnelServerStarter.createHazelcastInstance(
+                        
TestUtils.getClusterName("EngineStateStoreMetricExportsTest_localMetrics"));
+        await().atMost(10, TimeUnit.SECONDS)
+                .untilAsserted(() -> 
Assertions.assertTrue(instance.node.isMaster()));
+        instance.getMap(Constant.IMAP_RUNNING_JOB_INFO).put(1L, "job-info");
+
+        List<MetricFamilySamples> metrics =
+                new EngineStateStoreMetricExports(instance.node).collect();
+
+        Sample ownedEntries =
+                findSample(
+                        metrics,
+                        "engine_state_store_local_owned_entries",
+                        Constant.IMAP_RUNNING_JOB_INFO);
+        Assertions.assertEquals(
+                Arrays.asList("cluster", "address", "store", "backend"), 
ownedEntries.labelNames);
+        Assertions.assertEquals("hazelcast", labelValue(ownedEntries, 
"backend"));
+        Assertions.assertFalse(labelValue(ownedEntries, "address").isEmpty());
+
+        Assertions.assertNotNull(
+                findSample(
+                        metrics,
+                        "engine_state_store_local_backup_entries",
+                        Constant.IMAP_RUNNING_JOB_INFO));
+        Assertions.assertNotNull(
+                findSample(
+                        metrics,
+                        "engine_state_store_local_heap_cost_bytes",
+                        Constant.IMAP_RUNNING_JOB_INFO));
+    }
+
+    @Test
+    void collectShouldCoverAllEngineStateStores() {
+        instance =
+                SeaTunnelServerStarter.createHazelcastInstance(
+                        TestUtils.getClusterName(
+                                
"EngineStateStoreMetricExportsTest_allStateStores"));
+        await().atMost(10, TimeUnit.SECONDS)
+                .untilAsserted(() -> 
Assertions.assertTrue(instance.node.isMaster()));
+
+        List<MetricFamilySamples> metrics =
+                new EngineStateStoreMetricExports(instance.node).collect();
+
+        Set<String> exportedStores =
+                findMetricFamily(metrics, 
"engine_state_store_local_owned_entries").samples.stream()
+                        .map(sample -> labelValue(sample, "store"))
+                        .collect(Collectors.toSet());
+
+        Assertions.assertEquals(
+                new HashSet<>(
+                        Arrays.asList(
+                                Constant.IMAP_RUNNING_JOB_INFO,
+                                Constant.IMAP_RUNNING_JOB_STATE,
+                                Constant.IMAP_STATE_TIMESTAMPS,
+                                Constant.IMAP_OWNED_SLOT_PROFILES,
+                                Constant.IMAP_RUNNING_JOB_METRICS,
+                                Constant.IMAP_FINISHED_JOB_STATE,
+                                Constant.IMAP_FINISHED_JOB_METRICS,
+                                Constant.IMAP_FINISHED_JOB_VERTEX_INFO,
+                                Constant.IMAP_CHECKPOINT_MONITOR,
+                                Constant.IMAP_CONNECTOR_JAR_REF_COUNTERS,
+                                Constant.IMAP_CHECKPOINT_ID,
+                                Constant.IMAP_PENDING_PIPELINE_CLEANUP)),
+                exportedStores);
+    }
+
+    private static MetricFamilySamples findMetricFamily(
+            List<MetricFamilySamples> metrics, String name) {
+        return metrics.stream()
+                .filter(metricFamilySamples -> 
name.equals(metricFamilySamples.name))
+                .findFirst()
+                .orElseThrow(() -> new AssertionError("Missing metric family: 
" + name));
+    }
+
+    private static Sample findSample(
+            List<MetricFamilySamples> metrics, String metricName, String 
storeName) {
+        return findMetricFamily(metrics, metricName).samples.stream()
+                .filter(sample -> storeName.equals(labelValue(sample, 
"store")))
+                .findFirst()
+                .orElseThrow(
+                        () ->
+                                new AssertionError(
+                                        "Missing metric sample: "
+                                                + metricName
+                                                + ", store="
+                                                + storeName));
+    }
+
+    private static String labelValue(Sample sample, String labelName) {
+        int index = sample.labelNames.indexOf(labelName);
+        if (index < 0) {
+            throw new AssertionError("Missing label: " + labelName);
+        }
+        return sample.labelValues.get(index);
+    }
+}

Reply via email to