This is an automated email from the ASF dual-hosted git repository.

davidzollo pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new cb407be995 [Improve][Zeta] Add logical state store observability 
metrics (#11024)
cb407be995 is described below

commit cb407be99546102a8fd5a3879d21b89fd6caa276
Author: JeremyXin <[email protected]>
AuthorDate: Wed Jun 10 21:26:02 2026 +0800

    [Improve][Zeta] Add logical state store observability metrics (#11024)
---
 docs/en/engines/zeta/telemetry.md                  |  67 ++++-
 docs/zh/engines/zeta/telemetry.md                  |  66 +++-
 .../engine/server/CoordinatorService.java          | 159 ++++++++++
 .../monitor/CheckpointMonitorService.java          | 183 ++++++++++++
 .../engine/server/master/JobHistoryService.java    |  78 ++++-
 .../service/jar/ConnectorPackageService.java       |  26 ++
 .../metrics/ExportsInstanceInitializer.java        |   3 +
 .../EngineStateStoreLogicalMetricExports.java      | 251 ++++++++++++++++
 .../engine/server/metrics/MetricsApiTest.java      |   3 +-
 .../EngineStateStoreLogicalMetricExportsTest.java  | 332 +++++++++++++++++++++
 10 files changed, 1162 insertions(+), 6 deletions(-)

diff --git a/docs/en/engines/zeta/telemetry.md 
b/docs/en/engines/zeta/telemetry.md
index 99a87735f7..a749785a18 100644
--- a/docs/en/engines/zeta/telemetry.md
+++ b/docs/en/engines/zeta/telemetry.md
@@ -68,6 +68,71 @@ Example PromQL:
 sum by (cluster, store, backend) (engine_state_store_local_owned_entries)
 ```
 
+### Engine State Store Logical Metrics
+
+These metrics expose business-aware logical counts for special engine state 
stores. They are exported by the active
+master only. The metric names remain backend-neutral, while the current 
implementation still labels them with
+`backend="hazelcast"`.
+
+| MetricName                                               | Type    | Labels  
                                    | DESCRIPTION                               
                            |
+|----------------------------------------------------------|---------|---------------------------------------------|-----------------------------------------------------------------------|
+| engine_state_store_running_job_metrics_task_contexts     | Gauge   | 
**backend**, state store backend.           | Total task metric contexts 
currently stored in `engine_runningJobMetrics`. |
+| engine_state_store_running_job_metrics_active_partition_keys | Gauge | 
**backend**, state store backend.           | Active top-level partition 
buckets currently stored in `engine_runningJobMetrics`. |
+| engine_state_store_checkpoint_monitor_jobs               | Gauge   | 
**backend**, state store backend.           | Job count currently tracked in 
`engine_checkpoint_monitor`.          |
+| engine_state_store_checkpoint_monitor_in_progress_checkpoints | Gauge | 
**backend**, state store backend.       | In-progress checkpoint count 
currently tracked in `engine_checkpoint_monitor`. |
+| engine_state_store_checkpoint_monitor_retained_history_entries | Gauge | 
**backend**, state store backend.     | Retained checkpoint history entries 
currently tracked in `engine_checkpoint_monitor`. |
+| engine_state_store_finished_job_records                  | Gauge   | 
**store**, finished job store name. **backend**, state store backend. | Current 
record count in finished job stores. |
+| engine_state_store_finished_job_cleanup_total            | Counter | 
**store**, finished job store name. **backend**, state store backend. | Total 
cleanup events observed from finished job store expiration. |
+| engine_state_store_connector_jar_tracked_jars            | Gauge   | 
**backend**, state store backend.           | Current tracked connector jar 
count in `engine_connectorJarRefCounters`. |
+| engine_state_store_connector_jar_total_references        | Gauge   | 
**backend**, state store backend.           | Sum of connector jar reference 
counts in `engine_connectorJarRefCounters`. |
+
+Logical metrics complement the local Hazelcast store metrics above:
+
+- Use `engine_state_store_local_*` metrics when you want to understand data 
distribution and memory usage on each node.
+- Use `engine_state_store_*` logical metrics when you want to understand 
engine semantics such as checkpoint backlog, finished-job retention, or 
connector jar reuse.
+
+Example PromQL:
+
+```promql
+# Total state store entries by store
+sum by (cluster, store, backend) (engine_state_store_local_owned_entries)
+
+# Current checkpoint backlog
+engine_state_store_checkpoint_monitor_in_progress_checkpoints{backend="hazelcast"}
+
+# Finished-job cleanup growth over the last 15 minutes
+increase(engine_state_store_finished_job_cleanup_total{backend="hazelcast"}[15m])
+
+# Connector jar reference pressure
+engine_state_store_connector_jar_total_references{backend="hazelcast"}
+```
+
+Example Grafana panels:
+
+- `State Store Total Entries`
+
+```promql
+sum by (store) (engine_state_store_local_owned_entries{backend="hazelcast"})
+```
+
+- `Checkpoint In-Progress Count`
+
+```promql
+engine_state_store_checkpoint_monitor_in_progress_checkpoints{backend="hazelcast"}
+```
+
+- `Finished Job Cleanup Rate`
+
+```promql
+sum by (store) 
(rate(engine_state_store_finished_job_cleanup_total{backend="hazelcast"}[5m]))
+```
+
+- `Connector Jar Reference Count`
+
+```promql
+engine_state_store_connector_jar_total_references{backend="hazelcast"}
+```
+
 ### Thread Pool Status
 
 | MetricName                          | Type    | Labels                       
                                      | DESCRIPTION                             
                                       |
@@ -166,4 +231,4 @@ the 
[Installation](https://grafana.com/docs/grafana/latest/setup-grafana/install
 - Add Prometheus DataSource on Grafana.
   - Import the `Seatunnel Cluster` monitoring dashboard JSON into Grafana.
 
-The [effect image](../../../images/grafana.png) of the dashboard
+The [effect image](../../../images/grafana.png) of the dashboard
\ No newline at end of file
diff --git a/docs/zh/engines/zeta/telemetry.md 
b/docs/zh/engines/zeta/telemetry.md
index 00d679845f..0da7c4e1b8 100644
--- a/docs/zh/engines/zeta/telemetry.md
+++ b/docs/zh/engines/zeta/telemetry.md
@@ -68,6 +68,70 @@ PromQL 示例:
 sum by (cluster, store, backend) (engine_state_store_local_owned_entries)
 ```
 
+### 引擎状态存储逻辑指标
+
+这些指标暴露特定引擎状态存储的业务语义计数。它们仅由当前 active master 输出。指标名保持 backend-neutral,
+但当前实现仍会带上 `backend="hazelcast"` 标签。
+
+| MetricName                                               | Type    | Labels  
                                    | 描述 |
+|----------------------------------------------------------|---------|---------------------------------------------|------|
+| engine_state_store_running_job_metrics_task_contexts     | Gauge   | 
**backend**,状态存储后端。                  | `engine_runningJobMetrics` 中当前保存的 task 
metric context 总数。 |
+| engine_state_store_running_job_metrics_active_partition_keys | Gauge | 
**backend**,状态存储后端。                | `engine_runningJobMetrics` 中当前非空的顶层分桶 key 
数。 |
+| engine_state_store_checkpoint_monitor_jobs               | Gauge   | 
**backend**,状态存储后端。                  | `engine_checkpoint_monitor` 中当前跟踪的 job 
数。 |
+| engine_state_store_checkpoint_monitor_in_progress_checkpoints | Gauge | 
**backend**,状态存储后端。              | `engine_checkpoint_monitor` 中当前 in-progress 
checkpoint 数。 |
+| engine_state_store_checkpoint_monitor_retained_history_entries | Gauge | 
**backend**,状态存储后端。            | `engine_checkpoint_monitor` 中当前保留的 checkpoint 
history 条目数。 |
+| engine_state_store_finished_job_records                  | Gauge   | 
**store**,finished job store 名称。**backend**,状态存储后端。 | finished job 
相关状态存储中的当前记录数。 |
+| engine_state_store_finished_job_cleanup_total            | Counter | 
**store**,finished job store 名称。**backend**,状态存储后端。 | finished job 
状态存储因过期而发生的清理总次数。 |
+| engine_state_store_connector_jar_tracked_jars            | Gauge   | 
**backend**,状态存储后端。                  | `engine_connectorJarRefCounters` 中当前被跟踪的 
connector jar 数。 |
+| engine_state_store_connector_jar_total_references        | Gauge   | 
**backend**,状态存储后端。                  | `engine_connectorJarRefCounters` 中当前所有 
connector jar 引用计数之和。 |
+
+这些逻辑指标与上面的本地 Hazelcast store 指标互为补充:
+
+- 当你想看每个节点上的数据分布和内存占用时,使用 `engine_state_store_local_*` 指标。
+- 当你想看 checkpoint 积压、finished job 保留、connector jar 复用等引擎语义时,使用 
`engine_state_store_*` 逻辑指标。
+
+PromQL 示例:
+
+```promql
+# 按 store 聚合后的状态存储总 entry 数
+sum by (cluster, store, backend) (engine_state_store_local_owned_entries)
+
+# 当前 checkpoint 积压
+engine_state_store_checkpoint_monitor_in_progress_checkpoints{backend="hazelcast"}
+
+# 最近 15 分钟 finished job cleanup 增长量
+increase(engine_state_store_finished_job_cleanup_total{backend="hazelcast"}[15m])
+
+# connector jar 引用压力
+engine_state_store_connector_jar_total_references{backend="hazelcast"}
+```
+
+Grafana 面板示例:
+
+- `State Store Total Entries`
+
+```promql
+sum by (store) (engine_state_store_local_owned_entries{backend="hazelcast"})
+```
+
+- `Checkpoint In-Progress Count`
+
+```promql
+engine_state_store_checkpoint_monitor_in_progress_checkpoints{backend="hazelcast"}
+```
+
+- `Finished Job Cleanup Rate`
+
+```promql
+sum by (store) 
(rate(engine_state_store_finished_job_cleanup_total{backend="hazelcast"}[5m]))
+```
+
+- `Connector Jar Reference Count`
+
+```promql
+engine_state_store_connector_jar_total_references{backend="hazelcast"}
+```
+
 ### 线程池状态
 
 | MetricName                          | Type    | Labels                       
           | 描述                             |
@@ -166,4 +230,4 @@ scrape_configs:
 - 在 Grafana 中添加 Prometheus 数据源。
 - 将 `Seatunnel Cluster` 监控仪表板 JSON 导入到 Grafana 中。
 
-监控[效果图](../../../images/grafana.png)
+监控[效果图](../../../images/grafana.png)
\ No newline at end of file
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
index 71b232d82d..720c94028b 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
@@ -88,11 +88,15 @@ import 
org.apache.seatunnel.engine.server.utils.PeekBlockingQueue;
 
 import com.hazelcast.cluster.Address;
 import com.hazelcast.config.Config;
+import com.hazelcast.core.EntryEvent;
 import com.hazelcast.core.HazelcastInstanceNotActiveException;
 import com.hazelcast.internal.serialization.Data;
 import com.hazelcast.internal.services.MembershipServiceEvent;
 import com.hazelcast.logging.ILogger;
 import com.hazelcast.map.IMap;
+import com.hazelcast.map.listener.EntryAddedListener;
+import com.hazelcast.map.listener.EntryRemovedListener;
+import com.hazelcast.map.listener.EntryUpdatedListener;
 import com.hazelcast.ringbuffer.Ringbuffer;
 import com.hazelcast.spi.impl.NodeEngineImpl;
 import lombok.NonNull;
@@ -109,6 +113,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -221,6 +226,20 @@ public class CoordinatorService {
 
     private final AtomicBoolean coordinatorServiceCleared = new 
AtomicBoolean(false);
 
+    private final AtomicLong runningJobMetricsPartitionKeyCount = new 
AtomicLong();
+
+    private final AtomicLong runningJobMetricsTaskContextCount = new 
AtomicLong();
+
+    private final Object runningJobMetricsStatsLock = new Object();
+
+    private final Set<Long> runningJobMetricsDirtyKeys = 
ConcurrentHashMap.newKeySet();
+
+    private final Map<Long, RunningJobMetricsStats> 
runningJobMetricsStatsByJobId = new HashMap<>();
+
+    private final AtomicBoolean runningJobMetricsInitializing = new 
AtomicBoolean(false);
+
+    private volatile UUID runningJobMetricsListenerId;
+
     public CoordinatorService(
             @NonNull NodeEngineImpl nodeEngine,
             @NonNull SeaTunnelServer seaTunnelServer,
@@ -507,6 +526,7 @@ public class CoordinatorService {
         ownedSlotProfilesIMap =
                 
nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_OWNED_SLOT_PROFILES);
         metricsImap = 
nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_RUNNING_JOB_METRICS);
+        initRunningJobMetricsStoreStats();
         pendingPipelineCleanupIMap =
                 
nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_PENDING_PIPELINE_CLEANUP);
         pendingJobCleanupIMap =
@@ -1088,6 +1108,7 @@ public class CoordinatorService {
         if (!coordinatorServiceCleared.compareAndSet(false, true)) {
             return;
         }
+        removeRunningJobMetricsListener();
         // interrupt all JobMaster
         runningJobMasterMap.values().forEach(JobMaster::interrupt);
         if (isWaitStrategy) {
@@ -1615,6 +1636,7 @@ public class CoordinatorService {
         if (pipelineCleanupScheduler != null) {
             pipelineCleanupScheduler.shutdown();
         }
+        removeRunningJobMetricsListener();
         clearCoordinatorService();
         awaitSchedulerTermination("master active listener", 
masterActiveListener);
         awaitSchedulerTermination("pipeline cleanup scheduler", 
pipelineCleanupScheduler);
@@ -1876,6 +1898,14 @@ public class CoordinatorService {
         return pendingJobQueue.getJobIdMap().size();
     }
 
+    public long getRunningJobMetricsPartitionKeyCount() {
+        return runningJobMetricsPartitionKeyCount.get();
+    }
+
+    public long getRunningJobMetricsTaskContextCount() {
+        return runningJobMetricsTaskContextCount.get();
+    }
+
     public EngineConfig getEngineConfig() {
         return engineConfig;
     }
@@ -1900,6 +1930,135 @@ public class CoordinatorService {
         }
     }
 
+    private void initRunningJobMetricsStoreStats() {
+        removeRunningJobMetricsListener();
+        runningJobMetricsListenerId =
+                metricsImap.addEntryListener(new 
RunningJobMetricsEntryListener(), true);
+        runningJobMetricsInitializing.set(true);
+        runningJobMetricsDirtyKeys.clear();
+
+        Map<Long, RunningJobMetricsStats> snapshotStats = new HashMap<>();
+        metricsImap.forEach(
+                (partitionKey, metrics) ->
+                        snapshotStats.put(partitionKey, 
toRunningJobMetricsStats(metrics)));
+
+        Set<Long> dirtyKeys = new HashSet<>(runningJobMetricsDirtyKeys);
+        for (Long partitionKey : dirtyKeys) {
+            snapshotStats.put(
+                    partitionKey, 
toRunningJobMetricsStats(metricsImap.get(partitionKey)));
+        }
+
+        synchronized (runningJobMetricsStatsLock) {
+            runningJobMetricsStatsByJobId.clear();
+            runningJobMetricsPartitionKeyCount.set(0L);
+            runningJobMetricsTaskContextCount.set(0L);
+            snapshotStats.forEach(this::replaceRunningJobMetricsStatsLocked);
+            runningJobMetricsInitializing.set(false);
+
+            Set<Long> postSnapshotDirtyKeys = new 
HashSet<>(runningJobMetricsDirtyKeys);
+            runningJobMetricsDirtyKeys.clear();
+            for (Long partitionKey : postSnapshotDirtyKeys) {
+                replaceRunningJobMetricsStatsLocked(
+                        partitionKey, 
toRunningJobMetricsStats(metricsImap.get(partitionKey)));
+            }
+        }
+    }
+
+    private RunningJobMetricsStats toRunningJobMetricsStats(
+            Map<TaskLocation, SeaTunnelMetricsContext> metrics) {
+        if (metrics == null || metrics.isEmpty()) {
+            return RunningJobMetricsStats.EMPTY;
+        }
+        return new RunningJobMetricsStats(1L, metrics.size());
+    }
+
+    private void replaceRunningJobMetricsStatsLocked(
+            Long partitionKey, RunningJobMetricsStats stats) {
+        RunningJobMetricsStats currentStats = 
runningJobMetricsStatsByJobId.get(partitionKey);
+        if (currentStats != null) {
+            
runningJobMetricsPartitionKeyCount.addAndGet(-currentStats.partitionKeyCount);
+            
runningJobMetricsTaskContextCount.addAndGet(-currentStats.taskContextCount);
+        }
+
+        if (stats.isEmpty()) {
+            runningJobMetricsStatsByJobId.remove(partitionKey);
+            return;
+        }
+
+        runningJobMetricsStatsByJobId.put(partitionKey, stats);
+        runningJobMetricsPartitionKeyCount.addAndGet(stats.partitionKeyCount);
+        runningJobMetricsTaskContextCount.addAndGet(stats.taskContextCount);
+    }
+
+    private void removeRunningJobMetricsListener() {
+        if (metricsImap != null && runningJobMetricsListenerId != null) {
+            metricsImap.removeEntryListener(runningJobMetricsListenerId);
+            runningJobMetricsListenerId = null;
+        }
+        runningJobMetricsInitializing.set(false);
+        runningJobMetricsDirtyKeys.clear();
+        synchronized (runningJobMetricsStatsLock) {
+            runningJobMetricsStatsByJobId.clear();
+        }
+        runningJobMetricsPartitionKeyCount.set(0L);
+        runningJobMetricsTaskContextCount.set(0L);
+    }
+
+    private final class RunningJobMetricsEntryListener
+            implements EntryAddedListener<Long, HashMap<TaskLocation, 
SeaTunnelMetricsContext>>,
+                    EntryUpdatedListener<Long, HashMap<TaskLocation, 
SeaTunnelMetricsContext>>,
+                    EntryRemovedListener<Long, HashMap<TaskLocation, 
SeaTunnelMetricsContext>> {
+
+        @Override
+        public void entryAdded(
+                EntryEvent<Long, HashMap<TaskLocation, 
SeaTunnelMetricsContext>> event) {
+            replaceRunningJobMetricsStats(event.getKey(), event.getValue());
+        }
+
+        @Override
+        public void entryUpdated(
+                EntryEvent<Long, HashMap<TaskLocation, 
SeaTunnelMetricsContext>> event) {
+            replaceRunningJobMetricsStats(event.getKey(), event.getValue());
+        }
+
+        @Override
+        public void entryRemoved(
+                EntryEvent<Long, HashMap<TaskLocation, 
SeaTunnelMetricsContext>> event) {
+            replaceRunningJobMetricsStats(event.getKey(), null);
+        }
+    }
+
+    private void replaceRunningJobMetricsStats(
+            Long partitionKey, Map<TaskLocation, SeaTunnelMetricsContext> 
metrics) {
+        if (runningJobMetricsInitializing.get()) {
+            runningJobMetricsDirtyKeys.add(partitionKey);
+            return;
+        }
+        synchronized (runningJobMetricsStatsLock) {
+            if (runningJobMetricsInitializing.get()) {
+                runningJobMetricsDirtyKeys.add(partitionKey);
+                return;
+            }
+            replaceRunningJobMetricsStatsLocked(partitionKey, 
toRunningJobMetricsStats(metrics));
+        }
+    }
+
+    private static final class RunningJobMetricsStats {
+        private static final RunningJobMetricsStats EMPTY = new 
RunningJobMetricsStats(0L, 0L);
+
+        private final long partitionKeyCount;
+        private final long taskContextCount;
+
+        private RunningJobMetricsStats(long partitionKeyCount, long 
taskContextCount) {
+            this.partitionKeyCount = partitionKeyCount;
+            this.taskContextCount = taskContextCount;
+        }
+
+        private boolean isEmpty() {
+            return partitionKeyCount == 0L && taskContextCount == 0L;
+        }
+    }
+
     @VisibleForTesting
     public PeekBlockingQueue<PendingJobInfo> getPendingJobQueue() {
         return pendingJobQueue;
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/monitor/CheckpointMonitorService.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/monitor/CheckpointMonitorService.java
index 1f890e9a84..44ec00b857 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/monitor/CheckpointMonitorService.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/monitor/CheckpointMonitorService.java
@@ -32,15 +32,28 @@ import 
org.apache.seatunnel.engine.server.checkpoint.CompletedCheckpoint;
 import org.apache.seatunnel.engine.server.checkpoint.SubtaskStatistics;
 import org.apache.seatunnel.engine.server.checkpoint.TaskStatistics;
 
+import com.hazelcast.core.EntryEvent;
 import com.hazelcast.map.IMap;
+import com.hazelcast.map.listener.EntryAddedListener;
+import com.hazelcast.map.listener.EntryExpiredListener;
+import com.hazelcast.map.listener.EntryRemovedListener;
+import com.hazelcast.map.listener.EntryUpdatedListener;
 import com.hazelcast.spi.impl.NodeEngine;
 import lombok.extern.slf4j.Slf4j;
 
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Consumer;
 import java.util.stream.Collectors;
 
@@ -50,6 +63,14 @@ public class CheckpointMonitorService {
     private final NodeEngine nodeEngine;
     private volatile IMap<Long, CheckpointOverview> overviewMap;
     private final int maxHistorySize;
+    private final AtomicLong overviewJobCount = new AtomicLong();
+    private final AtomicLong inProgressCheckpointCount = new AtomicLong();
+    private final AtomicLong retainedHistoryCount = new AtomicLong();
+    private final Object overviewStatsLock = new Object();
+    private final Set<Long> overviewDirtyJobIds = 
ConcurrentHashMap.newKeySet();
+    private final Map<Long, CheckpointOverviewStats> overviewStatsByJobId = 
new HashMap<>();
+    private final AtomicBoolean overviewStatsInitializing = new 
AtomicBoolean(false);
+    private volatile UUID overviewListenerId;
 
     public CheckpointMonitorService(NodeEngine nodeEngine, int maxHistorySize) 
{
         this.nodeEngine = nodeEngine;
@@ -64,6 +85,7 @@ public class CheckpointMonitorService {
                             nodeEngine
                                     .getHazelcastInstance()
                                     .getMap(Constant.IMAP_CHECKPOINT_MONITOR);
+                    initOverviewStats();
                 }
             }
         }
@@ -236,6 +258,18 @@ public class CheckpointMonitorService {
                 });
     }
 
+    public long getOverviewJobCount() {
+        return overviewJobCount.get();
+    }
+
+    public long getInProgressCheckpointCount() {
+        return inProgressCheckpointCount.get();
+    }
+
+    public long getRetainedHistoryCount() {
+        return retainedHistoryCount.get();
+    }
+
     private void updateOverview(
             long jobId, int pipelineId, Consumer<PipelineCheckpointOverview> 
consumer) {
         getOverviewMap()
@@ -256,6 +290,155 @@ public class CheckpointMonitorService {
         pipeline.getInProgress().removeIf(cp -> cp.getCheckpointId() == 
checkpointId);
     }
 
+    private void initOverviewStats() {
+        removeOverviewListener();
+        overviewListenerId =
+                overviewMap.addEntryListener(new 
CheckpointOverviewEntryListener(), true);
+        overviewStatsInitializing.set(true);
+        overviewDirtyJobIds.clear();
+
+        Map<Long, CheckpointOverviewStats> snapshotStats = new HashMap<>();
+        overviewMap.forEach(
+                (jobId, overview) -> snapshotStats.put(jobId, 
toOverviewStats(overview)));
+
+        Set<Long> dirtyJobIds = new HashSet<>(overviewDirtyJobIds);
+        for (Long jobId : dirtyJobIds) {
+            snapshotStats.put(jobId, toOverviewStats(overviewMap.get(jobId)));
+        }
+
+        synchronized (overviewStatsLock) {
+            overviewStatsByJobId.clear();
+            overviewJobCount.set(0L);
+            inProgressCheckpointCount.set(0L);
+            retainedHistoryCount.set(0L);
+            snapshotStats.forEach(this::replaceOverviewStatsLocked);
+            overviewStatsInitializing.set(false);
+
+            Set<Long> postSnapshotDirtyJobIds = new 
HashSet<>(overviewDirtyJobIds);
+            overviewDirtyJobIds.clear();
+            for (Long jobId : postSnapshotDirtyJobIds) {
+                replaceOverviewStatsLocked(jobId, 
toOverviewStats(overviewMap.get(jobId)));
+            }
+        }
+    }
+
+    private void removeOverviewListener() {
+        if (overviewMap != null && overviewListenerId != null) {
+            overviewMap.removeEntryListener(overviewListenerId);
+            overviewListenerId = null;
+        }
+        overviewStatsInitializing.set(false);
+        overviewDirtyJobIds.clear();
+        synchronized (overviewStatsLock) {
+            overviewStatsByJobId.clear();
+        }
+        overviewJobCount.set(0L);
+        inProgressCheckpointCount.set(0L);
+        retainedHistoryCount.set(0L);
+    }
+
+    private CheckpointOverviewStats toOverviewStats(CheckpointOverview 
overview) {
+        if (overview == null) {
+            return CheckpointOverviewStats.EMPTY;
+        }
+        return new CheckpointOverviewStats(
+                1L, getInProgressCount(overview), getHistoryCount(overview));
+    }
+
+    private void replaceOverviewStatsLocked(Long jobId, 
CheckpointOverviewStats stats) {
+        CheckpointOverviewStats currentStats = overviewStatsByJobId.get(jobId);
+        if (currentStats != null) {
+            overviewJobCount.addAndGet(-currentStats.jobCount);
+            
inProgressCheckpointCount.addAndGet(-currentStats.inProgressCheckpointCount);
+            retainedHistoryCount.addAndGet(-currentStats.retainedHistoryCount);
+        }
+
+        if (stats.isEmpty()) {
+            overviewStatsByJobId.remove(jobId);
+            return;
+        }
+
+        overviewStatsByJobId.put(jobId, stats);
+        overviewJobCount.addAndGet(stats.jobCount);
+        inProgressCheckpointCount.addAndGet(stats.inProgressCheckpointCount);
+        retainedHistoryCount.addAndGet(stats.retainedHistoryCount);
+    }
+
+    private long getInProgressCount(CheckpointOverview overview) {
+        return overview.getPipelines().values().stream()
+                .filter(Objects::nonNull)
+                .mapToLong(pipelineOverview -> 
pipelineOverview.getInProgress().size())
+                .sum();
+    }
+
+    private long getHistoryCount(CheckpointOverview overview) {
+        return overview.getPipelines().values().stream()
+                .filter(Objects::nonNull)
+                .mapToLong(pipelineOverview -> 
pipelineOverview.getHistory().size())
+                .sum();
+    }
+
+    private final class CheckpointOverviewEntryListener
+            implements EntryAddedListener<Long, CheckpointOverview>,
+                    EntryUpdatedListener<Long, CheckpointOverview>,
+                    EntryRemovedListener<Long, CheckpointOverview>,
+                    EntryExpiredListener<Long, CheckpointOverview> {
+
+        @Override
+        public void entryAdded(EntryEvent<Long, CheckpointOverview> event) {
+            replaceOverviewStats(event.getKey(), event.getValue());
+        }
+
+        @Override
+        public void entryUpdated(EntryEvent<Long, CheckpointOverview> event) {
+            replaceOverviewStats(event.getKey(), event.getValue());
+        }
+
+        @Override
+        public void entryRemoved(EntryEvent<Long, CheckpointOverview> event) {
+            replaceOverviewStats(event.getKey(), null);
+        }
+
+        @Override
+        public void entryExpired(EntryEvent<Long, CheckpointOverview> event) {
+            replaceOverviewStats(event.getKey(), null);
+        }
+    }
+
+    private void replaceOverviewStats(Long jobId, CheckpointOverview overview) 
{
+        if (overviewStatsInitializing.get()) {
+            overviewDirtyJobIds.add(jobId);
+            return;
+        }
+        synchronized (overviewStatsLock) {
+            if (overviewStatsInitializing.get()) {
+                overviewDirtyJobIds.add(jobId);
+                return;
+            }
+            replaceOverviewStatsLocked(jobId, toOverviewStats(overview));
+        }
+    }
+
+    private static final class CheckpointOverviewStats {
+        private static final CheckpointOverviewStats EMPTY =
+                new CheckpointOverviewStats(0L, 0L, 0L);
+
+        private final long jobCount;
+        private final long inProgressCheckpointCount;
+        private final long retainedHistoryCount;
+
+        private CheckpointOverviewStats(
+                long jobCount, long inProgressCheckpointCount, long 
retainedHistoryCount) {
+            this.jobCount = jobCount;
+            this.inProgressCheckpointCount = inProgressCheckpointCount;
+            this.retainedHistoryCount = retainedHistoryCount;
+        }
+
+        private boolean isEmpty() {
+            return jobCount == 0L && inProgressCheckpointCount == 0L && 
retainedHistoryCount == 0L;
+        }
+    }
+
     public static long calculateStateSize(CompletedCheckpoint checkpoint) {
         return checkpoint.getTaskStatistics().values().stream()
                 .map(TaskStatistics::getSubtaskStats)
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobHistoryService.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobHistoryService.java
index 027704f58e..4afeabeb9e 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobHistoryService.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobHistoryService.java
@@ -23,6 +23,7 @@ import 
org.apache.seatunnel.shade.com.fasterxml.jackson.databind.SerializationFe
 import 
org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.ObjectNode;
 
 import org.apache.seatunnel.api.common.metrics.JobMetrics;
+import org.apache.seatunnel.engine.common.Constant;
 import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
 import org.apache.seatunnel.engine.common.job.JobStatus;
 import org.apache.seatunnel.engine.common.job.JobStatusData;
@@ -50,12 +51,16 @@ import lombok.Getter;
 import java.io.Serializable;
 import java.net.UnknownHostException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
@@ -105,6 +110,8 @@ public class JobHistoryService {
 
     private final int finishedJobExpireTime;
 
+    private final Map<String, AtomicLong> finishedJobCleanupTotals = new 
ConcurrentHashMap<>();
+
     public JobHistoryService(
             NodeEngine nodeEngine,
             IMap<Object, Object> runningJobStateIMap,
@@ -123,7 +130,12 @@ public class JobHistoryService {
         this.finishedJobStateImap = finishedJobStateImap;
         this.finishedJobMetricsImap = finishedJobMetricsImap;
         this.finishedJobDAGInfoImap = finishedJobVertexInfoImap;
-        this.finishedJobDAGInfoImap.addEntryListener(new 
JobInfoExpiredListener(), true);
+        this.finishedJobStateImap.addEntryListener(
+                new 
FinishedJobExpiredListener<>(Constant.IMAP_FINISHED_JOB_STATE), true);
+        this.finishedJobMetricsImap.addEntryListener(
+                new 
FinishedJobExpiredListener<>(Constant.IMAP_FINISHED_JOB_METRICS), true);
+        this.finishedJobDAGInfoImap.addEntryListener(
+                new 
JobInfoExpiredListener(Constant.IMAP_FINISHED_JOB_VERTEX_INFO), true);
         this.objectMapper = new ObjectMapper();
         this.objectMapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, 
false);
         this.finishedJobExpireTime = finishedJobExpireTime;
@@ -332,6 +344,39 @@ public class JobHistoryService {
         finishedJobDAGInfoImap.put(jobId, jobInfo, finishedJobExpireTime, 
TimeUnit.MINUTES);
     }
 
+    public Map<String, Long> getFinishedJobRecordCounts() {
+        Map<String, Long> counts = new HashMap<>();
+        counts.put(Constant.IMAP_FINISHED_JOB_STATE, (long) 
finishedJobStateImap.size());
+        counts.put(Constant.IMAP_FINISHED_JOB_METRICS, (long) 
finishedJobMetricsImap.size());
+        counts.put(Constant.IMAP_FINISHED_JOB_VERTEX_INFO, (long) 
finishedJobDAGInfoImap.size());
+        return counts;
+    }
+
+    public Map<String, Long> getFinishedJobCleanupTotals() {
+        Map<String, Long> counts = new HashMap<>();
+        counts.put(
+                Constant.IMAP_FINISHED_JOB_STATE,
+                getFinishedJobCleanupTotal(Constant.IMAP_FINISHED_JOB_STATE));
+        counts.put(
+                Constant.IMAP_FINISHED_JOB_METRICS,
+                
getFinishedJobCleanupTotal(Constant.IMAP_FINISHED_JOB_METRICS));
+        counts.put(
+                Constant.IMAP_FINISHED_JOB_VERTEX_INFO,
+                
getFinishedJobCleanupTotal(Constant.IMAP_FINISHED_JOB_VERTEX_INFO));
+        return counts;
+    }
+
+    private long getFinishedJobCleanupTotal(String storeName) {
+        AtomicLong total = finishedJobCleanupTotals.get(storeName);
+        return total == null ? 0L : total.get();
+    }
+
+    private void incrementFinishedJobCleanupTotal(String storeName) {
+        finishedJobCleanupTotals
+                .computeIfAbsent(storeName, key -> new AtomicLong())
+                .incrementAndGet();
+    }
+
     @AllArgsConstructor
     @Data
     public static final class JobState implements Serializable {
@@ -354,14 +399,41 @@ public class JobHistoryService {
         private Map<TaskGroupLocation, ExecutionState> executionStateMap;
     }
 
+    private class FinishedJobExpiredListener<T> implements 
EntryExpiredListener<Long, T> {
+        private final String storeName;
+
+        private FinishedJobExpiredListener(String storeName) {
+            this.storeName = storeName;
+        }
+
+        @Override
+        public void entryExpired(EntryEvent<Long, T> event) {
+            incrementFinishedJobCleanupTotal(storeName);
+        }
+    }
+
     private class JobInfoExpiredListener implements EntryExpiredListener<Long, 
JobDAGInfo> {
+        private final String storeName;
+
+        private JobInfoExpiredListener(String storeName) {
+            this.storeName = storeName;
+        }
+
         @Override
         public void entryExpired(EntryEvent<Long, JobDAGInfo> event) {
+            incrementFinishedJobCleanupTotal(storeName);
             Long jobId = event.getKey();
             JobDAGInfo jobDagInfo = event.getOldValue();
+            if (jobDagInfo == null) {
+                return;
+            }
             try {
-                Set<ExecutionAddress> historyExecutionPlan = 
jobDagInfo.getHistoryExecutionPlan();
-                Stream.concat(historyExecutionPlan.stream(), 
Stream.of(jobDagInfo.getMaster()))
+                Set<ExecutionAddress> historyExecutionPlan =
+                        
Optional.ofNullable(jobDagInfo.getHistoryExecutionPlan())
+                                .orElseGet(Collections::emptySet);
+                Stream.concat(
+                                historyExecutionPlan.stream(),
+                                
Stream.of(jobDagInfo.getMaster()).filter(Objects::nonNull))
                         .forEach(
                                 address -> {
                                     logger.info(
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/jar/ConnectorPackageService.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/jar/ConnectorPackageService.java
index 4e1ec89593..a6bd7dd6b4 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/jar/ConnectorPackageService.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/jar/ConnectorPackageService.java
@@ -17,11 +17,13 @@
 
 package org.apache.seatunnel.engine.server.service.jar;
 
+import org.apache.seatunnel.engine.common.Constant;
 import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
 import 
org.apache.seatunnel.engine.common.config.server.ConnectorJarStorageConfig;
 import 
org.apache.seatunnel.engine.common.config.server.ConnectorJarStorageMode;
 import org.apache.seatunnel.engine.core.job.ConnectorJar;
 import org.apache.seatunnel.engine.core.job.ConnectorJarIdentifier;
+import org.apache.seatunnel.engine.core.job.RefCount;
 import org.apache.seatunnel.engine.server.SeaTunnelServer;
 import 
org.apache.seatunnel.engine.server.task.operation.SendConnectorJarToMemberNodeOperation;
 import org.apache.seatunnel.engine.server.utils.NodeEngineUtil;
@@ -34,6 +36,7 @@ import com.hazelcast.spi.impl.NodeEngineImpl;
 import com.hazelcast.spi.impl.operationservice.impl.InvocationFuture;
 import lombok.extern.slf4j.Slf4j;
 
+import java.util.Collection;
 import java.util.List;
 
 @Slf4j
@@ -120,4 +123,27 @@ public class ConnectorPackageService {
             long jobId, List<ConnectorJarIdentifier> 
connectorJarIdentifierList) {
         connectorJarStorageStrategy.cleanUpWhenJobFinished(jobId, 
connectorJarIdentifierList);
     }
+
+    public int getTrackedConnectorJarCount() {
+        return nodeEngine
+                .getHazelcastInstance()
+                .<ConnectorJarIdentifier, 
RefCount>getMap(Constant.IMAP_CONNECTOR_JAR_REF_COUNTERS)
+                .size();
+    }
+
+    public long getTotalConnectorJarReferences() {
+        Collection<RefCount> refCounts =
+                nodeEngine
+                        .getHazelcastInstance()
+                        .<ConnectorJarIdentifier, RefCount>getMap(
+                                Constant.IMAP_CONNECTOR_JAR_REF_COUNTERS)
+                        .values();
+        long total = 0L;
+        for (RefCount refCount : refCounts) {
+            if (refCount != null && refCount.getReferences() != null) {
+                total += refCount.getReferences();
+            }
+        }
+        return total;
+    }
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/telemetry/metrics/ExportsInstanceInitializer.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/telemetry/metrics/ExportsInstanceInitializer.java
index 8d429536a2..d4b6cba6ec 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/telemetry/metrics/ExportsInstanceInitializer.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/telemetry/metrics/ExportsInstanceInitializer.java
@@ -18,6 +18,7 @@
 package org.apache.seatunnel.engine.server.telemetry.metrics;
 
 import 
org.apache.seatunnel.engine.server.telemetry.metrics.exports.ClusterMetricExports;
+import 
org.apache.seatunnel.engine.server.telemetry.metrics.exports.EngineStateStoreLogicalMetricExports;
 import 
org.apache.seatunnel.engine.server.telemetry.metrics.exports.EngineStateStoreMetricExports;
 import 
org.apache.seatunnel.engine.server.telemetry.metrics.exports.JobMetricExports;
 import 
org.apache.seatunnel.engine.server.telemetry.metrics.exports.JobThreadPoolStatusExports;
@@ -48,6 +49,8 @@ public final class ExportsInstanceInitializer {
             new NodeMetricExports(node).register(collectorRegistry);
             // Engine state store metrics
             new 
EngineStateStoreMetricExports(node).register(collectorRegistry);
+            // Engine state store logical metrics
+            new 
EngineStateStoreLogicalMetricExports(node).register(collectorRegistry);
             // Cluster metrics
             new ClusterMetricExports(node).register(collectorRegistry);
             initialized = true;
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/telemetry/metrics/exports/EngineStateStoreLogicalMetricExports.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/telemetry/metrics/exports/EngineStateStoreLogicalMetricExports.java
new file mode 100644
index 0000000000..49ce9252b6
--- /dev/null
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/telemetry/metrics/exports/EngineStateStoreLogicalMetricExports.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.telemetry.metrics.exports;
+
+import org.apache.seatunnel.engine.server.SeaTunnelServer;
+import org.apache.seatunnel.engine.server.master.JobHistoryService;
+import org.apache.seatunnel.engine.server.service.jar.ConnectorPackageService;
+import org.apache.seatunnel.engine.server.telemetry.metrics.AbstractCollector;
+
+import com.hazelcast.instance.impl.Node;
+import io.prometheus.client.CounterMetricFamily;
+import io.prometheus.client.GaugeMetricFamily;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class EngineStateStoreLogicalMetricExports extends AbstractCollector {
+
+    private static final String BACKEND = "hazelcast";
+
+    public EngineStateStoreLogicalMetricExports(Node node) {
+        super(node);
+    }
+
+    @Override
+    public List<MetricFamilySamples> collect() {
+        List<String> commonLabelNames = clusterLabelNames("backend");
+        List<String> storeLabelNames = clusterLabelNames("store", "backend");
+
+        GaugeMetricFamily runningJobMetricsTaskContexts =
+                new GaugeMetricFamily(
+                        "engine_state_store_running_job_metrics_task_contexts",
+                        "Logical task metrics contexts stored in 
engine_runningJobMetrics",
+                        commonLabelNames);
+        GaugeMetricFamily runningJobMetricsActivePartitionKeys =
+                new GaugeMetricFamily(
+                        
"engine_state_store_running_job_metrics_active_partition_keys",
+                        "Active partition-key count stored in 
engine_runningJobMetrics",
+                        commonLabelNames);
+        GaugeMetricFamily checkpointMonitorJobs =
+                new GaugeMetricFamily(
+                        "engine_state_store_checkpoint_monitor_jobs",
+                        "Job count stored in engine_checkpoint_monitor",
+                        commonLabelNames);
+        GaugeMetricFamily checkpointMonitorInProgressCheckpoints =
+                new GaugeMetricFamily(
+                        
"engine_state_store_checkpoint_monitor_in_progress_checkpoints",
+                        "In-progress checkpoint count stored in 
engine_checkpoint_monitor",
+                        commonLabelNames);
+        GaugeMetricFamily checkpointMonitorRetainedHistoryEntries =
+                new GaugeMetricFamily(
+                        
"engine_state_store_checkpoint_monitor_retained_history_entries",
+                        "Retained checkpoint history entry count stored in 
engine_checkpoint_monitor",
+                        commonLabelNames);
+        GaugeMetricFamily finishedJobRecords =
+                new GaugeMetricFamily(
+                        "engine_state_store_finished_job_records",
+                        "Finished job records stored in finished job state 
stores",
+                        storeLabelNames);
+        CounterMetricFamily finishedJobCleanupTotal =
+                new CounterMetricFamily(
+                        "engine_state_store_finished_job_cleanup_total",
+                        "Cleanup total for finished job state stores",
+                        storeLabelNames);
+        GaugeMetricFamily connectorJarTrackedJars =
+                new GaugeMetricFamily(
+                        "engine_state_store_connector_jar_tracked_jars",
+                        "Tracked connector jar count stored in 
engine_connectorJarRefCounters",
+                        commonLabelNames);
+        GaugeMetricFamily connectorJarTotalReferences =
+                new GaugeMetricFamily(
+                        "engine_state_store_connector_jar_total_references",
+                        "Total connector jar references stored in 
engine_connectorJarRefCounters",
+                        commonLabelNames);
+
+        if (!isMaster() || !isCoordinatorReady()) {
+            return metricFamilies(
+                    runningJobMetricsTaskContexts,
+                    runningJobMetricsActivePartitionKeys,
+                    checkpointMonitorJobs,
+                    checkpointMonitorInProgressCheckpoints,
+                    checkpointMonitorRetainedHistoryEntries,
+                    finishedJobRecords,
+                    finishedJobCleanupTotal,
+                    connectorJarTrackedJars,
+                    connectorJarTotalReferences);
+        }
+
+        runSafely(
+                "engine_state_store_running_job_metrics",
+                () -> {
+                    addMetric(
+                            runningJobMetricsTaskContexts,
+                            
getCoordinatorService().getRunningJobMetricsTaskContextCount(),
+                            labelValues(BACKEND));
+                    addMetric(
+                            runningJobMetricsActivePartitionKeys,
+                            
getCoordinatorService().getRunningJobMetricsPartitionKeyCount(),
+                            labelValues(BACKEND));
+                });
+
+        SeaTunnelServer server = getServer();
+        runSafely(
+                "engine_state_store_checkpoint_monitor",
+                () -> {
+                    addMetric(
+                            checkpointMonitorJobs,
+                            
server.getCheckpointMonitorService().getOverviewJobCount(),
+                            labelValues(BACKEND));
+                    addMetric(
+                            checkpointMonitorInProgressCheckpoints,
+                            
server.getCheckpointMonitorService().getInProgressCheckpointCount(),
+                            labelValues(BACKEND));
+                    addMetric(
+                            checkpointMonitorRetainedHistoryEntries,
+                            
server.getCheckpointMonitorService().getRetainedHistoryCount(),
+                            labelValues(BACKEND));
+                });
+
+        runSafely(
+                "engine_state_store_finished_job",
+                () -> {
+                    JobHistoryService jobHistoryService =
+                            getCoordinatorService().getJobHistoryService();
+                    addStoreMetrics(
+                            finishedJobRecords, 
jobHistoryService.getFinishedJobRecordCounts());
+                    addStoreMetrics(
+                            finishedJobCleanupTotal,
+                            jobHistoryService.getFinishedJobCleanupTotals());
+                });
+
+        runSafely(
+                "engine_state_store_connector_jar",
+                () -> {
+                    ConnectorPackageService connectorPackageService = null;
+                    try {
+                        connectorPackageService = 
server.getConnectorPackageService();
+                    } catch (Exception e) {
+                        getLogger(getClass())
+                                .fine(
+                                        "Connector package service is not 
enabled; exporting fallback metrics");
+                    }
+                    addMetric(
+                            connectorJarTrackedJars,
+                            
getTrackedConnectorJarCount(connectorPackageService),
+                            labelValues(BACKEND));
+                    addMetric(
+                            connectorJarTotalReferences,
+                            
getTotalConnectorJarReferences(connectorPackageService),
+                            labelValues(BACKEND));
+                });
+
+        return metricFamilies(
+                runningJobMetricsTaskContexts,
+                runningJobMetricsActivePartitionKeys,
+                checkpointMonitorJobs,
+                checkpointMonitorInProgressCheckpoints,
+                checkpointMonitorRetainedHistoryEntries,
+                finishedJobRecords,
+                finishedJobCleanupTotal,
+                connectorJarTrackedJars,
+                connectorJarTotalReferences);
+    }
+
+    private List<MetricFamilySamples> metricFamilies(MetricFamilySamples... 
samples) {
+        List<MetricFamilySamples> metrics = new ArrayList<>();
+        for (MetricFamilySamples sample : samples) {
+            metrics.add(sample);
+        }
+        return metrics;
+    }
+
+    private void addStoreMetrics(GaugeMetricFamily metricFamily, Map<String, 
Long> storeMetrics) {
+        storeMetrics.forEach(
+                (storeName, value) ->
+                        addMetric(metricFamily, value, labelValues(storeName, 
BACKEND)));
+    }
+
+    private void addStoreMetrics(CounterMetricFamily metricFamily, Map<String, 
Long> storeMetrics) {
+        storeMetrics.forEach(
+                (storeName, value) ->
+                        addMetric(metricFamily, value, labelValues(storeName, 
BACKEND)));
+    }
+
+    private void addMetric(GaugeMetricFamily metricFamily, long value, 
List<String> labels) {
+        metricFamily.addMetric(labels, value);
+    }
+
+    private void addMetric(CounterMetricFamily metricFamily, long value, 
List<String> labels) {
+        metricFamily.addMetric(labels, value);
+    }
+
+    private int getTrackedConnectorJarCount(ConnectorPackageService 
connectorPackageService) {
+        if (connectorPackageService != null) {
+            return connectorPackageService.getTrackedConnectorJarCount();
+        }
+        return getConnectorJarRefCounters().size();
+    }
+
+    private long getTotalConnectorJarReferences(ConnectorPackageService 
connectorPackageService) {
+        if (connectorPackageService != null) {
+            return connectorPackageService.getTotalConnectorJarReferences();
+        }
+        long total = 0L;
+        for (org.apache.seatunnel.engine.core.job.RefCount refCount :
+                getConnectorJarRefCounters().values()) {
+            if (refCount != null && refCount.getReferences() != null) {
+                total += refCount.getReferences();
+            }
+        }
+        return total;
+    }
+
+    private com.hazelcast.map.IMap<?, 
org.apache.seatunnel.engine.core.job.RefCount>
+            getConnectorJarRefCounters() {
+        return getNode()
+                .hazelcastInstance
+                .getMap(
+                        org.apache.seatunnel.engine.common.Constant
+                                .IMAP_CONNECTOR_JAR_REF_COUNTERS);
+    }
+
+    private void runSafely(String metricGroup, Runnable runnable) {
+        try {
+            runnable.run();
+        } catch (Exception e) {
+            getLogger(getClass())
+                    .warning(
+                            String.format(
+                                    "Failed to collect logical state store 
metrics, group=%s, backend=%s",
+                                    metricGroup, BACKEND),
+                            e);
+        }
+    }
+}
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/metrics/MetricsApiTest.java
 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/metrics/MetricsApiTest.java
index dcfe8989ce..968e8c8c02 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/metrics/MetricsApiTest.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/metrics/MetricsApiTest.java
@@ -55,7 +55,8 @@ public class MetricsApiTest {
                 .then()
                 .statusCode(200)
                 .body(containsString("process_start_time_seconds"))
-                
.body(containsString("engine_state_store_local_owned_entries"));
+                .body(containsString("engine_state_store_local_owned_entries"))
+                
.body(containsString("engine_state_store_checkpoint_monitor_jobs"));
     }
 
     @AfterAll
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/telemetry/metrics/exports/EngineStateStoreLogicalMetricExportsTest.java
 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/telemetry/metrics/exports/EngineStateStoreLogicalMetricExportsTest.java
new file mode 100644
index 0000000000..dbbfa1dcd5
--- /dev/null
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/telemetry/metrics/exports/EngineStateStoreLogicalMetricExportsTest.java
@@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.telemetry.metrics.exports;
+
+import org.apache.seatunnel.api.common.metrics.JobMetrics;
+import org.apache.seatunnel.engine.common.Constant;
+import org.apache.seatunnel.engine.common.config.ConfigProvider;
+import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
+import org.apache.seatunnel.engine.common.job.JobStatus;
+import org.apache.seatunnel.engine.core.checkpoint.CheckpointType;
+import org.apache.seatunnel.engine.core.job.ConnectorJarIdentifier;
+import org.apache.seatunnel.engine.core.job.ConnectorJarType;
+import org.apache.seatunnel.engine.core.job.JobDAGInfo;
+import org.apache.seatunnel.engine.core.job.RefCount;
+import org.apache.seatunnel.engine.server.SeaTunnelServer;
+import org.apache.seatunnel.engine.server.SeaTunnelServerStarter;
+import org.apache.seatunnel.engine.server.TestUtils;
+import org.apache.seatunnel.engine.server.checkpoint.CheckpointCloseReason;
+import org.apache.seatunnel.engine.server.checkpoint.CompletedCheckpoint;
+import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
+import org.apache.seatunnel.engine.server.execution.TaskLocation;
+import org.apache.seatunnel.engine.server.master.JobHistoryService.JobState;
+import org.apache.seatunnel.engine.server.metrics.SeaTunnelMetricsContext;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.condition.DisabledOnOs;
+import org.junit.jupiter.api.condition.OS;
+
+import com.hazelcast.instance.impl.HazelcastInstanceImpl;
+import com.hazelcast.map.IMap;
+import io.prometheus.client.Collector.MetricFamilySamples;
+import io.prometheus.client.Collector.MetricFamilySamples.Sample;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import static org.awaitility.Awaitility.await;
+
+@DisabledOnOs(OS.WINDOWS)
+class EngineStateStoreLogicalMetricExportsTest {
+
+    private HazelcastInstanceImpl instance;
+
+    @AfterEach
+    void afterEach() {
+        if (instance != null) {
+            instance.shutdown();
+        }
+    }
+
+    @Test
+    void collectShouldExportLogicalMetricsForSpecialStateStores() {
+        instance = 
SeaTunnelServerStarter.createHazelcastInstance(createTestConfig());
+        await().atMost(10, TimeUnit.SECONDS)
+                .untilAsserted(() -> 
Assertions.assertTrue(instance.node.isMaster()));
+
+        SeaTunnelServer server =
+                
instance.node.getNodeEngine().getService(SeaTunnelServer.SERVICE_NAME);
+        await().atMost(10, TimeUnit.SECONDS)
+                .untilAsserted(() -> 
Assertions.assertTrue(server.isCoordinatorActive()));
+
+        seedRunningJobMetrics();
+        seedCheckpointMonitor(server);
+        seedFinishedJobStores();
+        seedConnectorJarRefCounters();
+
+        List<MetricFamilySamples> metrics =
+                new 
EngineStateStoreLogicalMetricExports(instance.node).collect();
+
+        Assertions.assertEquals(
+                3d,
+                findSampleValue(
+                        metrics, 
"engine_state_store_running_job_metrics_task_contexts", null));
+        Assertions.assertEquals(
+                2d,
+                findSampleValue(
+                        metrics,
+                        
"engine_state_store_running_job_metrics_active_partition_keys",
+                        null));
+        Assertions.assertEquals(
+                1d, findSampleValue(metrics, 
"engine_state_store_checkpoint_monitor_jobs", null));
+        Assertions.assertEquals(
+                1d,
+                findSampleValue(
+                        metrics,
+                        
"engine_state_store_checkpoint_monitor_in_progress_checkpoints",
+                        null));
+        Assertions.assertEquals(
+                2d,
+                findSampleValue(
+                        metrics,
+                        
"engine_state_store_checkpoint_monitor_retained_history_entries",
+                        null));
+        Assertions.assertEquals(
+                1d,
+                findSampleValue(
+                        metrics,
+                        "engine_state_store_finished_job_records",
+                        Constant.IMAP_FINISHED_JOB_STATE));
+        Assertions.assertEquals(
+                1d,
+                findSampleValue(
+                        metrics,
+                        "engine_state_store_finished_job_records",
+                        Constant.IMAP_FINISHED_JOB_METRICS));
+        Assertions.assertEquals(
+                1d,
+                findSampleValue(
+                        metrics,
+                        "engine_state_store_finished_job_records",
+                        Constant.IMAP_FINISHED_JOB_VERTEX_INFO));
+        Assertions.assertEquals(
+                1d,
+                findSampleValue(metrics, 
"engine_state_store_connector_jar_tracked_jars", null));
+        Assertions.assertEquals(
+                2d,
+                findSampleValue(
+                        metrics, 
"engine_state_store_connector_jar_total_references", null));
+    }
+
+    @Test
+    void collectShouldTrackFinishedJobCleanupTotals() {
+        instance = 
SeaTunnelServerStarter.createHazelcastInstance(createTestConfig());
+        await().atMost(10, TimeUnit.SECONDS)
+                .untilAsserted(() -> 
Assertions.assertTrue(instance.node.isMaster()));
+        SeaTunnelServer server =
+                
instance.node.getNodeEngine().getService(SeaTunnelServer.SERVICE_NAME);
+        await().atMost(10, TimeUnit.SECONDS)
+                .untilAsserted(() -> 
Assertions.assertTrue(server.isCoordinatorActive()));
+
+        IMap<Long, JobState> finishedJobStateMap =
+                instance.getMap(Constant.IMAP_FINISHED_JOB_STATE);
+        IMap<Long, JobMetrics> finishedJobMetricsMap =
+                instance.getMap(Constant.IMAP_FINISHED_JOB_METRICS);
+        IMap<Long, JobDAGInfo> finishedJobVertexInfoMap =
+                instance.getMap(Constant.IMAP_FINISHED_JOB_VERTEX_INFO);
+        finishedJobStateMap.put(101L, createJobState(101L), 1, 
TimeUnit.MILLISECONDS);
+        finishedJobMetricsMap.put(101L, JobMetrics.of(new HashMap<>()), 1, 
TimeUnit.MILLISECONDS);
+        finishedJobVertexInfoMap.put(101L, createJobDagInfo(101L), 1, 
TimeUnit.MILLISECONDS);
+
+        await().atMost(10, TimeUnit.SECONDS)
+                .untilAsserted(
+                        () -> {
+                            List<MetricFamilySamples> metrics =
+                                    new 
EngineStateStoreLogicalMetricExports(instance.node)
+                                            .collect();
+                            Assertions.assertEquals(
+                                    1d,
+                                    findSampleValue(
+                                            metrics,
+                                            
"engine_state_store_finished_job_cleanup_total",
+                                            Constant.IMAP_FINISHED_JOB_STATE));
+                            Assertions.assertEquals(
+                                    1d,
+                                    findSampleValue(
+                                            metrics,
+                                            
"engine_state_store_finished_job_cleanup_total",
+                                            
Constant.IMAP_FINISHED_JOB_METRICS));
+                            Assertions.assertEquals(
+                                    1d,
+                                    findSampleValue(
+                                            metrics,
+                                            
"engine_state_store_finished_job_cleanup_total",
+                                            
Constant.IMAP_FINISHED_JOB_VERTEX_INFO));
+                        });
+    }
+
+    private void seedRunningJobMetrics() {
+        IMap<Long, HashMap<TaskLocation, SeaTunnelMetricsContext>> metricsMap =
+                instance.getMap(Constant.IMAP_RUNNING_JOB_METRICS);
+        HashMap<TaskLocation, SeaTunnelMetricsContext> partitionZero = new 
HashMap<>();
+        partitionZero.put(new TaskLocation(new TaskGroupLocation(1L, 1, 1L), 
0, 0), metricCtx());
+        partitionZero.put(new TaskLocation(new TaskGroupLocation(1L, 1, 2L), 
0, 0), metricCtx());
+        HashMap<TaskLocation, SeaTunnelMetricsContext> partitionOne = new 
HashMap<>();
+        partitionOne.put(new TaskLocation(new TaskGroupLocation(2L, 1, 1L), 0, 
0), metricCtx());
+        metricsMap.put(0L, partitionZero);
+        metricsMap.put(1L, partitionOne);
+    }
+
+    private void seedCheckpointMonitor(SeaTunnelServer server) {
+        server.getCheckpointMonitorService()
+                .onCheckpointTriggered(7L, 1, 1001L, 
CheckpointType.CHECKPOINT_TYPE, 100L, 2);
+        server.getCheckpointMonitorService()
+                .onCheckpointCompleted(
+                        new CompletedCheckpoint(
+                                7L,
+                                1,
+                                1000L,
+                                10L,
+                                CheckpointType.CHECKPOINT_TYPE,
+                                20L,
+                                Collections.emptyMap(),
+                                Collections.emptyMap()),
+                        128L);
+        server.getCheckpointMonitorService()
+                .onCheckpointFailed(
+                        7L,
+                        1,
+                        1002L,
+                        CheckpointType.CHECKPOINT_TYPE,
+                        CheckpointCloseReason.CHECKPOINT_EXPIRED,
+                        null,
+                        30L);
+    }
+
+    private void seedFinishedJobStores() {
+        instance.getMap(Constant.IMAP_FINISHED_JOB_STATE).put(7L, 
createJobState(7L));
+        instance.getMap(Constant.IMAP_FINISHED_JOB_METRICS).put(7L, 
JobMetrics.of(new HashMap<>()));
+        instance.getMap(Constant.IMAP_FINISHED_JOB_VERTEX_INFO).put(7L, 
createJobDagInfo(7L));
+    }
+
+    private void seedConnectorJarRefCounters() {
+        IMap<ConnectorJarIdentifier, RefCount> connectorJarRefCounters =
+                instance.getMap(Constant.IMAP_CONNECTOR_JAR_REF_COUNTERS);
+        connectorJarRefCounters.put(
+                ConnectorJarIdentifier.of(
+                        ConnectorJarType.CONNECTOR_PLUGIN_JAR, "a.jar", 
"/tmp/a.jar"),
+                createRefCount(2L));
+    }
+
+    private static SeaTunnelMetricsContext metricCtx() {
+        return new SeaTunnelMetricsContext();
+    }
+
+    private static RefCount createRefCount(long references) {
+        RefCount refCount = new RefCount();
+        refCount.setReferences(references);
+        return refCount;
+    }
+
+    private static JobState createJobState(long jobId) {
+        return new JobState(
+                jobId, "job-" + jobId, JobStatus.FINISHED, 1L, 2L, 3L, new 
HashMap<>(), null);
+    }
+
+    private static JobDAGInfo createJobDagInfo(long jobId) {
+        return new JobDAGInfo(
+                jobId,
+                new HashMap<>(),
+                new HashMap<>(),
+                new HashMap<>(),
+                null,
+                Collections.emptySet());
+    }
+
+    private static double findSampleValue(
+            List<MetricFamilySamples> metrics, String metricName, String 
storeName) {
+        Sample sample =
+                findMetricFamily(metrics, metricName).samples.stream()
+                        .filter(
+                                metricSample ->
+                                        storeName == null
+                                                || storeName.equals(
+                                                        
labelValue(metricSample, "store")))
+                        .findFirst()
+                        .orElseThrow(
+                                () ->
+                                        new AssertionError(
+                                                "Missing metric sample: "
+                                                        + metricName
+                                                        + ", store="
+                                                        + storeName));
+        Assertions.assertEquals("hazelcast", labelValue(sample, "backend"));
+        return sample.value;
+    }
+
+    private static MetricFamilySamples findMetricFamily(
+            List<MetricFamilySamples> metrics, String name) {
+        return metrics.stream()
+                .filter(
+                        metricFamilySamples ->
+                                name.equals(metricFamilySamples.name)
+                                        || metricFamilySamples.samples.stream()
+                                                .anyMatch(sample -> 
name.equals(sample.name)))
+                .findFirst()
+                .orElseThrow(() -> new AssertionError("Missing metric family: 
" + name));
+    }
+
+    private static String labelValue(Sample sample, String labelName) {
+        int index = sample.labelNames.indexOf(labelName);
+        if (index < 0) {
+            throw new AssertionError(
+                    "Missing label: "
+                            + labelName
+                            + ", labels="
+                            + Arrays.toString(sample.labelNames.toArray()));
+        }
+        return sample.labelValues.get(index);
+    }
+
+    private static SeaTunnelConfig createTestConfig() {
+        String yaml =
+                "seatunnel:\n"
+                        + "  engine:\n"
+                        + "    telemetry:\n"
+                        + "      metric:\n"
+                        + "        enabled: true\n"
+                        + "    history-job-expire-minutes: 1\n"
+                        + "    jar-storage:\n"
+                        + "      enable: true\n"
+                        + "      connector-jar-storage-mode: SHARED\n"
+                        + "      connector-jar-storage-path: \"\"\n"
+                        + "      connector-jar-cleanup-task-interval: 3600\n"
+                        + "      connector-jar-expiry-time: 600\n";
+        SeaTunnelConfig config = 
ConfigProvider.locateAndGetSeaTunnelConfigFromString(yaml);
+        config.getHazelcastConfig()
+                .setClusterName(
+                        TestUtils.getClusterName(
+                                "EngineStateStoreLogicalMetricExportsTest_" + 
System.nanoTime()));
+        return config;
+    }
+}

Reply via email to