This is an automated email from the ASF dual-hosted git repository.
davidzollo pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new cb407be995 [Improve][Zeta] Add logical state store observability
metrics (#11024)
cb407be995 is described below
commit cb407be99546102a8fd5a3879d21b89fd6caa276
Author: JeremyXin <[email protected]>
AuthorDate: Wed Jun 10 21:26:02 2026 +0800
[Improve][Zeta] Add logical state store observability metrics (#11024)
---
docs/en/engines/zeta/telemetry.md | 67 ++++-
docs/zh/engines/zeta/telemetry.md | 66 +++-
.../engine/server/CoordinatorService.java | 159 ++++++++++
.../monitor/CheckpointMonitorService.java | 183 ++++++++++++
.../engine/server/master/JobHistoryService.java | 78 ++++-
.../service/jar/ConnectorPackageService.java | 26 ++
.../metrics/ExportsInstanceInitializer.java | 3 +
.../EngineStateStoreLogicalMetricExports.java | 251 ++++++++++++++++
.../engine/server/metrics/MetricsApiTest.java | 3 +-
.../EngineStateStoreLogicalMetricExportsTest.java | 332 +++++++++++++++++++++
10 files changed, 1162 insertions(+), 6 deletions(-)
diff --git a/docs/en/engines/zeta/telemetry.md
b/docs/en/engines/zeta/telemetry.md
index 99a87735f7..a749785a18 100644
--- a/docs/en/engines/zeta/telemetry.md
+++ b/docs/en/engines/zeta/telemetry.md
@@ -68,6 +68,71 @@ Example PromQL:
sum by (cluster, store, backend) (engine_state_store_local_owned_entries)
```
+### Engine State Store Logical Metrics
+
+These metrics expose business-aware logical counts for special engine state
stores. They are exported by the active
+master only. The metric names remain backend-neutral, while the current
implementation still labels them with
+`backend="hazelcast"`.
+
+| MetricName | Type | Labels
| DESCRIPTION
|
+|----------------------------------------------------------|---------|---------------------------------------------|-----------------------------------------------------------------------|
+| engine_state_store_running_job_metrics_task_contexts | Gauge |
**backend**, state store backend. | Total task metric contexts
currently stored in `engine_runningJobMetrics`. |
+| engine_state_store_running_job_metrics_active_partition_keys | Gauge |
**backend**, state store backend. | Active top-level partition
buckets currently stored in `engine_runningJobMetrics`. |
+| engine_state_store_checkpoint_monitor_jobs | Gauge |
**backend**, state store backend. | Job count currently tracked in
`engine_checkpoint_monitor`. |
+| engine_state_store_checkpoint_monitor_in_progress_checkpoints | Gauge |
**backend**, state store backend. | In-progress checkpoint count
currently tracked in `engine_checkpoint_monitor`. |
+| engine_state_store_checkpoint_monitor_retained_history_entries | Gauge |
**backend**, state store backend. | Retained checkpoint history entries
currently tracked in `engine_checkpoint_monitor`. |
+| engine_state_store_finished_job_records | Gauge |
**store**, finished job store name. **backend**, state store backend. | Current
record count in finished job stores. |
+| engine_state_store_finished_job_cleanup_total | Counter |
**store**, finished job store name. **backend**, state store backend. | Total
cleanup events observed from finished job store expiration. |
+| engine_state_store_connector_jar_tracked_jars | Gauge |
**backend**, state store backend. | Current tracked connector jar
count in `engine_connectorJarRefCounters`. |
+| engine_state_store_connector_jar_total_references | Gauge |
**backend**, state store backend. | Sum of connector jar reference
counts in `engine_connectorJarRefCounters`. |
+
+Logical metrics complement the local Hazelcast store metrics above:
+
+- Use `engine_state_store_local_*` metrics when you want to understand data
distribution and memory usage on each node.
+- Use `engine_state_store_*` logical metrics when you want to understand
engine semantics such as checkpoint backlog, finished-job retention, or
connector jar reuse.
+
+Example PromQL:
+
+```promql
+# Total state store entries by store
+sum by (cluster, store, backend) (engine_state_store_local_owned_entries)
+
+# Current checkpoint backlog
+engine_state_store_checkpoint_monitor_in_progress_checkpoints{backend="hazelcast"}
+
+# Finished-job cleanup growth over the last 15 minutes
+increase(engine_state_store_finished_job_cleanup_total{backend="hazelcast"}[15m])
+
+# Connector jar reference pressure
+engine_state_store_connector_jar_total_references{backend="hazelcast"}
+```
+
+Example Grafana panels:
+
+- `State Store Total Entries`
+
+```promql
+sum by (store) (engine_state_store_local_owned_entries{backend="hazelcast"})
+```
+
+- `Checkpoint In-Progress Count`
+
+```promql
+engine_state_store_checkpoint_monitor_in_progress_checkpoints{backend="hazelcast"}
+```
+
+- `Finished Job Cleanup Rate`
+
+```promql
+sum by (store)
(rate(engine_state_store_finished_job_cleanup_total{backend="hazelcast"}[5m]))
+```
+
+- `Connector Jar Reference Count`
+
+```promql
+engine_state_store_connector_jar_total_references{backend="hazelcast"}
+```
+
### Thread Pool Status
| MetricName | Type | Labels
| DESCRIPTION
|
@@ -166,4 +231,4 @@ the
[Installation](https://grafana.com/docs/grafana/latest/setup-grafana/install
- Add Prometheus DataSource on Grafana.
- Import the `Seatunnel Cluster` monitoring dashboard JSON into Grafana.
-The [effect image](../../../images/grafana.png) of the dashboard
+The [effect image](../../../images/grafana.png) of the dashboard
\ No newline at end of file
diff --git a/docs/zh/engines/zeta/telemetry.md
b/docs/zh/engines/zeta/telemetry.md
index 00d679845f..0da7c4e1b8 100644
--- a/docs/zh/engines/zeta/telemetry.md
+++ b/docs/zh/engines/zeta/telemetry.md
@@ -68,6 +68,70 @@ PromQL 示例:
sum by (cluster, store, backend) (engine_state_store_local_owned_entries)
```
+### 引擎状态存储逻辑指标
+
+这些指标暴露特定引擎状态存储的业务语义计数。它们仅由当前 active master 输出。指标名保持 backend-neutral,
+但当前实现仍会带上 `backend="hazelcast"` 标签。
+
+| MetricName | Type | Labels
| 描述 |
+|----------------------------------------------------------|---------|---------------------------------------------|------|
+| engine_state_store_running_job_metrics_task_contexts | Gauge |
**backend**,状态存储后端。 | `engine_runningJobMetrics` 中当前保存的 task
metric context 总数。 |
+| engine_state_store_running_job_metrics_active_partition_keys | Gauge |
**backend**,状态存储后端。 | `engine_runningJobMetrics` 中当前非空的顶层分桶 key
数。 |
+| engine_state_store_checkpoint_monitor_jobs | Gauge |
**backend**,状态存储后端。 | `engine_checkpoint_monitor` 中当前跟踪的 job
数。 |
+| engine_state_store_checkpoint_monitor_in_progress_checkpoints | Gauge |
**backend**,状态存储后端。 | `engine_checkpoint_monitor` 中当前 in-progress
checkpoint 数。 |
+| engine_state_store_checkpoint_monitor_retained_history_entries | Gauge |
**backend**,状态存储后端。 | `engine_checkpoint_monitor` 中当前保留的 checkpoint
history 条目数。 |
+| engine_state_store_finished_job_records | Gauge |
**store**,finished job store 名称。**backend**,状态存储后端。 | finished job
相关状态存储中的当前记录数。 |
+| engine_state_store_finished_job_cleanup_total | Counter |
**store**,finished job store 名称。**backend**,状态存储后端。 | finished job
状态存储因过期而发生的清理总次数。 |
+| engine_state_store_connector_jar_tracked_jars | Gauge |
**backend**,状态存储后端。 | `engine_connectorJarRefCounters` 中当前被跟踪的
connector jar 数。 |
+| engine_state_store_connector_jar_total_references | Gauge |
**backend**,状态存储后端。 | `engine_connectorJarRefCounters` 中当前所有
connector jar 引用计数之和。 |
+
+这些逻辑指标与上面的本地 Hazelcast store 指标互为补充:
+
+- 当你想看每个节点上的数据分布和内存占用时,使用 `engine_state_store_local_*` 指标。
+- 当你想看 checkpoint 积压、finished job 保留、connector jar 复用等引擎语义时,使用
`engine_state_store_*` 逻辑指标。
+
+PromQL 示例:
+
+```promql
+# 按 store 聚合后的状态存储总 entry 数
+sum by (cluster, store, backend) (engine_state_store_local_owned_entries)
+
+# 当前 checkpoint 积压
+engine_state_store_checkpoint_monitor_in_progress_checkpoints{backend="hazelcast"}
+
+# 最近 15 分钟 finished job cleanup 增长量
+increase(engine_state_store_finished_job_cleanup_total{backend="hazelcast"}[15m])
+
+# connector jar 引用压力
+engine_state_store_connector_jar_total_references{backend="hazelcast"}
+```
+
+Grafana 面板示例:
+
+- `State Store Total Entries`
+
+```promql
+sum by (store) (engine_state_store_local_owned_entries{backend="hazelcast"})
+```
+
+- `Checkpoint In-Progress Count`
+
+```promql
+engine_state_store_checkpoint_monitor_in_progress_checkpoints{backend="hazelcast"}
+```
+
+- `Finished Job Cleanup Rate`
+
+```promql
+sum by (store)
(rate(engine_state_store_finished_job_cleanup_total{backend="hazelcast"}[5m]))
+```
+
+- `Connector Jar Reference Count`
+
+```promql
+engine_state_store_connector_jar_total_references{backend="hazelcast"}
+```
+
### 线程池状态
| MetricName | Type | Labels
| 描述 |
@@ -166,4 +230,4 @@ scrape_configs:
- 在 Grafana 中添加 Prometheus 数据源。
- 将 `Seatunnel Cluster` 监控仪表板 JSON 导入到 Grafana 中。
-监控[效果图](../../../images/grafana.png)
+监控[效果图](../../../images/grafana.png)
\ No newline at end of file
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
index 71b232d82d..720c94028b 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
@@ -88,11 +88,15 @@ import
org.apache.seatunnel.engine.server.utils.PeekBlockingQueue;
import com.hazelcast.cluster.Address;
import com.hazelcast.config.Config;
+import com.hazelcast.core.EntryEvent;
import com.hazelcast.core.HazelcastInstanceNotActiveException;
import com.hazelcast.internal.serialization.Data;
import com.hazelcast.internal.services.MembershipServiceEvent;
import com.hazelcast.logging.ILogger;
import com.hazelcast.map.IMap;
+import com.hazelcast.map.listener.EntryAddedListener;
+import com.hazelcast.map.listener.EntryRemovedListener;
+import com.hazelcast.map.listener.EntryUpdatedListener;
import com.hazelcast.ringbuffer.Ringbuffer;
import com.hazelcast.spi.impl.NodeEngineImpl;
import lombok.NonNull;
@@ -109,6 +113,7 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
+import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -221,6 +226,20 @@ public class CoordinatorService {
private final AtomicBoolean coordinatorServiceCleared = new
AtomicBoolean(false);
+ private final AtomicLong runningJobMetricsPartitionKeyCount = new
AtomicLong();
+
+ private final AtomicLong runningJobMetricsTaskContextCount = new
AtomicLong();
+
+ private final Object runningJobMetricsStatsLock = new Object();
+
+ private final Set<Long> runningJobMetricsDirtyKeys =
ConcurrentHashMap.newKeySet();
+
+ private final Map<Long, RunningJobMetricsStats>
runningJobMetricsStatsByJobId = new HashMap<>();
+
+ private final AtomicBoolean runningJobMetricsInitializing = new
AtomicBoolean(false);
+
+ private volatile UUID runningJobMetricsListenerId;
+
public CoordinatorService(
@NonNull NodeEngineImpl nodeEngine,
@NonNull SeaTunnelServer seaTunnelServer,
@@ -507,6 +526,7 @@ public class CoordinatorService {
ownedSlotProfilesIMap =
nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_OWNED_SLOT_PROFILES);
metricsImap =
nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_RUNNING_JOB_METRICS);
+ initRunningJobMetricsStoreStats();
pendingPipelineCleanupIMap =
nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_PENDING_PIPELINE_CLEANUP);
pendingJobCleanupIMap =
@@ -1088,6 +1108,7 @@ public class CoordinatorService {
if (!coordinatorServiceCleared.compareAndSet(false, true)) {
return;
}
+ removeRunningJobMetricsListener();
// interrupt all JobMaster
runningJobMasterMap.values().forEach(JobMaster::interrupt);
if (isWaitStrategy) {
@@ -1615,6 +1636,7 @@ public class CoordinatorService {
if (pipelineCleanupScheduler != null) {
pipelineCleanupScheduler.shutdown();
}
+ removeRunningJobMetricsListener();
clearCoordinatorService();
awaitSchedulerTermination("master active listener",
masterActiveListener);
awaitSchedulerTermination("pipeline cleanup scheduler",
pipelineCleanupScheduler);
@@ -1876,6 +1898,14 @@ public class CoordinatorService {
return pendingJobQueue.getJobIdMap().size();
}
+ public long getRunningJobMetricsPartitionKeyCount() {
+ return runningJobMetricsPartitionKeyCount.get();
+ }
+
+ public long getRunningJobMetricsTaskContextCount() {
+ return runningJobMetricsTaskContextCount.get();
+ }
+
public EngineConfig getEngineConfig() {
return engineConfig;
}
@@ -1900,6 +1930,135 @@ public class CoordinatorService {
}
}
+ private void initRunningJobMetricsStoreStats() {
+ removeRunningJobMetricsListener();
+ runningJobMetricsListenerId =
+ metricsImap.addEntryListener(new
RunningJobMetricsEntryListener(), true);
+ runningJobMetricsInitializing.set(true);
+ runningJobMetricsDirtyKeys.clear();
+
+ Map<Long, RunningJobMetricsStats> snapshotStats = new HashMap<>();
+ metricsImap.forEach(
+ (partitionKey, metrics) ->
+ snapshotStats.put(partitionKey,
toRunningJobMetricsStats(metrics)));
+
+ Set<Long> dirtyKeys = new HashSet<>(runningJobMetricsDirtyKeys);
+ for (Long partitionKey : dirtyKeys) {
+ snapshotStats.put(
+ partitionKey,
toRunningJobMetricsStats(metricsImap.get(partitionKey)));
+ }
+
+ synchronized (runningJobMetricsStatsLock) {
+ runningJobMetricsStatsByJobId.clear();
+ runningJobMetricsPartitionKeyCount.set(0L);
+ runningJobMetricsTaskContextCount.set(0L);
+ snapshotStats.forEach(this::replaceRunningJobMetricsStatsLocked);
+ runningJobMetricsInitializing.set(false);
+
+ Set<Long> postSnapshotDirtyKeys = new
HashSet<>(runningJobMetricsDirtyKeys);
+ runningJobMetricsDirtyKeys.clear();
+ for (Long partitionKey : postSnapshotDirtyKeys) {
+ replaceRunningJobMetricsStatsLocked(
+ partitionKey,
toRunningJobMetricsStats(metricsImap.get(partitionKey)));
+ }
+ }
+ }
+
+ private RunningJobMetricsStats toRunningJobMetricsStats(
+ Map<TaskLocation, SeaTunnelMetricsContext> metrics) {
+ if (metrics == null || metrics.isEmpty()) {
+ return RunningJobMetricsStats.EMPTY;
+ }
+ return new RunningJobMetricsStats(1L, metrics.size());
+ }
+
+ private void replaceRunningJobMetricsStatsLocked(
+ Long partitionKey, RunningJobMetricsStats stats) {
+ RunningJobMetricsStats currentStats =
runningJobMetricsStatsByJobId.get(partitionKey);
+ if (currentStats != null) {
+
runningJobMetricsPartitionKeyCount.addAndGet(-currentStats.partitionKeyCount);
+
runningJobMetricsTaskContextCount.addAndGet(-currentStats.taskContextCount);
+ }
+
+ if (stats.isEmpty()) {
+ runningJobMetricsStatsByJobId.remove(partitionKey);
+ return;
+ }
+
+ runningJobMetricsStatsByJobId.put(partitionKey, stats);
+ runningJobMetricsPartitionKeyCount.addAndGet(stats.partitionKeyCount);
+ runningJobMetricsTaskContextCount.addAndGet(stats.taskContextCount);
+ }
+
+ private void removeRunningJobMetricsListener() {
+ if (metricsImap != null && runningJobMetricsListenerId != null) {
+ metricsImap.removeEntryListener(runningJobMetricsListenerId);
+ runningJobMetricsListenerId = null;
+ }
+ runningJobMetricsInitializing.set(false);
+ runningJobMetricsDirtyKeys.clear();
+ synchronized (runningJobMetricsStatsLock) {
+ runningJobMetricsStatsByJobId.clear();
+ }
+ runningJobMetricsPartitionKeyCount.set(0L);
+ runningJobMetricsTaskContextCount.set(0L);
+ }
+
+ private final class RunningJobMetricsEntryListener
+ implements EntryAddedListener<Long, HashMap<TaskLocation,
SeaTunnelMetricsContext>>,
+ EntryUpdatedListener<Long, HashMap<TaskLocation,
SeaTunnelMetricsContext>>,
+ EntryRemovedListener<Long, HashMap<TaskLocation,
SeaTunnelMetricsContext>> {
+
+ @Override
+ public void entryAdded(
+ EntryEvent<Long, HashMap<TaskLocation,
SeaTunnelMetricsContext>> event) {
+ replaceRunningJobMetricsStats(event.getKey(), event.getValue());
+ }
+
+ @Override
+ public void entryUpdated(
+ EntryEvent<Long, HashMap<TaskLocation,
SeaTunnelMetricsContext>> event) {
+ replaceRunningJobMetricsStats(event.getKey(), event.getValue());
+ }
+
+ @Override
+ public void entryRemoved(
+ EntryEvent<Long, HashMap<TaskLocation,
SeaTunnelMetricsContext>> event) {
+ replaceRunningJobMetricsStats(event.getKey(), null);
+ }
+ }
+
+ private void replaceRunningJobMetricsStats(
+ Long partitionKey, Map<TaskLocation, SeaTunnelMetricsContext>
metrics) {
+ if (runningJobMetricsInitializing.get()) {
+ runningJobMetricsDirtyKeys.add(partitionKey);
+ return;
+ }
+ synchronized (runningJobMetricsStatsLock) {
+ if (runningJobMetricsInitializing.get()) {
+ runningJobMetricsDirtyKeys.add(partitionKey);
+ return;
+ }
+ replaceRunningJobMetricsStatsLocked(partitionKey,
toRunningJobMetricsStats(metrics));
+ }
+ }
+
+ private static final class RunningJobMetricsStats {
+ private static final RunningJobMetricsStats EMPTY = new
RunningJobMetricsStats(0L, 0L);
+
+ private final long partitionKeyCount;
+ private final long taskContextCount;
+
+ private RunningJobMetricsStats(long partitionKeyCount, long
taskContextCount) {
+ this.partitionKeyCount = partitionKeyCount;
+ this.taskContextCount = taskContextCount;
+ }
+
+ private boolean isEmpty() {
+ return partitionKeyCount == 0L && taskContextCount == 0L;
+ }
+ }
+
@VisibleForTesting
public PeekBlockingQueue<PendingJobInfo> getPendingJobQueue() {
return pendingJobQueue;
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/monitor/CheckpointMonitorService.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/monitor/CheckpointMonitorService.java
index 1f890e9a84..44ec00b857 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/monitor/CheckpointMonitorService.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/monitor/CheckpointMonitorService.java
@@ -32,15 +32,28 @@ import
org.apache.seatunnel.engine.server.checkpoint.CompletedCheckpoint;
import org.apache.seatunnel.engine.server.checkpoint.SubtaskStatistics;
import org.apache.seatunnel.engine.server.checkpoint.TaskStatistics;
+import com.hazelcast.core.EntryEvent;
import com.hazelcast.map.IMap;
+import com.hazelcast.map.listener.EntryAddedListener;
+import com.hazelcast.map.listener.EntryExpiredListener;
+import com.hazelcast.map.listener.EntryRemovedListener;
+import com.hazelcast.map.listener.EntryUpdatedListener;
import com.hazelcast.spi.impl.NodeEngine;
import lombok.extern.slf4j.Slf4j;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Objects;
import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.stream.Collectors;
@@ -50,6 +63,14 @@ public class CheckpointMonitorService {
private final NodeEngine nodeEngine;
private volatile IMap<Long, CheckpointOverview> overviewMap;
private final int maxHistorySize;
+ private final AtomicLong overviewJobCount = new AtomicLong();
+ private final AtomicLong inProgressCheckpointCount = new AtomicLong();
+ private final AtomicLong retainedHistoryCount = new AtomicLong();
+ private final Object overviewStatsLock = new Object();
+ private final Set<Long> overviewDirtyJobIds =
ConcurrentHashMap.newKeySet();
+ private final Map<Long, CheckpointOverviewStats> overviewStatsByJobId =
new HashMap<>();
+ private final AtomicBoolean overviewStatsInitializing = new
AtomicBoolean(false);
+ private volatile UUID overviewListenerId;
public CheckpointMonitorService(NodeEngine nodeEngine, int maxHistorySize)
{
this.nodeEngine = nodeEngine;
@@ -64,6 +85,7 @@ public class CheckpointMonitorService {
nodeEngine
.getHazelcastInstance()
.getMap(Constant.IMAP_CHECKPOINT_MONITOR);
+ initOverviewStats();
}
}
}
@@ -236,6 +258,18 @@ public class CheckpointMonitorService {
});
}
+ public long getOverviewJobCount() {
+ return overviewJobCount.get();
+ }
+
+ public long getInProgressCheckpointCount() {
+ return inProgressCheckpointCount.get();
+ }
+
+ public long getRetainedHistoryCount() {
+ return retainedHistoryCount.get();
+ }
+
private void updateOverview(
long jobId, int pipelineId, Consumer<PipelineCheckpointOverview>
consumer) {
getOverviewMap()
@@ -256,6 +290,155 @@ public class CheckpointMonitorService {
pipeline.getInProgress().removeIf(cp -> cp.getCheckpointId() ==
checkpointId);
}
+ private void initOverviewStats() {
+ removeOverviewListener();
+ overviewListenerId =
+ overviewMap.addEntryListener(new
CheckpointOverviewEntryListener(), true);
+ overviewStatsInitializing.set(true);
+ overviewDirtyJobIds.clear();
+
+ Map<Long, CheckpointOverviewStats> snapshotStats = new HashMap<>();
+ overviewMap.forEach(
+ (jobId, overview) -> snapshotStats.put(jobId,
toOverviewStats(overview)));
+
+ Set<Long> dirtyJobIds = new HashSet<>(overviewDirtyJobIds);
+ for (Long jobId : dirtyJobIds) {
+ snapshotStats.put(jobId, toOverviewStats(overviewMap.get(jobId)));
+ }
+
+ synchronized (overviewStatsLock) {
+ overviewStatsByJobId.clear();
+ overviewJobCount.set(0L);
+ inProgressCheckpointCount.set(0L);
+ retainedHistoryCount.set(0L);
+ snapshotStats.forEach(this::replaceOverviewStatsLocked);
+ overviewStatsInitializing.set(false);
+
+ Set<Long> postSnapshotDirtyJobIds = new
HashSet<>(overviewDirtyJobIds);
+ overviewDirtyJobIds.clear();
+ for (Long jobId : postSnapshotDirtyJobIds) {
+ replaceOverviewStatsLocked(jobId,
toOverviewStats(overviewMap.get(jobId)));
+ }
+ }
+ }
+
+ private void removeOverviewListener() {
+ if (overviewMap != null && overviewListenerId != null) {
+ overviewMap.removeEntryListener(overviewListenerId);
+ overviewListenerId = null;
+ }
+ overviewStatsInitializing.set(false);
+ overviewDirtyJobIds.clear();
+ synchronized (overviewStatsLock) {
+ overviewStatsByJobId.clear();
+ }
+ overviewJobCount.set(0L);
+ inProgressCheckpointCount.set(0L);
+ retainedHistoryCount.set(0L);
+ }
+
+ private CheckpointOverviewStats toOverviewStats(CheckpointOverview
overview) {
+ if (overview == null) {
+ return CheckpointOverviewStats.EMPTY;
+ }
+ return new CheckpointOverviewStats(
+ 1L, getInProgressCount(overview), getHistoryCount(overview));
+ }
+
+ private void replaceOverviewStatsLocked(Long jobId,
CheckpointOverviewStats stats) {
+ CheckpointOverviewStats currentStats = overviewStatsByJobId.get(jobId);
+ if (currentStats != null) {
+ overviewJobCount.addAndGet(-currentStats.jobCount);
+
inProgressCheckpointCount.addAndGet(-currentStats.inProgressCheckpointCount);
+ retainedHistoryCount.addAndGet(-currentStats.retainedHistoryCount);
+ }
+
+ if (stats.isEmpty()) {
+ overviewStatsByJobId.remove(jobId);
+ return;
+ }
+
+ overviewStatsByJobId.put(jobId, stats);
+ overviewJobCount.addAndGet(stats.jobCount);
+ inProgressCheckpointCount.addAndGet(stats.inProgressCheckpointCount);
+ retainedHistoryCount.addAndGet(stats.retainedHistoryCount);
+ }
+
+ private long getInProgressCount(CheckpointOverview overview) {
+ return overview.getPipelines().values().stream()
+ .filter(Objects::nonNull)
+ .mapToLong(pipelineOverview ->
pipelineOverview.getInProgress().size())
+ .sum();
+ }
+
+ private long getHistoryCount(CheckpointOverview overview) {
+ return overview.getPipelines().values().stream()
+ .filter(Objects::nonNull)
+ .mapToLong(pipelineOverview ->
pipelineOverview.getHistory().size())
+ .sum();
+ }
+
+ private final class CheckpointOverviewEntryListener
+ implements EntryAddedListener<Long, CheckpointOverview>,
+ EntryUpdatedListener<Long, CheckpointOverview>,
+ EntryRemovedListener<Long, CheckpointOverview>,
+ EntryExpiredListener<Long, CheckpointOverview> {
+
+ @Override
+ public void entryAdded(EntryEvent<Long, CheckpointOverview> event) {
+ replaceOverviewStats(event.getKey(), event.getValue());
+ }
+
+ @Override
+ public void entryUpdated(EntryEvent<Long, CheckpointOverview> event) {
+ replaceOverviewStats(event.getKey(), event.getValue());
+ }
+
+ @Override
+ public void entryRemoved(EntryEvent<Long, CheckpointOverview> event) {
+ replaceOverviewStats(event.getKey(), null);
+ }
+
+ @Override
+ public void entryExpired(EntryEvent<Long, CheckpointOverview> event) {
+ replaceOverviewStats(event.getKey(), null);
+ }
+ }
+
+ private void replaceOverviewStats(Long jobId, CheckpointOverview overview)
{
+ if (overviewStatsInitializing.get()) {
+ overviewDirtyJobIds.add(jobId);
+ return;
+ }
+ synchronized (overviewStatsLock) {
+ if (overviewStatsInitializing.get()) {
+ overviewDirtyJobIds.add(jobId);
+ return;
+ }
+ replaceOverviewStatsLocked(jobId, toOverviewStats(overview));
+ }
+ }
+
+ private static final class CheckpointOverviewStats {
+ private static final CheckpointOverviewStats EMPTY =
+ new CheckpointOverviewStats(0L, 0L, 0L);
+
+ private final long jobCount;
+ private final long inProgressCheckpointCount;
+ private final long retainedHistoryCount;
+
+ private CheckpointOverviewStats(
+ long jobCount, long inProgressCheckpointCount, long
retainedHistoryCount) {
+ this.jobCount = jobCount;
+ this.inProgressCheckpointCount = inProgressCheckpointCount;
+ this.retainedHistoryCount = retainedHistoryCount;
+ }
+
+ private boolean isEmpty() {
+ return jobCount == 0L && inProgressCheckpointCount == 0L &&
retainedHistoryCount == 0L;
+ }
+ }
+
public static long calculateStateSize(CompletedCheckpoint checkpoint) {
return checkpoint.getTaskStatistics().values().stream()
.map(TaskStatistics::getSubtaskStats)
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobHistoryService.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobHistoryService.java
index 027704f58e..4afeabeb9e 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobHistoryService.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobHistoryService.java
@@ -23,6 +23,7 @@ import
org.apache.seatunnel.shade.com.fasterxml.jackson.databind.SerializationFe
import
org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.seatunnel.api.common.metrics.JobMetrics;
+import org.apache.seatunnel.engine.common.Constant;
import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
import org.apache.seatunnel.engine.common.job.JobStatus;
import org.apache.seatunnel.engine.common.job.JobStatusData;
@@ -50,12 +51,16 @@ import lombok.Getter;
import java.io.Serializable;
import java.net.UnknownHostException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -105,6 +110,8 @@ public class JobHistoryService {
private final int finishedJobExpireTime;
+ private final Map<String, AtomicLong> finishedJobCleanupTotals = new
ConcurrentHashMap<>();
+
public JobHistoryService(
NodeEngine nodeEngine,
IMap<Object, Object> runningJobStateIMap,
@@ -123,7 +130,12 @@ public class JobHistoryService {
this.finishedJobStateImap = finishedJobStateImap;
this.finishedJobMetricsImap = finishedJobMetricsImap;
this.finishedJobDAGInfoImap = finishedJobVertexInfoImap;
- this.finishedJobDAGInfoImap.addEntryListener(new
JobInfoExpiredListener(), true);
+ this.finishedJobStateImap.addEntryListener(
+ new
FinishedJobExpiredListener<>(Constant.IMAP_FINISHED_JOB_STATE), true);
+ this.finishedJobMetricsImap.addEntryListener(
+ new
FinishedJobExpiredListener<>(Constant.IMAP_FINISHED_JOB_METRICS), true);
+ this.finishedJobDAGInfoImap.addEntryListener(
+ new
JobInfoExpiredListener(Constant.IMAP_FINISHED_JOB_VERTEX_INFO), true);
this.objectMapper = new ObjectMapper();
this.objectMapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS,
false);
this.finishedJobExpireTime = finishedJobExpireTime;
@@ -332,6 +344,39 @@ public class JobHistoryService {
finishedJobDAGInfoImap.put(jobId, jobInfo, finishedJobExpireTime,
TimeUnit.MINUTES);
}
+ public Map<String, Long> getFinishedJobRecordCounts() {
+ Map<String, Long> counts = new HashMap<>();
+ counts.put(Constant.IMAP_FINISHED_JOB_STATE, (long)
finishedJobStateImap.size());
+ counts.put(Constant.IMAP_FINISHED_JOB_METRICS, (long)
finishedJobMetricsImap.size());
+ counts.put(Constant.IMAP_FINISHED_JOB_VERTEX_INFO, (long)
finishedJobDAGInfoImap.size());
+ return counts;
+ }
+
+ public Map<String, Long> getFinishedJobCleanupTotals() {
+ Map<String, Long> counts = new HashMap<>();
+ counts.put(
+ Constant.IMAP_FINISHED_JOB_STATE,
+ getFinishedJobCleanupTotal(Constant.IMAP_FINISHED_JOB_STATE));
+ counts.put(
+ Constant.IMAP_FINISHED_JOB_METRICS,
+
getFinishedJobCleanupTotal(Constant.IMAP_FINISHED_JOB_METRICS));
+ counts.put(
+ Constant.IMAP_FINISHED_JOB_VERTEX_INFO,
+
getFinishedJobCleanupTotal(Constant.IMAP_FINISHED_JOB_VERTEX_INFO));
+ return counts;
+ }
+
+ private long getFinishedJobCleanupTotal(String storeName) {
+ AtomicLong total = finishedJobCleanupTotals.get(storeName);
+ return total == null ? 0L : total.get();
+ }
+
+ private void incrementFinishedJobCleanupTotal(String storeName) {
+ finishedJobCleanupTotals
+ .computeIfAbsent(storeName, key -> new AtomicLong())
+ .incrementAndGet();
+ }
+
@AllArgsConstructor
@Data
public static final class JobState implements Serializable {
@@ -354,14 +399,41 @@ public class JobHistoryService {
private Map<TaskGroupLocation, ExecutionState> executionStateMap;
}
+ private class FinishedJobExpiredListener<T> implements
EntryExpiredListener<Long, T> {
+ private final String storeName;
+
+ private FinishedJobExpiredListener(String storeName) {
+ this.storeName = storeName;
+ }
+
+ @Override
+ public void entryExpired(EntryEvent<Long, T> event) {
+ incrementFinishedJobCleanupTotal(storeName);
+ }
+ }
+
private class JobInfoExpiredListener implements EntryExpiredListener<Long,
JobDAGInfo> {
+ private final String storeName;
+
+ private JobInfoExpiredListener(String storeName) {
+ this.storeName = storeName;
+ }
+
@Override
public void entryExpired(EntryEvent<Long, JobDAGInfo> event) {
+ incrementFinishedJobCleanupTotal(storeName);
Long jobId = event.getKey();
JobDAGInfo jobDagInfo = event.getOldValue();
+ if (jobDagInfo == null) {
+ return;
+ }
try {
- Set<ExecutionAddress> historyExecutionPlan =
jobDagInfo.getHistoryExecutionPlan();
- Stream.concat(historyExecutionPlan.stream(),
Stream.of(jobDagInfo.getMaster()))
+ Set<ExecutionAddress> historyExecutionPlan =
+
Optional.ofNullable(jobDagInfo.getHistoryExecutionPlan())
+ .orElseGet(Collections::emptySet);
+ Stream.concat(
+ historyExecutionPlan.stream(),
+
Stream.of(jobDagInfo.getMaster()).filter(Objects::nonNull))
.forEach(
address -> {
logger.info(
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/jar/ConnectorPackageService.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/jar/ConnectorPackageService.java
index 4e1ec89593..a6bd7dd6b4 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/jar/ConnectorPackageService.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/jar/ConnectorPackageService.java
@@ -17,11 +17,13 @@
package org.apache.seatunnel.engine.server.service.jar;
+import org.apache.seatunnel.engine.common.Constant;
import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
import
org.apache.seatunnel.engine.common.config.server.ConnectorJarStorageConfig;
import
org.apache.seatunnel.engine.common.config.server.ConnectorJarStorageMode;
import org.apache.seatunnel.engine.core.job.ConnectorJar;
import org.apache.seatunnel.engine.core.job.ConnectorJarIdentifier;
+import org.apache.seatunnel.engine.core.job.RefCount;
import org.apache.seatunnel.engine.server.SeaTunnelServer;
import
org.apache.seatunnel.engine.server.task.operation.SendConnectorJarToMemberNodeOperation;
import org.apache.seatunnel.engine.server.utils.NodeEngineUtil;
@@ -34,6 +36,7 @@ import com.hazelcast.spi.impl.NodeEngineImpl;
import com.hazelcast.spi.impl.operationservice.impl.InvocationFuture;
import lombok.extern.slf4j.Slf4j;
+import java.util.Collection;
import java.util.List;
@Slf4j
@@ -120,4 +123,27 @@ public class ConnectorPackageService {
long jobId, List<ConnectorJarIdentifier>
connectorJarIdentifierList) {
connectorJarStorageStrategy.cleanUpWhenJobFinished(jobId,
connectorJarIdentifierList);
}
+
+ public int getTrackedConnectorJarCount() {
+ return nodeEngine
+ .getHazelcastInstance()
+ .<ConnectorJarIdentifier,
RefCount>getMap(Constant.IMAP_CONNECTOR_JAR_REF_COUNTERS)
+ .size();
+ }
+
+ public long getTotalConnectorJarReferences() {
+ Collection<RefCount> refCounts =
+ nodeEngine
+ .getHazelcastInstance()
+ .<ConnectorJarIdentifier, RefCount>getMap(
+ Constant.IMAP_CONNECTOR_JAR_REF_COUNTERS)
+ .values();
+ long total = 0L;
+ for (RefCount refCount : refCounts) {
+ if (refCount != null && refCount.getReferences() != null) {
+ total += refCount.getReferences();
+ }
+ }
+ return total;
+ }
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/telemetry/metrics/ExportsInstanceInitializer.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/telemetry/metrics/ExportsInstanceInitializer.java
index 8d429536a2..d4b6cba6ec 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/telemetry/metrics/ExportsInstanceInitializer.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/telemetry/metrics/ExportsInstanceInitializer.java
@@ -18,6 +18,7 @@
package org.apache.seatunnel.engine.server.telemetry.metrics;
import
org.apache.seatunnel.engine.server.telemetry.metrics.exports.ClusterMetricExports;
+import
org.apache.seatunnel.engine.server.telemetry.metrics.exports.EngineStateStoreLogicalMetricExports;
import
org.apache.seatunnel.engine.server.telemetry.metrics.exports.EngineStateStoreMetricExports;
import
org.apache.seatunnel.engine.server.telemetry.metrics.exports.JobMetricExports;
import
org.apache.seatunnel.engine.server.telemetry.metrics.exports.JobThreadPoolStatusExports;
@@ -48,6 +49,8 @@ public final class ExportsInstanceInitializer {
new NodeMetricExports(node).register(collectorRegistry);
// Engine state store metrics
new
EngineStateStoreMetricExports(node).register(collectorRegistry);
+ // Engine state store logical metrics
+ new
EngineStateStoreLogicalMetricExports(node).register(collectorRegistry);
// Cluster metrics
new ClusterMetricExports(node).register(collectorRegistry);
initialized = true;
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/telemetry/metrics/exports/EngineStateStoreLogicalMetricExports.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/telemetry/metrics/exports/EngineStateStoreLogicalMetricExports.java
new file mode 100644
index 0000000000..49ce9252b6
--- /dev/null
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/telemetry/metrics/exports/EngineStateStoreLogicalMetricExports.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.telemetry.metrics.exports;
+
+import org.apache.seatunnel.engine.server.SeaTunnelServer;
+import org.apache.seatunnel.engine.server.master.JobHistoryService;
+import org.apache.seatunnel.engine.server.service.jar.ConnectorPackageService;
+import org.apache.seatunnel.engine.server.telemetry.metrics.AbstractCollector;
+
+import com.hazelcast.instance.impl.Node;
+import io.prometheus.client.CounterMetricFamily;
+import io.prometheus.client.GaugeMetricFamily;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class EngineStateStoreLogicalMetricExports extends AbstractCollector {
+
+ private static final String BACKEND = "hazelcast";
+
+ public EngineStateStoreLogicalMetricExports(Node node) {
+ super(node);
+ }
+
+ @Override
+ public List<MetricFamilySamples> collect() {
+ List<String> commonLabelNames = clusterLabelNames("backend");
+ List<String> storeLabelNames = clusterLabelNames("store", "backend");
+
+ GaugeMetricFamily runningJobMetricsTaskContexts =
+ new GaugeMetricFamily(
+ "engine_state_store_running_job_metrics_task_contexts",
+ "Logical task metrics contexts stored in
engine_runningJobMetrics",
+ commonLabelNames);
+ GaugeMetricFamily runningJobMetricsActivePartitionKeys =
+ new GaugeMetricFamily(
+
"engine_state_store_running_job_metrics_active_partition_keys",
+ "Active partition-key count stored in
engine_runningJobMetrics",
+ commonLabelNames);
+ GaugeMetricFamily checkpointMonitorJobs =
+ new GaugeMetricFamily(
+ "engine_state_store_checkpoint_monitor_jobs",
+ "Job count stored in engine_checkpoint_monitor",
+ commonLabelNames);
+ GaugeMetricFamily checkpointMonitorInProgressCheckpoints =
+ new GaugeMetricFamily(
+
"engine_state_store_checkpoint_monitor_in_progress_checkpoints",
+ "In-progress checkpoint count stored in
engine_checkpoint_monitor",
+ commonLabelNames);
+ GaugeMetricFamily checkpointMonitorRetainedHistoryEntries =
+ new GaugeMetricFamily(
+
"engine_state_store_checkpoint_monitor_retained_history_entries",
+ "Retained checkpoint history entry count stored in
engine_checkpoint_monitor",
+ commonLabelNames);
+ GaugeMetricFamily finishedJobRecords =
+ new GaugeMetricFamily(
+ "engine_state_store_finished_job_records",
+ "Finished job records stored in finished job state
stores",
+ storeLabelNames);
+ CounterMetricFamily finishedJobCleanupTotal =
+ new CounterMetricFamily(
+ "engine_state_store_finished_job_cleanup_total",
+ "Cleanup total for finished job state stores",
+ storeLabelNames);
+ GaugeMetricFamily connectorJarTrackedJars =
+ new GaugeMetricFamily(
+ "engine_state_store_connector_jar_tracked_jars",
+ "Tracked connector jar count stored in
engine_connectorJarRefCounters",
+ commonLabelNames);
+ GaugeMetricFamily connectorJarTotalReferences =
+ new GaugeMetricFamily(
+ "engine_state_store_connector_jar_total_references",
+ "Total connector jar references stored in
engine_connectorJarRefCounters",
+ commonLabelNames);
+
+ if (!isMaster() || !isCoordinatorReady()) {
+ return metricFamilies(
+ runningJobMetricsTaskContexts,
+ runningJobMetricsActivePartitionKeys,
+ checkpointMonitorJobs,
+ checkpointMonitorInProgressCheckpoints,
+ checkpointMonitorRetainedHistoryEntries,
+ finishedJobRecords,
+ finishedJobCleanupTotal,
+ connectorJarTrackedJars,
+ connectorJarTotalReferences);
+ }
+
+ runSafely(
+ "engine_state_store_running_job_metrics",
+ () -> {
+ addMetric(
+ runningJobMetricsTaskContexts,
+
getCoordinatorService().getRunningJobMetricsTaskContextCount(),
+ labelValues(BACKEND));
+ addMetric(
+ runningJobMetricsActivePartitionKeys,
+
getCoordinatorService().getRunningJobMetricsPartitionKeyCount(),
+ labelValues(BACKEND));
+ });
+
+ SeaTunnelServer server = getServer();
+ runSafely(
+ "engine_state_store_checkpoint_monitor",
+ () -> {
+ addMetric(
+ checkpointMonitorJobs,
+
server.getCheckpointMonitorService().getOverviewJobCount(),
+ labelValues(BACKEND));
+ addMetric(
+ checkpointMonitorInProgressCheckpoints,
+
server.getCheckpointMonitorService().getInProgressCheckpointCount(),
+ labelValues(BACKEND));
+ addMetric(
+ checkpointMonitorRetainedHistoryEntries,
+
server.getCheckpointMonitorService().getRetainedHistoryCount(),
+ labelValues(BACKEND));
+ });
+
+ runSafely(
+ "engine_state_store_finished_job",
+ () -> {
+ JobHistoryService jobHistoryService =
+ getCoordinatorService().getJobHistoryService();
+ addStoreMetrics(
+ finishedJobRecords,
jobHistoryService.getFinishedJobRecordCounts());
+ addStoreMetrics(
+ finishedJobCleanupTotal,
+ jobHistoryService.getFinishedJobCleanupTotals());
+ });
+
+ runSafely(
+ "engine_state_store_connector_jar",
+ () -> {
+ ConnectorPackageService connectorPackageService = null;
+ try {
+ connectorPackageService =
server.getConnectorPackageService();
+ } catch (Exception e) {
+ getLogger(getClass())
+ .fine(
+ "Connector package service is not
enabled; exporting fallback metrics");
+ }
+ addMetric(
+ connectorJarTrackedJars,
+
getTrackedConnectorJarCount(connectorPackageService),
+ labelValues(BACKEND));
+ addMetric(
+ connectorJarTotalReferences,
+
getTotalConnectorJarReferences(connectorPackageService),
+ labelValues(BACKEND));
+ });
+
+ return metricFamilies(
+ runningJobMetricsTaskContexts,
+ runningJobMetricsActivePartitionKeys,
+ checkpointMonitorJobs,
+ checkpointMonitorInProgressCheckpoints,
+ checkpointMonitorRetainedHistoryEntries,
+ finishedJobRecords,
+ finishedJobCleanupTotal,
+ connectorJarTrackedJars,
+ connectorJarTotalReferences);
+ }
+
+ private List<MetricFamilySamples> metricFamilies(MetricFamilySamples...
samples) {
+ List<MetricFamilySamples> metrics = new ArrayList<>();
+ for (MetricFamilySamples sample : samples) {
+ metrics.add(sample);
+ }
+ return metrics;
+ }
+
+ private void addStoreMetrics(GaugeMetricFamily metricFamily, Map<String,
Long> storeMetrics) {
+ storeMetrics.forEach(
+ (storeName, value) ->
+ addMetric(metricFamily, value, labelValues(storeName,
BACKEND)));
+ }
+
+ private void addStoreMetrics(CounterMetricFamily metricFamily, Map<String,
Long> storeMetrics) {
+ storeMetrics.forEach(
+ (storeName, value) ->
+ addMetric(metricFamily, value, labelValues(storeName,
BACKEND)));
+ }
+
+ private void addMetric(GaugeMetricFamily metricFamily, long value,
List<String> labels) {
+ metricFamily.addMetric(labels, value);
+ }
+
+ private void addMetric(CounterMetricFamily metricFamily, long value,
List<String> labels) {
+ metricFamily.addMetric(labels, value);
+ }
+
+ private int getTrackedConnectorJarCount(ConnectorPackageService
connectorPackageService) {
+ if (connectorPackageService != null) {
+ return connectorPackageService.getTrackedConnectorJarCount();
+ }
+ return getConnectorJarRefCounters().size();
+ }
+
+ private long getTotalConnectorJarReferences(ConnectorPackageService
connectorPackageService) {
+ if (connectorPackageService != null) {
+ return connectorPackageService.getTotalConnectorJarReferences();
+ }
+ long total = 0L;
+ for (org.apache.seatunnel.engine.core.job.RefCount refCount :
+ getConnectorJarRefCounters().values()) {
+ if (refCount != null && refCount.getReferences() != null) {
+ total += refCount.getReferences();
+ }
+ }
+ return total;
+ }
+
+ private com.hazelcast.map.IMap<?,
org.apache.seatunnel.engine.core.job.RefCount>
+ getConnectorJarRefCounters() {
+ return getNode()
+ .hazelcastInstance
+ .getMap(
+ org.apache.seatunnel.engine.common.Constant
+ .IMAP_CONNECTOR_JAR_REF_COUNTERS);
+ }
+
+ private void runSafely(String metricGroup, Runnable runnable) {
+ try {
+ runnable.run();
+ } catch (Exception e) {
+ getLogger(getClass())
+ .warning(
+ String.format(
+ "Failed to collect logical state store
metrics, group=%s, backend=%s",
+ metricGroup, BACKEND),
+ e);
+ }
+ }
+}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/metrics/MetricsApiTest.java
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/metrics/MetricsApiTest.java
index dcfe8989ce..968e8c8c02 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/metrics/MetricsApiTest.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/metrics/MetricsApiTest.java
@@ -55,7 +55,8 @@ public class MetricsApiTest {
.then()
.statusCode(200)
.body(containsString("process_start_time_seconds"))
-
.body(containsString("engine_state_store_local_owned_entries"));
+ .body(containsString("engine_state_store_local_owned_entries"))
+
.body(containsString("engine_state_store_checkpoint_monitor_jobs"));
}
@AfterAll
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/telemetry/metrics/exports/EngineStateStoreLogicalMetricExportsTest.java
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/telemetry/metrics/exports/EngineStateStoreLogicalMetricExportsTest.java
new file mode 100644
index 0000000000..dbbfa1dcd5
--- /dev/null
+++
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/telemetry/metrics/exports/EngineStateStoreLogicalMetricExportsTest.java
@@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.telemetry.metrics.exports;
+
+import org.apache.seatunnel.api.common.metrics.JobMetrics;
+import org.apache.seatunnel.engine.common.Constant;
+import org.apache.seatunnel.engine.common.config.ConfigProvider;
+import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
+import org.apache.seatunnel.engine.common.job.JobStatus;
+import org.apache.seatunnel.engine.core.checkpoint.CheckpointType;
+import org.apache.seatunnel.engine.core.job.ConnectorJarIdentifier;
+import org.apache.seatunnel.engine.core.job.ConnectorJarType;
+import org.apache.seatunnel.engine.core.job.JobDAGInfo;
+import org.apache.seatunnel.engine.core.job.RefCount;
+import org.apache.seatunnel.engine.server.SeaTunnelServer;
+import org.apache.seatunnel.engine.server.SeaTunnelServerStarter;
+import org.apache.seatunnel.engine.server.TestUtils;
+import org.apache.seatunnel.engine.server.checkpoint.CheckpointCloseReason;
+import org.apache.seatunnel.engine.server.checkpoint.CompletedCheckpoint;
+import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
+import org.apache.seatunnel.engine.server.execution.TaskLocation;
+import org.apache.seatunnel.engine.server.master.JobHistoryService.JobState;
+import org.apache.seatunnel.engine.server.metrics.SeaTunnelMetricsContext;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.condition.DisabledOnOs;
+import org.junit.jupiter.api.condition.OS;
+
+import com.hazelcast.instance.impl.HazelcastInstanceImpl;
+import com.hazelcast.map.IMap;
+import io.prometheus.client.Collector.MetricFamilySamples;
+import io.prometheus.client.Collector.MetricFamilySamples.Sample;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import static org.awaitility.Awaitility.await;
+
+@DisabledOnOs(OS.WINDOWS)
+class EngineStateStoreLogicalMetricExportsTest {
+
+ private HazelcastInstanceImpl instance;
+
+ @AfterEach
+ void afterEach() {
+ if (instance != null) {
+ instance.shutdown();
+ }
+ }
+
+ @Test
+ void collectShouldExportLogicalMetricsForSpecialStateStores() {
+ instance =
SeaTunnelServerStarter.createHazelcastInstance(createTestConfig());
+ await().atMost(10, TimeUnit.SECONDS)
+ .untilAsserted(() ->
Assertions.assertTrue(instance.node.isMaster()));
+
+ SeaTunnelServer server =
+
instance.node.getNodeEngine().getService(SeaTunnelServer.SERVICE_NAME);
+ await().atMost(10, TimeUnit.SECONDS)
+ .untilAsserted(() ->
Assertions.assertTrue(server.isCoordinatorActive()));
+
+ seedRunningJobMetrics();
+ seedCheckpointMonitor(server);
+ seedFinishedJobStores();
+ seedConnectorJarRefCounters();
+
+ List<MetricFamilySamples> metrics =
+ new
EngineStateStoreLogicalMetricExports(instance.node).collect();
+
+ Assertions.assertEquals(
+ 3d,
+ findSampleValue(
+ metrics,
"engine_state_store_running_job_metrics_task_contexts", null));
+ Assertions.assertEquals(
+ 2d,
+ findSampleValue(
+ metrics,
+
"engine_state_store_running_job_metrics_active_partition_keys",
+ null));
+ Assertions.assertEquals(
+ 1d, findSampleValue(metrics,
"engine_state_store_checkpoint_monitor_jobs", null));
+ Assertions.assertEquals(
+ 1d,
+ findSampleValue(
+ metrics,
+
"engine_state_store_checkpoint_monitor_in_progress_checkpoints",
+ null));
+ Assertions.assertEquals(
+ 2d,
+ findSampleValue(
+ metrics,
+
"engine_state_store_checkpoint_monitor_retained_history_entries",
+ null));
+ Assertions.assertEquals(
+ 1d,
+ findSampleValue(
+ metrics,
+ "engine_state_store_finished_job_records",
+ Constant.IMAP_FINISHED_JOB_STATE));
+ Assertions.assertEquals(
+ 1d,
+ findSampleValue(
+ metrics,
+ "engine_state_store_finished_job_records",
+ Constant.IMAP_FINISHED_JOB_METRICS));
+ Assertions.assertEquals(
+ 1d,
+ findSampleValue(
+ metrics,
+ "engine_state_store_finished_job_records",
+ Constant.IMAP_FINISHED_JOB_VERTEX_INFO));
+ Assertions.assertEquals(
+ 1d,
+ findSampleValue(metrics,
"engine_state_store_connector_jar_tracked_jars", null));
+ Assertions.assertEquals(
+ 2d,
+ findSampleValue(
+ metrics,
"engine_state_store_connector_jar_total_references", null));
+ }
+
+ @Test
+ void collectShouldTrackFinishedJobCleanupTotals() {
+ instance =
SeaTunnelServerStarter.createHazelcastInstance(createTestConfig());
+ await().atMost(10, TimeUnit.SECONDS)
+ .untilAsserted(() ->
Assertions.assertTrue(instance.node.isMaster()));
+ SeaTunnelServer server =
+
instance.node.getNodeEngine().getService(SeaTunnelServer.SERVICE_NAME);
+ await().atMost(10, TimeUnit.SECONDS)
+ .untilAsserted(() ->
Assertions.assertTrue(server.isCoordinatorActive()));
+
+ IMap<Long, JobState> finishedJobStateMap =
+ instance.getMap(Constant.IMAP_FINISHED_JOB_STATE);
+ IMap<Long, JobMetrics> finishedJobMetricsMap =
+ instance.getMap(Constant.IMAP_FINISHED_JOB_METRICS);
+ IMap<Long, JobDAGInfo> finishedJobVertexInfoMap =
+ instance.getMap(Constant.IMAP_FINISHED_JOB_VERTEX_INFO);
+ finishedJobStateMap.put(101L, createJobState(101L), 1,
TimeUnit.MILLISECONDS);
+ finishedJobMetricsMap.put(101L, JobMetrics.of(new HashMap<>()), 1,
TimeUnit.MILLISECONDS);
+ finishedJobVertexInfoMap.put(101L, createJobDagInfo(101L), 1,
TimeUnit.MILLISECONDS);
+
+ await().atMost(10, TimeUnit.SECONDS)
+ .untilAsserted(
+ () -> {
+ List<MetricFamilySamples> metrics =
+ new
EngineStateStoreLogicalMetricExports(instance.node)
+ .collect();
+ Assertions.assertEquals(
+ 1d,
+ findSampleValue(
+ metrics,
+
"engine_state_store_finished_job_cleanup_total",
+ Constant.IMAP_FINISHED_JOB_STATE));
+ Assertions.assertEquals(
+ 1d,
+ findSampleValue(
+ metrics,
+
"engine_state_store_finished_job_cleanup_total",
+
Constant.IMAP_FINISHED_JOB_METRICS));
+ Assertions.assertEquals(
+ 1d,
+ findSampleValue(
+ metrics,
+
"engine_state_store_finished_job_cleanup_total",
+
Constant.IMAP_FINISHED_JOB_VERTEX_INFO));
+ });
+ }
+
+ private void seedRunningJobMetrics() {
+ IMap<Long, HashMap<TaskLocation, SeaTunnelMetricsContext>> metricsMap =
+ instance.getMap(Constant.IMAP_RUNNING_JOB_METRICS);
+ HashMap<TaskLocation, SeaTunnelMetricsContext> partitionZero = new
HashMap<>();
+ partitionZero.put(new TaskLocation(new TaskGroupLocation(1L, 1, 1L),
0, 0), metricCtx());
+ partitionZero.put(new TaskLocation(new TaskGroupLocation(1L, 1, 2L),
0, 0), metricCtx());
+ HashMap<TaskLocation, SeaTunnelMetricsContext> partitionOne = new
HashMap<>();
+ partitionOne.put(new TaskLocation(new TaskGroupLocation(2L, 1, 1L), 0,
0), metricCtx());
+ metricsMap.put(0L, partitionZero);
+ metricsMap.put(1L, partitionOne);
+ }
+
+ private void seedCheckpointMonitor(SeaTunnelServer server) {
+ server.getCheckpointMonitorService()
+ .onCheckpointTriggered(7L, 1, 1001L,
CheckpointType.CHECKPOINT_TYPE, 100L, 2);
+ server.getCheckpointMonitorService()
+ .onCheckpointCompleted(
+ new CompletedCheckpoint(
+ 7L,
+ 1,
+ 1000L,
+ 10L,
+ CheckpointType.CHECKPOINT_TYPE,
+ 20L,
+ Collections.emptyMap(),
+ Collections.emptyMap()),
+ 128L);
+ server.getCheckpointMonitorService()
+ .onCheckpointFailed(
+ 7L,
+ 1,
+ 1002L,
+ CheckpointType.CHECKPOINT_TYPE,
+ CheckpointCloseReason.CHECKPOINT_EXPIRED,
+ null,
+ 30L);
+ }
+
+ private void seedFinishedJobStores() {
+ instance.getMap(Constant.IMAP_FINISHED_JOB_STATE).put(7L,
createJobState(7L));
+ instance.getMap(Constant.IMAP_FINISHED_JOB_METRICS).put(7L,
JobMetrics.of(new HashMap<>()));
+ instance.getMap(Constant.IMAP_FINISHED_JOB_VERTEX_INFO).put(7L,
createJobDagInfo(7L));
+ }
+
+ private void seedConnectorJarRefCounters() {
+ IMap<ConnectorJarIdentifier, RefCount> connectorJarRefCounters =
+ instance.getMap(Constant.IMAP_CONNECTOR_JAR_REF_COUNTERS);
+ connectorJarRefCounters.put(
+ ConnectorJarIdentifier.of(
+ ConnectorJarType.CONNECTOR_PLUGIN_JAR, "a.jar",
"/tmp/a.jar"),
+ createRefCount(2L));
+ }
+
+ private static SeaTunnelMetricsContext metricCtx() {
+ return new SeaTunnelMetricsContext();
+ }
+
+ private static RefCount createRefCount(long references) {
+ RefCount refCount = new RefCount();
+ refCount.setReferences(references);
+ return refCount;
+ }
+
+ private static JobState createJobState(long jobId) {
+ return new JobState(
+ jobId, "job-" + jobId, JobStatus.FINISHED, 1L, 2L, 3L, new
HashMap<>(), null);
+ }
+
+ private static JobDAGInfo createJobDagInfo(long jobId) {
+ return new JobDAGInfo(
+ jobId,
+ new HashMap<>(),
+ new HashMap<>(),
+ new HashMap<>(),
+ null,
+ Collections.emptySet());
+ }
+
+ private static double findSampleValue(
+ List<MetricFamilySamples> metrics, String metricName, String
storeName) {
+ Sample sample =
+ findMetricFamily(metrics, metricName).samples.stream()
+ .filter(
+ metricSample ->
+ storeName == null
+ || storeName.equals(
+
labelValue(metricSample, "store")))
+ .findFirst()
+ .orElseThrow(
+ () ->
+ new AssertionError(
+ "Missing metric sample: "
+ + metricName
+ + ", store="
+ + storeName));
+ Assertions.assertEquals("hazelcast", labelValue(sample, "backend"));
+ return sample.value;
+ }
+
+ private static MetricFamilySamples findMetricFamily(
+ List<MetricFamilySamples> metrics, String name) {
+ return metrics.stream()
+ .filter(
+ metricFamilySamples ->
+ name.equals(metricFamilySamples.name)
+ || metricFamilySamples.samples.stream()
+ .anyMatch(sample ->
name.equals(sample.name)))
+ .findFirst()
+ .orElseThrow(() -> new AssertionError("Missing metric family:
" + name));
+ }
+
+ private static String labelValue(Sample sample, String labelName) {
+ int index = sample.labelNames.indexOf(labelName);
+ if (index < 0) {
+ throw new AssertionError(
+ "Missing label: "
+ + labelName
+ + ", labels="
+ + Arrays.toString(sample.labelNames.toArray()));
+ }
+ return sample.labelValues.get(index);
+ }
+
+ private static SeaTunnelConfig createTestConfig() {
+ String yaml =
+ "seatunnel:\n"
+ + " engine:\n"
+ + " telemetry:\n"
+ + " metric:\n"
+ + " enabled: true\n"
+ + " history-job-expire-minutes: 1\n"
+ + " jar-storage:\n"
+ + " enable: true\n"
+ + " connector-jar-storage-mode: SHARED\n"
+ + " connector-jar-storage-path: \"\"\n"
+ + " connector-jar-cleanup-task-interval: 3600\n"
+ + " connector-jar-expiry-time: 600\n";
+ SeaTunnelConfig config =
ConfigProvider.locateAndGetSeaTunnelConfigFromString(yaml);
+ config.getHazelcastConfig()
+ .setClusterName(
+ TestUtils.getClusterName(
+ "EngineStateStoreLogicalMetricExportsTest_" +
System.nanoTime()));
+ return config;
+ }
+}