This is an automated email from the ASF dual-hosted git repository.

zehnder pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/streampipes.git


The following commit(s) were added to refs/heads/dev by this push:
     new 6b4e815b41 Refactored Prometheus Endpoints to new naming convention 
and functioning (#4039)
6b4e815b41 is described below

commit 6b4e815b410388180414368e966efea4f2121db3
Author: Jacqueline Höllig <[email protected]>
AuthorDate: Mon Dec 15 13:08:56 2025 +0100

    Refactored Prometheus Endpoints to new naming convention and functioning 
(#4039)
    
    Co-authored-by: Philipp Zehnder <[email protected]>
---
 .../commons/prometheus/adapter/AdapterMetrics.java | 28 ++++++++--
 .../loadbalancer/LoadBalancerMetrics.java          | 24 ++++++--
 .../prometheus/pipelines/PipelineFlowMetrics.java  | 22 +++++---
 .../prometheus/pipelines/PipelineFlowStats.java    |  5 ++
 .../prometheus/pipelines/PipelinesMetrics.java     | 65 +++++++++++++---------
 .../prometheus/pipelines/PipelinesStats.java       | 37 ++++++++----
 .../prometheus/service/ElementServiceMetrics.java  | 42 ++++++++++----
 .../spmemorymanager/SpMemoryManagerMetrics.java    | 22 ++++++--
 .../spratelimiter/SpRateLimiterMetrics.java        | 19 ++++++-
 .../management/AdapterUpdateManagement.java        |  3 +
 .../pipeline/ExtensionsLogProvider.java            | 36 ++++++++++--
 .../pipeline/ExtensionsServiceLogExecutor.java     | 22 ++++++++
 .../execution/task/StorePipelineStatusTask.java    |  8 +++
 .../manager/health/PipelineHealthCheck.java        | 38 ++++++++-----
 .../migration/PipelineElementMigrationManager.java |  3 +
 15 files changed, 283 insertions(+), 91 deletions(-)

diff --git 
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/prometheus/adapter/AdapterMetrics.java
 
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/prometheus/adapter/AdapterMetrics.java
index 174531fb6e..99039046e6 100644
--- 
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/prometheus/adapter/AdapterMetrics.java
+++ 
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/prometheus/adapter/AdapterMetrics.java
@@ -47,16 +47,33 @@ public class AdapterMetrics {
 
   private static final String ELEMENT_NOT_FOUND_TEMPLATE = "No entry for 
adapter '%s' found. Please register it first.";
 
-  private final Gauge totalAdapterEventsPublishedMetric;
+/**
+ * @deprecated Use {@link #totalAdapterEventsPublishedMetric} instead.
+ */
+@Deprecated
+public final Gauge totalAdapterEventsPublishedMetricLegacy;
 
-  public AdapterMetrics() {
-    this.totalAdapterEventsPublishedMetric = 
StreamPipesCollectorRegistry.registerGauge(
+public final Gauge totalAdapterEventsPublishedMetric;
+
+public AdapterMetrics() {
+
+    this.totalAdapterEventsPublishedMetricLegacy = 
StreamPipesCollectorRegistry.registerGauge(
         "adapter_events_published_total",
+        "DEPRECATED: Use sp_core_adapter_instance_events_published_total 
instead. Total amount of events published per adapter",
+        "adapterId", "adapterName"
+    );
+
+    this.totalAdapterEventsPublishedMetric = 
StreamPipesCollectorRegistry.registerGauge(
+        "sp_core_adapter_instance_events_published_total",
         "Total amount of events published per adapter",
         "adapterId", "adapterName"
     );
+
     this.registeredAdapters = new HashMap<>();
-  }
+}
+
+
+ 
 
   /**
    * Registers a new adapter with the given adapterId and adapterName.
@@ -93,6 +110,8 @@ public class AdapterMetrics {
       // Order of labels needs to be preserved and match the one in 
AdapterMetrics#register
       totalAdapterEventsPublishedMetric.labels(adapterId, adapterName)
                                        .set(totalEventsPublished);
+      totalAdapterEventsPublishedMetricLegacy.labels(adapterId, adapterName)
+                                       .set(totalEventsPublished);
     } else {
       throw new 
NoSuchElementException(ELEMENT_NOT_FOUND_TEMPLATE.formatted(adapterId));
     }
@@ -109,6 +128,7 @@ public class AdapterMetrics {
   public void remove(String adapterId, String adapterName) {
     if (contains(adapterId)) {
       this.totalAdapterEventsPublishedMetric.remove(adapterId, adapterName);
+      this.totalAdapterEventsPublishedMetricLegacy.remove(adapterId, 
adapterName);
       this.registeredAdapters.remove(adapterId);
     } else {
       throw new 
NoSuchElementException(ELEMENT_NOT_FOUND_TEMPLATE.formatted(adapterId));
diff --git 
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/prometheus/loadbalancer/LoadBalancerMetrics.java
 
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/prometheus/loadbalancer/LoadBalancerMetrics.java
index 685182def4..9e4625525c 100644
--- 
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/prometheus/loadbalancer/LoadBalancerMetrics.java
+++ 
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/prometheus/loadbalancer/LoadBalancerMetrics.java
@@ -26,25 +26,39 @@ import io.prometheus.client.Gauge;
  */
 public class LoadBalancerMetrics {
 
+  @Deprecated
+  public static final Gauge SERVICE_ADAPTER_COUNT_LEGACY = 
StreamPipesCollectorRegistry
+      .registerGauge("lb_service_adapter_count", "DEPRECATED: Use 
sp_extension_adapter_count_total instead. Number of adapters in each extension 
service",
+                     "serviceId");
+  @Deprecated
+  public static final Gauge SERVICE_PIPELINE_COUNT_LEGACY = 
StreamPipesCollectorRegistry
+      .registerGauge("lb_service_pipeline_count", "DEPRECATED: Use 
sp_extension_pipeline_count_total instead.Number of pipelines in each extension 
service",
+                     "serviceId");
+  @Deprecated
+  public static final Gauge MIGRATION_TIME_SECONDS_LEGACY = 
StreamPipesCollectorRegistry
+      .registerGauge("lb_migration_time_seconds", "DEPRECATED: Use 
sp_core_migration_time_seconds instead. Time taken for pipeline migration in 
seconds");
+  
   public static final Gauge SERVICE_ADAPTER_COUNT = 
StreamPipesCollectorRegistry
-      .registerGauge("lb_service_adapter_count", "Number of adapters in each 
extension service",
+      .registerGauge("sp_extension_adapter_count_total", "Number of adapters 
in each extension service",
                      "serviceId");
-
   public static final Gauge SERVICE_PIPELINE_COUNT = 
StreamPipesCollectorRegistry
-      .registerGauge("lb_service_pipeline_count", "Number of pipelines in each 
extension service",
+      .registerGauge("sp_extension_pipeline_count_total", "Number of pipelines 
in each extension service",
                      "serviceId");
-
   public static final Gauge MIGRATION_TIME_SECONDS = 
StreamPipesCollectorRegistry
-      .registerGauge("lb_migration_time_seconds", "Time taken for pipeline 
migration in seconds");
+      .registerGauge("sp_core_migration_time_seconds", "Time taken for 
pipeline migration in seconds");
 
   public LoadBalancerMetrics() {}
 
   public void reportMetrics(String serviceId, int serviceAdapterCount, int 
servicePipelineCount) {
     SERVICE_ADAPTER_COUNT.labels(serviceId).set(serviceAdapterCount);
     SERVICE_PIPELINE_COUNT.labels(serviceId).set(servicePipelineCount);
+
+    SERVICE_ADAPTER_COUNT_LEGACY.labels(serviceId).set(serviceAdapterCount);
+    SERVICE_PIPELINE_COUNT_LEGACY.labels(serviceId).set(servicePipelineCount);
   }
 
   public void reportMigrationTime(double seconds) {
     MIGRATION_TIME_SECONDS.set(seconds);
+    MIGRATION_TIME_SECONDS_LEGACY.set(seconds);
   }
 }
diff --git 
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/prometheus/pipelines/PipelineFlowMetrics.java
 
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/prometheus/pipelines/PipelineFlowMetrics.java
index 8faede4017..12b160642e 100644
--- 
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/prometheus/pipelines/PipelineFlowMetrics.java
+++ 
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/prometheus/pipelines/PipelineFlowMetrics.java
@@ -23,25 +23,29 @@ import io.prometheus.client.Gauge;
 
 
 public class PipelineFlowMetrics {
-
-  public static final Gauge RECEIVED_TOTAL_DATA_GAUGE = 
StreamPipesCollectorRegistry.registerGauge(
+@Deprecated
+    public static final Gauge RECEIVED_TOTAL_DATA_GAUGE = 
StreamPipesCollectorRegistry.registerGauge(
       "received_total_data",
-      "Total amount of data received by the pipeline"
+      "DEPRECATED. Total amount of data received by the pipeline"
   );
-
+@Deprecated
   public static final Gauge PROCESSED_DATA_GAUGE = 
StreamPipesCollectorRegistry.registerGauge(
       "processed_data",
-      "Number of data obtained from pipeline processing"
+      "DEPRECATED. Number of data obtained from pipeline processing"
   );
-
+@Deprecated
   public static final Gauge ELEMENT_INPUT_TOTAL_DATA_GAUGE = 
StreamPipesCollectorRegistry.registerGauge(
       "element_input_total_data",
-      "Total amount of data received by elements"
+      "DEPRECATED. Total amount of data received by elements"
   );
-
+@Deprecated
   public static final Gauge ELEMENT_OUTPUT_TOTAL_DATA_GAUGE = 
StreamPipesCollectorRegistry.registerGauge(
       "element_output_total_data",
-      "Total amount of data sent by elements"
+      "DEPRECATED. Total amount of data sent by elements"
   );
 
+  public static final Gauge ELEMENT_FLOW_GAUGE = 
StreamPipesCollectorRegistry.registerGauge(
+                                                    
"sp_core_pipeline_element_data_total",
+                                                     "Total amount of data 
received/sent by a pipeline element (e.g., filter)","pipelineId", 
"elemenetId","elementType", "operation");
+
 }
diff --git 
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/prometheus/pipelines/PipelineFlowStats.java
 
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/prometheus/pipelines/PipelineFlowStats.java
index 5d159c9092..13cae56fd3 100644
--- 
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/prometheus/pipelines/PipelineFlowStats.java
+++ 
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/prometheus/pipelines/PipelineFlowStats.java
@@ -109,4 +109,9 @@ public class PipelineFlowStats {
 
   }
 
+  public void updateElementFlow(String pipelineId, String elementId, String 
elementType,  long valuesReceived, long valuesSend){
+    PipelineFlowMetrics.ELEMENT_FLOW_GAUGE.labels(pipelineId, 
elementId,elementType,"received").set(valuesReceived);
+     PipelineFlowMetrics.ELEMENT_FLOW_GAUGE.labels(pipelineId, 
elementId,elementType,"send").set(valuesSend);
+  }
+
 }
diff --git 
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/prometheus/pipelines/PipelinesMetrics.java
 
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/prometheus/pipelines/PipelinesMetrics.java
index 28e976a57d..34f7bfb6c5 100644
--- 
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/prometheus/pipelines/PipelinesMetrics.java
+++ 
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/prometheus/pipelines/PipelinesMetrics.java
@@ -21,37 +21,48 @@ import 
org.apache.streampipes.commons.prometheus.StreamPipesCollectorRegistry;
 
 import io.prometheus.client.Gauge;
 
-
 public class PipelinesMetrics {
 
-  public static final Gauge ALL_PIPELINES_GAUGE = 
StreamPipesCollectorRegistry.registerGauge(
-                                                   "all_pipelines",
-                                                    "Total number of 
pipelines");
-
-
-  public static final Gauge RUNNING_PIPELINES_GAUGE = 
StreamPipesCollectorRegistry.registerGauge(
-                                                   "running_pipelines",
-                                                    "Number of running 
pipelines");
-
-  public static final Gauge STOPPED_PIPELINES_GAUGE = 
StreamPipesCollectorRegistry.registerGauge(
-                                                    "stopped_pipelines",
-                                                     "Number of stopped 
pipelines");
-
-
-  public static final Gauge HEALTHY_PIPELINES_GAUGE = 
StreamPipesCollectorRegistry.registerGauge(
-                                                    "healthy_pipelines",
-                                                     "Number of healthy 
pipelines");
+    @Deprecated
+    public static final Gauge ALL_PIPELINES_GAUGE_LEGACY = 
StreamPipesCollectorRegistry.registerGauge(
+            "all_pipelines",
+            "DEPRECATED: Use sp_core_pipeline_count_total instead. Total 
number of pipelines");
 
-  public static final Gauge FAILED_PIPELINES_GAUGE = 
StreamPipesCollectorRegistry.registerGauge(
-                                                    "failed_pipelines",
-                                                     "Number of failed 
pipelines");
+    @Deprecated
+    public static final Gauge RUNNING_PIPELINES_GAUGE_LEGACY = 
StreamPipesCollectorRegistry.registerGauge(
+            "running_pipelines",
+            "DEPRECATED: Use sp_core_pipeline_running_state operation = 
running instead. Number of running pipelines");
+    @Deprecated
+    public static final Gauge STOPPED_PIPELINES_GAUGE_LEGACY = 
StreamPipesCollectorRegistry.registerGauge(
+            "stopped_pipelines",
+            "DEPRECATED: Use ssp_core_pipeline_running_state operation = 
stopped instead. Number of stopped pipelines");
 
-  public static final Gauge ATTENTION_REQUIRED_PIPELINES_GAUGE = 
StreamPipesCollectorRegistry.registerGauge(
-                                                    
"attention_required_pipelines",
-                                                     "Number of pipelines 
requiring attention");
+    @Deprecated
+    public static final Gauge HEALTHY_PIPELINES_GAUGE_LEGACY = 
StreamPipesCollectorRegistry.registerGauge(
+            "healthy_pipelines",
+            "DEPRECATED: Use sp_core_pipeline_health_state operation = OK 
instead. Number of healthy pipelines");
+    @Deprecated
+    public static final Gauge FAILED_PIPELINES_GAUGE_LEGACY = 
StreamPipesCollectorRegistry.registerGauge(
+            "failed_pipelines",
+            "DEPRECATED: Use sp_core_pipeline_health_state operation = FAILED 
instead. Number of failed pipelines");
+    @Deprecated
+    public static final Gauge ATTENTION_REQUIRED_PIPELINES_GAUGE_LEGACY = 
StreamPipesCollectorRegistry.registerGauge(
+            "attention_required_pipelines",
+            "DEPRECATED: Use sp_core_pipeline_health_state operation = 
ATTENTION_REQUIRED instead. Number of pipelines requiring attention");
+    @Deprecated
+    public static final Gauge ELEMENT_COUNT_GAUGE_LEGACY = 
StreamPipesCollectorRegistry.registerGauge(
+            "element_count",
+            "DEPRECATED. Total number of elements in the pipeline");
 
-  public static final Gauge ELEMENT_COUNT_GAUGE = 
StreamPipesCollectorRegistry.registerGauge(
-                                                    "element_count",
-                                                     "Total number of elements 
in the pipeline");
+    public static final Gauge ALL_PIPELINES_GAUGE = 
StreamPipesCollectorRegistry.registerGauge(
+            "sp_core_pipeline_count_total",
+            "Total number of pipelines");
+    public static final Gauge HEALTH_PIPELINES_GAUGE = 
StreamPipesCollectorRegistry.registerGauge(
+            "sp_core_pipeline_health_state",
+            "running pipelines per status (failed, attention, healthy)",
+            "pipelineId", "pipelineName", "operation");
+    public static final Gauge STATUS_PIPELINES_GAUGE = 
StreamPipesCollectorRegistry.registerGauge(
+            "sp_core_pipeline_running_state",
+            "Number of failed pipelines", "pipelineId", "pipelineName", 
"operation");
 
 }
diff --git 
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/prometheus/pipelines/PipelinesStats.java
 
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/prometheus/pipelines/PipelinesStats.java
index 038186ed27..c7a75de099 100644
--- 
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/prometheus/pipelines/PipelinesStats.java
+++ 
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/prometheus/pipelines/PipelinesStats.java
@@ -33,7 +33,6 @@ public class PipelinesStats {
 
   private int elementCount;
 
-
   public PipelinesStats() {
   }
 
@@ -45,7 +44,6 @@ public class PipelinesStats {
     this.allPipelines = allPipelines;
   }
 
-
   public int getRunningPipelines() {
     return runningPipelines;
   }
@@ -142,7 +140,6 @@ public class PipelinesStats {
     this.elementCount += 1;
   }
 
-
   public void clear() {
     this.allPipelines = 0;
     this.runningPipelines = 0;
@@ -153,14 +150,34 @@ public class PipelinesStats {
     this.elementCount = 0;
   }
 
-
   public void metrics() {
+    PipelinesMetrics.ALL_PIPELINES_GAUGE_LEGACY.set(this.allPipelines);
+    PipelinesMetrics.RUNNING_PIPELINES_GAUGE_LEGACY.set(this.runningPipelines);
+    PipelinesMetrics.STOPPED_PIPELINES_GAUGE_LEGACY.set(this.stoppedPipelines);
+    PipelinesMetrics.HEALTHY_PIPELINES_GAUGE_LEGACY.set(this.healthyPipelines);
+    PipelinesMetrics.FAILED_PIPELINES_GAUGE_LEGACY.set(this.failedPipelines);
+    
PipelinesMetrics.ATTENTION_REQUIRED_PIPELINES_GAUGE_LEGACY.set(this.attentionRequiredPipelines);
+    PipelinesMetrics.ELEMENT_COUNT_GAUGE_LEGACY.set(this.elementCount);
+
     PipelinesMetrics.ALL_PIPELINES_GAUGE.set(this.allPipelines);
-    PipelinesMetrics.RUNNING_PIPELINES_GAUGE.set(this.runningPipelines);
-    PipelinesMetrics.STOPPED_PIPELINES_GAUGE.set(this.stoppedPipelines);
-    PipelinesMetrics.HEALTHY_PIPELINES_GAUGE.set(this.healthyPipelines);
-    PipelinesMetrics.FAILED_PIPELINES_GAUGE.set(this.failedPipelines);
-    
PipelinesMetrics.ATTENTION_REQUIRED_PIPELINES_GAUGE.set(this.attentionRequiredPipelines);
-    PipelinesMetrics.ELEMENT_COUNT_GAUGE.set(this.elementCount);
+  }
+
+  public void updatePipelineRunningState(String pipelineId, String 
pipelineName, boolean state) {
+    PipelinesMetrics.STATUS_PIPELINES_GAUGE.labels(pipelineId, pipelineName, 
state ? "running" : "stopped")
+        .set(1);
+    PipelinesMetrics.STATUS_PIPELINES_GAUGE.labels(pipelineId, pipelineName, 
!state ? "running" : "stopped")
+        .set(0);
+  }
+
+  public void updatePipelineHealthState(String pipelineId, String 
pipelineName, String state) {
+   String[] statusElements = { "OK", "FAILURE", "REQUIRES_ATTENTION" };
+
+    for (String s : statusElements) {
+      if (s.equals(state)) {
+         PipelinesMetrics.HEALTH_PIPELINES_GAUGE.labels(pipelineId, 
pipelineName, s).set(1);
+      } else {
+         PipelinesMetrics.HEALTH_PIPELINES_GAUGE.labels(pipelineId, 
pipelineName, s).set(0);
+      }
+    }
   }
 }
diff --git 
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/prometheus/service/ElementServiceMetrics.java
 
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/prometheus/service/ElementServiceMetrics.java
index ed6f197ef2..317b5189ee 100644
--- 
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/prometheus/service/ElementServiceMetrics.java
+++ 
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/prometheus/service/ElementServiceMetrics.java
@@ -22,37 +22,59 @@ import 
org.apache.streampipes.commons.prometheus.StreamPipesCollectorRegistry;
 import io.prometheus.client.Gauge;
 
 /**
- * Service Metrics Manager Inherits PrometheusMetrics and implements 
service-related metric
+ * Service Metrics Manager Inherits PrometheusMetrics and implements
+ * service-related metric
  * registration
  */
 public class ElementServiceMetrics {
+  @Deprecated
+  public static final Gauge CPU_USAGE_LEGACY = StreamPipesCollectorRegistry
+      .registerGauge("cpu_usage", "DEPRECATED: Use 
sp_extension_cpu_usage_percentage instead. Element CPU usage percentage", 
"serviceId");
+  @Deprecated
+  public static final Gauge MEMORY_USAGE_LEGACY = StreamPipesCollectorRegistry
+      .registerGauge("memory_usage", "DEPRECATED: Use 
sp_extension_memory_usage_bytes instead. Element memory usage in bytes", 
"serviceId");
+  @Deprecated
+  public static final Gauge WEIGHT_LEGACY = StreamPipesCollectorRegistry
+      .registerGauge("weight", "DEPRECATED: Use 
sp_extension_weight_count_total instead. Weight of remaining available 
resources for element", "serviceId");
+  @Deprecated
+  public static final Gauge SYSTEM_LOAD_LEGACY = StreamPipesCollectorRegistry
+      .registerGauge("system_load", "DEPRECATED: Use 
sp_extension_system_load_last_minute instead. System load average over the last 
minute", "serviceId");
+  @Deprecated
+  public static final Gauge HISTORICAL_SYSTEM_LOAD_LEGACY = 
StreamPipesCollectorRegistry
+      .registerGauge("historical_system_load", "DEPRECATED: Use 
sp_extension_system_load_historic_average instead. Historical system load 
average", "serviceId");
 
   public static final Gauge CPU_USAGE = StreamPipesCollectorRegistry
-      .registerGauge("cpu_usage", "Element CPU usage percentage", "serviceId");
-
+      .registerGauge("sp_extension_cpu_usage_percentage", "Element CPU usage 
percentage", "serviceId");
   public static final Gauge MEMORY_USAGE = StreamPipesCollectorRegistry
-      .registerGauge("memory_usage", "Element memory usage in bytes", 
"serviceId");
+      .registerGauge("sp_extension_memory_usage_bytes", "Element memory usage 
in bytes", "serviceId");
   public static final Gauge WEIGHT = StreamPipesCollectorRegistry
-      .registerGauge("weight", "Weight of remaining available resources for 
element", "serviceId");
+      .registerGauge("sp_extension_weight_count_total", "Weight of remaining 
available resources for element",
+          "serviceId");
+
   public static final Gauge SYSTEM_LOAD = StreamPipesCollectorRegistry
-      .registerGauge("system_load", "System load average over the last 
minute", "serviceId");
+      .registerGauge("sp_extension_system_load_last_minute", "System load 
average over the last minute", "serviceId");
+
   public static final Gauge HISTORICAL_SYSTEM_LOAD = 
StreamPipesCollectorRegistry
-      .registerGauge("historical_system_load", "Historical system load 
average", "serviceId");
+      .registerGauge("sp_extension_system_load_historic_average", "Historical 
system load average", "serviceId");
 
   private final String id;
 
-
-
   public ElementServiceMetrics(String id) {
     this.id = id;
   }
 
   public void reportMetrics(double cpuUsage, double memoryUsage, double 
weight, double systemLoad,
-                            double historicalSystemLoad) {
+      double historicalSystemLoad) {
     CPU_USAGE.labels(this.id).set(cpuUsage);
     MEMORY_USAGE.labels(this.id).set(memoryUsage);
     WEIGHT.labels(this.id).set(weight);
     SYSTEM_LOAD.labels(this.id).set(systemLoad);
     HISTORICAL_SYSTEM_LOAD.labels(this.id).set(historicalSystemLoad);
+
+    CPU_USAGE_LEGACY.labels(this.id).set(cpuUsage);
+    MEMORY_USAGE_LEGACY.labels(this.id).set(memoryUsage);
+    WEIGHT_LEGACY.labels(this.id).set(weight);
+    SYSTEM_LOAD_LEGACY.labels(this.id).set(systemLoad);
+    HISTORICAL_SYSTEM_LOAD_LEGACY.labels(this.id).set(historicalSystemLoad);
   }
 }
diff --git 
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/prometheus/spmemorymanager/SpMemoryManagerMetrics.java
 
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/prometheus/spmemorymanager/SpMemoryManagerMetrics.java
index b1608b004e..e746e5e92d 100644
--- 
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/prometheus/spmemorymanager/SpMemoryManagerMetrics.java
+++ 
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/prometheus/spmemorymanager/SpMemoryManagerMetrics.java
@@ -26,19 +26,33 @@ import io.prometheus.client.Gauge;
  * Memory Manager Metrics Manager
  */
 public class SpMemoryManagerMetrics {
-    
-  public static final Gauge MEMORY_USED_BYTES = 
StreamPipesCollectorRegistry.registerGauge(
+  
+  @Deprecated
+  public static final Gauge MEMORY_USED_BYTES_LEGACY = 
StreamPipesCollectorRegistry.registerGauge(
         "sp_memory_used_bytes",
+        "DEPRECATED: Use sp_memory_used_bytes instead. Amount of memory used 
in bytes"
+  );
+  @Deprecated
+  public static final Gauge MEMORY_ALLOCATION_RATE_LEGACY = 
StreamPipesCollectorRegistry.registerGauge(
+        "sp_memory_allocation_rate_bytes_per_second",
+        "DEPRECATED: Use sp_memory_allocation_rate_bytes_per_second instead. 
Memory allocation rate in bytes per second"
+  );
+
+
+  public static final Gauge MEMORY_USED_BYTES = 
StreamPipesCollectorRegistry.registerGauge(
+        "sp_extension_memory_used_bytes",
         "Amount of memory used in bytes"
   );
 
   public static final Gauge MEMORY_ALLOCATION_RATE = 
StreamPipesCollectorRegistry.registerGauge(
-        "sp_memory_allocation_rate_bytes_per_second",
+        "sp_extension_memory_allocation_rate_bytes_per_second",
         "Memory allocation rate in bytes per second"
   );
-
   public static void updateCoreMetrics(double memoryUsedBytes, double 
allocationRate) {
     MEMORY_USED_BYTES.set(memoryUsedBytes);
     MEMORY_ALLOCATION_RATE.set(allocationRate);
+
+    MEMORY_USED_BYTES_LEGACY.set(memoryUsedBytes);
+    MEMORY_ALLOCATION_RATE_LEGACY.set(allocationRate);
   }
 }
diff --git 
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/prometheus/spratelimiter/SpRateLimiterMetrics.java
 
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/prometheus/spratelimiter/SpRateLimiterMetrics.java
index 0724cec394..b81f4fbe52 100644
--- 
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/prometheus/spratelimiter/SpRateLimiterMetrics.java
+++ 
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/prometheus/spratelimiter/SpRateLimiterMetrics.java
@@ -26,19 +26,32 @@ import io.prometheus.client.Gauge;
  * Rate Limiter Metrics Manager
  */
 public class SpRateLimiterMetrics {
-    
-  public static final Gauge RATE_LIMITER_QUEUE_SIZE = 
StreamPipesCollectorRegistry.registerGauge(
+  @Deprecated
+  public static final Gauge RATE_LIMITER_QUEUE_SIZE_LEGACY = 
StreamPipesCollectorRegistry.registerGauge(
         "sp_rate_limiter_queue_size",
+        "DEPRECATED: Use sp_extension_rate_limiter_queue_total instead. 
Current size of the waiting queue"
+  );
+  @Deprecated
+  public static final Gauge RATE_LIMITER_AVERAGE_WAIT_TIME_LEGACY = 
StreamPipesCollectorRegistry.registerGauge(
+        "sp_rate_limiter_average_wait_time_seconds",
+        "DEPRECATED: Use sp_extension_rate_limiter_average_wait_time_seconds 
instead. Average wait time for permit acquisition in seconds"
+  );
+
+  public static final Gauge RATE_LIMITER_QUEUE_SIZE = 
StreamPipesCollectorRegistry.registerGauge(
+        "sp_extension_rate_limiter_queue_total",
         "Current size of the waiting queue"
   );
 
   public static final Gauge RATE_LIMITER_AVERAGE_WAIT_TIME = 
StreamPipesCollectorRegistry.registerGauge(
-        "sp_rate_limiter_average_wait_time_seconds",
+        "sp_extension_rate_limiter_average_wait_time_seconds",
         "Average wait time for permit acquisition in seconds"
   );
 
   public static void updateCoreMetrics(double queueSize, double 
averageWaitTime) {
     RATE_LIMITER_QUEUE_SIZE.set(queueSize);
     RATE_LIMITER_AVERAGE_WAIT_TIME.set(averageWaitTime);
+
+    RATE_LIMITER_QUEUE_SIZE_LEGACY.set(queueSize);
+    RATE_LIMITER_AVERAGE_WAIT_TIME_LEGACY.set(averageWaitTime);
   }
 }
diff --git 
a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterUpdateManagement.java
 
b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterUpdateManagement.java
index 963e9e1677..15e0d52c19 100644
--- 
a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterUpdateManagement.java
+++ 
b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterUpdateManagement.java
@@ -19,6 +19,7 @@
 package org.apache.streampipes.connect.management.management;
 
 import org.apache.streampipes.commons.exceptions.connect.AdapterException;
+import org.apache.streampipes.commons.prometheus.pipelines.PipelinesStats;
 import org.apache.streampipes.manager.execution.PipelineExecutor;
 import org.apache.streampipes.manager.matching.PipelineVerificationHandlerV2;
 import org.apache.streampipes.manager.pipeline.PipelineManager;
@@ -51,6 +52,7 @@ public class AdapterUpdateManagement {
   private final AdapterMasterManagement adapterMasterManagement;
   private final AdapterResourceManager adapterResourceManager;
   private final DataStreamResourceManager dataStreamResourceManager;
+    private static final PipelinesStats pipelinesStats = new PipelinesStats();
 
   public AdapterUpdateManagement(AdapterMasterManagement 
adapterMasterManagement) {
     this.adapterMasterManagement = adapterMasterManagement;
@@ -88,6 +90,7 @@ public class AdapterUpdateManagement {
         var canAutoMigrate = canAutoMigrate(modificationMessage);
         if (!canAutoMigrate) {
           
modifiedPipeline.setHealthStatus(PipelineHealthStatus.REQUIRES_ATTENTION);
+          
pipelinesStats.updatePipelineHealthState(modifiedPipeline.getElementId(), 
modifiedPipeline.getName(),modifiedPipeline.getHealthStatus().toString());
           
modifiedPipeline.setPipelineNotifications(toNotification(updateInfo));
           modifiedPipeline.setValid(false);
         }
diff --git 
a/streampipes-load-balancer/src/main/java/org/apache/streampipes/loadbalance/pipeline/ExtensionsLogProvider.java
 
b/streampipes-load-balancer/src/main/java/org/apache/streampipes/loadbalance/pipeline/ExtensionsLogProvider.java
index 1ef42adcf8..3ba3f185f4 100644
--- 
a/streampipes-load-balancer/src/main/java/org/apache/streampipes/loadbalance/pipeline/ExtensionsLogProvider.java
+++ 
b/streampipes-load-balancer/src/main/java/org/apache/streampipes/loadbalance/pipeline/ExtensionsLogProvider.java
@@ -43,7 +43,7 @@ public enum   ExtensionsLogProvider {
 
   public void addMonitoringInfos(SpEndpointMonitoringInfo monitoringInfo) {
     allMetricsInfos.putAll(monitoringInfo.getMetricsInfos());
-    monitoringInfo.getLogInfos().forEach((key, value) -> {
+    monitoringInfo.getLogInfos().forEach((key, value) -> {    
       if (!allLogInfos.containsKey(key)) {
         allLogInfos.put(key, new ArrayList<>());
       }
@@ -118,15 +118,39 @@ public enum   ExtensionsLogProvider {
   public Map<String, SpMetricsEntry> getAllMetricsInfos() {
     return this.allMetricsInfos;
   }
-
+  
   private List<String> collectPipelineElementIds(Pipeline pipeline) {
     if (pipeline != null){
-    return Stream.concat(
-        pipeline.getSepas().stream().map(NamedStreamPipesEntity::getElementId),
-        
pipeline.getActions().stream().map(NamedStreamPipesEntity::getElementId)
-    ).collect(Collectors.toList());
+  return Stream.concat(
+        Stream.concat(
+            pipeline.getStreams().stream()
+               .map(s -> s.getCorrespondingAdapterId()),
+            pipeline.getSepas().stream()
+                .map(NamedStreamPipesEntity::getElementId)
+        ),
+        pipeline.getActions().stream()
+            .map(NamedStreamPipesEntity::getElementId)
+    )
+    .collect(Collectors.toList());
   }
   return List.of();
   }
 
+
+  public Map<String, Map<String, SpMetricsEntry>> 
getMetricsGroupedByPipeline() {
+
+    var allPipelines = StorageDispatcher.INSTANCE
+        .getNoSqlStore()
+        .getPipelineStorageAPI().findAll();
+
+    Map<String, Map<String, SpMetricsEntry>> result = new HashMap<>();
+
+    for (Pipeline pipeline : allPipelines) {
+        var metrics = 
ExtensionsLogProvider.INSTANCE.getMetricInfosForPipeline(pipeline.getPipelineId());
+        result.put(pipeline.getElementId(), metrics);
+    }
+
+    return result;
+}
+
 }
diff --git 
a/streampipes-load-balancer/src/main/java/org/apache/streampipes/loadbalance/pipeline/ExtensionsServiceLogExecutor.java
 
b/streampipes-load-balancer/src/main/java/org/apache/streampipes/loadbalance/pipeline/ExtensionsServiceLogExecutor.java
index fbf2de5344..95d5ea9648 100644
--- 
a/streampipes-load-balancer/src/main/java/org/apache/streampipes/loadbalance/pipeline/ExtensionsServiceLogExecutor.java
+++ 
b/streampipes-load-balancer/src/main/java/org/apache/streampipes/loadbalance/pipeline/ExtensionsServiceLogExecutor.java
@@ -77,8 +77,30 @@ public class ExtensionsServiceLogExecutor implements 
Runnable {
 
   private void updatePipelineFlow() {
     pipelineFlowStats.clear();
+   
ExtensionsLogProvider.INSTANCE.getMetricsGroupedByPipeline().forEach((pipelineId,
 data) -> {
+    data.forEach((k, v) -> {
+        // Total "in" count
+        long dataCountIn = v.getMessagesIn()
+                .values()
+                .stream()
+                .mapToLong(m -> m.getCounter())
+                .sum();
+
+        long dataCountOut = v.getMessagesOut().getCounter();
+
+        pipelineFlowStats.updateElementFlow(
+                pipelineId,
+               k,
+                InstanceIdExtractor.getSimpleName(k),
+                dataCountIn,
+                dataCountOut
+        );
+    });
+});
+/** When removing the deprectated gauges this also becomes deprecated */
     ExtensionsLogProvider.INSTANCE.getAllMetricsInfos().forEach((k, v) -> {
       String className = InstanceIdExtractor.getSimpleName(k);
+
       if 
(AdapterDescription.class.getSimpleName().toLowerCase().equals(className)) {
         
pipelineFlowStats.increaseReceivedTotalData(v.getMessagesOut().getCounter());
       } else if 
(DataProcessorInvocation.class.getSimpleName().toLowerCase().equals(className)) 
{
diff --git 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/task/StorePipelineStatusTask.java
 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/task/StorePipelineStatusTask.java
index 923d6c942b..fb7609e2be 100644
--- 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/task/StorePipelineStatusTask.java
+++ 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/task/StorePipelineStatusTask.java
@@ -18,6 +18,7 @@
 
 package org.apache.streampipes.manager.execution.task;
 
+import org.apache.streampipes.commons.prometheus.pipelines.PipelinesStats;
 import org.apache.streampipes.manager.execution.PipelineExecutionInfo;
 import org.apache.streampipes.model.pipeline.Pipeline;
 import org.apache.streampipes.model.pipeline.PipelineHealthStatus;
@@ -36,6 +37,7 @@ public class StorePipelineStatusTask implements 
PipelineExecutionTask {
 
   private final boolean start;
   private final boolean forceStop;
+  private final PipelinesStats pipelinesStats = new PipelinesStats();
 
   public StorePipelineStatusTask(boolean start,
                                  boolean forceStop) {
@@ -62,6 +64,9 @@ public class StorePipelineStatusTask implements 
PipelineExecutionTask {
   private void setPipelineStarted(Pipeline pipeline) {
     pipeline.setRunning(true);
     pipeline.setStartedAt(new Date().getTime());
+    
pipelinesStats.updatePipelineRunningState(pipeline.getElementId(),pipeline.getName()
+                                                                  ,  true);
+    
pipelinesStats.updatePipelineHealthState(pipeline.getElementId(),pipeline.getName(),
 pipeline.getHealthStatus().toString());
     try {
       getPipelineStorageApi().updateElement(pipeline);
     } catch (DocumentConflictException dce) {
@@ -71,6 +76,9 @@ public class StorePipelineStatusTask implements 
PipelineExecutionTask {
 
   private void setPipelineStopped(Pipeline pipeline) {
     pipeline.setRunning(false);
+    
pipelinesStats.updatePipelineRunningState(pipeline.getElementId(),pipeline.getName()
+                                                                  , false);
+    
pipelinesStats.updatePipelineHealthState(pipeline.getElementId(),pipeline.getName(),
 pipeline.getHealthStatus().toString());
     getPipelineStorageApi().updateElement(pipeline);
   }
 
diff --git 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/health/PipelineHealthCheck.java
 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/health/PipelineHealthCheck.java
index 7018dca8e4..06a6d3a8ec 100644
--- 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/health/PipelineHealthCheck.java
+++ 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/health/PipelineHealthCheck.java
@@ -17,7 +17,6 @@
  */
 package org.apache.streampipes.manager.health;
 
-
 import org.apache.streampipes.commons.constants.InstanceIdExtractor;
 import 
org.apache.streampipes.commons.exceptions.NoServiceEndpointsAvailableException;
 import org.apache.streampipes.commons.prometheus.pipelines.PipelinesStats;
@@ -69,6 +68,18 @@ public class PipelineHealthCheck implements Runnable {
     pipelinesStats.setStoppedPipelines(pipelinesStats.getAllPipelines()
         - pipelinesStats.getRunningPipelines());
 
+    for (Pipeline p : allPipelines) {
+      pipelinesStats.updatePipelineHealthState(
+          p.getElementId(),
+          p.getName(),
+          p.getHealthStatus().toString());
+
+      pipelinesStats.updatePipelineRunningState(
+          p.getElementId(),
+          p.getName(),
+          p.isRunning());
+    }
+
     if (!runningPipelines.isEmpty()) {
       Map<String, List<InvocableStreamPipesEntity>> endpointMap = 
generateEndpointMap();
       List<String> allRunningInstances = 
findRunningInstances(endpointMap.keySet());
@@ -78,8 +89,8 @@ public class PipelineHealthCheck implements Runnable {
         List<String> failedInstances = new ArrayList<>();
         List<String> recoveredInstances = new ArrayList<>();
         List<String> pipelineNotifications = new ArrayList<>();
-        List<InvocableStreamPipesEntity> graphs =
-            
RunningPipelineElementStorage.runningProcessorsAndSinks.get(pipeline.getPipelineId());
+        List<InvocableStreamPipesEntity> graphs = 
RunningPipelineElementStorage.runningProcessorsAndSinks
+            .get(pipeline.getPipelineId());
 
         graphs.forEach(graph -> {
           String instanceId = extractInstanceId(graph);
@@ -101,15 +112,15 @@ public class PipelineHealthCheck implements Runnable {
                 addFailedAttemptNotification(pipelineNotifications, graph);
                 increaseFailedAttempt(instanceId);
                 LOG.info("Could not restore pipeline element {} of pipeline {} 
({}/{})",
-                         graph.getName(), pipeline.getName(), 
failedRestartAttempts.get(instanceId),
-                         MAX_FAILED_ATTEMPTS);
+                    graph.getName(), pipeline.getName(), 
failedRestartAttempts.get(instanceId),
+                    MAX_FAILED_ATTEMPTS);
               } else {
                 recoveredInstances.add(instanceId);
                 addSuccessfulRestoreNotification(pipelineNotifications, graph);
                 resetFailedAttempts(instanceId);
                 graph.setSelectedEndpointUrl(endpointUrl);
                 LOG.info("Successfully restored pipeline element {} of 
pipeline {}",
-                         graph.getName(), pipeline.getName());
+                    graph.getName(), pipeline.getName());
               }
             }
           }
@@ -126,7 +137,10 @@ public class PipelineHealthCheck implements Runnable {
           currentPipeline.setPipelineNotifications(pipelineNotifications);
           StorageDispatcher.INSTANCE.getNoSqlStore().getPipelineStorageAPI()
               .updateElement(currentPipeline);
+          
pipelinesStats.updatePipelineHealthState(currentPipeline.getElementId(), 
currentPipeline.getName(),
+              currentPipeline.getHealthStatus().toString());
         }
+
       });
       int healthNum = pipelinesStats.getRunningPipelines() - 
pipelinesStats.getFailedPipelines()
           - pipelinesStats.getAttentionRequiredPipelines();
@@ -138,8 +152,7 @@ public class PipelineHealthCheck implements Runnable {
 
   private String findEndpointUrl(InvocableStreamPipesEntity graph)
       throws NoServiceEndpointsAvailableException {
-    SpServiceUrlProvider serviceUrlProvider =
-        ExtensionsServiceEndpointUtils.getPipelineElementType(graph);
+    SpServiceUrlProvider serviceUrlProvider = 
ExtensionsServiceEndpointUtils.getPipelineElementType(graph);
     return serviceUrlProvider.getInvocationUrl(graph.getSelectedEndpointUrl(), 
graph.getAppId());
   }
 
@@ -165,13 +178,13 @@ public class PipelineHealthCheck implements Runnable {
   }
 
   private void addSuccessfulRestoreNotification(List<String> 
pipelineNotifications,
-                                                InvocableStreamPipesEntity 
graph) {
+      InvocableStreamPipesEntity graph) {
     pipelineNotifications.add(getCurrentDatetime() + "Pipeline element '" + 
graph.getName()
         + "' was not available and was successfully restored.");
   }
 
   private void addFailedAttemptNotification(List<String> pipelineNotifications,
-                                            InvocableStreamPipesEntity graph) {
+      InvocableStreamPipesEntity graph) {
     pipelineNotifications.add(getCurrentDatetime() + "Pipeline element '" + 
graph.getName()
         + "' was not available and could not be restored.");
   }
@@ -186,7 +199,6 @@ public class PipelineHealthCheck implements Runnable {
     return InstanceIdExtractor.extractId(graph.getElementId());
   }
 
-
   private List<String> findRunningInstances(Set<String> endpoints) {
     List<String> allRunningInstances = new ArrayList<>();
     endpoints.forEach(endpoint -> {
@@ -210,7 +222,7 @@ public class PipelineHealthCheck implements Runnable {
   }
 
   private void addEndpoint(Map<String, List<InvocableStreamPipesEntity>> 
endpointMap,
-                           InvocableStreamPipesEntity graph) {
+      InvocableStreamPipesEntity graph) {
     String selectedEndpoint = graph.getSelectedEndpointUrl();
     if (!endpointMap.containsKey(selectedEndpoint)) {
       endpointMap.put(selectedEndpoint, new ArrayList<>());
@@ -225,7 +237,7 @@ public class PipelineHealthCheck implements Runnable {
     try {
       this.checkAndRestorePipelineElements();
     } catch (Exception e) {
-        LOG.error("Error while checking and restoring pipeline elements", e);
+      LOG.error("Error while checking and restoring pipeline elements", e);
     }
 
   }
diff --git 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/PipelineElementMigrationManager.java
 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/PipelineElementMigrationManager.java
index 7c84214d83..ac22d2783f 100644
--- 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/PipelineElementMigrationManager.java
+++ 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/PipelineElementMigrationManager.java
@@ -18,6 +18,7 @@
 
 package org.apache.streampipes.manager.migration;
 
+import org.apache.streampipes.commons.prometheus.pipelines.PipelinesStats;
 import org.apache.streampipes.manager.execution.PipelineExecutor;
 import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
 import 
org.apache.streampipes.model.extensions.svcdiscovery.SpServiceRegistration;
@@ -50,6 +51,7 @@ public class PipelineElementMigrationManager extends 
AbstractMigrationManager im
   private final IPipelineStorage pipelineStorage;
   private final IDataProcessorStorage dataProcessorStorage;
   private final IDataSinkStorage dataSinkStorage;
+  private final PipelinesStats pipelinesStats = new PipelinesStats();
 
   public PipelineElementMigrationManager(IPipelineStorage pipelineStorage,
                                          IDataProcessorStorage 
dataProcessorStorage,
@@ -166,6 +168,7 @@ public class PipelineElementMigrationManager extends 
AbstractMigrationManager im
         failedMigration -> "Failed migration of pipeline element: 
%s".formatted(failedMigration.message())
     ).toList());
     pipeline.setHealthStatus(PipelineHealthStatus.REQUIRES_ATTENTION);
+    pipelinesStats.updatePipelineHealthState(pipeline.getPipelineId(), 
pipeline.getName(), pipeline.getHealthStatus().toString());
 
     pipelineStorage.updateElement(pipeline);
 


Reply via email to