This is an automated email from the ASF dual-hosted git repository.

riemer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/streampipes.git


The following commit(s) were added to refs/heads/dev by this push:
     new 63dd142d4a refactor: Improve health checks (#4096)
63dd142d4a is described below

commit 63dd142d4a658fa3ad40fbea2ba12c57758d6f83
Author: Dominik Riemer <[email protected]>
AuthorDate: Thu Jan 15 15:19:02 2026 +0100

    refactor: Improve health checks (#4096)
---
 pom.xml                                            |   1 +
 .../apache/streampipes/commons/constants/Envs.java |  30 +--
 .../commons/environment/DefaultEnvironment.java    |  24 +-
 .../management/health/AdapterHealthCheck.java      | 245 -------------------
 .../management/WorkerAdministrationManagement.java |  70 ++----
 .../management/health/AdapterHealthCheckTest.java  |  96 --------
 .../influx/DataExplorerInfluxQueryExecutor.java    |   2 +-
 .../management/init/RunningInstances.java          |  10 +
 streampipes-health-monitoring/pom.xml              |  76 ++++++
 .../health/monitoring/AdapterHealthCheck.java      | 168 +++++++++++++
 .../health/monitoring/ExtensionHealthCheck.java    |  57 +++++
 .../ExtensionInstanceAvailabilityCheck.java        |  67 ++++++
 .../health/monitoring/PipelineHealthCheck.java     | 210 +++++++++++++++++
 .../health/monitoring/PostStartupRecovery.java     |  62 +++++
 .../health/monitoring/ResourceProvider.java        | 104 ++++++++
 .../health/monitoring}/ServiceHealthCheck.java     |  22 +-
 .../monitoring}/ServiceRegistrationManager.java    |   6 +-
 .../monitoring/model/ActiveCoreInstances.java      |  10 +-
 .../health/monitoring/model/ActiveResources.java   |  15 +-
 .../health/monitoring/model/HealthCheckData.java   |  15 +-
 .../health/monitoring/utils/HealthCheckUtils.java  |  53 +++++
 .../model/health/ExtensionInstanceHealth.java      |  13 +-
 .../streampipes/model/util/PropertyUtils.java      |   4 -
 .../execution/ExtensionServiceExecutions.java      |   1 -
 .../manager/execution/PipelineExecutionInfo.java   |   4 +-
 .../execution/PipelineExecutionTaskFactory.java    |   6 +-
 .../ExtensionsServiceEndpointGenerator.java        |  12 +-
 ...tter.java => BasePipelineElementSubmitter.java} |  23 +-
 .../http/DetachPipelineElementSubmitter.java       |   5 +-
 .../http/InvokePipelineElementSubmitter.java       |   9 +-
 .../provider/StoredPipelineElementProvider.java    |  42 ----
 .../execution/task/AfterInvocationTask.java        |  13 +-
 .../execution/task/DiscoverEndpointsTask.java      |  37 +--
 .../manager/execution/task/SubmitRequestTask.java  |  16 +-
 .../health/PipelineElementEndpointHealthCheck.java |  51 ----
 .../manager/health/PipelineHealthCheck.java        | 261 ---------------------
 .../manager/storage/PipelineStorageService.java    |   4 -
 .../PipelineElementUtils.java}                     |  23 +-
 .../extensions/monitoring/HealthCheckResource.java |  67 ++++++
 streampipes-rest/pom.xml                           |   5 +
 .../rest/impl/admin/MigrationResource.java         |   2 +-
 .../impl/admin/ServiceRegistrationResource.java    |   2 +-
 .../rest/impl/connect/AbstractAdapterResource.java |   4 +
 .../impl/connect/RuntimeResolvableResource.java    |  14 +-
 streampipes-service-core/pom.xml                   |   5 +
 .../streampipes/service/core/PostStartupTask.java  |  61 +++--
 .../service/core/StreamPipesCoreApplication.java   |  28 ++-
 47 files changed, 1088 insertions(+), 967 deletions(-)

diff --git a/pom.xml b/pom.xml
index 3c5025b797..bb4369da70 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1134,6 +1134,7 @@
         <module>streampipes-connect-transformer-api</module>
         <module>streampipes-connect-transformer-groovy</module>
         <module>streampipes-connect-transformer-js</module>
+        <module>streampipes-health-monitoring</module>
     </modules>
 
     <profiles>
