This is an automated email from the ASF dual-hosted git repository.

riemer pushed a commit to branch improve-health-management
in repository https://gitbox.apache.org/repos/asf/streampipes.git

commit 7dc560b62fa3e98ed3f61ff5a20692dbc20f5904
Author: Dominik Riemer <[email protected]>
AuthorDate: Wed Jan 14 16:55:18 2026 +0100

    refactor: Improve health checks
---
 pom.xml                                            |   1 +
 .../apache/streampipes/commons/constants/Envs.java |  30 +--
 .../management/health/AdapterHealthCheck.java      | 245 -------------------
 .../management/WorkerAdministrationManagement.java |  25 +-
 .../management/health/AdapterHealthCheckTest.java  |  96 --------
 .../influx/DataExplorerInfluxQueryExecutor.java    |   2 +-
 .../management/init/RunningInstances.java          |  10 +
 streampipes-health-monitoring/pom.xml              |  59 +++++
 .../health/monitoring/AdapterHealthCheck.java      | 168 +++++++++++++
 .../health/monitoring/ExtensionHealthCheck.java    |  57 +++++
 .../PipelineElementEndpointHealthCheck.java        |  67 ++++++
 .../health/monitoring/PipelineHealthCheck.java     | 197 ++++++++++++++++
 .../health/monitoring/ResourceProvider.java        | 101 ++++++++
 .../health/monitoring}/ServiceHealthCheck.java     |  22 +-
 .../monitoring}/ServiceRegistrationManager.java    |   6 +-
 .../monitoring/model/ActiveCoreInstances.java      |  29 +--
 .../health/monitoring/model/ActiveResources.java   |  30 +--
 .../health/monitoring/model/HealthCheckData.java   |  30 +++
 .../health/monitoring/utils/HealthCheckUtils.java  |  51 ++++
 .../model/health/ExtensionInstanceHealth.java      |  25 ++
 .../streampipes/model/util/PropertyUtils.java      |   4 -
 .../execution/ExtensionServiceExecutions.java      |   1 -
 .../manager/execution/PipelineExecutionInfo.java   |   4 +-
 .../ExtensionsServiceEndpointGenerator.java        |  12 +-
 ...tter.java => BasePipelineElementSubmitter.java} |  23 +-
 .../http/DetachPipelineElementSubmitter.java       |   5 +-
 .../http/InvokePipelineElementSubmitter.java       |   9 +-
 .../execution/task/DiscoverEndpointsTask.java      |  37 +--
 .../manager/execution/task/SubmitRequestTask.java  |  10 +-
 .../health/PipelineElementEndpointHealthCheck.java |  51 ----
 .../manager/health/PipelineHealthCheck.java        | 261 ---------------------
 .../manager/storage/PipelineStorageService.java    |   4 -
 .../extensions/monitoring/HealthCheckResource.java |  67 ++++++
 streampipes-rest/pom.xml                           |   5 +
 .../rest/impl/admin/MigrationResource.java         |   2 +-
 .../impl/admin/ServiceRegistrationResource.java    |   2 +-
 streampipes-service-core/pom.xml                   |   5 +
 .../streampipes/service/core/PostStartupTask.java  |   4 +-
 .../service/core/StreamPipesCoreApplication.java   |  28 ++-
 39 files changed, 953 insertions(+), 832 deletions(-)

diff --git a/pom.xml b/pom.xml
index af0058acf8..90745684d6 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1125,6 +1125,7 @@
         <module>streampipes-wrapper-kafka-streams</module>
         <module>streampipes-wrapper-siddhi</module>
         <module>streampipes-wrapper-standalone</module>
+        <module>streampipes-health-monitoring</module>
     </modules>
 
     <profiles>
