This is an automated email from the ASF dual-hosted git repository.
zehnder pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/streampipes.git
The following commit(s) were added to refs/heads/dev by this push:
new 6b4e815b41 Refactored Prometheus Endpoints to new naming convention
and functioning (#4039)
6b4e815b41 is described below
commit 6b4e815b410388180414368e966efea4f2121db3
Author: Jacqueline Höllig <[email protected]>
AuthorDate: Mon Dec 15 13:08:56 2025 +0100
Refactored Prometheus Endpoints to new naming convention and functioning
(#4039)
Co-authored-by: Philipp Zehnder <[email protected]>
---
.../commons/prometheus/adapter/AdapterMetrics.java | 28 ++++++++--
.../loadbalancer/LoadBalancerMetrics.java | 24 ++++++--
.../prometheus/pipelines/PipelineFlowMetrics.java | 22 +++++---
.../prometheus/pipelines/PipelineFlowStats.java | 5 ++
.../prometheus/pipelines/PipelinesMetrics.java | 65 +++++++++++++---------
.../prometheus/pipelines/PipelinesStats.java | 37 ++++++++----
.../prometheus/service/ElementServiceMetrics.java | 42 ++++++++++----
.../spmemorymanager/SpMemoryManagerMetrics.java | 22 ++++++--
.../spratelimiter/SpRateLimiterMetrics.java | 19 ++++++-
.../management/AdapterUpdateManagement.java | 3 +
.../pipeline/ExtensionsLogProvider.java | 36 ++++++++++--
.../pipeline/ExtensionsServiceLogExecutor.java | 22 ++++++++
.../execution/task/StorePipelineStatusTask.java | 8 +++
.../manager/health/PipelineHealthCheck.java | 38 ++++++++-----
.../migration/PipelineElementMigrationManager.java | 3 +
15 files changed, 283 insertions(+), 91 deletions(-)
diff --git
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/prometheus/adapter/AdapterMetrics.java
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/prometheus/adapter/AdapterMetrics.java
index 174531fb6e..99039046e6 100644
---
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/prometheus/adapter/AdapterMetrics.java
+++
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/prometheus/adapter/AdapterMetrics.java
@@ -47,16 +47,33 @@ public class AdapterMetrics {
private static final String ELEMENT_NOT_FOUND_TEMPLATE = "No entry for
adapter '%s' found. Please register it first.";
- private final Gauge totalAdapterEventsPublishedMetric;
+/**
+ * @deprecated Use {@link #totalAdapterEventsPublishedMetric} instead.
+ */
+@Deprecated
+public final Gauge totalAdapterEventsPublishedMetricLegacy;
- public AdapterMetrics() {
- this.totalAdapterEventsPublishedMetric =
StreamPipesCollectorRegistry.registerGauge(
+public final Gauge totalAdapterEventsPublishedMetric;
+
+public AdapterMetrics() {
+
+ this.totalAdapterEventsPublishedMetricLegacy =
StreamPipesCollectorRegistry.registerGauge(
"adapter_events_published_total",
+ "DEPRECATED: Use sp_core_adapter_instance_events_published_total
instead. Total amount of events published per adapter",
+ "adapterId", "adapterName"
+ );
+
+ this.totalAdapterEventsPublishedMetric =
StreamPipesCollectorRegistry.registerGauge(
+ "sp_core_adapter_instance_events_published_total",
"Total amount of events published per adapter",
"adapterId", "adapterName"
);
+
this.registeredAdapters = new HashMap<>();
- }
+}
+
+
+
/**
* Registers a new adapter with the given adapterId and adapterName.
@@ -93,6 +110,8 @@ public class AdapterMetrics {
// Order of labels needs to be preserved and match the one in
AdapterMetrics#register
totalAdapterEventsPublishedMetric.labels(adapterId, adapterName)
.set(totalEventsPublished);
+ totalAdapterEventsPublishedMetricLegacy.labels(adapterId, adapterName)
+ .set(totalEventsPublished);
} else {
throw new
NoSuchElementException(ELEMENT_NOT_FOUND_TEMPLATE.formatted(adapterId));
}
@@ -109,6 +128,7 @@ public class AdapterMetrics {
public void remove(String adapterId, String adapterName) {
if (contains(adapterId)) {
this.totalAdapterEventsPublishedMetric.remove(adapterId, adapterName);
+ this.totalAdapterEventsPublishedMetricLegacy.remove(adapterId,
adapterName);
this.registeredAdapters.remove(adapterId);
} else {
throw new
NoSuchElementException(ELEMENT_NOT_FOUND_TEMPLATE.formatted(adapterId));
diff --git
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/prometheus/loadbalancer/LoadBalancerMetrics.java
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/prometheus/loadbalancer/LoadBalancerMetrics.java
index 685182def4..9e4625525c 100644
---
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/prometheus/loadbalancer/LoadBalancerMetrics.java
+++
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/prometheus/loadbalancer/LoadBalancerMetrics.java
@@ -26,25 +26,39 @@ import io.prometheus.client.Gauge;
*/
public class LoadBalancerMetrics {
+ @Deprecated
+ public static final Gauge SERVICE_ADAPTER_COUNT_LEGACY =
StreamPipesCollectorRegistry
+ .registerGauge("lb_service_adapter_count", "DEPRECATED: Use
sp_extension_adapter_count_total instead. Number of adapters in each extension
service",
+ "serviceId");
+ @Deprecated
+ public static final Gauge SERVICE_PIPELINE_COUNT_LEGACY =
StreamPipesCollectorRegistry
+ .registerGauge("lb_service_pipeline_count", "DEPRECATED: Use
sp_extension_pipeline_count_total instead.Number of pipelines in each extension
service",
+ "serviceId");
+ @Deprecated
+ public static final Gauge MIGRATION_TIME_SECONDS_LEGACY =
StreamPipesCollectorRegistry
+ .registerGauge("lb_migration_time_seconds", "DEPRECATED: Use
sp_core_migration_time_seconds instead. Time taken for pipeline migration in
seconds");
+
public static final Gauge SERVICE_ADAPTER_COUNT =
StreamPipesCollectorRegistry
- .registerGauge("lb_service_adapter_count", "Number of adapters in each
extension service",
+ .registerGauge("sp_extension_adapter_count_total", "Number of adapters
in each extension service",
"serviceId");
-
public static final Gauge SERVICE_PIPELINE_COUNT =
StreamPipesCollectorRegistry
- .registerGauge("lb_service_pipeline_count", "Number of pipelines in each
extension service",
+ .registerGauge("sp_extension_pipeline_count_total", "Number of pipelines
in each extension service",
"serviceId");
-
public static final Gauge MIGRATION_TIME_SECONDS =
StreamPipesCollectorRegistry
- .registerGauge("lb_migration_time_seconds", "Time taken for pipeline
migration in seconds");
+ .registerGauge("sp_core_migration_time_seconds", "Time taken for
pipeline migration in seconds");
public LoadBalancerMetrics() {}
public void reportMetrics(String serviceId, int serviceAdapterCount, int
servicePipelineCount) {
SERVICE_ADAPTER_COUNT.labels(serviceId).set(serviceAdapterCount);
SERVICE_PIPELINE_COUNT.labels(serviceId).set(servicePipelineCount);
+
+ SERVICE_ADAPTER_COUNT_LEGACY.labels(serviceId).set(serviceAdapterCount);
+ SERVICE_PIPELINE_COUNT_LEGACY.labels(serviceId).set(servicePipelineCount);
}
public void reportMigrationTime(double seconds) {
MIGRATION_TIME_SECONDS.set(seconds);
+ MIGRATION_TIME_SECONDS_LEGACY.set(seconds);
}
}
diff --git
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/prometheus/pipelines/PipelineFlowMetrics.java
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/prometheus/pipelines/PipelineFlowMetrics.java
index 8faede4017..12b160642e 100644
---
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/prometheus/pipelines/PipelineFlowMetrics.java
+++
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/prometheus/pipelines/PipelineFlowMetrics.java
@@ -23,25 +23,29 @@ import io.prometheus.client.Gauge;
public class PipelineFlowMetrics {
-
- public static final Gauge RECEIVED_TOTAL_DATA_GAUGE =
StreamPipesCollectorRegistry.registerGauge(
+@Deprecated
+ public static final Gauge RECEIVED_TOTAL_DATA_GAUGE =
StreamPipesCollectorRegistry.registerGauge(
"received_total_data",
- "Total amount of data received by the pipeline"
+ "DEPRECATED. Total amount of data received by the pipeline"
);
-
+@Deprecated
public static final Gauge PROCESSED_DATA_GAUGE =
StreamPipesCollectorRegistry.registerGauge(
"processed_data",
- "Number of data obtained from pipeline processing"
+ "DEPRECATED. Number of data obtained from pipeline processing"
);
-
+@Deprecated
public static final Gauge ELEMENT_INPUT_TOTAL_DATA_GAUGE =
StreamPipesCollectorRegistry.registerGauge(
"element_input_total_data",
- "Total amount of data received by elements"
+ "DEPRECATED. Total amount of data received by elements"
);
-
+@Deprecated
public static final Gauge ELEMENT_OUTPUT_TOTAL_DATA_GAUGE =
StreamPipesCollectorRegistry.registerGauge(
"element_output_total_data",
- "Total amount of data sent by elements"
+ "DEPRECATED. Total amount of data sent by elements"
);
+ public static final Gauge ELEMENT_FLOW_GAUGE =
StreamPipesCollectorRegistry.registerGauge(
+
"sp_core_pipeline_element_data_total",
+ "Total amount of data
received/sent by a pipeline element (e.g., filter)","pipelineId",
"elemenetId","elementType", "operation");
+
}
diff --git
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/prometheus/pipelines/PipelineFlowStats.java
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/prometheus/pipelines/PipelineFlowStats.java
index 5d159c9092..13cae56fd3 100644
---
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/prometheus/pipelines/PipelineFlowStats.java
+++
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/prometheus/pipelines/PipelineFlowStats.java
@@ -109,4 +109,9 @@ public class PipelineFlowStats {
}
+ public void updateElementFlow(String pipelineId, String elementId, String
elementType, long valuesReceived, long valuesSend){
+ PipelineFlowMetrics.ELEMENT_FLOW_GAUGE.labels(pipelineId,
elementId,elementType,"received").set(valuesReceived);
+ PipelineFlowMetrics.ELEMENT_FLOW_GAUGE.labels(pipelineId,
elementId,elementType,"send").set(valuesSend);
+ }
+
}
diff --git
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/prometheus/pipelines/PipelinesMetrics.java
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/prometheus/pipelines/PipelinesMetrics.java
index 28e976a57d..34f7bfb6c5 100644
---
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/prometheus/pipelines/PipelinesMetrics.java
+++
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/prometheus/pipelines/PipelinesMetrics.java
@@ -21,37 +21,48 @@ import
org.apache.streampipes.commons.prometheus.StreamPipesCollectorRegistry;
import io.prometheus.client.Gauge;
-
public class PipelinesMetrics {
- public static final Gauge ALL_PIPELINES_GAUGE =
StreamPipesCollectorRegistry.registerGauge(
- "all_pipelines",
- "Total number of
pipelines");
-
-
- public static final Gauge RUNNING_PIPELINES_GAUGE =
StreamPipesCollectorRegistry.registerGauge(
- "running_pipelines",
- "Number of running
pipelines");
-
- public static final Gauge STOPPED_PIPELINES_GAUGE =
StreamPipesCollectorRegistry.registerGauge(
- "stopped_pipelines",
- "Number of stopped
pipelines");
-
-
- public static final Gauge HEALTHY_PIPELINES_GAUGE =
StreamPipesCollectorRegistry.registerGauge(
- "healthy_pipelines",
- "Number of healthy
pipelines");
+ @Deprecated
+ public static final Gauge ALL_PIPELINES_GAUGE_LEGACY =
StreamPipesCollectorRegistry.registerGauge(
+ "all_pipelines",
+ "DEPRECATED: Use sp_core_pipeline_count_total instead. Total
number of pipelines");
- public static final Gauge FAILED_PIPELINES_GAUGE =
StreamPipesCollectorRegistry.registerGauge(
- "failed_pipelines",
- "Number of failed
pipelines");
+ @Deprecated
+ public static final Gauge RUNNING_PIPELINES_GAUGE_LEGACY =
StreamPipesCollectorRegistry.registerGauge(
+ "running_pipelines",
+ "DEPRECATED: Use sp_core_pipeline_running_state operation =
running instead. Number of running pipelines");
+ @Deprecated
+ public static final Gauge STOPPED_PIPELINES_GAUGE_LEGACY =
StreamPipesCollectorRegistry.registerGauge(
+ "stopped_pipelines",
+ "DEPRECATED: Use ssp_core_pipeline_running_state operation =
stopped instead. Number of stopped pipelines");
- public static final Gauge ATTENTION_REQUIRED_PIPELINES_GAUGE =
StreamPipesCollectorRegistry.registerGauge(
-
"attention_required_pipelines",
- "Number of pipelines
requiring attention");
+ @Deprecated
+ public static final Gauge HEALTHY_PIPELINES_GAUGE_LEGACY =
StreamPipesCollectorRegistry.registerGauge(
+ "healthy_pipelines",
+ "DEPRECATED: Use sp_core_pipeline_health_state operation = OK
instead. Number of healthy pipelines");
+ @Deprecated
+ public static final Gauge FAILED_PIPELINES_GAUGE_LEGACY =
StreamPipesCollectorRegistry.registerGauge(
+ "failed_pipelines",
+ "DEPRECATED: Use sp_core_pipeline_health_state operation = FAILED
instead. Number of failed pipelines");
+ @Deprecated
+ public static final Gauge ATTENTION_REQUIRED_PIPELINES_GAUGE_LEGACY =
StreamPipesCollectorRegistry.registerGauge(
+ "attention_required_pipelines",
+ "DEPRECATED: Use sp_core_pipeline_health_state operation =
ATTENTION_REQUIRED instead. Number of pipelines requiring attention");
+ @Deprecated
+ public static final Gauge ELEMENT_COUNT_GAUGE_LEGACY =
StreamPipesCollectorRegistry.registerGauge(
+ "element_count",
+ "DEPRECATED. Total number of elements in the pipeline");
- public static final Gauge ELEMENT_COUNT_GAUGE =
StreamPipesCollectorRegistry.registerGauge(
- "element_count",
- "Total number of elements
in the pipeline");
+ public static final Gauge ALL_PIPELINES_GAUGE =
StreamPipesCollectorRegistry.registerGauge(
+ "sp_core_pipeline_count_total",
+ "Total number of pipelines");
+ public static final Gauge HEALTH_PIPELINES_GAUGE =
StreamPipesCollectorRegistry.registerGauge(
+ "sp_core_pipeline_health_state",
+ "running pipelines per status (failed, attention, healthy)",
+ "pipelineId", "pipelineName", "operation");
+ public static final Gauge STATUS_PIPELINES_GAUGE =
StreamPipesCollectorRegistry.registerGauge(
+ "sp_core_pipeline_running_state",
+ "Number of failed pipelines", "pipelineId", "pipelineName",
"operation");
}
diff --git
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/prometheus/pipelines/PipelinesStats.java
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/prometheus/pipelines/PipelinesStats.java
index 038186ed27..c7a75de099 100644
---
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/prometheus/pipelines/PipelinesStats.java
+++
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/prometheus/pipelines/PipelinesStats.java
@@ -33,7 +33,6 @@ public class PipelinesStats {
private int elementCount;
-
public PipelinesStats() {
}
@@ -45,7 +44,6 @@ public class PipelinesStats {
this.allPipelines = allPipelines;
}
-
public int getRunningPipelines() {
return runningPipelines;
}
@@ -142,7 +140,6 @@ public class PipelinesStats {
this.elementCount += 1;
}
-
public void clear() {
this.allPipelines = 0;
this.runningPipelines = 0;
@@ -153,14 +150,34 @@ public class PipelinesStats {
this.elementCount = 0;
}
-
public void metrics() {
+ PipelinesMetrics.ALL_PIPELINES_GAUGE_LEGACY.set(this.allPipelines);
+ PipelinesMetrics.RUNNING_PIPELINES_GAUGE_LEGACY.set(this.runningPipelines);
+ PipelinesMetrics.STOPPED_PIPELINES_GAUGE_LEGACY.set(this.stoppedPipelines);
+ PipelinesMetrics.HEALTHY_PIPELINES_GAUGE_LEGACY.set(this.healthyPipelines);
+ PipelinesMetrics.FAILED_PIPELINES_GAUGE_LEGACY.set(this.failedPipelines);
+
PipelinesMetrics.ATTENTION_REQUIRED_PIPELINES_GAUGE_LEGACY.set(this.attentionRequiredPipelines);
+ PipelinesMetrics.ELEMENT_COUNT_GAUGE_LEGACY.set(this.elementCount);
+
PipelinesMetrics.ALL_PIPELINES_GAUGE.set(this.allPipelines);
- PipelinesMetrics.RUNNING_PIPELINES_GAUGE.set(this.runningPipelines);
- PipelinesMetrics.STOPPED_PIPELINES_GAUGE.set(this.stoppedPipelines);
- PipelinesMetrics.HEALTHY_PIPELINES_GAUGE.set(this.healthyPipelines);
- PipelinesMetrics.FAILED_PIPELINES_GAUGE.set(this.failedPipelines);
-
PipelinesMetrics.ATTENTION_REQUIRED_PIPELINES_GAUGE.set(this.attentionRequiredPipelines);
- PipelinesMetrics.ELEMENT_COUNT_GAUGE.set(this.elementCount);
+ }
+
+ public void updatePipelineRunningState(String pipelineId, String
pipelineName, boolean state) {
+ PipelinesMetrics.STATUS_PIPELINES_GAUGE.labels(pipelineId, pipelineName,
state ? "running" : "stopped")
+ .set(1);
+ PipelinesMetrics.STATUS_PIPELINES_GAUGE.labels(pipelineId, pipelineName,
!state ? "running" : "stopped")
+ .set(0);
+ }
+
+ public void updatePipelineHealthState(String pipelineId, String
pipelineName, String state) {
+ String[] statusElements = { "OK", "FAILURE", "REQUIRES_ATTENTION" };
+
+ for (String s : statusElements) {
+ if (s.equals(state)) {
+ PipelinesMetrics.HEALTH_PIPELINES_GAUGE.labels(pipelineId,
pipelineName, s).set(1);
+ } else {
+ PipelinesMetrics.HEALTH_PIPELINES_GAUGE.labels(pipelineId,
pipelineName, s).set(0);
+ }
+ }
}
}
diff --git
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/prometheus/service/ElementServiceMetrics.java
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/prometheus/service/ElementServiceMetrics.java
index ed6f197ef2..317b5189ee 100644
---
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/prometheus/service/ElementServiceMetrics.java
+++
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/prometheus/service/ElementServiceMetrics.java
@@ -22,37 +22,59 @@ import
org.apache.streampipes.commons.prometheus.StreamPipesCollectorRegistry;
import io.prometheus.client.Gauge;
/**
- * Service Metrics Manager Inherits PrometheusMetrics and implements
service-related metric
+ * Service Metrics Manager Inherits PrometheusMetrics and implements
+ * service-related metric
* registration
*/
public class ElementServiceMetrics {
+ @Deprecated
+ public static final Gauge CPU_USAGE_LEGACY = StreamPipesCollectorRegistry
+ .registerGauge("cpu_usage", "DEPRECATED: Use
sp_extension_cpu_usage_percentage instead. Element CPU usage percentage",
"serviceId");
+ @Deprecated
+ public static final Gauge MEMORY_USAGE_LEGACY = StreamPipesCollectorRegistry
+ .registerGauge("memory_usage", "DEPRECATED: Use
sp_extension_memory_usage_bytes instead. Element memory usage in bytes",
"serviceId");
+ @Deprecated
+ public static final Gauge WEIGHT_LEGACY = StreamPipesCollectorRegistry
+ .registerGauge("weight", "DEPRECATED: Use
sp_extension_weight_count_total instead. Weight of remaining available
resources for element", "serviceId");
+ @Deprecated
+ public static final Gauge SYSTEM_LOAD_LEGACY = StreamPipesCollectorRegistry
+ .registerGauge("system_load", "DEPRECATED: Use
sp_extension_system_load_last_minute instead. System load average over the last
minute", "serviceId");
+ @Deprecated
+ public static final Gauge HISTORICAL_SYSTEM_LOAD_LEGACY =
StreamPipesCollectorRegistry
+ .registerGauge("historical_system_load", "DEPRECATED: Use
sp_extension_system_load_historic_average instead. Historical system load
average", "serviceId");
public static final Gauge CPU_USAGE = StreamPipesCollectorRegistry
- .registerGauge("cpu_usage", "Element CPU usage percentage", "serviceId");
-
+ .registerGauge("sp_extension_cpu_usage_percentage", "Element CPU usage
percentage", "serviceId");
public static final Gauge MEMORY_USAGE = StreamPipesCollectorRegistry
- .registerGauge("memory_usage", "Element memory usage in bytes",
"serviceId");
+ .registerGauge("sp_extension_memory_usage_bytes", "Element memory usage
in bytes", "serviceId");
public static final Gauge WEIGHT = StreamPipesCollectorRegistry
- .registerGauge("weight", "Weight of remaining available resources for
element", "serviceId");
+ .registerGauge("sp_extension_weight_count_total", "Weight of remaining
available resources for element",
+ "serviceId");
+
public static final Gauge SYSTEM_LOAD = StreamPipesCollectorRegistry
- .registerGauge("system_load", "System load average over the last
minute", "serviceId");
+ .registerGauge("sp_extension_system_load_last_minute", "System load
average over the last minute", "serviceId");
+
public static final Gauge HISTORICAL_SYSTEM_LOAD =
StreamPipesCollectorRegistry
- .registerGauge("historical_system_load", "Historical system load
average", "serviceId");
+ .registerGauge("sp_extension_system_load_historic_average", "Historical
system load average", "serviceId");
private final String id;
-
-
public ElementServiceMetrics(String id) {
this.id = id;
}
public void reportMetrics(double cpuUsage, double memoryUsage, double
weight, double systemLoad,
- double historicalSystemLoad) {
+ double historicalSystemLoad) {
CPU_USAGE.labels(this.id).set(cpuUsage);
MEMORY_USAGE.labels(this.id).set(memoryUsage);
WEIGHT.labels(this.id).set(weight);
SYSTEM_LOAD.labels(this.id).set(systemLoad);
HISTORICAL_SYSTEM_LOAD.labels(this.id).set(historicalSystemLoad);
+
+ CPU_USAGE_LEGACY.labels(this.id).set(cpuUsage);
+ MEMORY_USAGE_LEGACY.labels(this.id).set(memoryUsage);
+ WEIGHT_LEGACY.labels(this.id).set(weight);
+ SYSTEM_LOAD_LEGACY.labels(this.id).set(systemLoad);
+ HISTORICAL_SYSTEM_LOAD_LEGACY.labels(this.id).set(historicalSystemLoad);
}
}
diff --git
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/prometheus/spmemorymanager/SpMemoryManagerMetrics.java
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/prometheus/spmemorymanager/SpMemoryManagerMetrics.java
index b1608b004e..e746e5e92d 100644
---
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/prometheus/spmemorymanager/SpMemoryManagerMetrics.java
+++
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/prometheus/spmemorymanager/SpMemoryManagerMetrics.java
@@ -26,19 +26,33 @@ import io.prometheus.client.Gauge;
* Memory Manager Metrics Manager
*/
public class SpMemoryManagerMetrics {
-
- public static final Gauge MEMORY_USED_BYTES =
StreamPipesCollectorRegistry.registerGauge(
+
+ @Deprecated
+ public static final Gauge MEMORY_USED_BYTES_LEGACY =
StreamPipesCollectorRegistry.registerGauge(
"sp_memory_used_bytes",
+ "DEPRECATED: Use sp_memory_used_bytes instead. Amount of memory used
in bytes"
+ );
+ @Deprecated
+ public static final Gauge MEMORY_ALLOCATION_RATE_LEGACY =
StreamPipesCollectorRegistry.registerGauge(
+ "sp_memory_allocation_rate_bytes_per_second",
+ "DEPRECATED: Use sp_memory_allocation_rate_bytes_per_second instead.
Memory allocation rate in bytes per second"
+ );
+
+
+ public static final Gauge MEMORY_USED_BYTES =
StreamPipesCollectorRegistry.registerGauge(
+ "sp_extension_memory_used_bytes",
"Amount of memory used in bytes"
);
public static final Gauge MEMORY_ALLOCATION_RATE =
StreamPipesCollectorRegistry.registerGauge(
- "sp_memory_allocation_rate_bytes_per_second",
+ "sp_extension_memory_allocation_rate_bytes_per_second",
"Memory allocation rate in bytes per second"
);
-
public static void updateCoreMetrics(double memoryUsedBytes, double
allocationRate) {
MEMORY_USED_BYTES.set(memoryUsedBytes);
MEMORY_ALLOCATION_RATE.set(allocationRate);
+
+ MEMORY_USED_BYTES_LEGACY.set(memoryUsedBytes);
+ MEMORY_ALLOCATION_RATE_LEGACY.set(allocationRate);
}
}
diff --git
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/prometheus/spratelimiter/SpRateLimiterMetrics.java
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/prometheus/spratelimiter/SpRateLimiterMetrics.java
index 0724cec394..b81f4fbe52 100644
---
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/prometheus/spratelimiter/SpRateLimiterMetrics.java
+++
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/prometheus/spratelimiter/SpRateLimiterMetrics.java
@@ -26,19 +26,32 @@ import io.prometheus.client.Gauge;
* Rate Limiter Metrics Manager
*/
public class SpRateLimiterMetrics {
-
- public static final Gauge RATE_LIMITER_QUEUE_SIZE =
StreamPipesCollectorRegistry.registerGauge(
+ @Deprecated
+ public static final Gauge RATE_LIMITER_QUEUE_SIZE_LEGACY =
StreamPipesCollectorRegistry.registerGauge(
"sp_rate_limiter_queue_size",
+ "DEPRECATED: Use sp_extension_rate_limiter_queue_total instead.
Current size of the waiting queue"
+ );
+ @Deprecated
+ public static final Gauge RATE_LIMITER_AVERAGE_WAIT_TIME_LEGACY =
StreamPipesCollectorRegistry.registerGauge(
+ "sp_rate_limiter_average_wait_time_seconds",
+ "DEPRECATED: Use sp_extension_rate_limiter_average_wait_time_seconds
instead. Average wait time for permit acquisition in seconds"
+ );
+
+ public static final Gauge RATE_LIMITER_QUEUE_SIZE =
StreamPipesCollectorRegistry.registerGauge(
+ "sp_extension_rate_limiter_queue_total",
"Current size of the waiting queue"
);
public static final Gauge RATE_LIMITER_AVERAGE_WAIT_TIME =
StreamPipesCollectorRegistry.registerGauge(
- "sp_rate_limiter_average_wait_time_seconds",
+ "sp_extension_rate_limiter_average_wait_time_seconds",
"Average wait time for permit acquisition in seconds"
);
public static void updateCoreMetrics(double queueSize, double
averageWaitTime) {
RATE_LIMITER_QUEUE_SIZE.set(queueSize);
RATE_LIMITER_AVERAGE_WAIT_TIME.set(averageWaitTime);
+
+ RATE_LIMITER_QUEUE_SIZE_LEGACY.set(queueSize);
+ RATE_LIMITER_AVERAGE_WAIT_TIME_LEGACY.set(averageWaitTime);
}
}
diff --git
a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterUpdateManagement.java
b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterUpdateManagement.java
index 963e9e1677..15e0d52c19 100644
---
a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterUpdateManagement.java
+++
b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterUpdateManagement.java
@@ -19,6 +19,7 @@
package org.apache.streampipes.connect.management.management;
import org.apache.streampipes.commons.exceptions.connect.AdapterException;
+import org.apache.streampipes.commons.prometheus.pipelines.PipelinesStats;
import org.apache.streampipes.manager.execution.PipelineExecutor;
import org.apache.streampipes.manager.matching.PipelineVerificationHandlerV2;
import org.apache.streampipes.manager.pipeline.PipelineManager;
@@ -51,6 +52,7 @@ public class AdapterUpdateManagement {
private final AdapterMasterManagement adapterMasterManagement;
private final AdapterResourceManager adapterResourceManager;
private final DataStreamResourceManager dataStreamResourceManager;
+ private static final PipelinesStats pipelinesStats = new PipelinesStats();
public AdapterUpdateManagement(AdapterMasterManagement
adapterMasterManagement) {
this.adapterMasterManagement = adapterMasterManagement;
@@ -88,6 +90,7 @@ public class AdapterUpdateManagement {
var canAutoMigrate = canAutoMigrate(modificationMessage);
if (!canAutoMigrate) {
modifiedPipeline.setHealthStatus(PipelineHealthStatus.REQUIRES_ATTENTION);
+
pipelinesStats.updatePipelineHealthState(modifiedPipeline.getElementId(),
modifiedPipeline.getName(),modifiedPipeline.getHealthStatus().toString());
modifiedPipeline.setPipelineNotifications(toNotification(updateInfo));
modifiedPipeline.setValid(false);
}
diff --git
a/streampipes-load-balancer/src/main/java/org/apache/streampipes/loadbalance/pipeline/ExtensionsLogProvider.java
b/streampipes-load-balancer/src/main/java/org/apache/streampipes/loadbalance/pipeline/ExtensionsLogProvider.java
index 1ef42adcf8..3ba3f185f4 100644
---
a/streampipes-load-balancer/src/main/java/org/apache/streampipes/loadbalance/pipeline/ExtensionsLogProvider.java
+++
b/streampipes-load-balancer/src/main/java/org/apache/streampipes/loadbalance/pipeline/ExtensionsLogProvider.java
@@ -43,7 +43,7 @@ public enum ExtensionsLogProvider {
public void addMonitoringInfos(SpEndpointMonitoringInfo monitoringInfo) {
allMetricsInfos.putAll(monitoringInfo.getMetricsInfos());
- monitoringInfo.getLogInfos().forEach((key, value) -> {
+ monitoringInfo.getLogInfos().forEach((key, value) -> {
if (!allLogInfos.containsKey(key)) {
allLogInfos.put(key, new ArrayList<>());
}
@@ -118,15 +118,39 @@ public enum ExtensionsLogProvider {
public Map<String, SpMetricsEntry> getAllMetricsInfos() {
return this.allMetricsInfos;
}
-
+
private List<String> collectPipelineElementIds(Pipeline pipeline) {
if (pipeline != null){
- return Stream.concat(
- pipeline.getSepas().stream().map(NamedStreamPipesEntity::getElementId),
-
pipeline.getActions().stream().map(NamedStreamPipesEntity::getElementId)
- ).collect(Collectors.toList());
+ return Stream.concat(
+ Stream.concat(
+ pipeline.getStreams().stream()
+ .map(s -> s.getCorrespondingAdapterId()),
+ pipeline.getSepas().stream()
+ .map(NamedStreamPipesEntity::getElementId)
+ ),
+ pipeline.getActions().stream()
+ .map(NamedStreamPipesEntity::getElementId)
+ )
+ .collect(Collectors.toList());
}
return List.of();
}
+
+ public Map<String, Map<String, SpMetricsEntry>>
getMetricsGroupedByPipeline() {
+
+ var allPipelines = StorageDispatcher.INSTANCE
+ .getNoSqlStore()
+ .getPipelineStorageAPI().findAll();
+
+ Map<String, Map<String, SpMetricsEntry>> result = new HashMap<>();
+
+ for (Pipeline pipeline : allPipelines) {
+ var metrics =
ExtensionsLogProvider.INSTANCE.getMetricInfosForPipeline(pipeline.getPipelineId());
+ result.put(pipeline.getElementId(), metrics);
+ }
+
+ return result;
+}
+
}
diff --git
a/streampipes-load-balancer/src/main/java/org/apache/streampipes/loadbalance/pipeline/ExtensionsServiceLogExecutor.java
b/streampipes-load-balancer/src/main/java/org/apache/streampipes/loadbalance/pipeline/ExtensionsServiceLogExecutor.java
index fbf2de5344..95d5ea9648 100644
---
a/streampipes-load-balancer/src/main/java/org/apache/streampipes/loadbalance/pipeline/ExtensionsServiceLogExecutor.java
+++
b/streampipes-load-balancer/src/main/java/org/apache/streampipes/loadbalance/pipeline/ExtensionsServiceLogExecutor.java
@@ -77,8 +77,30 @@ public class ExtensionsServiceLogExecutor implements
Runnable {
private void updatePipelineFlow() {
pipelineFlowStats.clear();
+
ExtensionsLogProvider.INSTANCE.getMetricsGroupedByPipeline().forEach((pipelineId,
data) -> {
+ data.forEach((k, v) -> {
+ // Total "in" count
+ long dataCountIn = v.getMessagesIn()
+ .values()
+ .stream()
+ .mapToLong(m -> m.getCounter())
+ .sum();
+
+ long dataCountOut = v.getMessagesOut().getCounter();
+
+ pipelineFlowStats.updateElementFlow(
+ pipelineId,
+ k,
+ InstanceIdExtractor.getSimpleName(k),
+ dataCountIn,
+ dataCountOut
+ );
+ });
+});
+/** When removing the deprectated gauges this also becomes deprecated */
ExtensionsLogProvider.INSTANCE.getAllMetricsInfos().forEach((k, v) -> {
String className = InstanceIdExtractor.getSimpleName(k);
+
if
(AdapterDescription.class.getSimpleName().toLowerCase().equals(className)) {
pipelineFlowStats.increaseReceivedTotalData(v.getMessagesOut().getCounter());
} else if
(DataProcessorInvocation.class.getSimpleName().toLowerCase().equals(className))
{
diff --git
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/task/StorePipelineStatusTask.java
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/task/StorePipelineStatusTask.java
index 923d6c942b..fb7609e2be 100644
---
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/task/StorePipelineStatusTask.java
+++
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/task/StorePipelineStatusTask.java
@@ -18,6 +18,7 @@
package org.apache.streampipes.manager.execution.task;
+import org.apache.streampipes.commons.prometheus.pipelines.PipelinesStats;
import org.apache.streampipes.manager.execution.PipelineExecutionInfo;
import org.apache.streampipes.model.pipeline.Pipeline;
import org.apache.streampipes.model.pipeline.PipelineHealthStatus;
@@ -36,6 +37,7 @@ public class StorePipelineStatusTask implements
PipelineExecutionTask {
private final boolean start;
private final boolean forceStop;
+ private final PipelinesStats pipelinesStats = new PipelinesStats();
public StorePipelineStatusTask(boolean start,
boolean forceStop) {
@@ -62,6 +64,9 @@ public class StorePipelineStatusTask implements
PipelineExecutionTask {
private void setPipelineStarted(Pipeline pipeline) {
pipeline.setRunning(true);
pipeline.setStartedAt(new Date().getTime());
+
pipelinesStats.updatePipelineRunningState(pipeline.getElementId(),pipeline.getName()
+ , true);
+
pipelinesStats.updatePipelineHealthState(pipeline.getElementId(),pipeline.getName(),
pipeline.getHealthStatus().toString());
try {
getPipelineStorageApi().updateElement(pipeline);
} catch (DocumentConflictException dce) {
@@ -71,6 +76,9 @@ public class StorePipelineStatusTask implements
PipelineExecutionTask {
private void setPipelineStopped(Pipeline pipeline) {
pipeline.setRunning(false);
+
pipelinesStats.updatePipelineRunningState(pipeline.getElementId(),pipeline.getName()
+ , false);
+
pipelinesStats.updatePipelineHealthState(pipeline.getElementId(),pipeline.getName(),
pipeline.getHealthStatus().toString());
getPipelineStorageApi().updateElement(pipeline);
}
diff --git
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/health/PipelineHealthCheck.java
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/health/PipelineHealthCheck.java
index 7018dca8e4..06a6d3a8ec 100644
---
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/health/PipelineHealthCheck.java
+++
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/health/PipelineHealthCheck.java
@@ -17,7 +17,6 @@
*/
package org.apache.streampipes.manager.health;
-
import org.apache.streampipes.commons.constants.InstanceIdExtractor;
import
org.apache.streampipes.commons.exceptions.NoServiceEndpointsAvailableException;
import org.apache.streampipes.commons.prometheus.pipelines.PipelinesStats;
@@ -69,6 +68,18 @@ public class PipelineHealthCheck implements Runnable {
pipelinesStats.setStoppedPipelines(pipelinesStats.getAllPipelines()
- pipelinesStats.getRunningPipelines());
+ for (Pipeline p : allPipelines) {
+ pipelinesStats.updatePipelineHealthState(
+ p.getElementId(),
+ p.getName(),
+ p.getHealthStatus().toString());
+
+ pipelinesStats.updatePipelineRunningState(
+ p.getElementId(),
+ p.getName(),
+ p.isRunning());
+ }
+
if (!runningPipelines.isEmpty()) {
Map<String, List<InvocableStreamPipesEntity>> endpointMap =
generateEndpointMap();
List<String> allRunningInstances =
findRunningInstances(endpointMap.keySet());
@@ -78,8 +89,8 @@ public class PipelineHealthCheck implements Runnable {
List<String> failedInstances = new ArrayList<>();
List<String> recoveredInstances = new ArrayList<>();
List<String> pipelineNotifications = new ArrayList<>();
- List<InvocableStreamPipesEntity> graphs =
-
RunningPipelineElementStorage.runningProcessorsAndSinks.get(pipeline.getPipelineId());
+ List<InvocableStreamPipesEntity> graphs =
RunningPipelineElementStorage.runningProcessorsAndSinks
+ .get(pipeline.getPipelineId());
graphs.forEach(graph -> {
String instanceId = extractInstanceId(graph);
@@ -101,15 +112,15 @@ public class PipelineHealthCheck implements Runnable {
addFailedAttemptNotification(pipelineNotifications, graph);
increaseFailedAttempt(instanceId);
LOG.info("Could not restore pipeline element {} of pipeline {}
({}/{})",
- graph.getName(), pipeline.getName(),
failedRestartAttempts.get(instanceId),
- MAX_FAILED_ATTEMPTS);
+ graph.getName(), pipeline.getName(),
failedRestartAttempts.get(instanceId),
+ MAX_FAILED_ATTEMPTS);
} else {
recoveredInstances.add(instanceId);
addSuccessfulRestoreNotification(pipelineNotifications, graph);
resetFailedAttempts(instanceId);
graph.setSelectedEndpointUrl(endpointUrl);
LOG.info("Successfully restored pipeline element {} of
pipeline {}",
- graph.getName(), pipeline.getName());
+ graph.getName(), pipeline.getName());
}
}
}
@@ -126,7 +137,10 @@ public class PipelineHealthCheck implements Runnable {
currentPipeline.setPipelineNotifications(pipelineNotifications);
StorageDispatcher.INSTANCE.getNoSqlStore().getPipelineStorageAPI()
.updateElement(currentPipeline);
+
pipelinesStats.updatePipelineHealthState(currentPipeline.getElementId(),
currentPipeline.getName(),
+ currentPipeline.getHealthStatus().toString());
}
+
});
int healthNum = pipelinesStats.getRunningPipelines() -
pipelinesStats.getFailedPipelines()
- pipelinesStats.getAttentionRequiredPipelines();
@@ -138,8 +152,7 @@ public class PipelineHealthCheck implements Runnable {
private String findEndpointUrl(InvocableStreamPipesEntity graph)
throws NoServiceEndpointsAvailableException {
- SpServiceUrlProvider serviceUrlProvider =
- ExtensionsServiceEndpointUtils.getPipelineElementType(graph);
+ SpServiceUrlProvider serviceUrlProvider =
ExtensionsServiceEndpointUtils.getPipelineElementType(graph);
return serviceUrlProvider.getInvocationUrl(graph.getSelectedEndpointUrl(),
graph.getAppId());
}
@@ -165,13 +178,13 @@ public class PipelineHealthCheck implements Runnable {
}
private void addSuccessfulRestoreNotification(List<String>
pipelineNotifications,
- InvocableStreamPipesEntity
graph) {
+ InvocableStreamPipesEntity graph) {
pipelineNotifications.add(getCurrentDatetime() + "Pipeline element '" +
graph.getName()
+ "' was not available and was successfully restored.");
}
private void addFailedAttemptNotification(List<String> pipelineNotifications,
- InvocableStreamPipesEntity graph) {
+ InvocableStreamPipesEntity graph) {
pipelineNotifications.add(getCurrentDatetime() + "Pipeline element '" +
graph.getName()
+ "' was not available and could not be restored.");
}
@@ -186,7 +199,6 @@ public class PipelineHealthCheck implements Runnable {
return InstanceIdExtractor.extractId(graph.getElementId());
}
-
private List<String> findRunningInstances(Set<String> endpoints) {
List<String> allRunningInstances = new ArrayList<>();
endpoints.forEach(endpoint -> {
@@ -210,7 +222,7 @@ public class PipelineHealthCheck implements Runnable {
}
private void addEndpoint(Map<String, List<InvocableStreamPipesEntity>>
endpointMap,
- InvocableStreamPipesEntity graph) {
+ InvocableStreamPipesEntity graph) {
String selectedEndpoint = graph.getSelectedEndpointUrl();
if (!endpointMap.containsKey(selectedEndpoint)) {
endpointMap.put(selectedEndpoint, new ArrayList<>());
@@ -225,7 +237,7 @@ public class PipelineHealthCheck implements Runnable {
try {
this.checkAndRestorePipelineElements();
} catch (Exception e) {
- LOG.error("Error while checking and restoring pipeline elements", e);
+ LOG.error("Error while checking and restoring pipeline elements", e);
}
}
diff --git
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/PipelineElementMigrationManager.java
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/PipelineElementMigrationManager.java
index 7c84214d83..ac22d2783f 100644
---
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/PipelineElementMigrationManager.java
+++
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/PipelineElementMigrationManager.java
@@ -18,6 +18,7 @@
package org.apache.streampipes.manager.migration;
+import org.apache.streampipes.commons.prometheus.pipelines.PipelinesStats;
import org.apache.streampipes.manager.execution.PipelineExecutor;
import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
import
org.apache.streampipes.model.extensions.svcdiscovery.SpServiceRegistration;
@@ -50,6 +51,7 @@ public class PipelineElementMigrationManager extends
AbstractMigrationManager im
private final IPipelineStorage pipelineStorage;
private final IDataProcessorStorage dataProcessorStorage;
private final IDataSinkStorage dataSinkStorage;
+ private final PipelinesStats pipelinesStats = new PipelinesStats();
public PipelineElementMigrationManager(IPipelineStorage pipelineStorage,
IDataProcessorStorage
dataProcessorStorage,
@@ -166,6 +168,7 @@ public class PipelineElementMigrationManager extends
AbstractMigrationManager im
failedMigration -> "Failed migration of pipeline element:
%s".formatted(failedMigration.message())
).toList());
pipeline.setHealthStatus(PipelineHealthStatus.REQUIRES_ATTENTION);
+ pipelinesStats.updatePipelineHealthState(pipeline.getPipelineId(),
pipeline.getName(), pipeline.getHealthStatus().toString());
pipelineStorage.updateElement(pipeline);