diff --git 
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/Envs.java
 
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/Envs.java
index 148fa66abd..990629b0f1 100644
--- 
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/Envs.java
+++ 
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/Envs.java
@@ -102,21 +102,21 @@ public enum Envs {
 
   SP_PULSAR_URL("SP_PULSAR_URL", "pulsar://localhost:6650"),
 
-  CPU_RESOURCE_WEIGHT("CPU_RESOURCE_WEIGHT", "1.0"),
-  MEMORY_RESOURCE_WEIGHT("MEMORY_RESOURCE_WEIGHT", "1.0"),
-
-  DIR_MEMORY_RESOURCE_WEIGHT("DIR_MEMORY_RESOURCE_WEIGHT", "1.0"),
-  BANDWIDTH_IN_RESOURCE_WEIGHT("BANDWIDTH_IN_RESOURCE_WEIGHT", "1.0"),
-  BANDWIDTH_OUT_RESOURCE_WEIGHT("BANDWIDTH_OUT_RESOURCE_WEIGHT", "1.0"),
-  THRESHOLD_MIGRATOR_PERCENTAGE("THRESHOLD_MIGRATOR_PERCENTAGE", "20.0"),
-  MIN_MIGRATOR_PERCENTAGE("MIN_MIGRATOR_PERCENTAGE", "20.0"),
-  OVERLOADED_THRESHOLD_PERCENTAGE("OVERLOADED_THRESHOLD_PERCENTAGE", "85"),
-  HISTORY_RESOURCE_PERCENTAGE("HISTORY_RESOURCE_PERCENTAGE", "0.9"),
-  
MSG_RATE_DIFFERENCE_MIGRATOR_THRESHOLD("MSG_RATE_DIFFERENCE_MIGRATOR_THRESHOLD",
 "85"),
-  LOAD_TARGET_STD("LOAD_TARGET_STD", "25.0"),
-  SELECTOR("SELECTOR", "WeightedRandomSelector"),
-  MIGRATOR("MIGRATOR", "ThresholdMigrator"),
-  LOAD_MANAGER_ENABLE("LOAD_MANAGER_ENABLE", "true"),
+  CPU_RESOURCE_WEIGHT("SP_CPU_RESOURCE_WEIGHT", "1.0"),
+  MEMORY_RESOURCE_WEIGHT("SP_MEMORY_RESOURCE_WEIGHT", "1.0"),
+
+  SP_DIR_MEMORY_RESOURCE_WEIGHT("SP_DIR_MEMORY_RESOURCE_WEIGHT", "1.0"),
+  SP_BANDWIDTH_IN_RESOURCE_WEIGHT("SP_BANDWIDTH_IN_RESOURCE_WEIGHT", "1.0"),
+  SP_BANDWIDTH_OUT_RESOURCE_WEIGHT("SP_BANDWIDTH_OUT_RESOURCE_WEIGHT", "1.0"),
+  SP_THRESHOLD_MIGRATOR_PERCENTAGE("SP_THRESHOLD_MIGRATOR_PERCENTAGE", "20.0"),
+  SP_MIN_MIGRATOR_PERCENTAGE("SP_MIN_MIGRATOR_PERCENTAGE", "20.0"),
+  SP_OVERLOADED_THRESHOLD_PERCENTAGE("SP_OVERLOADED_THRESHOLD_PERCENTAGE", 
"85"),
+  SP_HISTORY_RESOURCE_PERCENTAGE("SP_HISTORY_RESOURCE_PERCENTAGE", "0.9"),
+  
SP_MSG_RATE_DIFFERENCE_MIGRATOR_THRESHOLD("SP_MSG_RATE_DIFFERENCE_MIGRATOR_THRESHOLD",
 "85"),
+  SP_LOAD_TARGET_STD("SP_LOAD_TARGET_STD", "25.0"),
+  SP_SELECTOR("SP_SELECTOR", "WeightedRandomSelector"),
+  SP_MIGRATOR("SP_MIGRATOR", "ThresholdMigrator"),
+  SP_LOAD_MANAGER_ENABLE("SP_LOAD_MANAGER_ENABLE", "false"),
 
   // expects a comma separated string of service names
   SP_SERVICE_TAGS("SP_SERVICE_TAGS", ""),
diff --git 
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/DefaultEnvironment.java
 
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/DefaultEnvironment.java
index 2e93ea7f80..9f5a2d997a 100644
--- 
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/DefaultEnvironment.java
+++ 
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/DefaultEnvironment.java
@@ -444,62 +444,62 @@ public class DefaultEnvironment implements Environment {
 
   @Override
   public DoubleEnvironmentVariable getDirMemoryResourceWeight() {
-    return new DoubleEnvironmentVariable(Envs.DIR_MEMORY_RESOURCE_WEIGHT);
+    return new DoubleEnvironmentVariable(Envs.SP_DIR_MEMORY_RESOURCE_WEIGHT);
   }
 
   @Override
   public DoubleEnvironmentVariable getBandwidthInResourceWeight() {
-    return new DoubleEnvironmentVariable(Envs.BANDWIDTH_IN_RESOURCE_WEIGHT);
+    return new DoubleEnvironmentVariable(Envs.SP_BANDWIDTH_IN_RESOURCE_WEIGHT);
   }
 
   @Override
   public DoubleEnvironmentVariable getBandwidthOutResourceWeight() {
-    return new DoubleEnvironmentVariable(Envs.BANDWIDTH_OUT_RESOURCE_WEIGHT);
+    return new 
DoubleEnvironmentVariable(Envs.SP_BANDWIDTH_OUT_RESOURCE_WEIGHT);
   }
 
   @Override
   public FloatEnvironmentVariable getThresholdMigratorPercentage() {
-    return new FloatEnvironmentVariable(Envs.THRESHOLD_MIGRATOR_PERCENTAGE);
+    return new FloatEnvironmentVariable(Envs.SP_THRESHOLD_MIGRATOR_PERCENTAGE);
   }
 
   @Override
   public FloatEnvironmentVariable getMinMigratorPercentage() {
-    return new FloatEnvironmentVariable(Envs.MIN_MIGRATOR_PERCENTAGE);
+    return new FloatEnvironmentVariable(Envs.SP_MIN_MIGRATOR_PERCENTAGE);
   }
 
   @Override
   public FloatEnvironmentVariable getOverloadedThresholdPercentage() {
-    return new FloatEnvironmentVariable(Envs.OVERLOADED_THRESHOLD_PERCENTAGE);
+    return new 
FloatEnvironmentVariable(Envs.SP_OVERLOADED_THRESHOLD_PERCENTAGE);
   }
 
   @Override
   public FloatEnvironmentVariable getHistoryResourcePercentage() {
-    return new FloatEnvironmentVariable(Envs.HISTORY_RESOURCE_PERCENTAGE);
+    return new FloatEnvironmentVariable(Envs.SP_HISTORY_RESOURCE_PERCENTAGE);
   }
 
   @Override
   public IntEnvironmentVariable getMsgRateDifferenceMigratorThreshold() {
-    return new 
IntEnvironmentVariable(Envs.MSG_RATE_DIFFERENCE_MIGRATOR_THRESHOLD);
+    return new 
IntEnvironmentVariable(Envs.SP_MSG_RATE_DIFFERENCE_MIGRATOR_THRESHOLD);
   }
 
   @Override
   public FloatEnvironmentVariable getLoadTargetStd() {
-    return new FloatEnvironmentVariable(Envs.LOAD_TARGET_STD);
+    return new FloatEnvironmentVariable(Envs.SP_LOAD_TARGET_STD);
   }
 
   @Override
   public StringEnvironmentVariable getSelector() {
-    return new StringEnvironmentVariable(Envs.SELECTOR);
+    return new StringEnvironmentVariable(Envs.SP_SELECTOR);
   }
 
   @Override
   public StringEnvironmentVariable getMigrator() {
-    return new StringEnvironmentVariable(Envs.MIGRATOR);
+    return new StringEnvironmentVariable(Envs.SP_MIGRATOR);
   }
 
   @Override
   public BooleanEnvironmentVariable getLoadManagerEnable() {
-    return new BooleanEnvironmentVariable(Envs.LOAD_MANAGER_ENABLE);
+    return new BooleanEnvironmentVariable(Envs.SP_LOAD_MANAGER_ENABLE);
   }
 
   @Override
diff --git 
a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/health/AdapterHealthCheck.java
 
b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/health/AdapterHealthCheck.java
deleted file mode 100644
index 5142d5f4fe..0000000000
--- 
a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/health/AdapterHealthCheck.java
+++ /dev/null
@@ -1,245 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.connect.management.health;
-
-import org.apache.streampipes.commons.exceptions.connect.AdapterException;
-import org.apache.streampipes.commons.prometheus.adapter.AdapterMetrics;
-import org.apache.streampipes.commons.prometheus.adapter.AdapterMetricsManager;
-import 
org.apache.streampipes.connect.management.management.AdapterMasterManagement;
-import org.apache.streampipes.connect.management.management.WorkerRestClient;
-import org.apache.streampipes.connect.management.util.WorkerPaths;
-import org.apache.streampipes.loadbalance.pipeline.ExtensionsLogProvider;
-import org.apache.streampipes.model.connect.adapter.AdapterDescription;
-import org.apache.streampipes.storage.api.IAdapterStorage;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.NoSuchElementException;
-import java.util.stream.Collectors;
-
-public class AdapterHealthCheck implements Runnable {
-
-  private static final Logger LOG = 
LoggerFactory.getLogger(AdapterHealthCheck.class);
-
-  private final IAdapterStorage adapterStorage;
-  private final AdapterMasterManagement adapterMasterManagement;
-
-  public AdapterHealthCheck(IAdapterStorage adapterStorage,
-                            AdapterMasterManagement adapterMasterManagement) {
-    this.adapterStorage = adapterStorage;
-    this.adapterMasterManagement = adapterMasterManagement;
-  }
-
-  @Override
-  public void run() {
-    this.checkAndRestoreAdapters();
-  }
-
-  /**
-   * In this method it is checked which adapters are currently running. Then 
it calls all workers to
-   * validate if the adapter instance is still running as expected. If the 
adapter is not running
-   * anymore a new worker instance is invoked. In addition, it publishes 
monitoring metrics for all
-   * running adapters (in line with
-   * {@link org.apache.streampipes.manager.health.PipelineHealthCheck}).
-   */
-  public void checkAndRestoreAdapters() {
-    LOG.info("Adapter health check started");
-    // Get all adapters that are supposed to run according to the backend 
storage
-    Map<String, AdapterDescription> adapterInstancesSupposedToRun =
-        this.getAllAdaptersSupposedToRun();
-
-    // group all adapter instances supposed to run by their worker service URL
-    Map<String, List<AdapterDescription>> groupByWorker =
-        this.getAllWorkersWithAdapters(adapterInstancesSupposedToRun);
-
-    // Get adapters that are not running anymore
-    Map<String, AdapterDescription> allAdaptersToRecover =
-        this.getAdaptersToRecover(groupByWorker, 
adapterInstancesSupposedToRun);
-
-    allAdaptersToRecover
-        .keySet()
-        .forEach(adapterId ->
-                     LOG.info("Adapter instance with id {} needs to be 
recovered", adapterId));
-
-    try {
-      if (!adapterInstancesSupposedToRun.isEmpty()) {
-        // Filter adapters so that only healthy and running adapters are 
updated in the metrics
-        // endpoint
-        var adaptersToMonitor = 
adapterInstancesSupposedToRun.entrySet().stream()
-            .filter((entry -> 
!allAdaptersToRecover.containsKey(entry.getKey())))
-            .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
-
-        if (!adaptersToMonitor.isEmpty()) {
-          updateMonitoringMetrics(adaptersToMonitor);
-        } else {
-          LOG.info("No running adapter instances to monitor.");
-        }
-      }
-    } catch (NoSuchElementException e) {
-      LOG.error("Could not update adapter metrics due to an invalid state. 
({})", e.getMessage());
-    }
-
-    LOG.info("Monitoring metrics updated for running adapters.");
-
-    // Recover Adapters
-    this.recoverAdapters(allAdaptersToRecover);
-  }
-
-  /**
-   * Updates the monitoring metrics based on the descriptions of running 
adapters.
-   *
-   * @param runningAdapterDescriptions A map containing the descriptions of 
running adapters, where
-   *        the key is the adapter's element ID and the value is the 
corresponding adapter
-   *        description.
-   */
-  protected void updateMonitoringMetrics(Map<String, AdapterDescription> 
runningAdapterDescriptions) {
-
-    var adapterMetrics = 
AdapterMetricsManager.getInstance().getAdapterMetrics();
-    runningAdapterDescriptions.values()
-        .forEach(adapterDescription -> 
updateTotalEventsPublished(adapterMetrics,
-                                                                  
adapterDescription.getElementId(),
-                                                                  
adapterDescription.getName()));
-    LOG.debug("Monitoring {} adapter instances", adapterMetrics.size());
-  }
-
-  private void updateTotalEventsPublished(AdapterMetrics adapterMetrics, 
String adapterId,
-                                          String adapterName) {
-
-    // Check if the adapter is already registered; if not, register it first.
-    // This step is crucial, especially when the StreamPipes Core service is 
restarted,
-    // and there are existing running adapters that need proper registration.
-    // Note: Proper registration is usually handled during the initial start 
of the adapter.
-    if (!adapterMetrics.contains(adapterId)) {
-      adapterMetrics.register(adapterId, adapterName);
-    }
-
-    adapterMetrics.updateTotalEventsPublished(adapterId, adapterName, 
ExtensionsLogProvider.INSTANCE
-        .getMetricInfosForResource(adapterId).getMessagesOut().getCounter());
-  }
-
-
-  /**
-   * Retrieves a map of all adapter instances that are supposed to be running 
according to the
-   * backend storage.
-   * <p>
-   * This method queries the adapter storage to obtain information about all 
adapters and filters
-   * the running instances. The resulting map is keyed by the element ID of 
each running adapter,
-   * and the corresponding values are the respective {@link 
AdapterDescription} objects.
-   *
-   * @return A map containing all adapter instances supposed to be running 
according to the backend
-   *         storage. The keys are element IDs, and the values are the 
corresponding adapter
-   *         descriptions.
-   */
-  public Map<String, AdapterDescription> getAllAdaptersSupposedToRun() {
-    Map<String, AdapterDescription> result = new HashMap<>();
-    List<AdapterDescription> allRunningInstancesAdapterDescription = 
this.adapterStorage.findAll();
-    
allRunningInstancesAdapterDescription.stream().filter(AdapterDescription::isRunning)
-        .forEach(adapterDescription -> 
result.put(adapterDescription.getElementId(),
-                                                  adapterDescription));
-
-    return result;
-  }
-
-  public Map<String, List<AdapterDescription>> 
getAllWorkersWithAdapters(Map<String, AdapterDescription> 
adapterInstancesSupposedToRun) {
-
-    Map<String, List<AdapterDescription>> groupByWorker = new HashMap<>();
-    adapterInstancesSupposedToRun.values().forEach(ad -> {
-      String selectedEndpointUrl = ad.getSelectedEndpointUrl();
-      if (selectedEndpointUrl != null) {
-        if (groupByWorker.containsKey(selectedEndpointUrl)) {
-          groupByWorker.get(selectedEndpointUrl).add(ad);
-        } else {
-          List<AdapterDescription> adapterDescriptionsPerWorker = new 
ArrayList<>();
-          adapterDescriptionsPerWorker.add(ad);
-          groupByWorker.put(selectedEndpointUrl, adapterDescriptionsPerWorker);
-        }
-      }
-    });
-
-    return groupByWorker;
-  }
-
-  /**
-   * Retrieves a map of adapters to recover by comparing the provided 
groupings of adapter instances
-   * with the instances supposed to run according to the storage. For every 
adapter instance it is
-   * verified that it actually runs on a worker node. If this is not the case, 
it is added to the
-   * output of adapters to recover.
-   *
-   * @param adapterInstancesGroupedByWorker A map grouping adapter instances 
by worker.
-   * @param adapterInstancesSupposedToRun The map containing all adapter 
instances supposed to be
-   *        running.
-   * @return A new map containing adapter instances to recover, filtered based 
on running instances.
-   */
-  public Map<String, AdapterDescription> getAdaptersToRecover(Map<String, 
List<AdapterDescription>> adapterInstancesGroupedByWorker,
-                                                              Map<String, 
AdapterDescription> adapterInstancesSupposedToRun) {
-
-    // NOTE: This line is added to prevent modifying the existing map of 
instances supposed to run
-    // It looks like the parameter `adapterInstancesSupposedToRun` is not 
required at all,
-    // but this should be checked more carefully.
-    Map<String, AdapterDescription> adaptersToRecover =
-        new HashMap<>(adapterInstancesSupposedToRun);
-
-    adapterInstancesGroupedByWorker.keySet().forEach(adapterEndpointUrl -> {
-      try {
-        List<AdapterDescription> allRunningInstancesOfOneWorker =
-            
WorkerRestClient.getAllRunningAdapterInstanceDescriptions(adapterEndpointUrl
-                + WorkerPaths.getRunningAdaptersPath());
-
-        // only keep adapters where there is no running adapter instance
-        // therefore, all others are removed
-        allRunningInstancesOfOneWorker.forEach(adapterDescription -> 
adaptersToRecover
-            .remove(adapterDescription.getElementId()));
-      } catch (AdapterException e) {
-        LOG.info("Could not recover adapter at endpoint {}  - "
-            + "marking it as requested to recover (reason: {})", 
adapterEndpointUrl,
-                 e.getMessage());
-      }
-    });
-
-    return adaptersToRecover;
-  }
-
-
-  public void recoverAdapters(Map<String, AdapterDescription> 
adaptersToRecover) {
-    for (AdapterDescription adapterDescription : adaptersToRecover.values()) {
-      // Invoke all adapters that were running when the adapter container was 
stopped
-      try {
-        if (adapterDescription.isRunning()) {
-          LOG.info("Start recovering adapter {} ", 
adapterDescription.getElementId());
-          
this.adapterMasterManagement.startStreamAdapter(adapterDescription.getElementId());
-          LOG.info("Adapter {} is recovered", 
adapterDescription.getElementId());
-
-        }
-      } catch (AdapterException e) {
-        LOG.warn("Could not start adapter {} ({})", 
adapterDescription.getName(), e.getMessage());
-      } catch (Exception e) {
-        LOG.error(
-            "Unexpected error while recovering adapter {} ({})",
-            adapterDescription.getName(),
-            e.getMessage()
-        );
-      }
-    }
-
-  }
-}
diff --git 
a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/WorkerAdministrationManagement.java
 
b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/WorkerAdministrationManagement.java
index 6cdad54c6b..cc5e6154e7 100644
--- 
a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/WorkerAdministrationManagement.java
+++ 
b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/WorkerAdministrationManagement.java
@@ -19,18 +19,13 @@
 package org.apache.streampipes.connect.management.management;
 
 import 
org.apache.streampipes.commons.exceptions.NoServiceEndpointsAvailableException;
-import org.apache.streampipes.commons.prometheus.adapter.AdapterMetrics;
-import org.apache.streampipes.connect.management.health.AdapterHealthCheck;
-import org.apache.streampipes.connect.management.health.AdapterOperationLock;
 import org.apache.streampipes.manager.assets.AssetManager;
 import org.apache.streampipes.model.connect.adapter.AdapterDescription;
 import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceTag;
-import org.apache.streampipes.resource.management.AdapterResourceManager;
-import org.apache.streampipes.resource.management.DataStreamResourceManager;
 import org.apache.streampipes.resource.management.PermissionResourceManager;
-import org.apache.streampipes.resource.management.SpResourceManager;
+import org.apache.streampipes.resource.management.UserResourceManager;
 import org.apache.streampipes.storage.api.IAdapterStorage;
-import org.apache.streampipes.storage.couchdb.CouchDbStorageManager;
+import org.apache.streampipes.storage.api.IPermissionStorage;
 import org.apache.streampipes.svcdiscovery.api.model.SpServiceUrlProvider;
 
 import org.slf4j.Logger;
@@ -38,61 +33,30 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.List;
-import java.util.concurrent.TimeUnit;
 
 public class WorkerAdministrationManagement {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(WorkerAdministrationManagement.class);
-  private static final int MAX_RETRIES = 7;
 
   private final IAdapterStorage adapterDescriptionStorage;
-
-  private final AdapterHealthCheck adapterHealthCheck;
+  private final IPermissionStorage permissionStorage;
+  private final PermissionResourceManager permissionResourceManager;
+  private final UserResourceManager userResourceManager;
 
   public WorkerAdministrationManagement(
-      IAdapterStorage adapterStorage,
-      AdapterMetrics adapterMetrics,
-      AdapterResourceManager adapterResourceManager,
-      DataStreamResourceManager dataStreamResourceManager
-  ) {
-    this.adapterHealthCheck = new AdapterHealthCheck(
-        adapterStorage,
-        new AdapterMasterManagement(
-            adapterStorage,
-            adapterResourceManager,
-            dataStreamResourceManager,
-            adapterMetrics
-        )
-    );
-    this.adapterDescriptionStorage = 
CouchDbStorageManager.INSTANCE.getAdapterDescriptionStorage();
-  }
-
-  public void checkAndRestore(int retryCount) {
-    if (AdapterOperationLock.INSTANCE.isLocked()) {
-      LOG.info("Adapter operation already in progress, {}/{}", (retryCount + 
1), MAX_RETRIES);
-      if (retryCount <= MAX_RETRIES) {
-        try {
-          TimeUnit.MILLISECONDS.sleep(3000);
-          retryCount++;
-          checkAndRestore(retryCount);
-        } catch (InterruptedException e) {
-          e.printStackTrace();
-        }
-      } else {
-        LOG.info("Max retries for running adapter operations reached, will do 
unlock which might cause conflicts...");
-        AdapterOperationLock.INSTANCE.unlock();
-        this.adapterHealthCheck.checkAndRestoreAdapters();
-      }
-    } else {
-      AdapterOperationLock.INSTANCE.lock();
-      this.adapterHealthCheck.checkAndRestoreAdapters();
-      AdapterOperationLock.INSTANCE.unlock();
-    }
+      IAdapterStorage adapterDescriptionStorage,
+      IPermissionStorage permissionStorage,
+      UserResourceManager userResourceManager,
+      PermissionResourceManager permissionResourceManager) {
+    this.adapterDescriptionStorage = adapterDescriptionStorage;
+    this.userResourceManager = userResourceManager;
+    this.permissionStorage = permissionStorage;
+    this.permissionResourceManager = permissionResourceManager;
   }
 
   public void performAdapterMigrations(List<SpServiceTag> tags) {
-    var installedAdapters = 
CouchDbStorageManager.INSTANCE.getAdapterDescriptionStorage().findAll();
-    var adminSid = new 
SpResourceManager().manageUsers().getAdminUser().getPrincipalId();
+    var installedAdapters = adapterDescriptionStorage.findAll();
+    var adminSid = userResourceManager.getAdminUser().getPrincipalId();
     installedAdapters.stream()
         .filter(adapter -> tags.stream().anyMatch(tag -> 
tag.getValue().equals(adapter.getAppId())))
         .forEach(adapter -> {
@@ -107,13 +71,11 @@ public class WorkerAdministrationManagement {
                   e);
             }
           }
-          var permissionStorage = 
CouchDbStorageManager.INSTANCE.getPermissionStorage();
           var elementId = adapter.getElementId();
           var permissions = 
permissionStorage.getUserPermissionsForObject(elementId);
           if (permissions.isEmpty()) {
             LOG.info("Adding default permission for adapter {}", 
adapter.getAppId());
-            new PermissionResourceManager()
-                .createDefault(elementId, AdapterDescription.class, adminSid, 
true);
+            permissionResourceManager.createDefault(elementId, 
AdapterDescription.class, adminSid, true);
           }
         });
   }
diff --git 
a/streampipes-connect-management/src/test/java/org/apache/streampipes/connect/management/health/AdapterHealthCheckTest.java
 
b/streampipes-connect-management/src/test/java/org/apache/streampipes/connect/management/health/AdapterHealthCheckTest.java
deleted file mode 100644
index 47612f1361..0000000000
--- 
a/streampipes-connect-management/src/test/java/org/apache/streampipes/connect/management/health/AdapterHealthCheckTest.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.connect.management.health;
-
-import org.apache.streampipes.commons.prometheus.adapter.AdapterMetricsManager;
-import 
org.apache.streampipes.connect.management.management.AdapterMasterManagement;
-import org.apache.streampipes.model.connect.adapter.AdapterDescription;
-import org.apache.streampipes.resource.management.SpResourceManager;
-import org.apache.streampipes.storage.api.IAdapterStorage;
-
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-
-import java.util.List;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-public class AdapterHealthCheckTest {
-
-  private IAdapterStorage adapterInstanceStorageMock;
-
-  @BeforeEach
-  public void setUp() {
-    adapterInstanceStorageMock = mock(IAdapterStorage.class);
-  }
-
-  @Test
-  public void getAllRunningInstancesAdapterDescriptionsEmpty() {
-    when(adapterInstanceStorageMock.findAll()).thenReturn(List.of());
-
-    var healthCheck = new AdapterHealthCheck(
-        adapterInstanceStorageMock,
-        new AdapterMasterManagement(
-            adapterInstanceStorageMock,
-            new SpResourceManager().manageAdapters(),
-            new SpResourceManager().manageDataStreams(),
-            AdapterMetricsManager.INSTANCE.getAdapterMetrics()
-        )
-    );
-    var result = healthCheck.getAllAdaptersSupposedToRun();
-
-    Assertions.assertTrue(result.isEmpty());
-  }
-
-  @Test
-  public void getAllRunningInstancesAdapterDescriptionsMixed() {
-
-    var nameRunningAdapter = "running-adapter";
-    var nameStoppedAdapter = "stopped-adapter";
-
-    var stoppedAdapter = new AdapterDescription();
-    stoppedAdapter.setElementId(nameStoppedAdapter);
-    stoppedAdapter.setRunning(false);
-
-    var runningAdapter = new AdapterDescription();
-    runningAdapter.setElementId(nameRunningAdapter);
-    runningAdapter.setRunning(true);
-
-    
when(adapterInstanceStorageMock.findAll()).thenReturn(List.of(stoppedAdapter, 
runningAdapter));
-
-    var healthCheck = new AdapterHealthCheck(
-        adapterInstanceStorageMock,
-        new AdapterMasterManagement(
-            adapterInstanceStorageMock,
-            new SpResourceManager().manageAdapters(),
-            new SpResourceManager().manageDataStreams(),
-            AdapterMetricsManager.INSTANCE.getAdapterMetrics()
-        )
-    );
-    var result = healthCheck.getAllAdaptersSupposedToRun();
-
-    Assertions.assertEquals(1, result.size());
-    Assertions.assertTrue(result.containsKey(nameRunningAdapter));
-    Assertions.assertEquals(runningAdapter, result.get(nameRunningAdapter));
-
-  }
-
-}
diff --git 
a/streampipes-data-explorer-influx/src/main/java/org/apache/streampipes/dataexplorer/influx/DataExplorerInfluxQueryExecutor.java
 
b/streampipes-data-explorer-influx/src/main/java/org/apache/streampipes/dataexplorer/influx/DataExplorerInfluxQueryExecutor.java
index f4e1df6095..fbb715be88 100644
--- 
a/streampipes-data-explorer-influx/src/main/java/org/apache/streampipes/dataexplorer/influx/DataExplorerInfluxQueryExecutor.java
+++ 
b/streampipes-data-explorer-influx/src/main/java/org/apache/streampipes/dataexplorer/influx/DataExplorerInfluxQueryExecutor.java
@@ -73,7 +73,7 @@ public class DataExplorerInfluxQueryExecutor extends 
DataExplorerQueryExecutor<Q
    * The influx client always returns a double for timestamp values. This 
method converts them to long values.
    */
   private void convertTimestampValueToLong(List<Object> row) {
-    if (!row.isEmpty()) {
+    if (!row.isEmpty() && row.get(0) instanceof Number) {
       row.set(0, ((Number) row.get(0)).longValue());
     }
   }
diff --git 
a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/init/RunningInstances.java
 
b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/init/RunningInstances.java
index 538043d7f9..f14a94de00 100644
--- 
a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/init/RunningInstances.java
+++ 
b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/init/RunningInstances.java
@@ -26,6 +26,8 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
 
 public enum RunningInstances {
   INSTANCE;
@@ -65,6 +67,14 @@ public enum RunningInstances {
     return runningInstances.size();
   }
 
+  public Set<String> getRunningInstanceIds() {
+    return runningInstances
+        .entrySet()
+        .stream()
+        .map((entry -> entry.getValue().getDescription().getElementId()))
+        .collect(Collectors.toSet());
+  }
+
   public List<String> getRunningInstanceIdsForElement(String appId) {
     // TODO change this to appId for STREAMPIPES-319
     List<String> instanceIds = new ArrayList<>();
diff --git a/streampipes-health-monitoring/pom.xml 
b/streampipes-health-monitoring/pom.xml
new file mode 100644
index 0000000000..40e3992cde
--- /dev/null
+++ b/streampipes-health-monitoring/pom.xml
@@ -0,0 +1,76 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  ~
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.streampipes</groupId>
+        <artifactId>streampipes-parent</artifactId>
+        <version>0.99.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>streampipes-health-monitoring</artifactId>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.streampipes</groupId>
+            <artifactId>streampipes-pipeline-management</artifactId>
+            <version>0.99.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streampipes</groupId>
+            <artifactId>streampipes-commons</artifactId>
+            <version>0.99.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streampipes</groupId>
+            <artifactId>streampipes-connect-management</artifactId>
+            <version>0.99.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streampipes</groupId>
+            <artifactId>streampipes-load-balancer</artifactId>
+            <version>0.99.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streampipes</groupId>
+            <artifactId>streampipes-model</artifactId>
+            <version>0.99.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streampipes</groupId>
+            <artifactId>streampipes-storage-api</artifactId>
+            <version>0.99.0-SNAPSHOT</version>
+        </dependency>
+
+
+        <dependency>
+            <groupId>org.junit.jupiter</groupId>
+            <artifactId>junit-jupiter-api</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-core</artifactId>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+
+</project>
diff --git 
a/streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/AdapterHealthCheck.java
 
b/streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/AdapterHealthCheck.java
new file mode 100644
index 0000000000..f742b23f57
--- /dev/null
+++ 
b/streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/AdapterHealthCheck.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.health.monitoring;
+
+import org.apache.streampipes.commons.exceptions.connect.AdapterException;
+import org.apache.streampipes.commons.prometheus.adapter.AdapterMetrics;
+import org.apache.streampipes.commons.prometheus.adapter.AdapterMetricsManager;
+import org.apache.streampipes.health.monitoring.model.HealthCheckData;
+import org.apache.streampipes.loadbalance.pipeline.ExtensionsLogProvider;
+import org.apache.streampipes.model.connect.adapter.AdapterDescription;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+public class AdapterHealthCheck {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(AdapterHealthCheck.class);
+
+  private final HealthCheckData healthCheckData;
+
+  public AdapterHealthCheck(HealthCheckData healthCheckData) {
+    this.healthCheckData = healthCheckData;
+  }
+
+  /**
+   * In this method it is checked which adapters are currently running. Then 
it calls all workers to
+   * validate if the adapter instance is still running as expected. If the 
adapter is not running
+   * anymore a new worker instance is invoked. In addition, it publishes 
monitoring metrics for all
+   * running adapters (in line with
+   * {@link PipelineHealthCheck}).
+   */
+  public void runCheck() {
+    LOG.debug("Adapter health check started");
+
+    try {
+      if (!healthCheckData.activeResources().runningAdapters().isEmpty()) {
+        var allAdaptersToRecover = this.getAdaptersToRecover();
+
+        allAdaptersToRecover
+            .forEach(adapter ->
+                LOG.info("Adapter instance with id {} needs to be recovered", 
adapter.getElementId()));
+        // Filter adapters so that only healthy and running adapters are 
updated in the metrics
+        // endpoint
+        var adaptersToMonitor = 
healthCheckData.activeResources().runningAdapters().stream()
+            .filter(entry -> allAdaptersToRecover
+                .stream()
+                .noneMatch(r -> r.getElementId().equals(entry.getElementId()))
+            )
+            .toList();
+
+        if (!adaptersToMonitor.isEmpty()) {
+          updateMonitoringMetrics(adaptersToMonitor);
+        } else {
+          LOG.debug("No running adapter instances to monitor.");
+        }
+
+        LOG.debug("Monitoring metrics updated for running adapters.");
+
+        // Recover Adapters
+        this.recoverAdapters(allAdaptersToRecover);
+      }
+    } catch (NoSuchElementException e) {
+      LOG.error("Could not update adapter metrics due to an invalid state. 
({})", e.getMessage());
+    }
+  }
+
+  /**
+   * Updates the monitoring metrics based on the descriptions of running 
adapters.
+   *
+   * @param runningAdapterDescriptions A map containing the descriptions of 
running adapters, where
+   *        the key is the adapter's element ID and the value is the 
corresponding adapter
+   *        description.
+   */
+  protected void updateMonitoringMetrics(List<AdapterDescription> 
runningAdapterDescriptions) {
+
+    var adapterMetrics = 
AdapterMetricsManager.getInstance().getAdapterMetrics();
+    runningAdapterDescriptions
+        .forEach(adapterDescription -> 
updateTotalEventsPublished(adapterMetrics,
+                                                                  
adapterDescription.getElementId(),
+                                                                  
adapterDescription.getName()));
+    LOG.debug("Monitoring {} adapter instances", adapterMetrics.size());
+  }
+
+  private void updateTotalEventsPublished(AdapterMetrics adapterMetrics, 
String adapterId,
+                                          String adapterName) {
+
+    // Check if the adapter is already registered; if not, register it first.
+    // This step is crucial, especially when the StreamPipes Core service is 
restarted,
+    // and there are existing running adapters that need proper registration.
+    // Note: Proper registration is usually handled during the initial start 
of the adapter.
+    if (!adapterMetrics.contains(adapterId)) {
+      adapterMetrics.register(adapterId, adapterName);
+    }
+
+    adapterMetrics.updateTotalEventsPublished(adapterId, adapterName, 
ExtensionsLogProvider.INSTANCE
+        .getMetricInfosForResource(adapterId).getMessagesOut().getCounter());
+  }
+
+
+  /**
+   * Retrieves a list of adapters to recover by comparing the provided 
groupings of adapter instances
+   * with the instances supposed to run according to the storage. For every 
adapter instance it is
+   * verified that it actually runs on a worker node. If this is not the case, 
it is added to the
+   * output of adapters to recover.
+   *
+   *        running.
+   * @return A new map containing adapter instances to recover, filtered based 
on running instances.
+   */
+  public List<AdapterDescription> getAdaptersToRecover() {
+
+    var runningAdapterIds =
+        healthCheckData.activeExtensionInstances()
+            .values()
+            .stream()
+            .flatMap(h -> h.runningAdapterInstanceIds().stream())
+            .collect(Collectors.toSet());
+
+    return healthCheckData.activeResources()
+        .runningAdapters()
+        .stream()
+        .filter(Objects::nonNull)
+        .filter(a -> a.getElementId() != null)
+        .filter(a -> !runningAdapterIds.contains(a.getElementId()))
+        .toList();
+  }
+
+  public void recoverAdapters(List<AdapterDescription> adaptersToRecover) {
+    for (AdapterDescription adapterDescription : adaptersToRecover) {
+      // Invoke all adapters that were running when the adapter container was 
stopped
+      try {
+        if (adapterDescription.isRunning()) {
+          LOG.debug("Start recovering adapter {} ", 
adapterDescription.getElementId());
+          
this.healthCheckData.resourceProvider().adapterMasterManagement().startStreamAdapter(adapterDescription.getElementId());
+          LOG.info("Adapter {} is recovered", 
adapterDescription.getElementId());
+
+        }
+      } catch (AdapterException e) {
+        LOG.warn("Could not start adapter {} ({})", 
adapterDescription.getName(), e.getMessage());
+      } catch (Exception e) {
+        LOG.error(
+            "Unexpected error while recovering adapter {} ({})",
+            adapterDescription.getName(),
+            e.getMessage()
+        );
+      }
+    }
+  }
+}
diff --git 
a/streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/ExtensionHealthCheck.java
 
b/streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/ExtensionHealthCheck.java
new file mode 100644
index 0000000000..256c1cc57b
--- /dev/null
+++ 
b/streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/ExtensionHealthCheck.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.health.monitoring;
+
+import org.apache.streampipes.health.monitoring.model.HealthCheckData;
+import org.apache.streampipes.model.health.ExtensionInstanceHealth;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+
+public class ExtensionHealthCheck implements Runnable {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(ExtensionHealthCheck.class);
+
+  private final ResourceProvider resourceProvider;
+
+  public ExtensionHealthCheck(ResourceProvider resourceProvider) {
+    this.resourceProvider = resourceProvider;
+  }
+
+  @Override
+  public void run() {
+    try {
+      var activeResources = resourceProvider.loadActiveResources();
+      var activeCoreInstances = 
resourceProvider.loadActiveInstances(activeResources);
+
+      var activeExtensionInstances = new HashMap<String, 
ExtensionInstanceHealth>();
+      activeCoreInstances.keySet().forEach(k -> {
+        activeExtensionInstances.put(k, new 
ExtensionInstanceAvailabilityCheck(k).checkRunningInstances());
+      });
+
+      var healthCheckData = new HealthCheckData(resourceProvider, 
activeResources, activeCoreInstances, activeExtensionInstances);
+      new PipelineHealthCheck(healthCheckData).runCheck();
+      new AdapterHealthCheck(healthCheckData).runCheck();
+    } catch (Exception e) {
+      LOG.warn("An unhandled error occurred while running health check.", e);
+    }
+  }
+}
diff --git 
a/streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/ExtensionInstanceAvailabilityCheck.java
 
b/streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/ExtensionInstanceAvailabilityCheck.java
new file mode 100644
index 0000000000..46236d90da
--- /dev/null
+++ 
b/streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/ExtensionInstanceAvailabilityCheck.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.health.monitoring;
+
+import org.apache.streampipes.manager.execution.ExtensionServiceExecutions;
+import org.apache.streampipes.model.health.ExtensionInstanceHealth;
+import org.apache.streampipes.serializers.json.JacksonSerializer;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.http.util.EntityUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Set;
+
+public class ExtensionInstanceAvailabilityCheck {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(ExtensionInstanceAvailabilityCheck.class);
+  private static final String InstancePath = "/health";
+
+  private final String serviceBaseUrl;
+
+  public ExtensionInstanceAvailabilityCheck(String serviceBaseUrl) {
+    this.serviceBaseUrl = serviceBaseUrl;
+  }
+
+  public ExtensionInstanceHealth checkRunningInstances() {
+    try {
+      var request = 
ExtensionServiceExecutions.extServiceGetRequest(makeRequestUrl());
+      var response = request.execute().returnResponse();
+      if (response.getStatusLine().getStatusCode() != 200) {
+        return new ExtensionInstanceHealth(Set.of(), Set.of());
+      }
+      String body = EntityUtils.toString(response.getEntity(), 
StandardCharsets.UTF_8);
+      return deserialize(body);
+
+    } catch (IOException e) {
+      LOG.error("Extension service {} is unavailable", serviceBaseUrl);
+      return new ExtensionInstanceHealth(Set.of(), Set.of());
+    }
+  }
+
+  private ExtensionInstanceHealth deserialize(String json) throws 
JsonProcessingException {
+    return JacksonSerializer.getObjectMapper().readValue(json, 
ExtensionInstanceHealth.class);
+  }
+
+  private String makeRequestUrl() {
+    return serviceBaseUrl + InstancePath;
+  }
+}
diff --git 
a/streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/PipelineHealthCheck.java
 
b/streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/PipelineHealthCheck.java
new file mode 100644
index 0000000000..5447f7044b
--- /dev/null
+++ 
b/streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/PipelineHealthCheck.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.health.monitoring;
+
+import 
org.apache.streampipes.commons.exceptions.NoServiceEndpointsAvailableException;
+import org.apache.streampipes.commons.prometheus.pipelines.PipelinesStats;
+import org.apache.streampipes.health.monitoring.model.HealthCheckData;
+import org.apache.streampipes.health.monitoring.utils.HealthCheckUtils;
+import 
org.apache.streampipes.manager.execution.endpoint.ExtensionsServiceEndpointGenerator;
+import 
org.apache.streampipes.manager.execution.endpoint.ExtensionsServiceEndpointUtils;
+import org.apache.streampipes.manager.execution.http.InvokeHttpRequest;
+import org.apache.streampipes.manager.util.PipelineElementUtils;
+import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
+import org.apache.streampipes.model.graph.DataProcessorInvocation;
+import org.apache.streampipes.model.graph.DataSinkInvocation;
+import org.apache.streampipes.model.pipeline.Pipeline;
+import org.apache.streampipes.model.pipeline.PipelineHealthStatus;
+import org.apache.streampipes.resource.management.secret.SecretDecrypter;
+import org.apache.streampipes.resource.management.secret.SecretEncrypter;
+import org.apache.streampipes.resource.management.secret.SecretService;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Stream;
+
+import static 
org.apache.streampipes.manager.pipeline.PipelineManager.getPipeline;
+
+public class PipelineHealthCheck {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(PipelineHealthCheck.class);
+  private static final int MAX_FAILED_ATTEMPTS = 10;
+
+  private static final Map<String, Integer> failedRestartAttempts = new 
HashMap<>();
+  private static final PipelinesStats pipelinesStats = new PipelinesStats();
+
+  private final HealthCheckData healthCheckData;
+
+  public PipelineHealthCheck(HealthCheckData healthCheckData) {
+    this.healthCheckData = healthCheckData;
+  }
+
+  public void runCheck() {
+    try {
+      initPipelineMetrics();
+
+      if (!healthCheckData.activeResources().runningPipelines().isEmpty()) {
+        checkAndRestorePipelineElements();
+      }
+      pipelinesStats.metrics();
+    } catch (Exception e) {
+      LOG.error("Error while checking and restoring pipeline elements", e);
+    }
+  }
+
+  private void initPipelineMetrics() {
+    pipelinesStats.clear();
+    
pipelinesStats.setAllPipelines(healthCheckData.activeResources().allPipelines().size());
+    
pipelinesStats.setRunningPipelines(healthCheckData.activeResources().runningPipelines().size());
+    pipelinesStats.setStoppedPipelines(pipelinesStats.getAllPipelines()
+        - pipelinesStats.getRunningPipelines());
+
+    for (Pipeline p : healthCheckData.activeResources().allPipelines()) {
+      pipelinesStats.updatePipelineHealthState(
+          p.getElementId(),
+          p.getName(),
+          p.getHealthStatus().toString());
+
+      pipelinesStats.updatePipelineRunningState(
+          p.getElementId(),
+          p.getName(),
+          p.isRunning());
+    }
+  }
+
+  private void checkAndRestorePipelineElements() {
+    healthCheckData.activeResources().runningPipelines().forEach(pipeline -> {
+      AtomicBoolean shouldUpdatePipeline = new AtomicBoolean(false);
+      List<String> failedInstances = new ArrayList<>();
+      List<String> recoveredInstances = new ArrayList<>();
+      List<String> pipelineNotifications = new ArrayList<>();
+      List<InvocableStreamPipesEntity> runningPipelineElements = Stream.concat(
+          pipeline.getSepas().stream(),
+          pipeline.getActions().stream()
+      ).toList();
+
+      runningPipelineElements.forEach(pipelineElement -> {
+        String instanceId = 
HealthCheckUtils.extractInstanceId(pipelineElement);
+        if (isNowhereRunning(instanceId)) {
+          if (shouldRetry(instanceId)) {
+            var serviceBaseUrl = pipelineElement.getSelectedEndpointUrl();
+            shouldUpdatePipeline.set(true);
+            boolean success;
+            try {
+              serviceBaseUrl = new 
ExtensionsServiceEndpointGenerator().getEndpointBaseUrl(
+                  pipelineElement.getAppId(),
+                  
ExtensionsServiceEndpointUtils.getPipelineElementType(pipelineElement.getAppId()),
+                  Collections.emptySet()
+              );
+              new SecretService(new SecretDecrypter()).apply(pipelineElement);
+              var invocationUrl = getInvocationUrl(pipelineElement, 
serviceBaseUrl);
+              success = new InvokeHttpRequest()
+                  .execute(pipelineElement, invocationUrl, 
pipeline.getPipelineId()).isSuccess();
+              new SecretService(new SecretEncrypter()).apply(pipelineElement);
+            } catch (NoServiceEndpointsAvailableException e) {
+              success = false;
+            }
+            if (!success) {
+              failedInstances.add(instanceId);
+              
HealthCheckUtils.addFailedAttemptNotification(pipelineNotifications, 
pipelineElement);
+              increaseFailedAttempt(instanceId);
+              LOG.info("Could not restore pipeline element {} of pipeline {} 
({}/{})",
+                  pipelineElement.getName(), pipeline.getName(), 
failedRestartAttempts.get(instanceId),
+                  MAX_FAILED_ATTEMPTS);
+            } else {
+              recoveredInstances.add(instanceId);
+              
HealthCheckUtils.addSuccessfulRestoreNotification(pipelineNotifications, 
pipelineElement);
+              resetFailedAttempts(instanceId);
+              pipelineElement.setSelectedEndpointUrl(serviceBaseUrl);
+              LOG.info("Successfully restored pipeline element {} of pipeline 
{}",
+                  pipelineElement.getName(), pipeline.getName());
+            }
+          }
+        }
+      });
+      if (shouldUpdatePipeline.get()) {
+        var currentPipeline = getPipeline(pipeline.getPipelineId());
+        if (!failedInstances.isEmpty()) {
+          currentPipeline.setHealthStatus(PipelineHealthStatus.FAILURE);
+          pipelinesStats.failedIncrease();
+        } else if (!recoveredInstances.isEmpty()) {
+          
currentPipeline.setHealthStatus(PipelineHealthStatus.REQUIRES_ATTENTION);
+          pipelinesStats.attentionRequiredIncrease();
+        }
+        currentPipeline.setSepas(
+            PipelineElementUtils.filterInvocation(runningPipelineElements, 
DataProcessorInvocation.class)
+        );
+        currentPipeline.setActions(
+            PipelineElementUtils.filterInvocation(runningPipelineElements, 
DataSinkInvocation.class)
+        );
+        currentPipeline.setPipelineNotifications(pipelineNotifications);
+        
healthCheckData.resourceProvider().pipelineStorage().updateElement(currentPipeline);
+        
pipelinesStats.updatePipelineHealthState(currentPipeline.getElementId(), 
currentPipeline.getName(),
+            currentPipeline.getHealthStatus().toString());
+      }
+    });
+    int healthNum = pipelinesStats.getRunningPipelines() - 
pipelinesStats.getFailedPipelines()
+        - pipelinesStats.getAttentionRequiredPipelines();
+    pipelinesStats.setHealthyPipelines(healthNum);
+    
pipelinesStats.setElementCount(getElementsCount(healthCheckData.activeResources().allPipelines()));
+  }
+
+  private boolean isNowhereRunning(String instanceId) {
+    return (healthCheckData.activeExtensionInstances().entrySet().stream()
+        .noneMatch(entry -> 
entry.getValue().runningPipelineElementInstanceIds().contains(instanceId)));
+  }
+
+  private boolean shouldRetry(String instanceId) {
+    if (!failedRestartAttempts.containsKey(instanceId)) {
+      return true;
+    } else {
+      return failedRestartAttempts.get(instanceId) < MAX_FAILED_ATTEMPTS;
+    }
+  }
+
+  private void resetFailedAttempts(String instanceId) {
+    failedRestartAttempts.put(instanceId, 0);
+  }
+
+  private void increaseFailedAttempt(String instanceId) {
+    if (!failedRestartAttempts.containsKey(instanceId)) {
+      failedRestartAttempts.put(instanceId, 1);
+    } else {
+      Integer currentAttempt = failedRestartAttempts.get(instanceId) + 1;
+      failedRestartAttempts.put(instanceId, currentAttempt);
+    }
+  }
+
+  private int getElementsCount(List<Pipeline> allPipelines) {
+    return allPipelines.stream().mapToInt(pipeline -> 
pipeline.getActions().size()).sum();
+  }
+
+  private String getInvocationUrl(InvocableStreamPipesEntity pipelineElement,
+                                  String baseUrl) {
+    return ExtensionsServiceEndpointUtils
+        .getPipelineElementType(pipelineElement)
+        .getInvocationUrl(baseUrl, pipelineElement.getAppId());
+  }
+}
diff --git 
a/streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/PostStartupRecovery.java
 
b/streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/PostStartupRecovery.java
new file mode 100644
index 0000000000..73c918e83a
--- /dev/null
+++ 
b/streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/PostStartupRecovery.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.health.monitoring;
+
+import org.apache.streampipes.connect.management.health.AdapterOperationLock;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.TimeUnit;
+
+public class PostStartupRecovery {
+
+  private final ExtensionHealthCheck extensionHealthCheck;
+
+  public PostStartupRecovery(ExtensionHealthCheck extensionHealthCheck) {
+    this.extensionHealthCheck = extensionHealthCheck;
+  }
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(PostStartupRecovery.class);
+
+  private static final int MAX_RETRIES = 7;
+
+  public void checkAndRestore(int retryCount) {
+    if (AdapterOperationLock.INSTANCE.isLocked()) {
+      LOG.info("Adapter operation already in progress, {}/{}", (retryCount + 
1), MAX_RETRIES);
+      if (retryCount <= MAX_RETRIES) {
+        try {
+          TimeUnit.MILLISECONDS.sleep(3000);
+          retryCount++;
+          checkAndRestore(retryCount);
+        } catch (InterruptedException e) {
+          e.printStackTrace();
+        }
+      } else {
+        LOG.info("Max retries for running adapter operations reached, will do 
unlock which might cause conflicts...");
+        AdapterOperationLock.INSTANCE.unlock();
+        this.extensionHealthCheck.run();
+      }
+    } else {
+      AdapterOperationLock.INSTANCE.lock();
+      this.extensionHealthCheck.run();
+      AdapterOperationLock.INSTANCE.unlock();
+    }
+  }
+}
diff --git 
a/streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/ResourceProvider.java
 
b/streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/ResourceProvider.java
new file mode 100644
index 0000000000..9136778e09
--- /dev/null
+++ 
b/streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/ResourceProvider.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.health.monitoring;
+
+import 
org.apache.streampipes.connect.management.management.AdapterMasterManagement;
+import org.apache.streampipes.health.monitoring.model.ActiveCoreInstances;
+import org.apache.streampipes.health.monitoring.model.ActiveResources;
+import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
+import org.apache.streampipes.model.connect.adapter.AdapterDescription;
+import org.apache.streampipes.model.pipeline.Pipeline;
+import org.apache.streampipes.storage.api.IAdapterStorage;
+import org.apache.streampipes.storage.api.IPipelineStorage;
+
+import java.util.AbstractMap;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public record ResourceProvider(IPipelineStorage pipelineStorage,
+                               IAdapterStorage adapterInstanceStorage,
+                               AdapterMasterManagement 
adapterMasterManagement) {
+
+  public ActiveResources loadActiveResources() {
+    var allPipelines = pipelineStorage.findAll();
+    var runningPipelines = 
allPipelines.stream().filter(Pipeline::isRunning).toList();
+    var allAdapters = adapterInstanceStorage.findAll();
+    var runningAdapters = 
allAdapters.stream().filter(AdapterDescription::isRunning).toList();
+
+    return new ActiveResources(
+        allPipelines,
+        runningPipelines,
+        allAdapters,
+        runningAdapters
+    );
+  }
+
+  public Map<String, ActiveCoreInstances> loadActiveInstances(ActiveResources 
activeResources) {
+    Map<String, List<AdapterDescription>> adaptersByUrl =
+        activeResources.runningAdapters()
+            .stream()
+            .filter(a -> a.getSelectedEndpointUrl() != null)
+            
.collect(Collectors.groupingBy(AdapterDescription::getSelectedEndpointUrl));
+
+    Map<String, Map<String, InvocableStreamPipesEntity>> elementsByUrl =
+        activeResources.runningPipelines()
+            .stream()
+            .flatMap(p -> {
+              String pipelineId = p.getPipelineId();
+              return Optional.of(
+                      Stream.concat(
+                          p.getSepas().stream(),
+                          p.getActions().stream()
+                      ).toList()
+                  )
+                  .orElseGet(List::of)
+                  .stream()
+                  .filter(Objects::nonNull)
+                  .map(e -> new AbstractMap.SimpleEntry<>(pipelineId, e));
+            })
+            .filter(entry -> entry.getValue().getSelectedEndpointUrl() != null)
+            .collect(Collectors.groupingBy(
+                entry -> entry.getValue().getSelectedEndpointUrl(),
+                Collectors.toMap(
+                    Map.Entry::getKey,
+                    AbstractMap.SimpleEntry::getValue,
+                    (left, right) -> left)
+            ));
+
+    Set<String> allUrls = new HashSet<>();
+    allUrls.addAll(adaptersByUrl.keySet());
+    allUrls.addAll(elementsByUrl.keySet());
+
+    Map<String, ActiveCoreInstances> result = new HashMap<>();
+    for (String url : allUrls) {
+      List<AdapterDescription> adapters = adaptersByUrl.getOrDefault(url, 
List.of());
+      Map<String, InvocableStreamPipesEntity> elementsPerPipeline = 
elementsByUrl.getOrDefault(url, Map.of());
+      result.put(url, new ActiveCoreInstances(adapters, elementsPerPipeline));
+    }
+
+    return result;
+  }
+}
diff --git 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/health/ServiceHealthCheck.java
 
b/streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/ServiceHealthCheck.java
similarity index 83%
rename from 
streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/health/ServiceHealthCheck.java
rename to 
streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/ServiceHealthCheck.java
index 949b65fa0a..e252efd755 100644
--- 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/health/ServiceHealthCheck.java
+++ 
b/streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/ServiceHealthCheck.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  *
  */
-package org.apache.streampipes.manager.health;
+package org.apache.streampipes.health.monitoring;
 
 
 import org.apache.streampipes.commons.environment.Environment;
@@ -24,7 +24,7 @@ import org.apache.streampipes.loadbalance.LoadManager;
 import org.apache.streampipes.manager.execution.ExtensionServiceExecutions;
 import 
org.apache.streampipes.model.extensions.svcdiscovery.SpServiceRegistration;
 import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceStatus;
-import org.apache.streampipes.storage.management.StorageDispatcher;
+import org.apache.streampipes.storage.api.CRUDStorage;
 
 import org.apache.http.HttpStatus;
 import org.slf4j.Logger;
@@ -32,9 +32,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.HashSet;
 import java.util.List;
-import java.util.stream.Collectors;
 
 public class ServiceHealthCheck implements Runnable {
 
@@ -45,8 +43,7 @@ public class ServiceHealthCheck implements Runnable {
 
   private final List<SpServiceRegistration> needDeletedServices = new 
ArrayList<>();
 
-  public ServiceHealthCheck() {
-    var storage = 
StorageDispatcher.INSTANCE.getNoSqlStore().getExtensionsServiceStorage();
+  public ServiceHealthCheck(CRUDStorage<SpServiceRegistration> storage) {
     this.serviceRegistrationManager = new ServiceRegistrationManager(storage);
     this.maxUnhealthyDurationBeforeRemovalMs = Environments.getEnvironment()
         .getUnhealthyTimeBeforeServiceDeletionInMillis().getValueOrDefault();
@@ -68,10 +65,8 @@ public class ServiceHealthCheck implements Runnable {
     } finally {
       needDeletedServices.clear();
     }
-    new PipelineHealthCheck().run();
   }
 
-
   private void checkServiceHealth(SpServiceRegistration service) {
     String healthCheckUrl = makeHealthCheckUrl(service);
 
@@ -117,15 +112,4 @@ public class ServiceHealthCheck implements Runnable {
   private List<SpServiceRegistration> getRegisteredServices() {
     return serviceRegistrationManager.getAllServices();
   }
-
-  public static List<SpServiceRegistration> getService(String tag,
-                                                       
List<SpServiceRegistration> activeServices) {
-    return activeServices.stream().filter(service -> filtersSupported(service, 
tag))
-        .filter(service -> service.getStatus() != SpServiceStatus.HEALTHY)
-        .collect(Collectors.toList());
-  }
-
-  private static boolean filtersSupported(SpServiceRegistration service, 
String tag) {
-    return new HashSet<>(service.getTags()).stream().anyMatch(t -> 
t.asString().equals(tag));
-  }
 }
diff --git 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/health/ServiceRegistrationManager.java
 
b/streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/ServiceRegistrationManager.java
similarity index 95%
rename from 
streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/health/ServiceRegistrationManager.java
rename to 
streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/ServiceRegistrationManager.java
index 3bbee91f74..e3fb6271e1 100644
--- 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/health/ServiceRegistrationManager.java
+++ 
b/streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/ServiceRegistrationManager.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  *
  */
-package org.apache.streampipes.manager.health;
+package org.apache.streampipes.health.monitoring;
 
 import 
org.apache.streampipes.model.extensions.svcdiscovery.SpServiceRegistration;
 import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceStatus;
@@ -81,10 +81,6 @@ public class ServiceRegistrationManager {
              serviceRegistration.getSvcId());
   }
 
-  public SpServiceStatus getServiceStatus(String serviceId) {
-    return storage.getElementById(serviceId).getStatus();
-  }
-
   private void logService(SpServiceRegistration serviceRegistration) {
     LOG.info("Service {} (id={}) is now in {} state", 
serviceRegistration.getSvcGroup(),
              serviceRegistration.getSvcId(), serviceRegistration.getStatus());
diff --git 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/storage/RunningPipelineElementStorage.java
 
b/streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/model/ActiveCoreInstances.java
similarity index 75%
copy from 
streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/storage/RunningPipelineElementStorage.java
copy to 
streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/model/ActiveCoreInstances.java
index ba1734b844..272ed4a0c4 100644
--- 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/storage/RunningPipelineElementStorage.java
+++ 
b/streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/model/ActiveCoreInstances.java
@@ -16,16 +16,14 @@
  *
  */
 
-package org.apache.streampipes.manager.storage;
+package org.apache.streampipes.health.monitoring.model;
 
 import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
+import org.apache.streampipes.model.connect.adapter.AdapterDescription;
 
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-public class RunningPipelineElementStorage {
-
-  public static Map<String, List<InvocableStreamPipesEntity>> 
runningProcessorsAndSinks = new HashMap<>();
-
+public record ActiveCoreInstances(List<AdapterDescription> adapters,
+                                  Map<String, InvocableStreamPipesEntity> 
elementsPerPipeline) {
 }
diff --git 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/storage/RunningPipelineElementStorage.java
 
b/streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/model/ActiveResources.java
similarity index 65%
copy from 
streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/storage/RunningPipelineElementStorage.java
copy to 
streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/model/ActiveResources.java
index ba1734b844..862ef6d64a 100644
--- 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/storage/RunningPipelineElementStorage.java
+++ 
b/streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/model/ActiveResources.java
@@ -16,16 +16,15 @@
  *
  */
 
-package org.apache.streampipes.manager.storage;
+package org.apache.streampipes.health.monitoring.model;
 
-import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
+import org.apache.streampipes.model.connect.adapter.AdapterDescription;
+import org.apache.streampipes.model.pipeline.Pipeline;
 
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
-
-public class RunningPipelineElementStorage {
-
-  public static Map<String, List<InvocableStreamPipesEntity>> 
runningProcessorsAndSinks = new HashMap<>();
 
+public record ActiveResources(List<Pipeline> allPipelines,
+                              List<Pipeline> runningPipelines,
+                              List<AdapterDescription> allAdapters,
+                              List<AdapterDescription> runningAdapters) {
 }
diff --git 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/storage/RunningPipelineElementStorage.java
 
b/streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/model/HealthCheckData.java
similarity index 62%
rename from 
streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/storage/RunningPipelineElementStorage.java
rename to 
streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/model/HealthCheckData.java
index ba1734b844..6a0744d10f 100644
--- 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/storage/RunningPipelineElementStorage.java
+++ 
b/streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/model/HealthCheckData.java
@@ -16,16 +16,15 @@
  *
  */
 
-package org.apache.streampipes.manager.storage;
+package org.apache.streampipes.health.monitoring.model;
 
-import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
+import org.apache.streampipes.health.monitoring.ResourceProvider;
+import org.apache.streampipes.model.health.ExtensionInstanceHealth;
 
-import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 
-public class RunningPipelineElementStorage {
-
-  public static Map<String, List<InvocableStreamPipesEntity>> 
runningProcessorsAndSinks = new HashMap<>();
-
+public record HealthCheckData(ResourceProvider resourceProvider,
+                              ActiveResources activeResources,
+                              Map<String, ActiveCoreInstances> 
activeCoreInstances,
+                              Map<String, ExtensionInstanceHealth> 
activeExtensionInstances) {
 }
diff --git 
a/streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/utils/HealthCheckUtils.java
 
b/streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/utils/HealthCheckUtils.java
new file mode 100644
index 0000000000..fdec9191bc
--- /dev/null
+++ 
b/streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/utils/HealthCheckUtils.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.health.monitoring.utils;
+
+import org.apache.streampipes.commons.constants.InstanceIdExtractor;
+import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
+
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.List;
+
+public class HealthCheckUtils {
+
+  public static void addSuccessfulRestoreNotification(List<String> 
pipelineNotifications,
+                                                      
InvocableStreamPipesEntity pipelineElement) {
+    pipelineNotifications.add(getCurrentDatetime() + "Pipeline element '" + 
pipelineElement.getName()
+        + "' was not available and was successfully restored.");
+  }
+
+  public static void addFailedAttemptNotification(List<String> 
pipelineNotifications,
+                                                  InvocableStreamPipesEntity 
pipelineElement) {
+    pipelineNotifications.add(getCurrentDatetime() + "Pipeline element '" + 
pipelineElement.getName()
+        + "' was not available and could not be restored.");
+  }
+
+  private static String getCurrentDatetime() {
+    DateTimeFormatter dtf = DateTimeFormatter.ofPattern("uuuu/MM/dd HH:mm:ss");
+    LocalDateTime now = LocalDateTime.now();
+    return "[" + dtf.format(now) + "] ";
+  }
+
+  public static String extractInstanceId(InvocableStreamPipesEntity 
pipelineElement) {
+    return InstanceIdExtractor.extractId(pipelineElement.getElementId());
+  }
+
+
+}
diff --git 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/provider/PipelineElementProvider.java
 
b/streampipes-model/src/main/java/org/apache/streampipes/model/health/ExtensionInstanceHealth.java
similarity index 69%
rename from 
streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/provider/PipelineElementProvider.java
rename to 
streampipes-model/src/main/java/org/apache/streampipes/model/health/ExtensionInstanceHealth.java
index 08c37c6742..2adf8a0839 100644
--- 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/provider/PipelineElementProvider.java
+++ 
b/streampipes-model/src/main/java/org/apache/streampipes/model/health/ExtensionInstanceHealth.java
@@ -16,15 +16,10 @@
  *
  */
 
-package org.apache.streampipes.manager.execution.provider;
+package org.apache.streampipes.model.health;
 
-import org.apache.streampipes.manager.execution.PipelineExecutionInfo;
-import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
-
-import java.util.List;
-
-public interface PipelineElementProvider {
-
-  List<InvocableStreamPipesEntity> getProcessorsAndSinks(PipelineExecutionInfo 
executionInfo);
+import java.util.Set;
 
+public record ExtensionInstanceHealth(Set<String> runningAdapterInstanceIds,
+                                      Set<String> 
runningPipelineElementInstanceIds) {
 }
diff --git 
a/streampipes-model/src/main/java/org/apache/streampipes/model/util/PropertyUtils.java
 
b/streampipes-model/src/main/java/org/apache/streampipes/model/util/PropertyUtils.java
index 5cf7cba8dd..028982f953 100644
--- 
a/streampipes-model/src/main/java/org/apache/streampipes/model/util/PropertyUtils.java
+++ 
b/streampipes-model/src/main/java/org/apache/streampipes/model/util/PropertyUtils.java
@@ -30,10 +30,6 @@ import java.util.Map;
 
 public class PropertyUtils {
 
-  public static Map<String, Object> getRuntimeFormat(EventProperty 
eventProperty) {
-    return getUntypedRuntimeFormat(eventProperty);
-  }
-
   public static Map<String, Object> getUntypedRuntimeFormat(EventProperty ep) {
     if (ep instanceof EventPropertyPrimitive) {
       Map<String, Object> result = new HashMap<>();
diff --git 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/ExtensionServiceExecutions.java
 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/ExtensionServiceExecutions.java
index 70bd2e8cef..e4aca6b6c0 100644
--- 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/ExtensionServiceExecutions.java
+++ 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/ExtensionServiceExecutions.java
@@ -35,7 +35,6 @@ public class ExtensionServiceExecutions {
         .socketTimeout(10000);
   }
 
-
   private static String getServiceAdminSid() {
     return new 
SpResourceManager().manageUsers().getServiceAdmin().getPrincipalId();
   }
diff --git 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/PipelineExecutionInfo.java
 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/PipelineExecutionInfo.java
index 6451131d65..49d3ff72d4 100644
--- 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/PipelineExecutionInfo.java
+++ 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/PipelineExecutionInfo.java
@@ -67,7 +67,7 @@ public class PipelineExecutionInfo {
     return processorsAndSinks;
   }
 
-  public void applyPipelineOperationStatus(PipelineOperationStatus status) {
+  public void setPipelineOperationStatus(PipelineOperationStatus status) {
     this.pipelineOperationStatus = status;
   }
 
@@ -80,6 +80,6 @@ public class PipelineExecutionInfo {
   }
 
   public boolean isOperationSuccessful() {
-    return failedServices.size() == 0 && pipelineOperationStatus.isSuccess();
+    return failedServices.isEmpty() && pipelineOperationStatus.isSuccess();
   }
 }
diff --git 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/PipelineExecutionTaskFactory.java
 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/PipelineExecutionTaskFactory.java
index 3fab1d07e6..48a7ba2969 100644
--- 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/PipelineExecutionTaskFactory.java
+++ 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/PipelineExecutionTaskFactory.java
@@ -20,8 +20,6 @@ package org.apache.streampipes.manager.execution;
 
 import 
org.apache.streampipes.manager.execution.http.DetachPipelineElementSubmitter;
 import 
org.apache.streampipes.manager.execution.http.InvokePipelineElementSubmitter;
-import 
org.apache.streampipes.manager.execution.provider.CurrentPipelineElementProvider;
-import 
org.apache.streampipes.manager.execution.provider.StoredPipelineElementProvider;
 import org.apache.streampipes.manager.execution.task.AfterInvocationTask;
 import org.apache.streampipes.manager.execution.task.DiscoverEndpointsTask;
 import org.apache.streampipes.manager.execution.task.PipelineExecutionTask;
@@ -42,7 +40,7 @@ public class PipelineExecutionTaskFactory {
         new UpdateGroupIdTask(),
         new SecretEncryptionTask(SecretProvider.getDecryptionService()),
         new DiscoverEndpointsTask(),
-        new SubmitRequestTask(new InvokePipelineElementSubmitter(pipeline), 
new CurrentPipelineElementProvider()),
+        new SubmitRequestTask(new InvokePipelineElementSubmitter(pipeline)),
         new SecretEncryptionTask(SecretProvider.getEncryptionService()),
         new AfterInvocationTask(PipelineStatusMessageType.PIPELINE_STARTED),
         new StorePipelineStatusTask(true, false)
@@ -52,7 +50,7 @@ public class PipelineExecutionTaskFactory {
   public static List<PipelineExecutionTask> makeStopPipelineTasks(Pipeline 
pipeline,
                                                                   boolean 
forceStop) {
     return List.of(
-        new SubmitRequestTask(new DetachPipelineElementSubmitter(pipeline), 
new StoredPipelineElementProvider()),
+        new SubmitRequestTask(new DetachPipelineElementSubmitter(pipeline)),
         new AfterInvocationTask(PipelineStatusMessageType.PIPELINE_STOPPED),
         new StorePipelineStatusTask(false, forceStop)
     );
diff --git 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/endpoint/ExtensionsServiceEndpointGenerator.java
 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/endpoint/ExtensionsServiceEndpointGenerator.java
index 1a59bc8888..337e19c35d 100644
--- 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/endpoint/ExtensionsServiceEndpointGenerator.java
+++ 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/endpoint/ExtensionsServiceEndpointGenerator.java
@@ -43,20 +43,23 @@ public class ExtensionsServiceEndpointGenerator implements 
IExtensionsServiceEnd
 
   public ExtensionsServiceEndpointGenerator() {}
 
-  public String getEndpointResourceUrl(String appId, SpServiceUrlProvider 
spServiceUrlProvider,
+  public String getEndpointResourceUrl(String appId,
+                                       SpServiceUrlProvider 
spServiceUrlProvider,
                                        Set<SpServiceTag> customServiceTags)
       throws NoServiceEndpointsAvailableException {
     return spServiceUrlProvider
         .getInvocationUrl(selectService(appId, spServiceUrlProvider, 
customServiceTags), appId);
   }
 
-  public String getEndpointBaseUrl(String appId, SpServiceUrlProvider 
spServiceUrlProvider,
+  public String getEndpointBaseUrl(String appId,
+                                   SpServiceUrlProvider spServiceUrlProvider,
                                    Set<SpServiceTag> customServiceTags)
       throws NoServiceEndpointsAvailableException {
     return selectService(appId, spServiceUrlProvider, customServiceTags);
   }
 
-  private String selectService(String appId, SpServiceUrlProvider 
spServiceUrlProvider,
+  private String selectService(String appId,
+                               SpServiceUrlProvider spServiceUrlProvider,
                                Set<SpServiceTag> customServiceTags)
       throws NoServiceEndpointsAvailableException {
     Environment env = Environments.getEnvironment();
@@ -83,7 +86,8 @@ public class ExtensionsServiceEndpointGenerator implements 
IExtensionsServiceEnd
         "Could not find any matching service endpoints - are all software 
components running?");
   }
 
-  private List<String> getServiceEndpoints(String appId, SpServiceUrlProvider 
spServiceUrlProvider,
+  private List<String> getServiceEndpoints(String appId,
+                                           SpServiceUrlProvider 
spServiceUrlProvider,
                                            Set<SpServiceTag> 
customServiceTags) {
     return SpServiceDiscovery.getServiceDiscovery()
         .getServiceEndpoints(DefaultSpServiceTypes.EXT, true,
diff --git 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineElementSubmitter.java
 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/BasePipelineElementSubmitter.java
similarity index 71%
rename from 
streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineElementSubmitter.java
rename to 
streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/BasePipelineElementSubmitter.java
index 14937b8e56..7462e6f4d9 100644
--- 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineElementSubmitter.java
+++ 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/BasePipelineElementSubmitter.java
@@ -18,7 +18,7 @@
 
 package org.apache.streampipes.manager.execution.http;
 
-import org.apache.streampipes.model.api.EndpointSelectable;
+import 
org.apache.streampipes.manager.execution.endpoint.ExtensionsServiceEndpointUtils;
 import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
 import org.apache.streampipes.model.pipeline.Pipeline;
 import org.apache.streampipes.model.pipeline.PipelineElementStatus;
@@ -26,14 +26,14 @@ import 
org.apache.streampipes.model.pipeline.PipelineOperationStatus;
 
 import java.util.List;
 
-public abstract class PipelineElementSubmitter {
+public abstract class BasePipelineElementSubmitter {
 
   protected final String pipelineId;
   protected final String pipelineName;
 
   protected final PipelineOperationStatus status;
 
-  public PipelineElementSubmitter(Pipeline pipeline) {
+  public BasePipelineElementSubmitter(Pipeline pipeline) {
     this.pipelineId = pipeline.getPipelineId();
     this.pipelineName = pipeline.getName();
     this.status = new PipelineOperationStatus(pipelineId, pipelineName);
@@ -41,7 +41,10 @@ public abstract class PipelineElementSubmitter {
 
   public PipelineOperationStatus submit(List<InvocableStreamPipesEntity> 
processorsAndSinks) {
     // First, try handling all data processors and sinks
-    processorsAndSinks.forEach(g -> 
status.addPipelineElementStatus(submitElement(g)));
+    processorsAndSinks.forEach(g -> {
+      var response = submitElement(g);
+      status.addPipelineElementStatus(response);
+    });
 
     applySuccess(processorsAndSinks);
     return status;
@@ -60,12 +63,18 @@ public abstract class PipelineElementSubmitter {
     }
   }
 
-  protected PipelineElementStatus performDetach(EndpointSelectable 
pipelineElement) {
-    String endpointUrl = pipelineElement.getSelectedEndpointUrl() + 
pipelineElement.getDetachPath();
+  protected String getInvocationUrl(InvocableStreamPipesEntity 
pipelineElement) {
+    return ExtensionsServiceEndpointUtils
+        .getPipelineElementType(pipelineElement)
+        .getInvocationUrl(pipelineElement.getSelectedEndpointUrl(), 
pipelineElement.getAppId());
+  }
+
+  protected PipelineElementStatus performDetach(InvocableStreamPipesEntity 
pipelineElement) {
+    String endpointUrl = getInvocationUrl(pipelineElement) + 
pipelineElement.getDetachPath();
     return new DetachHttpRequest().execute(pipelineElement, endpointUrl, 
this.pipelineId);
   }
 
-  protected abstract PipelineElementStatus submitElement(EndpointSelectable 
pipelineElement);
+  protected abstract PipelineElementStatus 
submitElement(InvocableStreamPipesEntity pipelineElement);
 
   protected abstract void onSuccess();
 
diff --git 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/DetachPipelineElementSubmitter.java
 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/DetachPipelineElementSubmitter.java
index 16b8772ca4..816a043c2f 100644
--- 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/DetachPipelineElementSubmitter.java
+++ 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/DetachPipelineElementSubmitter.java
@@ -18,21 +18,20 @@
 
 package org.apache.streampipes.manager.execution.http;
 
-import org.apache.streampipes.model.api.EndpointSelectable;
 import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
 import org.apache.streampipes.model.pipeline.Pipeline;
 import org.apache.streampipes.model.pipeline.PipelineElementStatus;
 
 import java.util.List;
 
-public class DetachPipelineElementSubmitter extends PipelineElementSubmitter {
+public class DetachPipelineElementSubmitter extends 
BasePipelineElementSubmitter {
 
   public DetachPipelineElementSubmitter(Pipeline pipeline) {
     super(pipeline);
   }
 
   @Override
-  protected PipelineElementStatus submitElement(EndpointSelectable 
pipelineElement) {
+  protected PipelineElementStatus submitElement(InvocableStreamPipesEntity 
pipelineElement) {
     return performDetach(pipelineElement);
   }
 
diff --git 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/InvokePipelineElementSubmitter.java
 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/InvokePipelineElementSubmitter.java
index 986bfe6ead..238214a20f 100644
--- 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/InvokePipelineElementSubmitter.java
+++ 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/InvokePipelineElementSubmitter.java
@@ -18,7 +18,6 @@
 
 package org.apache.streampipes.manager.execution.http;
 
-import org.apache.streampipes.model.api.EndpointSelectable;
 import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
 import org.apache.streampipes.model.pipeline.Pipeline;
 import org.apache.streampipes.model.pipeline.PipelineElementStatus;
@@ -30,7 +29,7 @@ import org.slf4j.LoggerFactory;
 import java.util.List;
 import java.util.Optional;
 
-public class InvokePipelineElementSubmitter extends PipelineElementSubmitter {
+public class InvokePipelineElementSubmitter extends 
BasePipelineElementSubmitter {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(InvokePipelineElementSubmitter.class);
 
@@ -39,9 +38,9 @@ public class InvokePipelineElementSubmitter extends 
PipelineElementSubmitter {
   }
 
   @Override
-  protected PipelineElementStatus submitElement(EndpointSelectable 
pipelineElement) {
-    String endpointUrl = pipelineElement.getSelectedEndpointUrl();
-    return new InvokeHttpRequest().execute(pipelineElement, endpointUrl, 
this.pipelineId);
+  protected PipelineElementStatus submitElement(InvocableStreamPipesEntity 
pipelineElement) {
+    var invocationUrl = getInvocationUrl(pipelineElement);
+    return new InvokeHttpRequest().execute(pipelineElement, invocationUrl, 
this.pipelineId);
   }
 
   @Override
diff --git 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/provider/StoredPipelineElementProvider.java
 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/provider/StoredPipelineElementProvider.java
deleted file mode 100644
index d3cdda7f98..0000000000
--- 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/provider/StoredPipelineElementProvider.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.manager.execution.provider;
-
-import org.apache.streampipes.manager.execution.PipelineExecutionInfo;
-import org.apache.streampipes.manager.storage.RunningPipelineElementStorage;
-import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
-
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * Provides pipeline elements from the cache (for stop actions)
- */
-
-public class StoredPipelineElementProvider implements PipelineElementProvider {
-  @Override
-  public List<InvocableStreamPipesEntity> 
getProcessorsAndSinks(PipelineExecutionInfo executionInfo) {
-    if 
(RunningPipelineElementStorage.runningProcessorsAndSinks.containsKey(executionInfo.getPipelineId()))
 {
-      return 
RunningPipelineElementStorage.runningProcessorsAndSinks.get(executionInfo.getPipelineId());
-    } else {
-      return new ArrayList<>();
-    }
-  }
-
-}
diff --git 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/task/AfterInvocationTask.java
 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/task/AfterInvocationTask.java
index 4bd6c2279b..690e09ebde 100644
--- 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/task/AfterInvocationTask.java
+++ 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/task/AfterInvocationTask.java
@@ -20,8 +20,10 @@ package org.apache.streampipes.manager.execution.task;
 
 import org.apache.streampipes.manager.execution.PipelineExecutionInfo;
 import org.apache.streampipes.manager.execution.status.PipelineStatusManager;
-import org.apache.streampipes.manager.storage.RunningPipelineElementStorage;
+import org.apache.streampipes.manager.util.PipelineElementUtils;
 import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
+import org.apache.streampipes.model.graph.DataProcessorInvocation;
+import org.apache.streampipes.model.graph.DataSinkInvocation;
 import org.apache.streampipes.model.message.PipelineStatusMessage;
 import org.apache.streampipes.model.message.PipelineStatusMessageType;
 import org.apache.streampipes.model.pipeline.Pipeline;
@@ -45,13 +47,14 @@ public class AfterInvocationTask implements 
PipelineExecutionTask {
   public void executeTask(Pipeline pipeline,
                           PipelineExecutionInfo executionInfo) {
     var graphs = executionInfo.getProcessorsAndSinks();
-    storeInvocationGraphs(pipeline.getPipelineId(), graphs);
+    storeInvocationGraphs(pipeline, graphs);
     addPipelineStatus(pipeline);
   }
 
-  private void storeInvocationGraphs(String pipelineId,
-                                     List<InvocableStreamPipesEntity> graphs) {
-    RunningPipelineElementStorage.runningProcessorsAndSinks.put(pipelineId, 
graphs);
+  private void storeInvocationGraphs(Pipeline pipeline,
+                                     List<InvocableStreamPipesEntity> 
pipelineElements) {
+    
pipeline.setActions(PipelineElementUtils.filterInvocation(pipelineElements, 
DataSinkInvocation.class));
+    pipeline.setSepas(PipelineElementUtils.filterInvocation(pipelineElements, 
DataProcessorInvocation.class));
   }
 
   private void addPipelineStatus(Pipeline pipeline) {
diff --git 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/task/DiscoverEndpointsTask.java
 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/task/DiscoverEndpointsTask.java
index 78ca9ee3ce..a3b6cb5141 100644
--- 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/task/DiscoverEndpointsTask.java
+++ 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/task/DiscoverEndpointsTask.java
@@ -18,20 +18,17 @@
 
 package org.apache.streampipes.manager.execution.task;
 
-import 
org.apache.streampipes.commons.exceptions.NoServiceEndpointsAvailableException;
 import org.apache.streampipes.loadbalance.LoadManager;
 import org.apache.streampipes.loadbalance.unit.PipelineElementPartitioner;
 import org.apache.streampipes.manager.execution.PipelineExecutionInfo;
-import 
org.apache.streampipes.manager.execution.endpoint.ExtensionsServiceEndpointGenerator;
-import 
org.apache.streampipes.manager.execution.endpoint.ExtensionsServiceEndpointUtils;
 import org.apache.streampipes.model.api.EndpointSelectable;
-import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
 import 
org.apache.streampipes.model.extensions.svcdiscovery.SpServiceRegistration;
 import org.apache.streampipes.model.pipeline.Pipeline;
 import org.apache.streampipes.model.pipeline.PipelineElementStatus;
 import org.apache.streampipes.model.pipeline.PipelineOperationStatus;
 
 import java.util.List;
+import java.util.Objects;
 import java.util.stream.Collectors;
 
 public class DiscoverEndpointsTask implements PipelineExecutionTask {
@@ -40,14 +37,13 @@ public class DiscoverEndpointsTask implements 
PipelineExecutionTask {
                           PipelineExecutionInfo executionInfo) {
     for (PipelineElementPartitioner.ResourceUnitWithServices unit : 
PipelineElementPartitioner.partitionPipeline(pipeline).getResourceUnits()){
       SpServiceRegistration registration = 
LoadManager.allocation(unit.getCompatibleServices(), pipeline.getLabels());
-      String url = registration.getServiceUrl();
-      for (InvocableStreamPipesEntity el : 
unit.getResourceUnit().getElements()) {
-        try {
-          var endpointUrl = getSelectedEndpoint(el, url);
-          applyEndpointAndPipeline(pipeline.getPipelineId(), el, endpointUrl);
-        } catch (NoServiceEndpointsAvailableException en) {
-          executionInfo.addFailedPipelineElement(el);
-        }
+      if (Objects.nonNull(registration)) {
+        String endpointBaseUrl = registration.getServiceUrl();
+        unit.getResourceUnit().getElements().forEach(el -> {
+          applyEndpointAndPipeline(pipeline.getPipelineId(), el, 
endpointBaseUrl);
+        });
+      } else {
+        
unit.getResourceUnit().getElements().forEach(executionInfo::addFailedPipelineElement);
       }
     }
 
@@ -62,7 +58,7 @@ public class DiscoverEndpointsTask implements 
PipelineExecutionTask {
           pipeline.getName(),
           "Could not start pipeline " + pipeline.getName() + ".",
           pe);
-      executionInfo.applyPipelineOperationStatus(status);
+      executionInfo.setPipelineOperationStatus(status);
     }
   }
 
@@ -72,19 +68,4 @@ public class DiscoverEndpointsTask implements 
PipelineExecutionTask {
     pipelineElement.setSelectedEndpointUrl(endpointUrl);
     pipelineElement.setCorrespondingPipeline(pipelineId);
   }
-
-  private String findSelectedEndpoint(InvocableStreamPipesEntity 
pipelineElement)
-      throws NoServiceEndpointsAvailableException {
-    return new ExtensionsServiceEndpointGenerator()
-        .getEndpointResourceUrl(
-            pipelineElement.getAppId(),
-            
ExtensionsServiceEndpointUtils.getPipelineElementType(pipelineElement)
-        );
-  }
-  private String getSelectedEndpoint(InvocableStreamPipesEntity 
pipelineElement, String url)
-          throws NoServiceEndpointsAvailableException {
-    return ExtensionsServiceEndpointUtils
-            .getPipelineElementType(pipelineElement)
-            .getInvocationUrl(url,pipelineElement.getAppId());
-  }
 }
diff --git 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/task/SubmitRequestTask.java
 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/task/SubmitRequestTask.java
index 4805fe9b47..c78747f1f3 100644
--- 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/task/SubmitRequestTask.java
+++ 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/task/SubmitRequestTask.java
@@ -19,32 +19,28 @@
 package org.apache.streampipes.manager.execution.task;
 
 import org.apache.streampipes.manager.execution.PipelineExecutionInfo;
-import org.apache.streampipes.manager.execution.http.PipelineElementSubmitter;
-import 
org.apache.streampipes.manager.execution.provider.PipelineElementProvider;
+import 
org.apache.streampipes.manager.execution.http.BasePipelineElementSubmitter;
 import org.apache.streampipes.model.pipeline.Pipeline;
 
 public class SubmitRequestTask implements PipelineExecutionTask {
 
-  private final PipelineElementProvider elementProvider;
-  private final PipelineElementSubmitter submitter;
+  private final BasePipelineElementSubmitter submitter;
 
-  public SubmitRequestTask(PipelineElementSubmitter submitter,
-                           PipelineElementProvider elementProvider) {
-    this.elementProvider = elementProvider;
+  public SubmitRequestTask(BasePipelineElementSubmitter submitter) {
     this.submitter = submitter;
   }
 
   @Override
   public boolean shouldExecute(PipelineExecutionInfo executionInfo) {
-    return executionInfo.getFailedServices().size() == 0;
+    return executionInfo.getFailedServices().isEmpty();
   }
 
   @Override
   public void executeTask(Pipeline pipeline, PipelineExecutionInfo 
executionInfo) {
-    var processorsAndSinks = 
elementProvider.getProcessorsAndSinks(executionInfo);
+    var processorsAndSinks = executionInfo.getProcessorsAndSinks();
 
     var status = submitter.submit(processorsAndSinks);
 
-    executionInfo.applyPipelineOperationStatus(status);
+    executionInfo.setPipelineOperationStatus(status);
   }
 }
diff --git 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/health/PipelineElementEndpointHealthCheck.java
 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/health/PipelineElementEndpointHealthCheck.java
deleted file mode 100644
index 9bec880a89..0000000000
--- 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/health/PipelineElementEndpointHealthCheck.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.manager.health;
-
-import org.apache.streampipes.manager.execution.ExtensionServiceExecutions;
-import org.apache.streampipes.serializers.json.JacksonSerializer;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.List;
-
-public class PipelineElementEndpointHealthCheck {
-
-  private static final String InstancePath = "/instances";
-
-  private final String endpointUrl;
-
-  public PipelineElementEndpointHealthCheck(String endpointUrl) {
-    this.endpointUrl = endpointUrl;
-  }
-
-  public List<String> checkRunningInstances() throws IOException {
-    var request = 
ExtensionServiceExecutions.extServiceGetRequest(makeRequestUrl());
-    return asList(request.execute().returnContent().toString());
-  }
-
-  private List<String> asList(String json) throws JsonProcessingException {
-    return Arrays.asList(JacksonSerializer.getObjectMapper().readValue(json, 
String[].class));
-  }
-
-  private String makeRequestUrl() {
-    return endpointUrl + InstancePath;
-  }
-}
diff --git 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/health/PipelineHealthCheck.java
 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/health/PipelineHealthCheck.java
deleted file mode 100644
index 707b6cce09..0000000000
--- 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/health/PipelineHealthCheck.java
+++ /dev/null
@@ -1,261 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.manager.health;
-
-import org.apache.streampipes.commons.constants.InstanceIdExtractor;
-import 
org.apache.streampipes.commons.exceptions.NoServiceEndpointsAvailableException;
-import org.apache.streampipes.commons.prometheus.pipelines.PipelinesStats;
-import 
org.apache.streampipes.manager.execution.endpoint.ExtensionsServiceEndpointUtils;
-import org.apache.streampipes.manager.execution.http.InvokeHttpRequest;
-import org.apache.streampipes.manager.storage.RunningPipelineElementStorage;
-import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
-import org.apache.streampipes.model.pipeline.Pipeline;
-import org.apache.streampipes.model.pipeline.PipelineHealthStatus;
-import org.apache.streampipes.storage.management.StorageDispatcher;
-import org.apache.streampipes.svcdiscovery.api.model.SpServiceUrlProvider;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.time.LocalDateTime;
-import java.time.format.DateTimeFormatter;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.stream.Collectors;
-
-import static 
org.apache.streampipes.manager.pipeline.PipelineManager.getPipeline;
-
-public class PipelineHealthCheck implements Runnable {
-
-  private static final Logger LOG = 
LoggerFactory.getLogger(PipelineHealthCheck.class);
-  private static final int MAX_FAILED_ATTEMPTS = 10;
-
-  private static final Map<String, Integer> failedRestartAttempts = new 
HashMap<>();
-
-  private static final PipelinesStats pipelinesStats = new PipelinesStats();
-
-  public PipelineHealthCheck() {
-
-  }
-
-  public void checkAndRestorePipelineElements() {
-    List<Pipeline> allPipelines = getAllPipelines();
-    List<Pipeline> runningPipelines = getRunningPipelines(allPipelines);
-
-    pipelinesStats.clear();
-    pipelinesStats.setAllPipelines(allPipelines.size());
-    pipelinesStats.setRunningPipelines(runningPipelines.size());
-    pipelinesStats.setStoppedPipelines(pipelinesStats.getAllPipelines()
-        - pipelinesStats.getRunningPipelines());
-
-    for (Pipeline p : allPipelines) {
-      pipelinesStats.updatePipelineHealthState(
-          p.getElementId(),
-          p.getName(),
-          p.getHealthStatus().toString());
-
-      pipelinesStats.updatePipelineRunningState(
-          p.getElementId(),
-          p.getName(),
-          p.isRunning());
-    }
-
-    if (!runningPipelines.isEmpty()) {
-      Map<String, List<InvocableStreamPipesEntity>> endpointMap = 
generateEndpointMap();
-      List<String> allRunningInstances = 
findRunningInstances(endpointMap.keySet());
-
-      runningPipelines.forEach(pipeline -> {
-        AtomicBoolean shouldUpdatePipeline = new AtomicBoolean(false);
-        List<String> failedInstances = new ArrayList<>();
-        List<String> recoveredInstances = new ArrayList<>();
-        List<String> pipelineNotifications = new ArrayList<>();
-        List<InvocableStreamPipesEntity> graphs = 
RunningPipelineElementStorage.runningProcessorsAndSinks
-            .get(pipeline.getPipelineId());
-
-        if (Objects.nonNull(graphs)) {
-          graphs.forEach(graph -> {
-            String instanceId = extractInstanceId(graph);
-            if (allRunningInstances.stream()
-                .noneMatch(runningInstanceId -> 
runningInstanceId.equals(instanceId))) {
-              if (shouldRetry(instanceId)) {
-                String endpointUrl = graph.getSelectedEndpointUrl();
-                shouldUpdatePipeline.set(true);
-                boolean success;
-                try {
-                  endpointUrl = findEndpointUrl(graph);
-                  success = new InvokeHttpRequest()
-                      .execute(graph, endpointUrl, 
pipeline.getPipelineId()).isSuccess();
-                } catch (NoServiceEndpointsAvailableException e) {
-                  success = false;
-                }
-                if (!success) {
-                  failedInstances.add(instanceId);
-                  addFailedAttemptNotification(pipelineNotifications, graph);
-                  increaseFailedAttempt(instanceId);
-                  LOG.info("Could not restore pipeline element {} of pipeline 
{} ({}/{})",
-                      graph.getName(), pipeline.getName(), 
failedRestartAttempts.get(instanceId),
-                      MAX_FAILED_ATTEMPTS);
-                } else {
-                  recoveredInstances.add(instanceId);
-                  addSuccessfulRestoreNotification(pipelineNotifications, 
graph);
-                  resetFailedAttempts(instanceId);
-                  graph.setSelectedEndpointUrl(endpointUrl);
-                  LOG.info("Successfully restored pipeline element {} of 
pipeline {}",
-                      graph.getName(), pipeline.getName());
-                }
-              }
-            }
-          });
-        }
-        if (shouldUpdatePipeline.get()) {
-          var currentPipeline = getPipeline(pipeline.getPipelineId());
-          if (!failedInstances.isEmpty()) {
-            currentPipeline.setHealthStatus(PipelineHealthStatus.FAILURE);
-            pipelinesStats.failedIncrease();
-          } else if (!recoveredInstances.isEmpty()) {
-            
currentPipeline.setHealthStatus(PipelineHealthStatus.REQUIRES_ATTENTION);
-            pipelinesStats.attentionRequiredIncrease();
-          }
-          currentPipeline.setPipelineNotifications(pipelineNotifications);
-          StorageDispatcher.INSTANCE.getNoSqlStore().getPipelineStorageAPI()
-              .updateElement(currentPipeline);
-          
pipelinesStats.updatePipelineHealthState(currentPipeline.getElementId(), 
currentPipeline.getName(),
-              currentPipeline.getHealthStatus().toString());
-        }
-
-      });
-      int healthNum = pipelinesStats.getRunningPipelines() - 
pipelinesStats.getFailedPipelines()
-          - pipelinesStats.getAttentionRequiredPipelines();
-      pipelinesStats.setHealthyPipelines(healthNum);
-      pipelinesStats.setElementCount(getElementsCount(allPipelines));
-    }
-    pipelinesStats.metrics();
-  }
-
-  private String findEndpointUrl(InvocableStreamPipesEntity graph)
-      throws NoServiceEndpointsAvailableException {
-    SpServiceUrlProvider serviceUrlProvider = 
ExtensionsServiceEndpointUtils.getPipelineElementType(graph);
-    return serviceUrlProvider.getInvocationUrl(graph.getSelectedEndpointUrl(), 
graph.getAppId());
-  }
-
-  private boolean shouldRetry(String instanceId) {
-    if (!failedRestartAttempts.containsKey(instanceId)) {
-      return true;
-    } else {
-      return failedRestartAttempts.get(instanceId) < MAX_FAILED_ATTEMPTS;
-    }
-  }
-
-  private void resetFailedAttempts(String instanceId) {
-    failedRestartAttempts.put(instanceId, 0);
-  }
-
-  private void increaseFailedAttempt(String instanceId) {
-    if (!failedRestartAttempts.containsKey(instanceId)) {
-      failedRestartAttempts.put(instanceId, 1);
-    } else {
-      Integer currentAttempt = failedRestartAttempts.get(instanceId) + 1;
-      failedRestartAttempts.put(instanceId, currentAttempt);
-    }
-  }
-
-  private void addSuccessfulRestoreNotification(List<String> 
pipelineNotifications,
-      InvocableStreamPipesEntity graph) {
-    pipelineNotifications.add(getCurrentDatetime() + "Pipeline element '" + 
graph.getName()
-        + "' was not available and was successfully restored.");
-  }
-
-  private void addFailedAttemptNotification(List<String> pipelineNotifications,
-      InvocableStreamPipesEntity graph) {
-    pipelineNotifications.add(getCurrentDatetime() + "Pipeline element '" + 
graph.getName()
-        + "' was not available and could not be restored.");
-  }
-
-  private String getCurrentDatetime() {
-    DateTimeFormatter dtf = DateTimeFormatter.ofPattern("uuuu/MM/dd HH:mm:ss");
-    LocalDateTime now = LocalDateTime.now();
-    return "[" + dtf.format(now) + "] ";
-  }
-
-  private String extractInstanceId(InvocableStreamPipesEntity graph) {
-    return InstanceIdExtractor.extractId(graph.getElementId());
-  }
-
-  private List<String> findRunningInstances(Set<String> endpoints) {
-    List<String> allRunningInstances = new ArrayList<>();
-    endpoints.forEach(endpoint -> {
-      try {
-        allRunningInstances
-            .addAll(new 
PipelineElementEndpointHealthCheck(endpoint).checkRunningInstances());
-      } catch (IOException e) {
-        LOG.error("Pipeline element endpoint {} is unavailable", endpoint);
-      }
-    });
-
-    return allRunningInstances;
-  }
-
-  private Map<String, List<InvocableStreamPipesEntity>> generateEndpointMap() {
-    Map<String, List<InvocableStreamPipesEntity>> endpointMap = new 
HashMap<>();
-    RunningPipelineElementStorage.runningProcessorsAndSinks
-        .forEach((pipelineId, graphs) -> graphs.forEach(graph -> 
addEndpoint(endpointMap, graph)));
-
-    return endpointMap;
-  }
-
-  private void addEndpoint(Map<String, List<InvocableStreamPipesEntity>> 
endpointMap,
-      InvocableStreamPipesEntity graph) {
-    String selectedEndpoint = graph.getSelectedEndpointUrl();
-    if (!endpointMap.containsKey(selectedEndpoint)) {
-      endpointMap.put(selectedEndpoint, new ArrayList<>());
-    }
-    List<InvocableStreamPipesEntity> existingGraphs = 
endpointMap.get(selectedEndpoint);
-    existingGraphs.add(graph);
-    endpointMap.put(selectedEndpoint, existingGraphs);
-  }
-
-  @Override
-  public void run() {
-    try {
-      this.checkAndRestorePipelineElements();
-    } catch (Exception e) {
-      LOG.error("Error while checking and restoring pipeline elements", e);
-    }
-
-  }
-
-  private List<Pipeline> getRunningPipelines(List<Pipeline> allPipelines) {
-    return 
allPipelines.stream().filter(Pipeline::isRunning).collect(Collectors.toList());
-
-  }
-
-  private List<Pipeline> getAllPipelines() {
-    return 
StorageDispatcher.INSTANCE.getNoSqlStore().getPipelineStorageAPI().findAll();
-  }
-
-  private int getElementsCount(List<Pipeline> allPipelines) {
-    return allPipelines.stream().mapToInt(pipeline -> 
pipeline.getActions().size()).sum();
-
-  }
-}
diff --git 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/storage/PipelineStorageService.java
 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/storage/PipelineStorageService.java
index ebd3d99601..09709fac7b 100644
--- 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/storage/PipelineStorageService.java
+++ 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/storage/PipelineStorageService.java
@@ -68,10 +68,6 @@ public class PipelineStorageService {
     SecretProvider.getEncryptionService().apply(graphs);
   }
 
-  private void encryptSecrets(Pipeline pipeline) {
-    SecretProvider.getEncryptionService().apply(pipeline);
-  }
-
   private <T> List<T> filter(List<InvocableStreamPipesEntity> graphs, Class<T> 
clazz) {
     return graphs
         .stream()
diff --git 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/provider/CurrentPipelineElementProvider.java
 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/util/PipelineElementUtils.java
similarity index 66%
rename from 
streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/provider/CurrentPipelineElementProvider.java
rename to 
streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/util/PipelineElementUtils.java
index 9af2aa5f62..de255d8d7f 100644
--- 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/provider/CurrentPipelineElementProvider.java
+++ 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/util/PipelineElementUtils.java
@@ -16,21 +16,24 @@
  *
  */
 
-package org.apache.streampipes.manager.execution.provider;
+package org.apache.streampipes.manager.util;
 
-import org.apache.streampipes.manager.execution.PipelineExecutionInfo;
 import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
 
 import java.util.List;
 
-/**
- * Provides pipeline elements from the pipeline of interest (for start actions)
- */
+public class PipelineElementUtils {
 
-public class CurrentPipelineElementProvider implements PipelineElementProvider 
{
-  @Override
-  public List<InvocableStreamPipesEntity> 
getProcessorsAndSinks(PipelineExecutionInfo executionInfo) {
-    return executionInfo.getProcessorsAndSinks();
-  }
+  public static <T extends InvocableStreamPipesEntity> List<T> 
filterInvocation(
+      List<InvocableStreamPipesEntity> pipelineElements,
+      Class<T> clazz) {
+    if (pipelineElements == null || clazz == null) {
+      return List.of();
+    }
 
+    return pipelineElements.stream()
+        .filter(clazz::isInstance)
+        .map(clazz::cast)
+        .toList();
+  }
 }
diff --git 
a/streampipes-rest-extensions/src/main/java/org/apache/streampipes/rest/extensions/monitoring/HealthCheckResource.java
 
b/streampipes-rest-extensions/src/main/java/org/apache/streampipes/rest/extensions/monitoring/HealthCheckResource.java
new file mode 100644
index 0000000000..959c869a08
--- /dev/null
+++ 
b/streampipes-rest-extensions/src/main/java/org/apache/streampipes/rest/extensions/monitoring/HealthCheckResource.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.rest.extensions.monitoring;
+
+import org.apache.streampipes.commons.constants.InstanceIdExtractor;
+import 
org.apache.streampipes.extensions.management.connect.AdapterWorkerManagement;
+import org.apache.streampipes.extensions.management.init.DeclarersSingleton;
+import 
org.apache.streampipes.extensions.management.init.RunningAdapterInstances;
+import org.apache.streampipes.extensions.management.init.RunningInstances;
+import org.apache.streampipes.model.base.NamedStreamPipesEntity;
+import org.apache.streampipes.model.health.ExtensionInstanceHealth;
+import org.apache.streampipes.rest.extensions.AbstractExtensionsResource;
+
+import org.springframework.http.MediaType;
+import org.springframework.http.ResponseEntity;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RestController;
+
+import java.util.stream.Collectors;
+
+@RestController
+@RequestMapping("health")
+public class HealthCheckResource extends AbstractExtensionsResource {
+
+  private final AdapterWorkerManagement adapterManagement = new 
AdapterWorkerManagement(
+      RunningAdapterInstances.INSTANCE,
+      DeclarersSingleton.getInstance()
+  );
+
+  @GetMapping(produces = MediaType.APPLICATION_JSON_VALUE)
+  public ResponseEntity<ExtensionInstanceHealth> getRunningInstances() {
+
+    var runningAdapterInstances = 
adapterManagement.getAllRunningAdapterInstances()
+        .stream()
+        .map(NamedStreamPipesEntity::getElementId)
+        .collect(Collectors.toSet());
+
+    var runningPipelineElementInstances = 
RunningInstances.INSTANCE.getRunningInstanceIds()
+        .stream()
+        .map(InstanceIdExtractor::extractId)
+        .collect(Collectors.toSet());
+
+    var instanceHealth = new ExtensionInstanceHealth(
+        runningAdapterInstances,
+        runningPipelineElementInstances
+    );
+
+    return ok(instanceHealth);
+  }
+}
diff --git a/streampipes-rest/pom.xml b/streampipes-rest/pom.xml
index ae360fffc6..6650568127 100644
--- a/streampipes-rest/pom.xml
+++ b/streampipes-rest/pom.xml
@@ -55,6 +55,11 @@
             <artifactId>streampipes-data-export</artifactId>
             <version>0.99.0-SNAPSHOT</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.streampipes</groupId>
+            <artifactId>streampipes-health-monitoring</artifactId>
+            <version>0.99.0-SNAPSHOT</version>
+        </dependency>
         <dependency>
             <groupId>org.apache.streampipes</groupId>
             <artifactId>streampipes-measurement-units</artifactId>
diff --git 
a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/admin/MigrationResource.java
 
b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/admin/MigrationResource.java
index 58baea3101..437eeef73f 100644
--- 
a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/admin/MigrationResource.java
+++ 
b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/admin/MigrationResource.java
@@ -19,9 +19,9 @@
 package org.apache.streampipes.rest.impl.admin;
 
 import 
org.apache.streampipes.connect.management.management.AdapterMigrationManager;
+import org.apache.streampipes.health.monitoring.ServiceRegistrationManager;
 import org.apache.streampipes.manager.health.CoreInitialInstallationProgress;
 import org.apache.streampipes.manager.health.CoreServiceStatusManager;
-import org.apache.streampipes.manager.health.ServiceRegistrationManager;
 import 
org.apache.streampipes.manager.migration.PipelineElementMigrationManager;
 import 
org.apache.streampipes.model.extensions.svcdiscovery.SpServiceRegistration;
 import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceStatus;
diff --git 
a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/admin/ServiceRegistrationResource.java
 
b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/admin/ServiceRegistrationResource.java
index 69b6e3d5ec..fc243d5d56 100644
--- 
a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/admin/ServiceRegistrationResource.java
+++ 
b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/admin/ServiceRegistrationResource.java
@@ -17,7 +17,7 @@
  */
 package org.apache.streampipes.rest.impl.admin;
 
-import org.apache.streampipes.manager.health.ServiceRegistrationManager;
+import org.apache.streampipes.health.monitoring.ServiceRegistrationManager;
 import 
org.apache.streampipes.model.extensions.svcdiscovery.SpServiceRegistration;
 import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceStatus;
 import org.apache.streampipes.model.message.Notifications;
diff --git 
a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/AbstractAdapterResource.java
 
b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/AbstractAdapterResource.java
index dcdd800b6a..36362febe6 100644
--- 
a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/AbstractAdapterResource.java
+++ 
b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/AbstractAdapterResource.java
@@ -30,6 +30,10 @@ public class AbstractAdapterResource<T> extends 
AbstractAuthGuardedRestResource
     this.managementService = managementServiceSupplier.get();
   }
 
+  // no management service provided
+  public AbstractAdapterResource() {
+  }
+
   /**
    * required by Spring expression
    */
diff --git 
a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/RuntimeResolvableResource.java
 
b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/RuntimeResolvableResource.java
index cf30a41758..d5d1cd1942 100644
--- 
a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/RuntimeResolvableResource.java
+++ 
b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/RuntimeResolvableResource.java
@@ -21,17 +21,13 @@ package org.apache.streampipes.rest.impl.connect;
 import 
org.apache.streampipes.commons.exceptions.NoServiceEndpointsAvailableException;
 import org.apache.streampipes.commons.exceptions.SpConfigurationException;
 import org.apache.streampipes.commons.exceptions.connect.AdapterException;
-import org.apache.streampipes.commons.prometheus.adapter.AdapterMetricsManager;
-import 
org.apache.streampipes.connect.management.management.WorkerAdministrationManagement;
 import org.apache.streampipes.connect.management.management.WorkerRestClient;
 import 
org.apache.streampipes.manager.api.extensions.IExtensionsServiceEndpointGenerator;
 import 
org.apache.streampipes.manager.execution.endpoint.ExtensionsServiceEndpointGenerator;
 import org.apache.streampipes.model.monitoring.SpLogMessage;
 import org.apache.streampipes.model.runtime.RuntimeOptionsRequest;
 import org.apache.streampipes.model.runtime.RuntimeOptionsResponse;
-import org.apache.streampipes.resource.management.SpResourceManager;
 import org.apache.streampipes.resource.management.secret.SecretProvider;
-import org.apache.streampipes.storage.management.StorageDispatcher;
 import org.apache.streampipes.svcdiscovery.api.model.SpServiceUrlProvider;
 
 import org.slf4j.Logger;
@@ -47,20 +43,14 @@ import 
org.springframework.web.bind.annotation.RestController;
 
 @RestController
 @RequestMapping("/api/v2/connect/master/resolvable")
-public class RuntimeResolvableResource extends 
AbstractAdapterResource<WorkerAdministrationManagement> {
+public class RuntimeResolvableResource extends AbstractAdapterResource<Void> {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(RuntimeResolvableResource.class);
 
   private final IExtensionsServiceEndpointGenerator endpointGenerator;
 
   public RuntimeResolvableResource() {
-    super(() -> new WorkerAdministrationManagement(
-        StorageDispatcher.INSTANCE.getNoSqlStore()
-            .getAdapterInstanceStorage(),
-        AdapterMetricsManager.INSTANCE.getAdapterMetrics(),
-        new SpResourceManager().manageAdapters(),
-        new SpResourceManager().manageDataStreams()
-    ));
+    super();
     this.endpointGenerator = new ExtensionsServiceEndpointGenerator();
   }
 
diff --git a/streampipes-service-core/pom.xml b/streampipes-service-core/pom.xml
index 6f4b3b19bc..bdbc44bbaa 100644
--- a/streampipes-service-core/pom.xml
+++ b/streampipes-service-core/pom.xml
@@ -64,6 +64,11 @@
             <artifactId>streampipes-connect-transformer-js</artifactId>
             <version>0.99.0-SNAPSHOT</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.streampipes</groupId>
+            <artifactId>streampipes-health-monitoring</artifactId>
+            <version>0.99.0-SNAPSHOT</version>
+        </dependency>
         <dependency>
             <groupId>org.apache.streampipes</groupId>
             <artifactId>streampipes-messaging-jms</artifactId>
diff --git 
a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/PostStartupTask.java
 
b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/PostStartupTask.java
index 06c7257bf0..80000abf7f 100644
--- 
a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/PostStartupTask.java
+++ 
b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/PostStartupTask.java
@@ -19,15 +19,19 @@
 package org.apache.streampipes.service.core;
 
 import org.apache.streampipes.commons.prometheus.adapter.AdapterMetricsManager;
+import 
org.apache.streampipes.connect.management.management.AdapterMasterManagement;
 import 
org.apache.streampipes.connect.management.management.WorkerAdministrationManagement;
+import org.apache.streampipes.health.monitoring.ExtensionHealthCheck;
+import org.apache.streampipes.health.monitoring.PostStartupRecovery;
+import org.apache.streampipes.health.monitoring.ResourceProvider;
+import org.apache.streampipes.health.monitoring.ServiceHealthCheck;
 import org.apache.streampipes.manager.execution.PipelineExecutor;
-import org.apache.streampipes.manager.health.ServiceHealthCheck;
 import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceTagPrefix;
 import org.apache.streampipes.model.pipeline.Pipeline;
 import org.apache.streampipes.model.pipeline.PipelineOperationStatus;
 import org.apache.streampipes.resource.management.SpResourceManager;
+import org.apache.streampipes.storage.api.INoSqlStorage;
 import org.apache.streampipes.storage.api.IPipelineStorage;
-import org.apache.streampipes.storage.couchdb.CouchDbStorageManager;
 import org.apache.streampipes.storage.management.StorageDispatcher;
 
 import org.slf4j.Logger;
@@ -51,40 +55,56 @@ public class PostStartupTask implements Runnable {
   private final Map<String, Integer> failedPipelines = new HashMap<>();
   private final ScheduledExecutorService executorService;
   private final WorkerAdministrationManagement workerAdministrationManagement;
+  private final PostStartupRecovery postStartupRecovery;
+
+  private final INoSqlStorage storage = 
StorageDispatcher.INSTANCE.getNoSqlStore();
 
   public PostStartupTask(IPipelineStorage pipelineStorage) {
     this.pipelineStorage = pipelineStorage;
     this.executorService = Executors.newSingleThreadScheduledExecutor();
+    var resourceManager = new SpResourceManager();
     this.workerAdministrationManagement = new WorkerAdministrationManagement(
-        StorageDispatcher.INSTANCE.getNoSqlStore()
-                                  .getAdapterInstanceStorage(),
-        AdapterMetricsManager.INSTANCE.getAdapterMetrics(),
-        new SpResourceManager().manageAdapters(),
-        new SpResourceManager().manageDataStreams()
+        storage.getAdapterDescriptionStorage(),
+        storage.getPermissionStorage(),
+        resourceManager.manageUsers(),
+        resourceManager.managePermissions());
+    this.postStartupRecovery = new PostStartupRecovery(
+        new ExtensionHealthCheck(
+            new ResourceProvider(
+                
StorageDispatcher.INSTANCE.getNoSqlStore().getPipelineStorageAPI(),
+                
StorageDispatcher.INSTANCE.getNoSqlStore().getAdapterInstanceStorage(),
+                new AdapterMasterManagement(
+                    
StorageDispatcher.INSTANCE.getNoSqlStore().getAdapterInstanceStorage(),
+                    new SpResourceManager().manageAdapters(),
+                    new SpResourceManager().manageDataStreams(),
+                    AdapterMetricsManager.INSTANCE.getAdapterMetrics()
+                )
+            )
+        )
     );
   }
 
   @Override
   public void run() {
-    new ServiceHealthCheck().run();
+    new ServiceHealthCheck(storage.getExtensionsServiceStorage()).run();
     performAdapterAssetUpdate();
     startAllPreviouslyStoppedPipelines();
-    startAdapters();
+    runHealthCheckOnce();
   }
 
   private void performAdapterAssetUpdate() {
-    var installedAppIds = 
CouchDbStorageManager.INSTANCE.getExtensionsServiceStorage()
-                                                        .findAll()
-                                                        .stream()
-                                                        .flatMap(config -> 
config.getTags()
-                                                                               
  .stream())
-                                                        .filter(tag -> 
tag.getPrefix() == SpServiceTagPrefix.ADAPTER)
-                                                        .toList();
+    var installedAppIds = storage.getExtensionsServiceStorage()
+        .findAll()
+        .stream()
+        .flatMap(config -> config.getTags()
+            .stream())
+        .filter(tag -> tag.getPrefix() == SpServiceTagPrefix.ADAPTER)
+        .toList();
     workerAdministrationManagement.performAdapterMigrations(installedAppIds);
   }
 
-  private void startAdapters() {
-    workerAdministrationManagement.checkAndRestore(0);
+  private void runHealthCheckOnce() {
+    postStartupRecovery.checkAndRestore(0);
   }
 
   private void startAllPreviouslyStoppedPipelines() {
@@ -165,9 +185,6 @@ public class PostStartupTask implements Runnable {
   }
 
   private IPipelineStorage getPipelineStorage() {
-    return StorageDispatcher
-        .INSTANCE
-        .getNoSqlStore()
-        .getPipelineStorageAPI();
+    return storage.getPipelineStorageAPI();
   }
 }
diff --git 
a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/StreamPipesCoreApplication.java
 
b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/StreamPipesCoreApplication.java
index 03e02193b4..643ba3f7ab 100644
--- 
a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/StreamPipesCoreApplication.java
+++ 
b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/StreamPipesCoreApplication.java
@@ -19,18 +19,18 @@ package org.apache.streampipes.service.core;
 
 import org.apache.streampipes.commons.environment.Environments;
 import org.apache.streampipes.commons.prometheus.adapter.AdapterMetricsManager;
-import org.apache.streampipes.connect.management.health.AdapterHealthCheck;
 import 
org.apache.streampipes.connect.management.management.AdapterMasterManagement;
 import org.apache.streampipes.connect.transformer.api.TransformationEngine;
 import org.apache.streampipes.connect.transformer.api.TransformationEngines;
 import org.apache.streampipes.connect.transformer.groovy.GroovyScriptEngine;
 import org.apache.streampipes.connect.transformer.js.GraalJsScriptEngine;
+import org.apache.streampipes.health.monitoring.ExtensionHealthCheck;
+import org.apache.streampipes.health.monitoring.ResourceProvider;
+import org.apache.streampipes.health.monitoring.ServiceHealthCheck;
 import org.apache.streampipes.loadbalance.LoadManager;
 import 
org.apache.streampipes.loadbalance.pipeline.ExtensionsServiceLogExecutor;
 import org.apache.streampipes.manager.health.CoreInitialInstallationProgress;
 import org.apache.streampipes.manager.health.CoreServiceStatusManager;
-import org.apache.streampipes.manager.health.PipelineHealthCheck;
-import org.apache.streampipes.manager.health.ServiceHealthCheck;
 import org.apache.streampipes.manager.pipeline.PipelineManager;
 import org.apache.streampipes.manager.setup.AutoInstallation;
 import org.apache.streampipes.manager.setup.StreamPipesEnvChecker;
@@ -56,7 +56,6 @@ import 
org.apache.streampipes.storage.couchdb.impl.UserStorage;
 import org.apache.streampipes.storage.couchdb.utils.CouchDbViewGenerator;
 import org.apache.streampipes.storage.management.StorageDispatcher;
 
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
@@ -155,14 +154,19 @@ public class StreamPipesCoreApplication extends 
StreamPipesServiceBase {
                              TimeUnit.MILLISECONDS);
 
     
scheduleHealthChecks(env.getHealthCheckIntervalInMillis().getValueOrDefault(), 
List
-        .of(new ServiceHealthCheck(), new PipelineHealthCheck(),
-            new AdapterHealthCheck(
-                
StorageDispatcher.INSTANCE.getNoSqlStore().getAdapterInstanceStorage(),
-                new AdapterMasterManagement(
-                    
StorageDispatcher.INSTANCE.getNoSqlStore().getAdapterInstanceStorage(),
-                    new SpResourceManager().manageAdapters(),
-                    new SpResourceManager().manageDataStreams(),
-                    AdapterMetricsManager.INSTANCE.getAdapterMetrics()))));
+        .of(new 
ServiceHealthCheck(StorageDispatcher.INSTANCE.getNoSqlStore().getExtensionsServiceStorage()),
+            new ExtensionHealthCheck(
+                new ResourceProvider(
+                  
StorageDispatcher.INSTANCE.getNoSqlStore().getPipelineStorageAPI(),
+                  
StorageDispatcher.INSTANCE.getNoSqlStore().getAdapterInstanceStorage(),
+                  new AdapterMasterManagement(
+                      
StorageDispatcher.INSTANCE.getNoSqlStore().getAdapterInstanceStorage(),
+                      new SpResourceManager().manageAdapters(),
+                      new SpResourceManager().manageDataStreams(),
+                      AdapterMetricsManager.INSTANCE.getAdapterMetrics()
+                  )
+                )
+            )));
 
     var logFetchInterval = 
env.getLogFetchIntervalInMillis().getValueOrDefault();
     LOG.info("Extensions logs will be fetched every {} milliseconds", 
logFetchInterval);


Reply via email to