diff --git 
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/Envs.java
 
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/Envs.java
index 148fa66abd..14095fdb23 100644
--- 
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/Envs.java
+++ 
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/Envs.java
@@ -102,21 +102,21 @@ public enum Envs {
 
   SP_PULSAR_URL("SP_PULSAR_URL", "pulsar://localhost:6650"),
 
-  CPU_RESOURCE_WEIGHT("CPU_RESOURCE_WEIGHT", "1.0"),
-  MEMORY_RESOURCE_WEIGHT("MEMORY_RESOURCE_WEIGHT", "1.0"),
-
-  DIR_MEMORY_RESOURCE_WEIGHT("DIR_MEMORY_RESOURCE_WEIGHT", "1.0"),
-  BANDWIDTH_IN_RESOURCE_WEIGHT("BANDWIDTH_IN_RESOURCE_WEIGHT", "1.0"),
-  BANDWIDTH_OUT_RESOURCE_WEIGHT("BANDWIDTH_OUT_RESOURCE_WEIGHT", "1.0"),
-  THRESHOLD_MIGRATOR_PERCENTAGE("THRESHOLD_MIGRATOR_PERCENTAGE", "20.0"),
-  MIN_MIGRATOR_PERCENTAGE("MIN_MIGRATOR_PERCENTAGE", "20.0"),
-  OVERLOADED_THRESHOLD_PERCENTAGE("OVERLOADED_THRESHOLD_PERCENTAGE", "85"),
-  HISTORY_RESOURCE_PERCENTAGE("HISTORY_RESOURCE_PERCENTAGE", "0.9"),
-  
MSG_RATE_DIFFERENCE_MIGRATOR_THRESHOLD("MSG_RATE_DIFFERENCE_MIGRATOR_THRESHOLD",
 "85"),
-  LOAD_TARGET_STD("LOAD_TARGET_STD", "25.0"),
-  SELECTOR("SELECTOR", "WeightedRandomSelector"),
-  MIGRATOR("MIGRATOR", "ThresholdMigrator"),
-  LOAD_MANAGER_ENABLE("LOAD_MANAGER_ENABLE", "true"),
+  CPU_RESOURCE_WEIGHT("SP_CPU_RESOURCE_WEIGHT", "1.0"),
+  MEMORY_RESOURCE_WEIGHT("SP_MEMORY_RESOURCE_WEIGHT", "1.0"),
+
+  DIR_MEMORY_RESOURCE_WEIGHT("SP_DIR_MEMORY_RESOURCE_WEIGHT", "1.0"),
+  BANDWIDTH_IN_RESOURCE_WEIGHT("SP_BANDWIDTH_IN_RESOURCE_WEIGHT", "1.0"),
+  BANDWIDTH_OUT_RESOURCE_WEIGHT("SP_BANDWIDTH_OUT_RESOURCE_WEIGHT", "1.0"),
+  THRESHOLD_MIGRATOR_PERCENTAGE("SP_THRESHOLD_MIGRATOR_PERCENTAGE", "20.0"),
+  MIN_MIGRATOR_PERCENTAGE("SP_MIN_MIGRATOR_PERCENTAGE", "20.0"),
+  OVERLOADED_THRESHOLD_PERCENTAGE("SP_OVERLOADED_THRESHOLD_PERCENTAGE", "85"),
+  HISTORY_RESOURCE_PERCENTAGE("SP_HISTORY_RESOURCE_PERCENTAGE", "0.9"),
+  
MSG_RATE_DIFFERENCE_MIGRATOR_THRESHOLD("SP_MSG_RATE_DIFFERENCE_MIGRATOR_THRESHOLD",
 "85"),
+  LOAD_TARGET_STD("SP_LOAD_TARGET_STD", "25.0"),
+  SELECTOR("SP_SELECTOR", "WeightedRandomSelector"),
+  MIGRATOR("SP_MIGRATOR", "ThresholdMigrator"),
+  LOAD_MANAGER_ENABLE("SP_LOAD_MANAGER_ENABLE", "false"),
 
   // expects a comma separated string of service names
   SP_SERVICE_TAGS("SP_SERVICE_TAGS", ""),
diff --git 
a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/health/AdapterHealthCheck.java
 
b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/health/AdapterHealthCheck.java
deleted file mode 100644
index 5142d5f4fe..0000000000
--- 
a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/health/AdapterHealthCheck.java
+++ /dev/null
@@ -1,245 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.connect.management.health;
-
-import org.apache.streampipes.commons.exceptions.connect.AdapterException;
-import org.apache.streampipes.commons.prometheus.adapter.AdapterMetrics;
-import org.apache.streampipes.commons.prometheus.adapter.AdapterMetricsManager;
-import 
org.apache.streampipes.connect.management.management.AdapterMasterManagement;
-import org.apache.streampipes.connect.management.management.WorkerRestClient;
-import org.apache.streampipes.connect.management.util.WorkerPaths;
-import org.apache.streampipes.loadbalance.pipeline.ExtensionsLogProvider;
-import org.apache.streampipes.model.connect.adapter.AdapterDescription;
-import org.apache.streampipes.storage.api.IAdapterStorage;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.NoSuchElementException;
-import java.util.stream.Collectors;
-
-public class AdapterHealthCheck implements Runnable {
-
-  private static final Logger LOG = 
LoggerFactory.getLogger(AdapterHealthCheck.class);
-
-  private final IAdapterStorage adapterStorage;
-  private final AdapterMasterManagement adapterMasterManagement;
-
-  public AdapterHealthCheck(IAdapterStorage adapterStorage,
-                            AdapterMasterManagement adapterMasterManagement) {
-    this.adapterStorage = adapterStorage;
-    this.adapterMasterManagement = adapterMasterManagement;
-  }
-
-  @Override
-  public void run() {
-    this.checkAndRestoreAdapters();
-  }
-
-  /**
-   * In this method it is checked which adapters are currently running. Then 
it calls all workers to
-   * validate if the adapter instance is still running as expected. If the 
adapter is not running
-   * anymore a new worker instance is invoked. In addition, it publishes 
monitoring metrics for all
-   * running adapters (in line with
-   * {@link org.apache.streampipes.manager.health.PipelineHealthCheck}).
-   */
-  public void checkAndRestoreAdapters() {
-    LOG.info("Adapter health check started");
-    // Get all adapters that are supposed to run according to the backend 
storage
-    Map<String, AdapterDescription> adapterInstancesSupposedToRun =
-        this.getAllAdaptersSupposedToRun();
-
-    // group all adapter instances supposed to run by their worker service URL
-    Map<String, List<AdapterDescription>> groupByWorker =
-        this.getAllWorkersWithAdapters(adapterInstancesSupposedToRun);
-
-    // Get adapters that are not running anymore
-    Map<String, AdapterDescription> allAdaptersToRecover =
-        this.getAdaptersToRecover(groupByWorker, 
adapterInstancesSupposedToRun);
-
-    allAdaptersToRecover
-        .keySet()
-        .forEach(adapterId ->
-                     LOG.info("Adapter instance with id {} needs to be 
recovered", adapterId));
-
-    try {
-      if (!adapterInstancesSupposedToRun.isEmpty()) {
-        // Filter adapters so that only healthy and running adapters are 
updated in the metrics
-        // endpoint
-        var adaptersToMonitor = 
adapterInstancesSupposedToRun.entrySet().stream()
-            .filter((entry -> 
!allAdaptersToRecover.containsKey(entry.getKey())))
-            .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
-
-        if (!adaptersToMonitor.isEmpty()) {
-          updateMonitoringMetrics(adaptersToMonitor);
-        } else {
-          LOG.info("No running adapter instances to monitor.");
-        }
-      }
-    } catch (NoSuchElementException e) {
-      LOG.error("Could not update adapter metrics due to an invalid state. 
({})", e.getMessage());
-    }
-
-    LOG.info("Monitoring metrics updated for running adapters.");
-
-    // Recover Adapters
-    this.recoverAdapters(allAdaptersToRecover);
-  }
-
-  /**
-   * Updates the monitoring metrics based on the descriptions of running 
adapters.
-   *
-   * @param runningAdapterDescriptions A map containing the descriptions of 
running adapters, where
-   *        the key is the adapter's element ID and the value is the 
corresponding adapter
-   *        description.
-   */
-  protected void updateMonitoringMetrics(Map<String, AdapterDescription> 
runningAdapterDescriptions) {
-
-    var adapterMetrics = 
AdapterMetricsManager.getInstance().getAdapterMetrics();
-    runningAdapterDescriptions.values()
-        .forEach(adapterDescription -> 
updateTotalEventsPublished(adapterMetrics,
-                                                                  
adapterDescription.getElementId(),
-                                                                  
adapterDescription.getName()));
-    LOG.debug("Monitoring {} adapter instances", adapterMetrics.size());
-  }
-
-  private void updateTotalEventsPublished(AdapterMetrics adapterMetrics, 
String adapterId,
-                                          String adapterName) {
-
-    // Check if the adapter is already registered; if not, register it first.
-    // This step is crucial, especially when the StreamPipes Core service is 
restarted,
-    // and there are existing running adapters that need proper registration.
-    // Note: Proper registration is usually handled during the initial start 
of the adapter.
-    if (!adapterMetrics.contains(adapterId)) {
-      adapterMetrics.register(adapterId, adapterName);
-    }
-
-    adapterMetrics.updateTotalEventsPublished(adapterId, adapterName, 
ExtensionsLogProvider.INSTANCE
-        .getMetricInfosForResource(adapterId).getMessagesOut().getCounter());
-  }
-
-
-  /**
-   * Retrieves a map of all adapter instances that are supposed to be running 
according to the
-   * backend storage.
-   * <p>
-   * This method queries the adapter storage to obtain information about all 
adapters and filters
-   * the running instances. The resulting map is keyed by the element ID of 
each running adapter,
-   * and the corresponding values are the respective {@link 
AdapterDescription} objects.
-   *
-   * @return A map containing all adapter instances supposed to be running 
according to the backend
-   *         storage. The keys are element IDs, and the values are the 
corresponding adapter
-   *         descriptions.
-   */
-  public Map<String, AdapterDescription> getAllAdaptersSupposedToRun() {
-    Map<String, AdapterDescription> result = new HashMap<>();
-    List<AdapterDescription> allRunningInstancesAdapterDescription = 
this.adapterStorage.findAll();
-    
allRunningInstancesAdapterDescription.stream().filter(AdapterDescription::isRunning)
-        .forEach(adapterDescription -> 
result.put(adapterDescription.getElementId(),
-                                                  adapterDescription));
-
-    return result;
-  }
-
-  public Map<String, List<AdapterDescription>> 
getAllWorkersWithAdapters(Map<String, AdapterDescription> 
adapterInstancesSupposedToRun) {
-
-    Map<String, List<AdapterDescription>> groupByWorker = new HashMap<>();
-    adapterInstancesSupposedToRun.values().forEach(ad -> {
-      String selectedEndpointUrl = ad.getSelectedEndpointUrl();
-      if (selectedEndpointUrl != null) {
-        if (groupByWorker.containsKey(selectedEndpointUrl)) {
-          groupByWorker.get(selectedEndpointUrl).add(ad);
-        } else {
-          List<AdapterDescription> adapterDescriptionsPerWorker = new 
ArrayList<>();
-          adapterDescriptionsPerWorker.add(ad);
-          groupByWorker.put(selectedEndpointUrl, adapterDescriptionsPerWorker);
-        }
-      }
-    });
-
-    return groupByWorker;
-  }
-
-  /**
-   * Retrieves a map of adapters to recover by comparing the provided 
groupings of adapter instances
-   * with the instances supposed to run according to the storage. For every 
adapter instance it is
-   * verified that it actually runs on a worker node. If this is not the case, 
it is added to the
-   * output of adapters to recover.
-   *
-   * @param adapterInstancesGroupedByWorker A map grouping adapter instances 
by worker.
-   * @param adapterInstancesSupposedToRun The map containing all adapter 
instances supposed to be
-   *        running.
-   * @return A new map containing adapter instances to recover, filtered based 
on running instances.
-   */
-  public Map<String, AdapterDescription> getAdaptersToRecover(Map<String, 
List<AdapterDescription>> adapterInstancesGroupedByWorker,
-                                                              Map<String, 
AdapterDescription> adapterInstancesSupposedToRun) {
-
-    // NOTE: This line is added to prevent modifying the existing map of 
instances supposed to run
-    // It looks like the parameter `adapterInstancesSupposedToRun` is not 
required at all,
-    // but this should be checked more carefully.
-    Map<String, AdapterDescription> adaptersToRecover =
-        new HashMap<>(adapterInstancesSupposedToRun);
-
-    adapterInstancesGroupedByWorker.keySet().forEach(adapterEndpointUrl -> {
-      try {
-        List<AdapterDescription> allRunningInstancesOfOneWorker =
-            
WorkerRestClient.getAllRunningAdapterInstanceDescriptions(adapterEndpointUrl
-                + WorkerPaths.getRunningAdaptersPath());
-
-        // only keep adapters where there is no running adapter instance
-        // therefore, all others are removed
-        allRunningInstancesOfOneWorker.forEach(adapterDescription -> 
adaptersToRecover
-            .remove(adapterDescription.getElementId()));
-      } catch (AdapterException e) {
-        LOG.info("Could not recover adapter at endpoint {}  - "
-            + "marking it as requested to recover (reason: {})", 
adapterEndpointUrl,
-                 e.getMessage());
-      }
-    });
-
-    return adaptersToRecover;
-  }
-
-
-  public void recoverAdapters(Map<String, AdapterDescription> 
adaptersToRecover) {
-    for (AdapterDescription adapterDescription : adaptersToRecover.values()) {
-      // Invoke all adapters that were running when the adapter container was 
stopped
-      try {
-        if (adapterDescription.isRunning()) {
-          LOG.info("Start recovering adapter {} ", 
adapterDescription.getElementId());
-          
this.adapterMasterManagement.startStreamAdapter(adapterDescription.getElementId());
-          LOG.info("Adapter {} is recovered", 
adapterDescription.getElementId());
-
-        }
-      } catch (AdapterException e) {
-        LOG.warn("Could not start adapter {} ({})", 
adapterDescription.getName(), e.getMessage());
-      } catch (Exception e) {
-        LOG.error(
-            "Unexpected error while recovering adapter {} ({})",
-            adapterDescription.getName(),
-            e.getMessage()
-        );
-      }
-    }
-
-  }
-}
diff --git 
a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/WorkerAdministrationManagement.java
 
b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/WorkerAdministrationManagement.java
index 6cdad54c6b..32e5b7f813 100644
--- 
a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/WorkerAdministrationManagement.java
+++ 
b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/WorkerAdministrationManagement.java
@@ -20,7 +20,6 @@ package org.apache.streampipes.connect.management.management;
 
 import 
org.apache.streampipes.commons.exceptions.NoServiceEndpointsAvailableException;
 import org.apache.streampipes.commons.prometheus.adapter.AdapterMetrics;
-import org.apache.streampipes.connect.management.health.AdapterHealthCheck;
 import org.apache.streampipes.connect.management.health.AdapterOperationLock;
 import org.apache.streampipes.manager.assets.AssetManager;
 import org.apache.streampipes.model.connect.adapter.AdapterDescription;
@@ -47,7 +46,7 @@ public class WorkerAdministrationManagement {
 
   private final IAdapterStorage adapterDescriptionStorage;
 
-  private final AdapterHealthCheck adapterHealthCheck;
+  //private final AdapterHealthCheck adapterHealthCheck;
 
   public WorkerAdministrationManagement(
       IAdapterStorage adapterStorage,
@@ -55,15 +54,15 @@ public class WorkerAdministrationManagement {
       AdapterResourceManager adapterResourceManager,
       DataStreamResourceManager dataStreamResourceManager
   ) {
-    this.adapterHealthCheck = new AdapterHealthCheck(
-        adapterStorage,
-        new AdapterMasterManagement(
-            adapterStorage,
-            adapterResourceManager,
-            dataStreamResourceManager,
-            adapterMetrics
-        )
-    );
+//    this.adapterHealthCheck = new AdapterHealthCheck(
+//        adapterStorage,
+//        new AdapterMasterManagement(
+//            adapterStorage,
+//            adapterResourceManager,
+//            dataStreamResourceManager,
+//            adapterMetrics
+//        )
+//    );
     this.adapterDescriptionStorage = 
CouchDbStorageManager.INSTANCE.getAdapterDescriptionStorage();
   }
 
@@ -81,11 +80,11 @@ public class WorkerAdministrationManagement {
       } else {
         LOG.info("Max retries for running adapter operations reached, will do 
unlock which might cause conflicts...");
         AdapterOperationLock.INSTANCE.unlock();
-        this.adapterHealthCheck.checkAndRestoreAdapters();
+        //this.adapterHealthCheck.checkAndRestoreAdapters();
       }
     } else {
       AdapterOperationLock.INSTANCE.lock();
-      this.adapterHealthCheck.checkAndRestoreAdapters();
+      //this.adapterHealthCheck.checkAndRestoreAdapters();
       AdapterOperationLock.INSTANCE.unlock();
     }
   }
diff --git 
a/streampipes-connect-management/src/test/java/org/apache/streampipes/connect/management/health/AdapterHealthCheckTest.java
 
b/streampipes-connect-management/src/test/java/org/apache/streampipes/connect/management/health/AdapterHealthCheckTest.java
deleted file mode 100644
index 47612f1361..0000000000
--- 
a/streampipes-connect-management/src/test/java/org/apache/streampipes/connect/management/health/AdapterHealthCheckTest.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.connect.management.health;
-
-import org.apache.streampipes.commons.prometheus.adapter.AdapterMetricsManager;
-import 
org.apache.streampipes.connect.management.management.AdapterMasterManagement;
-import org.apache.streampipes.model.connect.adapter.AdapterDescription;
-import org.apache.streampipes.resource.management.SpResourceManager;
-import org.apache.streampipes.storage.api.IAdapterStorage;
-
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-
-import java.util.List;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-public class AdapterHealthCheckTest {
-
-  private IAdapterStorage adapterInstanceStorageMock;
-
-  @BeforeEach
-  public void setUp() {
-    adapterInstanceStorageMock = mock(IAdapterStorage.class);
-  }
-
-  @Test
-  public void getAllRunningInstancesAdapterDescriptionsEmpty() {
-    when(adapterInstanceStorageMock.findAll()).thenReturn(List.of());
-
-    var healthCheck = new AdapterHealthCheck(
-        adapterInstanceStorageMock,
-        new AdapterMasterManagement(
-            adapterInstanceStorageMock,
-            new SpResourceManager().manageAdapters(),
-            new SpResourceManager().manageDataStreams(),
-            AdapterMetricsManager.INSTANCE.getAdapterMetrics()
-        )
-    );
-    var result = healthCheck.getAllAdaptersSupposedToRun();
-
-    Assertions.assertTrue(result.isEmpty());
-  }
-
-  @Test
-  public void getAllRunningInstancesAdapterDescriptionsMixed() {
-
-    var nameRunningAdapter = "running-adapter";
-    var nameStoppedAdapter = "stopped-adapter";
-
-    var stoppedAdapter = new AdapterDescription();
-    stoppedAdapter.setElementId(nameStoppedAdapter);
-    stoppedAdapter.setRunning(false);
-
-    var runningAdapter = new AdapterDescription();
-    runningAdapter.setElementId(nameRunningAdapter);
-    runningAdapter.setRunning(true);
-
-    
when(adapterInstanceStorageMock.findAll()).thenReturn(List.of(stoppedAdapter, 
runningAdapter));
-
-    var healthCheck = new AdapterHealthCheck(
-        adapterInstanceStorageMock,
-        new AdapterMasterManagement(
-            adapterInstanceStorageMock,
-            new SpResourceManager().manageAdapters(),
-            new SpResourceManager().manageDataStreams(),
-            AdapterMetricsManager.INSTANCE.getAdapterMetrics()
-        )
-    );
-    var result = healthCheck.getAllAdaptersSupposedToRun();
-
-    Assertions.assertEquals(1, result.size());
-    Assertions.assertTrue(result.containsKey(nameRunningAdapter));
-    Assertions.assertEquals(runningAdapter, result.get(nameRunningAdapter));
-
-  }
-
-}
diff --git 
a/streampipes-data-explorer-influx/src/main/java/org/apache/streampipes/dataexplorer/influx/DataExplorerInfluxQueryExecutor.java
 
b/streampipes-data-explorer-influx/src/main/java/org/apache/streampipes/dataexplorer/influx/DataExplorerInfluxQueryExecutor.java
index f4e1df6095..fbb715be88 100644
--- 
a/streampipes-data-explorer-influx/src/main/java/org/apache/streampipes/dataexplorer/influx/DataExplorerInfluxQueryExecutor.java
+++ 
b/streampipes-data-explorer-influx/src/main/java/org/apache/streampipes/dataexplorer/influx/DataExplorerInfluxQueryExecutor.java
@@ -73,7 +73,7 @@ public class DataExplorerInfluxQueryExecutor extends 
DataExplorerQueryExecutor<Q
    * The influx client always returns a double for timestamp values. This 
method converts them to long values.
    */
   private void convertTimestampValueToLong(List<Object> row) {
-    if (!row.isEmpty()) {
+    if (!row.isEmpty() && row.get(0) instanceof Number) {
       row.set(0, ((Number) row.get(0)).longValue());
     }
   }
diff --git 
a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/init/RunningInstances.java
 
b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/init/RunningInstances.java
index 538043d7f9..f14a94de00 100644
--- 
a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/init/RunningInstances.java
+++ 
b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/init/RunningInstances.java
@@ -26,6 +26,8 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
 
 public enum RunningInstances {
   INSTANCE;
@@ -65,6 +67,14 @@ public enum RunningInstances {
     return runningInstances.size();
   }
 
+  public Set<String> getRunningInstanceIds() {
+    return runningInstances
+        .entrySet()
+        .stream()
+        .map((entry -> entry.getValue().getDescription().getElementId()))
+        .collect(Collectors.toSet());
+  }
+
   public List<String> getRunningInstanceIdsForElement(String appId) {
     // TODO change this to appId for STREAMPIPES-319
     List<String> instanceIds = new ArrayList<>();
diff --git a/streampipes-health-monitoring/pom.xml 
b/streampipes-health-monitoring/pom.xml
new file mode 100644
index 0000000000..a3347edfe6
--- /dev/null
+++ b/streampipes-health-monitoring/pom.xml
@@ -0,0 +1,59 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.streampipes</groupId>
+        <artifactId>streampipes-parent</artifactId>
+        <version>0.99.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>streampipes-health-monitoring</artifactId>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.streampipes</groupId>
+            <artifactId>streampipes-pipeline-management</artifactId>
+            <version>0.99.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streampipes</groupId>
+            <artifactId>streampipes-commons</artifactId>
+            <version>0.99.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streampipes</groupId>
+            <artifactId>streampipes-connect-management</artifactId>
+            <version>0.99.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streampipes</groupId>
+            <artifactId>streampipes-load-balancer</artifactId>
+            <version>0.99.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streampipes</groupId>
+            <artifactId>streampipes-model</artifactId>
+            <version>0.99.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streampipes</groupId>
+            <artifactId>streampipes-storage-api</artifactId>
+            <version>0.99.0-SNAPSHOT</version>
+        </dependency>
+
+
+        <dependency>
+            <groupId>org.junit.jupiter</groupId>
+            <artifactId>junit-jupiter-api</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-core</artifactId>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+
+</project>
diff --git 
a/streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/AdapterHealthCheck.java
 
b/streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/AdapterHealthCheck.java
new file mode 100644
index 0000000000..f742b23f57
--- /dev/null
+++ 
b/streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/AdapterHealthCheck.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.health.monitoring;
+
+import org.apache.streampipes.commons.exceptions.connect.AdapterException;
+import org.apache.streampipes.commons.prometheus.adapter.AdapterMetrics;
+import org.apache.streampipes.commons.prometheus.adapter.AdapterMetricsManager;
+import org.apache.streampipes.health.monitoring.model.HealthCheckData;
+import org.apache.streampipes.loadbalance.pipeline.ExtensionsLogProvider;
+import org.apache.streampipes.model.connect.adapter.AdapterDescription;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+public class AdapterHealthCheck {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(AdapterHealthCheck.class);
+
+  private final HealthCheckData healthCheckData;
+
+  public AdapterHealthCheck(HealthCheckData healthCheckData) {
+    this.healthCheckData = healthCheckData;
+  }
+
+  /**
+   * In this method it is checked which adapters are currently running. Then 
it calls all workers to
+   * validate if the adapter instance is still running as expected. If the 
adapter is not running
+   * anymore a new worker instance is invoked. In addition, it publishes 
monitoring metrics for all
+   * running adapters (in line with
+   * {@link PipelineHealthCheck}).
+   */
+  public void runCheck() {
+    LOG.debug("Adapter health check started");
+
+    try {
+      if (!healthCheckData.activeResources().runningAdapters().isEmpty()) {
+        var allAdaptersToRecover = this.getAdaptersToRecover();
+
+        allAdaptersToRecover
+            .forEach(adapter ->
+                LOG.info("Adapter instance with id {} needs to be recovered", 
adapter.getElementId()));
+        // Filter adapters so that only healthy and running adapters are 
updated in the metrics
+        // endpoint
+        var adaptersToMonitor = 
healthCheckData.activeResources().runningAdapters().stream()
+            .filter(entry -> allAdaptersToRecover
+                .stream()
+                .noneMatch(r -> r.getElementId().equals(entry.getElementId()))
+            )
+            .toList();
+
+        if (!adaptersToMonitor.isEmpty()) {
+          updateMonitoringMetrics(adaptersToMonitor);
+        } else {
+          LOG.debug("No running adapter instances to monitor.");
+        }
+
+        LOG.debug("Monitoring metrics updated for running adapters.");
+
+        // Recover Adapters
+        this.recoverAdapters(allAdaptersToRecover);
+      }
+    } catch (NoSuchElementException e) {
+      LOG.error("Could not update adapter metrics due to an invalid state. 
({})", e.getMessage());
+    }
+  }
+
+  /**
+   * Updates the monitoring metrics based on the descriptions of running 
adapters.
+   *
+   * @param runningAdapterDescriptions A map containing the descriptions of 
running adapters, where
+   *        the key is the adapter's element ID and the value is the 
corresponding adapter
+   *        description.
+   */
+  protected void updateMonitoringMetrics(List<AdapterDescription> 
runningAdapterDescriptions) {
+
+    var adapterMetrics = 
AdapterMetricsManager.getInstance().getAdapterMetrics();
+    runningAdapterDescriptions
+        .forEach(adapterDescription -> 
updateTotalEventsPublished(adapterMetrics,
+                                                                  
adapterDescription.getElementId(),
+                                                                  
adapterDescription.getName()));
+    LOG.debug("Monitoring {} adapter instances", adapterMetrics.size());
+  }
+
+  private void updateTotalEventsPublished(AdapterMetrics adapterMetrics, 
String adapterId,
+                                          String adapterName) {
+
+    // Check if the adapter is already registered; if not, register it first.
+    // This step is crucial, especially when the StreamPipes Core service is 
restarted,
+    // and there are existing running adapters that need proper registration.
+    // Note: Proper registration is usually handled during the initial start 
of the adapter.
+    if (!adapterMetrics.contains(adapterId)) {
+      adapterMetrics.register(adapterId, adapterName);
+    }
+
+    adapterMetrics.updateTotalEventsPublished(adapterId, adapterName, 
ExtensionsLogProvider.INSTANCE
+        .getMetricInfosForResource(adapterId).getMessagesOut().getCounter());
+  }
+
+
+  /**
+   * Retrieves a list of adapters to recover by comparing the provided 
groupings of adapter instances
+   * with the instances supposed to run according to the storage. For every 
adapter instance it is
+   * verified that it actually runs on a worker node. If this is not the case, 
it is added to the
+   * output of adapters to recover.
+   *
+   *        running.
+   * @return A new map containing adapter instances to recover, filtered based 
on running instances.
+   */
+  public List<AdapterDescription> getAdaptersToRecover() {
+
+    var runningAdapterIds =
+        healthCheckData.activeExtensionInstances()
+            .values()
+            .stream()
+            .flatMap(h -> h.runningAdapterInstanceIds().stream())
+            .collect(Collectors.toSet());
+
+    return healthCheckData.activeResources()
+        .runningAdapters()
+        .stream()
+        .filter(Objects::nonNull)
+        .filter(a -> a.getElementId() != null)
+        .filter(a -> !runningAdapterIds.contains(a.getElementId()))
+        .toList();
+  }
+
+  public void recoverAdapters(List<AdapterDescription> adaptersToRecover) {
+    for (AdapterDescription adapterDescription : adaptersToRecover) {
+      // Invoke all adapters that were running when the adapter container was 
stopped
+      try {
+        if (adapterDescription.isRunning()) {
+          LOG.debug("Start recovering adapter {} ", 
adapterDescription.getElementId());
+          
this.healthCheckData.resourceProvider().adapterMasterManagement().startStreamAdapter(adapterDescription.getElementId());
+          LOG.info("Adapter {} is recovered", 
adapterDescription.getElementId());
+
+        }
+      } catch (AdapterException e) {
+        LOG.warn("Could not start adapter {} ({})", 
adapterDescription.getName(), e.getMessage());
+      } catch (Exception e) {
+        LOG.error(
+            "Unexpected error while recovering adapter {} ({})",
+            adapterDescription.getName(),
+            e.getMessage()
+        );
+      }
+    }
+  }
+}
diff --git 
a/streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/ExtensionHealthCheck.java
 
b/streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/ExtensionHealthCheck.java
new file mode 100644
index 0000000000..e7d19bae28
--- /dev/null
+++ 
b/streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/ExtensionHealthCheck.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.health.monitoring;
+
+import org.apache.streampipes.health.monitoring.model.HealthCheckData;
+import org.apache.streampipes.model.health.ExtensionInstanceHealth;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+
+public class ExtensionHealthCheck implements Runnable {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(ExtensionHealthCheck.class);
+
+  private final ResourceProvider resourceProvider;
+
+  public ExtensionHealthCheck(ResourceProvider resourceProvider) {
+    this.resourceProvider = resourceProvider;
+  }
+
+  @Override
+  public void run() {
+    try {
+      var activeResources = resourceProvider.loadActiveResources();
+      var activeCoreInstances = 
resourceProvider.loadActiveInstances(activeResources);
+
+      var activeExtensionInstances = new HashMap<String, 
ExtensionInstanceHealth>();
+      activeCoreInstances.keySet().forEach(k -> {
+        activeExtensionInstances.put(k, new 
PipelineElementEndpointHealthCheck(k).checkRunningInstances());
+      });
+
+      var healthCheckData = new HealthCheckData(resourceProvider, 
activeResources, activeCoreInstances, activeExtensionInstances);
+      new PipelineHealthCheck(healthCheckData).runCheck();
+      new AdapterHealthCheck(healthCheckData).runCheck();
+    } catch (Exception e) {
+      LOG.warn("An unhandled error occurred while running health check.", e);
+    }
+  }
+}
diff --git 
a/streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/PipelineElementEndpointHealthCheck.java
 
b/streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/PipelineElementEndpointHealthCheck.java
new file mode 100644
index 0000000000..779735b297
--- /dev/null
+++ 
b/streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/PipelineElementEndpointHealthCheck.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.health.monitoring;
+
+import org.apache.streampipes.manager.execution.ExtensionServiceExecutions;
+import org.apache.streampipes.model.health.ExtensionInstanceHealth;
+import org.apache.streampipes.serializers.json.JacksonSerializer;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.http.util.EntityUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Set;
+
+public class PipelineElementEndpointHealthCheck {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(PipelineElementEndpointHealthCheck.class);
+  private static final String InstancePath = "/health";
+
+  private final String serviceBaseUrl;
+
+  public PipelineElementEndpointHealthCheck(String serviceBaseUrl) {
+    this.serviceBaseUrl = serviceBaseUrl;
+  }
+
+  public ExtensionInstanceHealth checkRunningInstances() {
+    try {
+      var request = 
ExtensionServiceExecutions.extServiceGetRequest(makeRequestUrl());
+      var response = request.execute().returnResponse();
+      if (response.getStatusLine().getStatusCode() != 200) {
+        return new ExtensionInstanceHealth(Set.of(), Set.of());
+      }
+      String body = EntityUtils.toString(response.getEntity(), 
StandardCharsets.UTF_8);
+      return deserialize(body);
+
+    } catch (IOException e) {
+      LOG.error("Extension service {} is unavailable", serviceBaseUrl);
+      return new ExtensionInstanceHealth(Set.of(), Set.of());
+    }
+  }
+
+  private ExtensionInstanceHealth deserialize(String json) throws 
JsonProcessingException {
+    return JacksonSerializer.getObjectMapper().readValue(json, 
ExtensionInstanceHealth.class);
+  }
+
+  private String makeRequestUrl() {
+    return serviceBaseUrl + InstancePath;
+  }
+}
diff --git 
a/streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/PipelineHealthCheck.java
 
b/streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/PipelineHealthCheck.java
new file mode 100644
index 0000000000..fcb44ac9e9
--- /dev/null
+++ 
b/streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/PipelineHealthCheck.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.health.monitoring;
+
+import 
org.apache.streampipes.commons.exceptions.NoServiceEndpointsAvailableException;
+import org.apache.streampipes.commons.prometheus.pipelines.PipelinesStats;
+import org.apache.streampipes.health.monitoring.model.HealthCheckData;
+import org.apache.streampipes.health.monitoring.utils.HealthCheckUtils;
+import 
org.apache.streampipes.manager.execution.endpoint.ExtensionsServiceEndpointGenerator;
+import 
org.apache.streampipes.manager.execution.endpoint.ExtensionsServiceEndpointUtils;
+import org.apache.streampipes.manager.execution.http.InvokeHttpRequest;
+import org.apache.streampipes.manager.storage.RunningPipelineElementStorage;
+import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
+import org.apache.streampipes.model.pipeline.Pipeline;
+import org.apache.streampipes.model.pipeline.PipelineHealthStatus;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static 
org.apache.streampipes.manager.pipeline.PipelineManager.getPipeline;
+
+public class PipelineHealthCheck {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(PipelineHealthCheck.class);
+  private static final int MAX_FAILED_ATTEMPTS = 10;
+
+  private static final Map<String, Integer> failedRestartAttempts = new 
HashMap<>();
+  private static final PipelinesStats pipelinesStats = new PipelinesStats();
+
+  private final HealthCheckData healthCheckData;
+
+  public PipelineHealthCheck(HealthCheckData healthCheckData) {
+    this.healthCheckData = healthCheckData;
+  }
+
+  public void runCheck() {
+    try {
+      initPipelineMetrics();
+
+      if (!healthCheckData.activeResources().runningPipelines().isEmpty()) {
+        checkAndRestorePipelineElements();
+      }
+      pipelinesStats.metrics();
+    } catch (Exception e) {
+      LOG.error("Error while checking and restoring pipeline elements", e);
+    }
+  }
+
+  private void initPipelineMetrics() {
+    pipelinesStats.clear();
+    
pipelinesStats.setAllPipelines(healthCheckData.activeResources().allPipelines().size());
+    
pipelinesStats.setRunningPipelines(healthCheckData.activeResources().runningPipelines().size());
+    pipelinesStats.setStoppedPipelines(pipelinesStats.getAllPipelines()
+        - pipelinesStats.getRunningPipelines());
+
+    for (Pipeline p : healthCheckData.activeResources().allPipelines()) {
+      pipelinesStats.updatePipelineHealthState(
+          p.getElementId(),
+          p.getName(),
+          p.getHealthStatus().toString());
+
+      pipelinesStats.updatePipelineRunningState(
+          p.getElementId(),
+          p.getName(),
+          p.isRunning());
+    }
+  }
+
+  private void checkAndRestorePipelineElements() {
+    healthCheckData.activeResources().runningPipelines().forEach(pipeline -> {
+      AtomicBoolean shouldUpdatePipeline = new AtomicBoolean(false);
+      List<String> failedInstances = new ArrayList<>();
+      List<String> recoveredInstances = new ArrayList<>();
+      List<String> pipelineNotifications = new ArrayList<>();
+      List<InvocableStreamPipesEntity> runningPipelineElements = 
RunningPipelineElementStorage.runningProcessorsAndSinks
+          .get(pipeline.getPipelineId());
+
+      if (Objects.nonNull(runningPipelineElements)) {
+        runningPipelineElements.forEach(pipelineElement -> {
+          String instanceId = 
HealthCheckUtils.extractInstanceId(pipelineElement);
+          if (isNowhereRunning(instanceId)) {
+            if (shouldRetry(instanceId)) {
+              var serviceBaseUrl = pipelineElement.getSelectedEndpointUrl();
+              shouldUpdatePipeline.set(true);
+              boolean success;
+              try {
+                serviceBaseUrl = new 
ExtensionsServiceEndpointGenerator().getEndpointBaseUrl(
+                    pipelineElement.getAppId(),
+                    
ExtensionsServiceEndpointUtils.getPipelineElementType(pipelineElement.getAppId()),
+                    Collections.emptySet()
+                );
+                var invocationUrl = getInvocationUrl(pipelineElement, 
serviceBaseUrl);
+                success = new InvokeHttpRequest()
+                    .execute(pipelineElement, invocationUrl, 
pipeline.getPipelineId()).isSuccess();
+              } catch (NoServiceEndpointsAvailableException e) {
+                success = false;
+              }
+              if (!success) {
+                failedInstances.add(instanceId);
+                
HealthCheckUtils.addFailedAttemptNotification(pipelineNotifications, 
pipelineElement);
+                increaseFailedAttempt(instanceId);
+                LOG.info("Could not restore pipeline element {} of pipeline {} 
({}/{})",
+                    pipelineElement.getName(), pipeline.getName(), 
failedRestartAttempts.get(instanceId),
+                    MAX_FAILED_ATTEMPTS);
+              } else {
+                recoveredInstances.add(instanceId);
+                
HealthCheckUtils.addSuccessfulRestoreNotification(pipelineNotifications, 
pipelineElement);
+                resetFailedAttempts(instanceId);
+                pipelineElement.setSelectedEndpointUrl(serviceBaseUrl);
+                LOG.info("Successfully restored pipeline element {} of 
pipeline {}",
+                    pipelineElement.getName(), pipeline.getName());
+              }
+            }
+          }
+        });
+      }
+      if (shouldUpdatePipeline.get()) {
+        var currentPipeline = getPipeline(pipeline.getPipelineId());
+        if (!failedInstances.isEmpty()) {
+          currentPipeline.setHealthStatus(PipelineHealthStatus.FAILURE);
+          pipelinesStats.failedIncrease();
+        } else if (!recoveredInstances.isEmpty()) {
+          
currentPipeline.setHealthStatus(PipelineHealthStatus.REQUIRES_ATTENTION);
+          pipelinesStats.attentionRequiredIncrease();
+        }
+        currentPipeline.setPipelineNotifications(pipelineNotifications);
+        
healthCheckData.resourceProvider().pipelineStorage().updateElement(currentPipeline);
+        
pipelinesStats.updatePipelineHealthState(currentPipeline.getElementId(), 
currentPipeline.getName(),
+            currentPipeline.getHealthStatus().toString());
+      }
+    });
+    int healthNum = pipelinesStats.getRunningPipelines() - 
pipelinesStats.getFailedPipelines()
+        - pipelinesStats.getAttentionRequiredPipelines();
+    pipelinesStats.setHealthyPipelines(healthNum);
+    
pipelinesStats.setElementCount(getElementsCount(healthCheckData.activeResources().allPipelines()));
+  }
+
+  private boolean isNowhereRunning(String instanceId) {
+    return (healthCheckData.activeExtensionInstances().entrySet().stream()
+        .noneMatch(entry -> 
entry.getValue().runningPipelineElementInstanceIds().contains(instanceId)));
+  }
+
+  private boolean shouldRetry(String instanceId) {
+    if (!failedRestartAttempts.containsKey(instanceId)) {
+      return true;
+    } else {
+      return failedRestartAttempts.get(instanceId) < MAX_FAILED_ATTEMPTS;
+    }
+  }
+
+  private void resetFailedAttempts(String instanceId) {
+    failedRestartAttempts.put(instanceId, 0);
+  }
+
+  private void increaseFailedAttempt(String instanceId) {
+    if (!failedRestartAttempts.containsKey(instanceId)) {
+      failedRestartAttempts.put(instanceId, 1);
+    } else {
+      Integer currentAttempt = failedRestartAttempts.get(instanceId) + 1;
+      failedRestartAttempts.put(instanceId, currentAttempt);
+    }
+  }
+
+  private int getElementsCount(List<Pipeline> allPipelines) {
+    return allPipelines.stream().mapToInt(pipeline -> 
pipeline.getActions().size()).sum();
+  }
+
+  private String getInvocationUrl(InvocableStreamPipesEntity pipelineElement,
+                                  String baseUrl) {
+    return ExtensionsServiceEndpointUtils
+        .getPipelineElementType(pipelineElement)
+        .getInvocationUrl(baseUrl, pipelineElement.getAppId());
+  }
+}
diff --git 
a/streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/ResourceProvider.java
 
b/streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/ResourceProvider.java
new file mode 100644
index 0000000000..aa878b06be
--- /dev/null
+++ 
b/streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/ResourceProvider.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.health.monitoring;
+
+import 
org.apache.streampipes.connect.management.management.AdapterMasterManagement;
+import org.apache.streampipes.health.monitoring.model.ActiveCoreInstances;
+import org.apache.streampipes.health.monitoring.model.ActiveResources;
+import org.apache.streampipes.manager.storage.RunningPipelineElementStorage;
+import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
+import org.apache.streampipes.model.connect.adapter.AdapterDescription;
+import org.apache.streampipes.model.pipeline.Pipeline;
+import org.apache.streampipes.storage.api.IAdapterStorage;
+import org.apache.streampipes.storage.api.IPipelineStorage;
+
+import java.util.AbstractMap;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public record ResourceProvider(IPipelineStorage pipelineStorage,
+                               IAdapterStorage adapterInstanceStorage,
+                               AdapterMasterManagement 
adapterMasterManagement) {
+
+  public ActiveResources loadActiveResources() {
+    var allPipelines = pipelineStorage.findAll();
+    var runningPipelines = 
allPipelines.stream().filter(Pipeline::isRunning).toList();
+    var allAdapters = adapterInstanceStorage.findAll();
+    var runningAdapters = 
allAdapters.stream().filter(AdapterDescription::isRunning).toList();
+
+    return new ActiveResources(
+        allPipelines,
+        runningPipelines,
+        allAdapters,
+        runningAdapters
+    );
+  }
+
+  public Map<String, ActiveCoreInstances> loadActiveInstances(ActiveResources 
activeResources) {
+    Map<String, List<AdapterDescription>> adaptersByUrl =
+        activeResources.runningAdapters()
+            .stream()
+            .filter(a -> a.getSelectedEndpointUrl() != null)
+            
.collect(Collectors.groupingBy(AdapterDescription::getSelectedEndpointUrl));
+
+    Map<String, Map<String, InvocableStreamPipesEntity>> elementsByUrl =
+        activeResources.runningPipelines()
+            .stream()
+            .flatMap(p -> {
+              String pipelineId = p.getPipelineId();
+              return Optional.ofNullable(
+                      
RunningPipelineElementStorage.runningProcessorsAndSinks.get(pipelineId)
+                  )
+                  .orElseGet(List::of)
+                  .stream()
+                  .filter(Objects::nonNull)
+                  .map(e -> new AbstractMap.SimpleEntry<>(pipelineId, e));
+            })
+            .filter(entry -> entry.getValue().getSelectedEndpointUrl() != null)
+            .collect(Collectors.groupingBy(
+                entry -> entry.getValue().getSelectedEndpointUrl(),
+                Collectors.toMap(
+                    Map.Entry::getKey,
+                    AbstractMap.SimpleEntry::getValue,
+                    (left, right) -> left)
+            ));
+
+    Set<String> allUrls = new HashSet<>();
+    allUrls.addAll(adaptersByUrl.keySet());
+    allUrls.addAll(elementsByUrl.keySet());
+
+    Map<String, ActiveCoreInstances> result = new HashMap<>();
+    for (String url : allUrls) {
+      List<AdapterDescription> adapters = adaptersByUrl.getOrDefault(url, 
List.of());
+      Map<String, InvocableStreamPipesEntity> elementsPerPipeline = 
elementsByUrl.getOrDefault(url, Map.of());
+      result.put(url, new ActiveCoreInstances(adapters, elementsPerPipeline));
+    }
+
+    return result;
+  }
+}
diff --git 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/health/ServiceHealthCheck.java
 
b/streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/ServiceHealthCheck.java
similarity index 83%
rename from 
streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/health/ServiceHealthCheck.java
rename to 
streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/ServiceHealthCheck.java
index 949b65fa0a..e252efd755 100644
--- 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/health/ServiceHealthCheck.java
+++ 
b/streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/ServiceHealthCheck.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  *
  */
-package org.apache.streampipes.manager.health;
+package org.apache.streampipes.health.monitoring;
 
 
 import org.apache.streampipes.commons.environment.Environment;
@@ -24,7 +24,7 @@ import org.apache.streampipes.loadbalance.LoadManager;
 import org.apache.streampipes.manager.execution.ExtensionServiceExecutions;
 import 
org.apache.streampipes.model.extensions.svcdiscovery.SpServiceRegistration;
 import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceStatus;
-import org.apache.streampipes.storage.management.StorageDispatcher;
+import org.apache.streampipes.storage.api.CRUDStorage;
 
 import org.apache.http.HttpStatus;
 import org.slf4j.Logger;
@@ -32,9 +32,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.HashSet;
 import java.util.List;
-import java.util.stream.Collectors;
 
 public class ServiceHealthCheck implements Runnable {
 
@@ -45,8 +43,7 @@ public class ServiceHealthCheck implements Runnable {
 
   private final List<SpServiceRegistration> needDeletedServices = new 
ArrayList<>();
 
-  public ServiceHealthCheck() {
-    var storage = 
StorageDispatcher.INSTANCE.getNoSqlStore().getExtensionsServiceStorage();
+  public ServiceHealthCheck(CRUDStorage<SpServiceRegistration> storage) {
     this.serviceRegistrationManager = new ServiceRegistrationManager(storage);
     this.maxUnhealthyDurationBeforeRemovalMs = Environments.getEnvironment()
         .getUnhealthyTimeBeforeServiceDeletionInMillis().getValueOrDefault();
@@ -68,10 +65,8 @@ public class ServiceHealthCheck implements Runnable {
     } finally {
       needDeletedServices.clear();
     }
-    new PipelineHealthCheck().run();
   }
 
-
   private void checkServiceHealth(SpServiceRegistration service) {
     String healthCheckUrl = makeHealthCheckUrl(service);
 
@@ -117,15 +112,4 @@ public class ServiceHealthCheck implements Runnable {
   private List<SpServiceRegistration> getRegisteredServices() {
     return serviceRegistrationManager.getAllServices();
   }
-
-  public static List<SpServiceRegistration> getService(String tag,
-                                                       
List<SpServiceRegistration> activeServices) {
-    return activeServices.stream().filter(service -> filtersSupported(service, 
tag))
-        .filter(service -> service.getStatus() != SpServiceStatus.HEALTHY)
-        .collect(Collectors.toList());
-  }
-
-  private static boolean filtersSupported(SpServiceRegistration service, 
String tag) {
-    return new HashSet<>(service.getTags()).stream().anyMatch(t -> 
t.asString().equals(tag));
-  }
 }
diff --git 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/health/ServiceRegistrationManager.java
 
b/streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/ServiceRegistrationManager.java
similarity index 95%
rename from 
streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/health/ServiceRegistrationManager.java
rename to 
streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/ServiceRegistrationManager.java
index 3bbee91f74..e3fb6271e1 100644
--- 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/health/ServiceRegistrationManager.java
+++ 
b/streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/ServiceRegistrationManager.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  *
  */
-package org.apache.streampipes.manager.health;
+package org.apache.streampipes.health.monitoring;
 
 import 
org.apache.streampipes.model.extensions.svcdiscovery.SpServiceRegistration;
 import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceStatus;
@@ -81,10 +81,6 @@ public class ServiceRegistrationManager {
              serviceRegistration.getSvcId());
   }
 
-  public SpServiceStatus getServiceStatus(String serviceId) {
-    return storage.getElementById(serviceId).getStatus();
-  }
-
   private void logService(SpServiceRegistration serviceRegistration) {
     LOG.info("Service {} (id={}) is now in {} state", 
serviceRegistration.getSvcGroup(),
              serviceRegistration.getSvcId(), serviceRegistration.getStatus());
diff --git 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/DetachPipelineElementSubmitter.java
 
b/streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/model/ActiveCoreInstances.java
similarity index 50%
copy from 
streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/DetachPipelineElementSubmitter.java
copy to 
streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/model/ActiveCoreInstances.java
index 16b8772ca4..272ed4a0c4 100644
--- 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/DetachPipelineElementSubmitter.java
+++ 
b/streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/model/ActiveCoreInstances.java
@@ -16,33 +16,14 @@
  *
  */
 
-package org.apache.streampipes.manager.execution.http;
+package org.apache.streampipes.health.monitoring.model;
 
-import org.apache.streampipes.model.api.EndpointSelectable;
 import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
-import org.apache.streampipes.model.pipeline.Pipeline;
-import org.apache.streampipes.model.pipeline.PipelineElementStatus;
+import org.apache.streampipes.model.connect.adapter.AdapterDescription;
 
 import java.util.List;
+import java.util.Map;
 
-public class DetachPipelineElementSubmitter extends PipelineElementSubmitter {
-
-  public DetachPipelineElementSubmitter(Pipeline pipeline) {
-    super(pipeline);
-  }
-
-  @Override
-  protected PipelineElementStatus submitElement(EndpointSelectable 
pipelineElement) {
-    return performDetach(pipelineElement);
-  }
-
-  @Override
-  protected void onSuccess() {
-    status.setTitle("Pipeline " + pipelineName + " successfully stopped");
-  }
-
-  @Override
-  protected void onFailure(List<InvocableStreamPipesEntity> 
processorsAndSinks) {
-    status.setTitle("Could not stop all pipeline elements of pipeline " + 
pipelineName + ".");
-  }
+public record ActiveCoreInstances(List<AdapterDescription> adapters,
+                                  Map<String, InvocableStreamPipesEntity> 
elementsPerPipeline) {
 }
diff --git 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/DetachPipelineElementSubmitter.java
 
b/streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/model/ActiveResources.java
similarity index 50%
copy from 
streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/DetachPipelineElementSubmitter.java
copy to 
streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/model/ActiveResources.java
index 16b8772ca4..862ef6d64a 100644
--- 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/DetachPipelineElementSubmitter.java
+++ 
b/streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/model/ActiveResources.java
@@ -16,33 +16,15 @@
  *
  */
 
-package org.apache.streampipes.manager.execution.http;
+package org.apache.streampipes.health.monitoring.model;
 
-import org.apache.streampipes.model.api.EndpointSelectable;
-import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
+import org.apache.streampipes.model.connect.adapter.AdapterDescription;
 import org.apache.streampipes.model.pipeline.Pipeline;
-import org.apache.streampipes.model.pipeline.PipelineElementStatus;
 
 import java.util.List;
 
-public class DetachPipelineElementSubmitter extends PipelineElementSubmitter {
-
-  public DetachPipelineElementSubmitter(Pipeline pipeline) {
-    super(pipeline);
-  }
-
-  @Override
-  protected PipelineElementStatus submitElement(EndpointSelectable 
pipelineElement) {
-    return performDetach(pipelineElement);
-  }
-
-  @Override
-  protected void onSuccess() {
-    status.setTitle("Pipeline " + pipelineName + " successfully stopped");
-  }
-
-  @Override
-  protected void onFailure(List<InvocableStreamPipesEntity> 
processorsAndSinks) {
-    status.setTitle("Could not stop all pipeline elements of pipeline " + 
pipelineName + ".");
-  }
+public record ActiveResources(List<Pipeline> allPipelines,
+                              List<Pipeline> runningPipelines,
+                              List<AdapterDescription> allAdapters,
+                              List<AdapterDescription> runningAdapters) {
 }
diff --git 
a/streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/model/HealthCheckData.java
 
b/streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/model/HealthCheckData.java
new file mode 100644
index 0000000000..6a0744d10f
--- /dev/null
+++ 
b/streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/model/HealthCheckData.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.health.monitoring.model;
+
+import org.apache.streampipes.health.monitoring.ResourceProvider;
+import org.apache.streampipes.model.health.ExtensionInstanceHealth;
+
+import java.util.Map;
+
+public record HealthCheckData(ResourceProvider resourceProvider,
+                              ActiveResources activeResources,
+                              Map<String, ActiveCoreInstances> 
activeCoreInstances,
+                              Map<String, ExtensionInstanceHealth> 
activeExtensionInstances) {
+}
diff --git 
a/streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/utils/HealthCheckUtils.java
 
b/streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/utils/HealthCheckUtils.java
new file mode 100644
index 0000000000..c94e80a8e7
--- /dev/null
+++ 
b/streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/utils/HealthCheckUtils.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.health.monitoring.utils;
+
+import org.apache.streampipes.commons.constants.InstanceIdExtractor;
+import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
+
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.List;
+
+public class HealthCheckUtils {
+
+  public static void addSuccessfulRestoreNotification(List<String> 
pipelineNotifications,
+                                                InvocableStreamPipesEntity 
pipelineElement) {
+    pipelineNotifications.add(getCurrentDatetime() + "Pipeline element '" + 
pipelineElement.getName()
+        + "' was not available and was successfully restored.");
+  }
+
+  public static void addFailedAttemptNotification(List<String> 
pipelineNotifications,
+                                            InvocableStreamPipesEntity 
pipelineElement) {
+    pipelineNotifications.add(getCurrentDatetime() + "Pipeline element '" + 
pipelineElement.getName()
+        + "' was not available and could not be restored.");
+  }
+
+  private static String getCurrentDatetime() {
+    DateTimeFormatter dtf = DateTimeFormatter.ofPattern("uuuu/MM/dd HH:mm:ss");
+    LocalDateTime now = LocalDateTime.now();
+    return "[" + dtf.format(now) + "] ";
+  }
+
+  public static String extractInstanceId(InvocableStreamPipesEntity 
pipelineElement) {
+    return InstanceIdExtractor.extractId(pipelineElement.getElementId());
+  }
+}
diff --git 
a/streampipes-model/src/main/java/org/apache/streampipes/model/health/ExtensionInstanceHealth.java
 
b/streampipes-model/src/main/java/org/apache/streampipes/model/health/ExtensionInstanceHealth.java
new file mode 100644
index 0000000000..2adf8a0839
--- /dev/null
+++ 
b/streampipes-model/src/main/java/org/apache/streampipes/model/health/ExtensionInstanceHealth.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.model.health;
+
+import java.util.Set;
+
+public record ExtensionInstanceHealth(Set<String> runningAdapterInstanceIds,
+                                      Set<String> 
runningPipelineElementInstanceIds) {
+}
diff --git 
a/streampipes-model/src/main/java/org/apache/streampipes/model/util/PropertyUtils.java
 
b/streampipes-model/src/main/java/org/apache/streampipes/model/util/PropertyUtils.java
index 5cf7cba8dd..028982f953 100644
--- 
a/streampipes-model/src/main/java/org/apache/streampipes/model/util/PropertyUtils.java
+++ 
b/streampipes-model/src/main/java/org/apache/streampipes/model/util/PropertyUtils.java
@@ -30,10 +30,6 @@ import java.util.Map;
 
 public class PropertyUtils {
 
-  public static Map<String, Object> getRuntimeFormat(EventProperty 
eventProperty) {
-    return getUntypedRuntimeFormat(eventProperty);
-  }
-
   public static Map<String, Object> getUntypedRuntimeFormat(EventProperty ep) {
     if (ep instanceof EventPropertyPrimitive) {
       Map<String, Object> result = new HashMap<>();
diff --git 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/ExtensionServiceExecutions.java
 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/ExtensionServiceExecutions.java
index 70bd2e8cef..e4aca6b6c0 100644
--- 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/ExtensionServiceExecutions.java
+++ 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/ExtensionServiceExecutions.java
@@ -35,7 +35,6 @@ public class ExtensionServiceExecutions {
         .socketTimeout(10000);
   }
 
-
   private static String getServiceAdminSid() {
     return new 
SpResourceManager().manageUsers().getServiceAdmin().getPrincipalId();
   }
diff --git 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/PipelineExecutionInfo.java
 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/PipelineExecutionInfo.java
index 6451131d65..49d3ff72d4 100644
--- 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/PipelineExecutionInfo.java
+++ 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/PipelineExecutionInfo.java
@@ -67,7 +67,7 @@ public class PipelineExecutionInfo {
     return processorsAndSinks;
   }
 
-  public void applyPipelineOperationStatus(PipelineOperationStatus status) {
+  public void setPipelineOperationStatus(PipelineOperationStatus status) {
     this.pipelineOperationStatus = status;
   }
 
@@ -80,6 +80,6 @@ public class PipelineExecutionInfo {
   }
 
   public boolean isOperationSuccessful() {
-    return failedServices.size() == 0 && pipelineOperationStatus.isSuccess();
+    return failedServices.isEmpty() && pipelineOperationStatus.isSuccess();
   }
 }
diff --git 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/endpoint/ExtensionsServiceEndpointGenerator.java
 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/endpoint/ExtensionsServiceEndpointGenerator.java
index 5987e769c3..1b2d8ea647 100644
--- 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/endpoint/ExtensionsServiceEndpointGenerator.java
+++ 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/endpoint/ExtensionsServiceEndpointGenerator.java
@@ -44,20 +44,23 @@ public class ExtensionsServiceEndpointGenerator implements 
IExtensionsServiceEnd
 
   public ExtensionsServiceEndpointGenerator() {}
 
-  public String getEndpointResourceUrl(String appId, SpServiceUrlProvider 
spServiceUrlProvider,
+  public String getEndpointResourceUrl(String appId,
+                                       SpServiceUrlProvider 
spServiceUrlProvider,
                                        Set<SpServiceTag> customServiceTags)
       throws NoServiceEndpointsAvailableException {
     return spServiceUrlProvider
         .getInvocationUrl(selectService(appId, spServiceUrlProvider, 
customServiceTags), appId);
   }
 
-  public String getEndpointBaseUrl(String appId, SpServiceUrlProvider 
spServiceUrlProvider,
+  public String getEndpointBaseUrl(String appId,
+                                   SpServiceUrlProvider spServiceUrlProvider,
                                    Set<SpServiceTag> customServiceTags)
       throws NoServiceEndpointsAvailableException {
     return selectService(appId, spServiceUrlProvider, customServiceTags);
   }
 
-  private String selectService(String appId, SpServiceUrlProvider 
spServiceUrlProvider,
+  private String selectService(String appId,
+                               SpServiceUrlProvider spServiceUrlProvider,
                                Set<SpServiceTag> customServiceTags)
       throws NoServiceEndpointsAvailableException {
     Environment env = Environments.getEnvironment();
@@ -84,7 +87,8 @@ public class ExtensionsServiceEndpointGenerator implements 
IExtensionsServiceEnd
         "Could not find any matching service endpoints - are all software 
components running?");
   }
 
-  private List<String> getServiceEndpoints(String appId, SpServiceUrlProvider 
spServiceUrlProvider,
+  private List<String> getServiceEndpoints(String appId,
+                                           SpServiceUrlProvider 
spServiceUrlProvider,
                                            Set<SpServiceTag> 
customServiceTags) {
     return SpServiceDiscovery.getServiceDiscovery()
         .getServiceEndpoints(DefaultSpServiceTypes.EXT, true,
diff --git 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineElementSubmitter.java
 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/BasePipelineElementSubmitter.java
similarity index 71%
rename from 
streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineElementSubmitter.java
rename to 
streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/BasePipelineElementSubmitter.java
index 14937b8e56..7462e6f4d9 100644
--- 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineElementSubmitter.java
+++ 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/BasePipelineElementSubmitter.java
@@ -18,7 +18,7 @@
 
 package org.apache.streampipes.manager.execution.http;
 
-import org.apache.streampipes.model.api.EndpointSelectable;
+import 
org.apache.streampipes.manager.execution.endpoint.ExtensionsServiceEndpointUtils;
 import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
 import org.apache.streampipes.model.pipeline.Pipeline;
 import org.apache.streampipes.model.pipeline.PipelineElementStatus;
@@ -26,14 +26,14 @@ import 
org.apache.streampipes.model.pipeline.PipelineOperationStatus;
 
 import java.util.List;
 
-public abstract class PipelineElementSubmitter {
+public abstract class BasePipelineElementSubmitter {
 
   protected final String pipelineId;
   protected final String pipelineName;
 
   protected final PipelineOperationStatus status;
 
-  public PipelineElementSubmitter(Pipeline pipeline) {
+  public BasePipelineElementSubmitter(Pipeline pipeline) {
     this.pipelineId = pipeline.getPipelineId();
     this.pipelineName = pipeline.getName();
     this.status = new PipelineOperationStatus(pipelineId, pipelineName);
@@ -41,7 +41,10 @@ public abstract class PipelineElementSubmitter {
 
   public PipelineOperationStatus submit(List<InvocableStreamPipesEntity> 
processorsAndSinks) {
     // First, try handling all data processors and sinks
-    processorsAndSinks.forEach(g -> 
status.addPipelineElementStatus(submitElement(g)));
+    processorsAndSinks.forEach(g -> {
+      var response = submitElement(g);
+      status.addPipelineElementStatus(response);
+    });
 
     applySuccess(processorsAndSinks);
     return status;
@@ -60,12 +63,18 @@ public abstract class PipelineElementSubmitter {
     }
   }
 
-  protected PipelineElementStatus performDetach(EndpointSelectable 
pipelineElement) {
-    String endpointUrl = pipelineElement.getSelectedEndpointUrl() + 
pipelineElement.getDetachPath();
+  protected String getInvocationUrl(InvocableStreamPipesEntity 
pipelineElement) {
+    return ExtensionsServiceEndpointUtils
+        .getPipelineElementType(pipelineElement)
+        .getInvocationUrl(pipelineElement.getSelectedEndpointUrl(), 
pipelineElement.getAppId());
+  }
+
+  protected PipelineElementStatus performDetach(InvocableStreamPipesEntity 
pipelineElement) {
+    String endpointUrl = getInvocationUrl(pipelineElement) + 
pipelineElement.getDetachPath();
     return new DetachHttpRequest().execute(pipelineElement, endpointUrl, 
this.pipelineId);
   }
 
-  protected abstract PipelineElementStatus submitElement(EndpointSelectable 
pipelineElement);
+  protected abstract PipelineElementStatus 
submitElement(InvocableStreamPipesEntity pipelineElement);
 
   protected abstract void onSuccess();
 
diff --git 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/DetachPipelineElementSubmitter.java
 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/DetachPipelineElementSubmitter.java
index 16b8772ca4..816a043c2f 100644
--- 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/DetachPipelineElementSubmitter.java
+++ 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/DetachPipelineElementSubmitter.java
@@ -18,21 +18,20 @@
 
 package org.apache.streampipes.manager.execution.http;
 
-import org.apache.streampipes.model.api.EndpointSelectable;
 import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
 import org.apache.streampipes.model.pipeline.Pipeline;
 import org.apache.streampipes.model.pipeline.PipelineElementStatus;
 
 import java.util.List;
 
-public class DetachPipelineElementSubmitter extends PipelineElementSubmitter {
+public class DetachPipelineElementSubmitter extends 
BasePipelineElementSubmitter {
 
   public DetachPipelineElementSubmitter(Pipeline pipeline) {
     super(pipeline);
   }
 
   @Override
-  protected PipelineElementStatus submitElement(EndpointSelectable 
pipelineElement) {
+  protected PipelineElementStatus submitElement(InvocableStreamPipesEntity 
pipelineElement) {
     return performDetach(pipelineElement);
   }
 
diff --git 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/InvokePipelineElementSubmitter.java
 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/InvokePipelineElementSubmitter.java
index 986bfe6ead..238214a20f 100644
--- 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/InvokePipelineElementSubmitter.java
+++ 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/InvokePipelineElementSubmitter.java
@@ -18,7 +18,6 @@
 
 package org.apache.streampipes.manager.execution.http;
 
-import org.apache.streampipes.model.api.EndpointSelectable;
 import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
 import org.apache.streampipes.model.pipeline.Pipeline;
 import org.apache.streampipes.model.pipeline.PipelineElementStatus;
@@ -30,7 +29,7 @@ import org.slf4j.LoggerFactory;
 import java.util.List;
 import java.util.Optional;
 
-public class InvokePipelineElementSubmitter extends PipelineElementSubmitter {
+public class InvokePipelineElementSubmitter extends 
BasePipelineElementSubmitter {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(InvokePipelineElementSubmitter.class);
 
@@ -39,9 +38,9 @@ public class InvokePipelineElementSubmitter extends 
PipelineElementSubmitter {
   }
 
   @Override
-  protected PipelineElementStatus submitElement(EndpointSelectable 
pipelineElement) {
-    String endpointUrl = pipelineElement.getSelectedEndpointUrl();
-    return new InvokeHttpRequest().execute(pipelineElement, endpointUrl, 
this.pipelineId);
+  protected PipelineElementStatus submitElement(InvocableStreamPipesEntity 
pipelineElement) {
+    var invocationUrl = getInvocationUrl(pipelineElement);
+    return new InvokeHttpRequest().execute(pipelineElement, invocationUrl, 
this.pipelineId);
   }
 
   @Override
diff --git 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/task/DiscoverEndpointsTask.java
 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/task/DiscoverEndpointsTask.java
index 78ca9ee3ce..a3b6cb5141 100644
--- 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/task/DiscoverEndpointsTask.java
+++ 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/task/DiscoverEndpointsTask.java
@@ -18,20 +18,17 @@
 
 package org.apache.streampipes.manager.execution.task;
 
-import 
org.apache.streampipes.commons.exceptions.NoServiceEndpointsAvailableException;
 import org.apache.streampipes.loadbalance.LoadManager;
 import org.apache.streampipes.loadbalance.unit.PipelineElementPartitioner;
 import org.apache.streampipes.manager.execution.PipelineExecutionInfo;
-import 
org.apache.streampipes.manager.execution.endpoint.ExtensionsServiceEndpointGenerator;
-import 
org.apache.streampipes.manager.execution.endpoint.ExtensionsServiceEndpointUtils;
 import org.apache.streampipes.model.api.EndpointSelectable;
-import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
 import 
org.apache.streampipes.model.extensions.svcdiscovery.SpServiceRegistration;
 import org.apache.streampipes.model.pipeline.Pipeline;
 import org.apache.streampipes.model.pipeline.PipelineElementStatus;
 import org.apache.streampipes.model.pipeline.PipelineOperationStatus;
 
 import java.util.List;
+import java.util.Objects;
 import java.util.stream.Collectors;
 
 public class DiscoverEndpointsTask implements PipelineExecutionTask {
@@ -40,14 +37,13 @@ public class DiscoverEndpointsTask implements 
PipelineExecutionTask {
                           PipelineExecutionInfo executionInfo) {
     for (PipelineElementPartitioner.ResourceUnitWithServices unit : 
PipelineElementPartitioner.partitionPipeline(pipeline).getResourceUnits()){
       SpServiceRegistration registration = 
LoadManager.allocation(unit.getCompatibleServices(), pipeline.getLabels());
-      String url = registration.getServiceUrl();
-      for (InvocableStreamPipesEntity el : 
unit.getResourceUnit().getElements()) {
-        try {
-          var endpointUrl = getSelectedEndpoint(el, url);
-          applyEndpointAndPipeline(pipeline.getPipelineId(), el, endpointUrl);
-        } catch (NoServiceEndpointsAvailableException en) {
-          executionInfo.addFailedPipelineElement(el);
-        }
+      if (Objects.nonNull(registration)) {
+        String endpointBaseUrl = registration.getServiceUrl();
+        unit.getResourceUnit().getElements().forEach(el -> {
+          applyEndpointAndPipeline(pipeline.getPipelineId(), el, 
endpointBaseUrl);
+        });
+      } else {
+        
unit.getResourceUnit().getElements().forEach(executionInfo::addFailedPipelineElement);
       }
     }
 
@@ -62,7 +58,7 @@ public class DiscoverEndpointsTask implements 
PipelineExecutionTask {
           pipeline.getName(),
           "Could not start pipeline " + pipeline.getName() + ".",
           pe);
-      executionInfo.applyPipelineOperationStatus(status);
+      executionInfo.setPipelineOperationStatus(status);
     }
   }
 
@@ -72,19 +68,4 @@ public class DiscoverEndpointsTask implements 
PipelineExecutionTask {
     pipelineElement.setSelectedEndpointUrl(endpointUrl);
     pipelineElement.setCorrespondingPipeline(pipelineId);
   }
-
-  private String findSelectedEndpoint(InvocableStreamPipesEntity 
pipelineElement)
-      throws NoServiceEndpointsAvailableException {
-    return new ExtensionsServiceEndpointGenerator()
-        .getEndpointResourceUrl(
-            pipelineElement.getAppId(),
-            
ExtensionsServiceEndpointUtils.getPipelineElementType(pipelineElement)
-        );
-  }
-  private String getSelectedEndpoint(InvocableStreamPipesEntity 
pipelineElement, String url)
-          throws NoServiceEndpointsAvailableException {
-    return ExtensionsServiceEndpointUtils
-            .getPipelineElementType(pipelineElement)
-            .getInvocationUrl(url,pipelineElement.getAppId());
-  }
 }
diff --git 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/task/SubmitRequestTask.java
 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/task/SubmitRequestTask.java
index 4805fe9b47..26c2b71075 100644
--- 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/task/SubmitRequestTask.java
+++ 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/task/SubmitRequestTask.java
@@ -19,16 +19,16 @@
 package org.apache.streampipes.manager.execution.task;
 
 import org.apache.streampipes.manager.execution.PipelineExecutionInfo;
-import org.apache.streampipes.manager.execution.http.PipelineElementSubmitter;
+import 
org.apache.streampipes.manager.execution.http.BasePipelineElementSubmitter;
 import 
org.apache.streampipes.manager.execution.provider.PipelineElementProvider;
 import org.apache.streampipes.model.pipeline.Pipeline;
 
 public class SubmitRequestTask implements PipelineExecutionTask {
 
   private final PipelineElementProvider elementProvider;
-  private final PipelineElementSubmitter submitter;
+  private final BasePipelineElementSubmitter submitter;
 
-  public SubmitRequestTask(PipelineElementSubmitter submitter,
+  public SubmitRequestTask(BasePipelineElementSubmitter submitter,
                            PipelineElementProvider elementProvider) {
     this.elementProvider = elementProvider;
     this.submitter = submitter;
@@ -36,7 +36,7 @@ public class SubmitRequestTask implements 
PipelineExecutionTask {
 
   @Override
   public boolean shouldExecute(PipelineExecutionInfo executionInfo) {
-    return executionInfo.getFailedServices().size() == 0;
+    return executionInfo.getFailedServices().isEmpty();
   }
 
   @Override
@@ -45,6 +45,6 @@ public class SubmitRequestTask implements 
PipelineExecutionTask {
 
     var status = submitter.submit(processorsAndSinks);
 
-    executionInfo.applyPipelineOperationStatus(status);
+    executionInfo.setPipelineOperationStatus(status);
   }
 }
diff --git 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/health/PipelineElementEndpointHealthCheck.java
 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/health/PipelineElementEndpointHealthCheck.java
deleted file mode 100644
index 9bec880a89..0000000000
--- 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/health/PipelineElementEndpointHealthCheck.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.manager.health;
-
-import org.apache.streampipes.manager.execution.ExtensionServiceExecutions;
-import org.apache.streampipes.serializers.json.JacksonSerializer;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.List;
-
-public class PipelineElementEndpointHealthCheck {
-
-  private static final String InstancePath = "/instances";
-
-  private final String endpointUrl;
-
-  public PipelineElementEndpointHealthCheck(String endpointUrl) {
-    this.endpointUrl = endpointUrl;
-  }
-
-  public List<String> checkRunningInstances() throws IOException {
-    var request = 
ExtensionServiceExecutions.extServiceGetRequest(makeRequestUrl());
-    return asList(request.execute().returnContent().toString());
-  }
-
-  private List<String> asList(String json) throws JsonProcessingException {
-    return Arrays.asList(JacksonSerializer.getObjectMapper().readValue(json, 
String[].class));
-  }
-
-  private String makeRequestUrl() {
-    return endpointUrl + InstancePath;
-  }
-}
diff --git 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/health/PipelineHealthCheck.java
 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/health/PipelineHealthCheck.java
deleted file mode 100644
index 707b6cce09..0000000000
--- 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/health/PipelineHealthCheck.java
+++ /dev/null
@@ -1,261 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.manager.health;
-
-import org.apache.streampipes.commons.constants.InstanceIdExtractor;
-import 
org.apache.streampipes.commons.exceptions.NoServiceEndpointsAvailableException;
-import org.apache.streampipes.commons.prometheus.pipelines.PipelinesStats;
-import 
org.apache.streampipes.manager.execution.endpoint.ExtensionsServiceEndpointUtils;
-import org.apache.streampipes.manager.execution.http.InvokeHttpRequest;
-import org.apache.streampipes.manager.storage.RunningPipelineElementStorage;
-import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
-import org.apache.streampipes.model.pipeline.Pipeline;
-import org.apache.streampipes.model.pipeline.PipelineHealthStatus;
-import org.apache.streampipes.storage.management.StorageDispatcher;
-import org.apache.streampipes.svcdiscovery.api.model.SpServiceUrlProvider;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.time.LocalDateTime;
-import java.time.format.DateTimeFormatter;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.stream.Collectors;
-
-import static 
org.apache.streampipes.manager.pipeline.PipelineManager.getPipeline;
-
-public class PipelineHealthCheck implements Runnable {
-
-  private static final Logger LOG = 
LoggerFactory.getLogger(PipelineHealthCheck.class);
-  private static final int MAX_FAILED_ATTEMPTS = 10;
-
-  private static final Map<String, Integer> failedRestartAttempts = new 
HashMap<>();
-
-  private static final PipelinesStats pipelinesStats = new PipelinesStats();
-
-  public PipelineHealthCheck() {
-
-  }
-
-  public void checkAndRestorePipelineElements() {
-    List<Pipeline> allPipelines = getAllPipelines();
-    List<Pipeline> runningPipelines = getRunningPipelines(allPipelines);
-
-    pipelinesStats.clear();
-    pipelinesStats.setAllPipelines(allPipelines.size());
-    pipelinesStats.setRunningPipelines(runningPipelines.size());
-    pipelinesStats.setStoppedPipelines(pipelinesStats.getAllPipelines()
-        - pipelinesStats.getRunningPipelines());
-
-    for (Pipeline p : allPipelines) {
-      pipelinesStats.updatePipelineHealthState(
-          p.getElementId(),
-          p.getName(),
-          p.getHealthStatus().toString());
-
-      pipelinesStats.updatePipelineRunningState(
-          p.getElementId(),
-          p.getName(),
-          p.isRunning());
-    }
-
-    if (!runningPipelines.isEmpty()) {
-      Map<String, List<InvocableStreamPipesEntity>> endpointMap = 
generateEndpointMap();
-      List<String> allRunningInstances = 
findRunningInstances(endpointMap.keySet());
-
-      runningPipelines.forEach(pipeline -> {
-        AtomicBoolean shouldUpdatePipeline = new AtomicBoolean(false);
-        List<String> failedInstances = new ArrayList<>();
-        List<String> recoveredInstances = new ArrayList<>();
-        List<String> pipelineNotifications = new ArrayList<>();
-        List<InvocableStreamPipesEntity> graphs = 
RunningPipelineElementStorage.runningProcessorsAndSinks
-            .get(pipeline.getPipelineId());
-
-        if (Objects.nonNull(graphs)) {
-          graphs.forEach(graph -> {
-            String instanceId = extractInstanceId(graph);
-            if (allRunningInstances.stream()
-                .noneMatch(runningInstanceId -> 
runningInstanceId.equals(instanceId))) {
-              if (shouldRetry(instanceId)) {
-                String endpointUrl = graph.getSelectedEndpointUrl();
-                shouldUpdatePipeline.set(true);
-                boolean success;
-                try {
-                  endpointUrl = findEndpointUrl(graph);
-                  success = new InvokeHttpRequest()
-                      .execute(graph, endpointUrl, 
pipeline.getPipelineId()).isSuccess();
-                } catch (NoServiceEndpointsAvailableException e) {
-                  success = false;
-                }
-                if (!success) {
-                  failedInstances.add(instanceId);
-                  addFailedAttemptNotification(pipelineNotifications, graph);
-                  increaseFailedAttempt(instanceId);
-                  LOG.info("Could not restore pipeline element {} of pipeline 
{} ({}/{})",
-                      graph.getName(), pipeline.getName(), 
failedRestartAttempts.get(instanceId),
-                      MAX_FAILED_ATTEMPTS);
-                } else {
-                  recoveredInstances.add(instanceId);
-                  addSuccessfulRestoreNotification(pipelineNotifications, 
graph);
-                  resetFailedAttempts(instanceId);
-                  graph.setSelectedEndpointUrl(endpointUrl);
-                  LOG.info("Successfully restored pipeline element {} of 
pipeline {}",
-                      graph.getName(), pipeline.getName());
-                }
-              }
-            }
-          });
-        }
-        if (shouldUpdatePipeline.get()) {
-          var currentPipeline = getPipeline(pipeline.getPipelineId());
-          if (!failedInstances.isEmpty()) {
-            currentPipeline.setHealthStatus(PipelineHealthStatus.FAILURE);
-            pipelinesStats.failedIncrease();
-          } else if (!recoveredInstances.isEmpty()) {
-            
currentPipeline.setHealthStatus(PipelineHealthStatus.REQUIRES_ATTENTION);
-            pipelinesStats.attentionRequiredIncrease();
-          }
-          currentPipeline.setPipelineNotifications(pipelineNotifications);
-          StorageDispatcher.INSTANCE.getNoSqlStore().getPipelineStorageAPI()
-              .updateElement(currentPipeline);
-          
pipelinesStats.updatePipelineHealthState(currentPipeline.getElementId(), 
currentPipeline.getName(),
-              currentPipeline.getHealthStatus().toString());
-        }
-
-      });
-      int healthNum = pipelinesStats.getRunningPipelines() - 
pipelinesStats.getFailedPipelines()
-          - pipelinesStats.getAttentionRequiredPipelines();
-      pipelinesStats.setHealthyPipelines(healthNum);
-      pipelinesStats.setElementCount(getElementsCount(allPipelines));
-    }
-    pipelinesStats.metrics();
-  }
-
-  private String findEndpointUrl(InvocableStreamPipesEntity graph)
-      throws NoServiceEndpointsAvailableException {
-    SpServiceUrlProvider serviceUrlProvider = 
ExtensionsServiceEndpointUtils.getPipelineElementType(graph);
-    return serviceUrlProvider.getInvocationUrl(graph.getSelectedEndpointUrl(), 
graph.getAppId());
-  }
-
-  private boolean shouldRetry(String instanceId) {
-    if (!failedRestartAttempts.containsKey(instanceId)) {
-      return true;
-    } else {
-      return failedRestartAttempts.get(instanceId) < MAX_FAILED_ATTEMPTS;
-    }
-  }
-
-  private void resetFailedAttempts(String instanceId) {
-    failedRestartAttempts.put(instanceId, 0);
-  }
-
-  private void increaseFailedAttempt(String instanceId) {
-    if (!failedRestartAttempts.containsKey(instanceId)) {
-      failedRestartAttempts.put(instanceId, 1);
-    } else {
-      Integer currentAttempt = failedRestartAttempts.get(instanceId) + 1;
-      failedRestartAttempts.put(instanceId, currentAttempt);
-    }
-  }
-
-  private void addSuccessfulRestoreNotification(List<String> 
pipelineNotifications,
-      InvocableStreamPipesEntity graph) {
-    pipelineNotifications.add(getCurrentDatetime() + "Pipeline element '" + 
graph.getName()
-        + "' was not available and was successfully restored.");
-  }
-
-  private void addFailedAttemptNotification(List<String> pipelineNotifications,
-      InvocableStreamPipesEntity graph) {
-    pipelineNotifications.add(getCurrentDatetime() + "Pipeline element '" + 
graph.getName()
-        + "' was not available and could not be restored.");
-  }
-
-  private String getCurrentDatetime() {
-    DateTimeFormatter dtf = DateTimeFormatter.ofPattern("uuuu/MM/dd HH:mm:ss");
-    LocalDateTime now = LocalDateTime.now();
-    return "[" + dtf.format(now) + "] ";
-  }
-
-  private String extractInstanceId(InvocableStreamPipesEntity graph) {
-    return InstanceIdExtractor.extractId(graph.getElementId());
-  }
-
-  private List<String> findRunningInstances(Set<String> endpoints) {
-    List<String> allRunningInstances = new ArrayList<>();
-    endpoints.forEach(endpoint -> {
-      try {
-        allRunningInstances
-            .addAll(new 
PipelineElementEndpointHealthCheck(endpoint).checkRunningInstances());
-      } catch (IOException e) {
-        LOG.error("Pipeline element endpoint {} is unavailable", endpoint);
-      }
-    });
-
-    return allRunningInstances;
-  }
-
-  private Map<String, List<InvocableStreamPipesEntity>> generateEndpointMap() {
-    Map<String, List<InvocableStreamPipesEntity>> endpointMap = new 
HashMap<>();
-    RunningPipelineElementStorage.runningProcessorsAndSinks
-        .forEach((pipelineId, graphs) -> graphs.forEach(graph -> 
addEndpoint(endpointMap, graph)));
-
-    return endpointMap;
-  }
-
-  private void addEndpoint(Map<String, List<InvocableStreamPipesEntity>> 
endpointMap,
-      InvocableStreamPipesEntity graph) {
-    String selectedEndpoint = graph.getSelectedEndpointUrl();
-    if (!endpointMap.containsKey(selectedEndpoint)) {
-      endpointMap.put(selectedEndpoint, new ArrayList<>());
-    }
-    List<InvocableStreamPipesEntity> existingGraphs = 
endpointMap.get(selectedEndpoint);
-    existingGraphs.add(graph);
-    endpointMap.put(selectedEndpoint, existingGraphs);
-  }
-
-  @Override
-  public void run() {
-    try {
-      this.checkAndRestorePipelineElements();
-    } catch (Exception e) {
-      LOG.error("Error while checking and restoring pipeline elements", e);
-    }
-
-  }
-
-  private List<Pipeline> getRunningPipelines(List<Pipeline> allPipelines) {
-    return 
allPipelines.stream().filter(Pipeline::isRunning).collect(Collectors.toList());
-
-  }
-
-  private List<Pipeline> getAllPipelines() {
-    return 
StorageDispatcher.INSTANCE.getNoSqlStore().getPipelineStorageAPI().findAll();
-  }
-
-  private int getElementsCount(List<Pipeline> allPipelines) {
-    return allPipelines.stream().mapToInt(pipeline -> 
pipeline.getActions().size()).sum();
-
-  }
-}
diff --git 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/storage/PipelineStorageService.java
 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/storage/PipelineStorageService.java
index ebd3d99601..09709fac7b 100644
--- 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/storage/PipelineStorageService.java
+++ 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/storage/PipelineStorageService.java
@@ -68,10 +68,6 @@ public class PipelineStorageService {
     SecretProvider.getEncryptionService().apply(graphs);
   }
 
-  private void encryptSecrets(Pipeline pipeline) {
-    SecretProvider.getEncryptionService().apply(pipeline);
-  }
-
   private <T> List<T> filter(List<InvocableStreamPipesEntity> graphs, Class<T> 
clazz) {
     return graphs
         .stream()
diff --git 
a/streampipes-rest-extensions/src/main/java/org/apache/streampipes/rest/extensions/monitoring/HealthCheckResource.java
 
b/streampipes-rest-extensions/src/main/java/org/apache/streampipes/rest/extensions/monitoring/HealthCheckResource.java
new file mode 100644
index 0000000000..959c869a08
--- /dev/null
+++ 
b/streampipes-rest-extensions/src/main/java/org/apache/streampipes/rest/extensions/monitoring/HealthCheckResource.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.rest.extensions.monitoring;
+
+import org.apache.streampipes.commons.constants.InstanceIdExtractor;
+import 
org.apache.streampipes.extensions.management.connect.AdapterWorkerManagement;
+import org.apache.streampipes.extensions.management.init.DeclarersSingleton;
+import 
org.apache.streampipes.extensions.management.init.RunningAdapterInstances;
+import org.apache.streampipes.extensions.management.init.RunningInstances;
+import org.apache.streampipes.model.base.NamedStreamPipesEntity;
+import org.apache.streampipes.model.health.ExtensionInstanceHealth;
+import org.apache.streampipes.rest.extensions.AbstractExtensionsResource;
+
+import org.springframework.http.MediaType;
+import org.springframework.http.ResponseEntity;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RestController;
+
+import java.util.stream.Collectors;
+
+@RestController
+@RequestMapping("health")
+public class HealthCheckResource extends AbstractExtensionsResource {
+
+  private final AdapterWorkerManagement adapterManagement = new 
AdapterWorkerManagement(
+      RunningAdapterInstances.INSTANCE,
+      DeclarersSingleton.getInstance()
+  );
+
+  @GetMapping(produces = MediaType.APPLICATION_JSON_VALUE)
+  public ResponseEntity<ExtensionInstanceHealth> getRunningInstances() {
+
+    var runningAdapterInstances = 
adapterManagement.getAllRunningAdapterInstances()
+        .stream()
+        .map(NamedStreamPipesEntity::getElementId)
+        .collect(Collectors.toSet());
+
+    var runningPipelineElementInstances = 
RunningInstances.INSTANCE.getRunningInstanceIds()
+        .stream()
+        .map(InstanceIdExtractor::extractId)
+        .collect(Collectors.toSet());
+
+    var instanceHealth = new ExtensionInstanceHealth(
+        runningAdapterInstances,
+        runningPipelineElementInstances
+    );
+
+    return ok(instanceHealth);
+  }
+}
diff --git a/streampipes-rest/pom.xml b/streampipes-rest/pom.xml
index ae360fffc6..6650568127 100644
--- a/streampipes-rest/pom.xml
+++ b/streampipes-rest/pom.xml
@@ -55,6 +55,11 @@
             <artifactId>streampipes-data-export</artifactId>
             <version>0.99.0-SNAPSHOT</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.streampipes</groupId>
+            <artifactId>streampipes-health-monitoring</artifactId>
+            <version>0.99.0-SNAPSHOT</version>
+        </dependency>
         <dependency>
             <groupId>org.apache.streampipes</groupId>
             <artifactId>streampipes-measurement-units</artifactId>
diff --git 
a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/admin/MigrationResource.java
 
b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/admin/MigrationResource.java
index 58baea3101..d35057c9da 100644
--- 
a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/admin/MigrationResource.java
+++ 
b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/admin/MigrationResource.java
@@ -21,7 +21,7 @@ package org.apache.streampipes.rest.impl.admin;
 import 
org.apache.streampipes.connect.management.management.AdapterMigrationManager;
 import org.apache.streampipes.manager.health.CoreInitialInstallationProgress;
 import org.apache.streampipes.manager.health.CoreServiceStatusManager;
-import org.apache.streampipes.manager.health.ServiceRegistrationManager;
+import org.apache.streampipes.health.monitoring.ServiceRegistrationManager;
 import 
org.apache.streampipes.manager.migration.PipelineElementMigrationManager;
 import 
org.apache.streampipes.model.extensions.svcdiscovery.SpServiceRegistration;
 import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceStatus;
diff --git 
a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/admin/ServiceRegistrationResource.java
 
b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/admin/ServiceRegistrationResource.java
index 69b6e3d5ec..fc243d5d56 100644
--- 
a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/admin/ServiceRegistrationResource.java
+++ 
b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/admin/ServiceRegistrationResource.java
@@ -17,7 +17,7 @@
  */
 package org.apache.streampipes.rest.impl.admin;
 
-import org.apache.streampipes.manager.health.ServiceRegistrationManager;
+import org.apache.streampipes.health.monitoring.ServiceRegistrationManager;
 import 
org.apache.streampipes.model.extensions.svcdiscovery.SpServiceRegistration;
 import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceStatus;
 import org.apache.streampipes.model.message.Notifications;
diff --git a/streampipes-service-core/pom.xml b/streampipes-service-core/pom.xml
index 25830676dc..b898e78458 100644
--- a/streampipes-service-core/pom.xml
+++ b/streampipes-service-core/pom.xml
@@ -54,6 +54,11 @@
                 </exclusion>
             </exclusions>
         </dependency>
+        <dependency>
+            <groupId>org.apache.streampipes</groupId>
+            <artifactId>streampipes-health-monitoring</artifactId>
+            <version>0.99.0-SNAPSHOT</version>
+        </dependency>
         <dependency>
             <groupId>org.apache.streampipes</groupId>
             <artifactId>streampipes-messaging-jms</artifactId>
diff --git 
a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/PostStartupTask.java
 
b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/PostStartupTask.java
index 06c7257bf0..ce886d620e 100644
--- 
a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/PostStartupTask.java
+++ 
b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/PostStartupTask.java
@@ -21,7 +21,7 @@ package org.apache.streampipes.service.core;
 import org.apache.streampipes.commons.prometheus.adapter.AdapterMetricsManager;
 import 
org.apache.streampipes.connect.management.management.WorkerAdministrationManagement;
 import org.apache.streampipes.manager.execution.PipelineExecutor;
-import org.apache.streampipes.manager.health.ServiceHealthCheck;
+import org.apache.streampipes.health.monitoring.ServiceHealthCheck;
 import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceTagPrefix;
 import org.apache.streampipes.model.pipeline.Pipeline;
 import org.apache.streampipes.model.pipeline.PipelineOperationStatus;
@@ -66,7 +66,7 @@ public class PostStartupTask implements Runnable {
 
   @Override
   public void run() {
-    new ServiceHealthCheck().run();
+    new 
ServiceHealthCheck(StorageDispatcher.INSTANCE.getNoSqlStore().getExtensionsServiceStorage()).run();
     performAdapterAssetUpdate();
     startAllPreviouslyStoppedPipelines();
     startAdapters();
diff --git 
a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/StreamPipesCoreApplication.java
 
b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/StreamPipesCoreApplication.java
index d1888420a0..6a275b1ea1 100644
--- 
a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/StreamPipesCoreApplication.java
+++ 
b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/StreamPipesCoreApplication.java
@@ -19,14 +19,14 @@ package org.apache.streampipes.service.core;
 
 import org.apache.streampipes.commons.environment.Environments;
 import org.apache.streampipes.commons.prometheus.adapter.AdapterMetricsManager;
-import org.apache.streampipes.connect.management.health.AdapterHealthCheck;
 import 
org.apache.streampipes.connect.management.management.AdapterMasterManagement;
+import org.apache.streampipes.health.monitoring.ExtensionHealthCheck;
+import org.apache.streampipes.health.monitoring.ResourceProvider;
+import org.apache.streampipes.health.monitoring.ServiceHealthCheck;
 import org.apache.streampipes.loadbalance.LoadManager;
 import 
org.apache.streampipes.loadbalance.pipeline.ExtensionsServiceLogExecutor;
 import org.apache.streampipes.manager.health.CoreInitialInstallationProgress;
 import org.apache.streampipes.manager.health.CoreServiceStatusManager;
-import org.apache.streampipes.manager.health.PipelineHealthCheck;
-import org.apache.streampipes.manager.health.ServiceHealthCheck;
 import org.apache.streampipes.manager.pipeline.PipelineManager;
 import org.apache.streampipes.manager.setup.AutoInstallation;
 import org.apache.streampipes.manager.setup.StreamPipesEnvChecker;
@@ -54,6 +54,7 @@ import 
org.apache.streampipes.storage.management.StorageDispatcher;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
 import org.springframework.context.annotation.ComponentScan;
 import org.springframework.context.annotation.Configuration;
@@ -138,14 +139,19 @@ public class StreamPipesCoreApplication extends 
StreamPipesServiceBase {
                              TimeUnit.MILLISECONDS);
 
     
scheduleHealthChecks(env.getHealthCheckIntervalInMillis().getValueOrDefault(), 
List
-        .of(new ServiceHealthCheck(), new PipelineHealthCheck(),
-            new AdapterHealthCheck(
-                
StorageDispatcher.INSTANCE.getNoSqlStore().getAdapterInstanceStorage(),
-                new AdapterMasterManagement(
-                    
StorageDispatcher.INSTANCE.getNoSqlStore().getAdapterInstanceStorage(),
-                    new SpResourceManager().manageAdapters(),
-                    new SpResourceManager().manageDataStreams(),
-                    AdapterMetricsManager.INSTANCE.getAdapterMetrics()))));
+        .of(new 
ServiceHealthCheck(StorageDispatcher.INSTANCE.getNoSqlStore().getExtensionsServiceStorage()),
+            new ExtensionHealthCheck(
+                new ResourceProvider(
+                  
StorageDispatcher.INSTANCE.getNoSqlStore().getPipelineStorageAPI(),
+                  
StorageDispatcher.INSTANCE.getNoSqlStore().getAdapterInstanceStorage(),
+                  new AdapterMasterManagement(
+                      
StorageDispatcher.INSTANCE.getNoSqlStore().getAdapterInstanceStorage(),
+                      new SpResourceManager().manageAdapters(),
+                      new SpResourceManager().manageDataStreams(),
+                      AdapterMetricsManager.INSTANCE.getAdapterMetrics()
+                  )
+                )
+            )));
 
     var logFetchInterval = 
env.getLogFetchIntervalInMillis().getValueOrDefault();
     LOG.info("Extensions logs will be fetched every {} milliseconds", 
logFetchInterval);

Reply via email to