This is an automated email from the ASF dual-hosted git repository. riemer pushed a commit to branch improve-health-management in repository https://gitbox.apache.org/repos/asf/streampipes.git
commit 7dc560b62fa3e98ed3f61ff5a20692dbc20f5904 Author: Dominik Riemer <[email protected]> AuthorDate: Wed Jan 14 16:55:18 2026 +0100 refactor: Improve health checks --- pom.xml | 1 + .../apache/streampipes/commons/constants/Envs.java | 30 +-- .../management/health/AdapterHealthCheck.java | 245 ------------------- .../management/WorkerAdministrationManagement.java | 25 +- .../management/health/AdapterHealthCheckTest.java | 96 -------- .../influx/DataExplorerInfluxQueryExecutor.java | 2 +- .../management/init/RunningInstances.java | 10 + streampipes-health-monitoring/pom.xml | 59 +++++ .../health/monitoring/AdapterHealthCheck.java | 168 +++++++++++++ .../health/monitoring/ExtensionHealthCheck.java | 57 +++++ .../PipelineElementEndpointHealthCheck.java | 67 ++++++ .../health/monitoring/PipelineHealthCheck.java | 197 ++++++++++++++++ .../health/monitoring/ResourceProvider.java | 101 ++++++++ .../health/monitoring}/ServiceHealthCheck.java | 22 +- .../monitoring}/ServiceRegistrationManager.java | 6 +- .../monitoring/model/ActiveCoreInstances.java | 29 +-- .../health/monitoring/model/ActiveResources.java | 30 +-- .../health/monitoring/model/HealthCheckData.java | 30 +++ .../health/monitoring/utils/HealthCheckUtils.java | 51 ++++ .../model/health/ExtensionInstanceHealth.java | 25 ++ .../streampipes/model/util/PropertyUtils.java | 4 - .../execution/ExtensionServiceExecutions.java | 1 - .../manager/execution/PipelineExecutionInfo.java | 4 +- .../ExtensionsServiceEndpointGenerator.java | 12 +- ...tter.java => BasePipelineElementSubmitter.java} | 23 +- .../http/DetachPipelineElementSubmitter.java | 5 +- .../http/InvokePipelineElementSubmitter.java | 9 +- .../execution/task/DiscoverEndpointsTask.java | 37 +-- .../manager/execution/task/SubmitRequestTask.java | 10 +- .../health/PipelineElementEndpointHealthCheck.java | 51 ---- .../manager/health/PipelineHealthCheck.java | 261 --------------------- .../manager/storage/PipelineStorageService.java | 4 - .../extensions/monitoring/HealthCheckResource.java | 67 ++++++ streampipes-rest/pom.xml | 5 + .../rest/impl/admin/MigrationResource.java | 2 +- .../impl/admin/ServiceRegistrationResource.java | 2 +- streampipes-service-core/pom.xml | 5 + .../streampipes/service/core/PostStartupTask.java | 4 +- .../service/core/StreamPipesCoreApplication.java | 28 ++- 39 files changed, 953 insertions(+), 832 deletions(-) diff --git a/pom.xml b/pom.xml index af0058acf8..90745684d6 100644 --- a/pom.xml +++ b/pom.xml @@ -1125,6 +1125,7 @@ <module>streampipes-wrapper-kafka-streams</module> <module>streampipes-wrapper-siddhi</module> <module>streampipes-wrapper-standalone</module> + <module>streampipes-health-monitoring</module> </modules> <profiles> diff --git a/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/Envs.java b/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/Envs.java index 148fa66abd..14095fdb23 100644 --- a/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/Envs.java +++ b/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/Envs.java @@ -102,21 +102,21 @@ public enum Envs { SP_PULSAR_URL("SP_PULSAR_URL", "pulsar://localhost:6650"), - CPU_RESOURCE_WEIGHT("CPU_RESOURCE_WEIGHT", "1.0"), - MEMORY_RESOURCE_WEIGHT("MEMORY_RESOURCE_WEIGHT", "1.0"), - - DIR_MEMORY_RESOURCE_WEIGHT("DIR_MEMORY_RESOURCE_WEIGHT", "1.0"), - BANDWIDTH_IN_RESOURCE_WEIGHT("BANDWIDTH_IN_RESOURCE_WEIGHT", "1.0"), - BANDWIDTH_OUT_RESOURCE_WEIGHT("BANDWIDTH_OUT_RESOURCE_WEIGHT", "1.0"), - THRESHOLD_MIGRATOR_PERCENTAGE("THRESHOLD_MIGRATOR_PERCENTAGE", "20.0"), - MIN_MIGRATOR_PERCENTAGE("MIN_MIGRATOR_PERCENTAGE", "20.0"), - OVERLOADED_THRESHOLD_PERCENTAGE("OVERLOADED_THRESHOLD_PERCENTAGE", "85"), - HISTORY_RESOURCE_PERCENTAGE("HISTORY_RESOURCE_PERCENTAGE", "0.9"), - MSG_RATE_DIFFERENCE_MIGRATOR_THRESHOLD("MSG_RATE_DIFFERENCE_MIGRATOR_THRESHOLD", "85"), - LOAD_TARGET_STD("LOAD_TARGET_STD", "25.0"), - SELECTOR("SELECTOR", "WeightedRandomSelector"), - MIGRATOR("MIGRATOR", "ThresholdMigrator"), - LOAD_MANAGER_ENABLE("LOAD_MANAGER_ENABLE", "true"), + CPU_RESOURCE_WEIGHT("SP_CPU_RESOURCE_WEIGHT", "1.0"), + MEMORY_RESOURCE_WEIGHT("SP_MEMORY_RESOURCE_WEIGHT", "1.0"), + + DIR_MEMORY_RESOURCE_WEIGHT("SP_DIR_MEMORY_RESOURCE_WEIGHT", "1.0"), + BANDWIDTH_IN_RESOURCE_WEIGHT("SP_BANDWIDTH_IN_RESOURCE_WEIGHT", "1.0"), + BANDWIDTH_OUT_RESOURCE_WEIGHT("SP_BANDWIDTH_OUT_RESOURCE_WEIGHT", "1.0"), + THRESHOLD_MIGRATOR_PERCENTAGE("SP_THRESHOLD_MIGRATOR_PERCENTAGE", "20.0"), + MIN_MIGRATOR_PERCENTAGE("SP_MIN_MIGRATOR_PERCENTAGE", "20.0"), + OVERLOADED_THRESHOLD_PERCENTAGE("SP_OVERLOADED_THRESHOLD_PERCENTAGE", "85"), + HISTORY_RESOURCE_PERCENTAGE("SP_HISTORY_RESOURCE_PERCENTAGE", "0.9"), + MSG_RATE_DIFFERENCE_MIGRATOR_THRESHOLD("SP_MSG_RATE_DIFFERENCE_MIGRATOR_THRESHOLD", "85"), + LOAD_TARGET_STD("SP_LOAD_TARGET_STD", "25.0"), + SELECTOR("SP_SELECTOR", "WeightedRandomSelector"), + MIGRATOR("SP_MIGRATOR", "ThresholdMigrator"), + LOAD_MANAGER_ENABLE("SP_LOAD_MANAGER_ENABLE", "false"), // expects a comma separated string of service names SP_SERVICE_TAGS("SP_SERVICE_TAGS", ""), diff --git a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/health/AdapterHealthCheck.java b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/health/AdapterHealthCheck.java deleted file mode 100644 index 5142d5f4fe..0000000000 --- a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/health/AdapterHealthCheck.java +++ /dev/null @@ -1,245 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -package org.apache.streampipes.connect.management.health; - -import org.apache.streampipes.commons.exceptions.connect.AdapterException; -import org.apache.streampipes.commons.prometheus.adapter.AdapterMetrics; -import org.apache.streampipes.commons.prometheus.adapter.AdapterMetricsManager; -import org.apache.streampipes.connect.management.management.AdapterMasterManagement; -import org.apache.streampipes.connect.management.management.WorkerRestClient; -import org.apache.streampipes.connect.management.util.WorkerPaths; -import org.apache.streampipes.loadbalance.pipeline.ExtensionsLogProvider; -import org.apache.streampipes.model.connect.adapter.AdapterDescription; -import org.apache.streampipes.storage.api.IAdapterStorage; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.NoSuchElementException; -import java.util.stream.Collectors; - -public class AdapterHealthCheck implements Runnable { - - private static final Logger LOG = LoggerFactory.getLogger(AdapterHealthCheck.class); - - private final IAdapterStorage adapterStorage; - private final AdapterMasterManagement adapterMasterManagement; - - public AdapterHealthCheck(IAdapterStorage adapterStorage, - AdapterMasterManagement adapterMasterManagement) { - this.adapterStorage = adapterStorage; - this.adapterMasterManagement = adapterMasterManagement; - } - - @Override - public void run() { - this.checkAndRestoreAdapters(); - } - - /** - * In this method it is checked which adapters are currently running. Then it calls all workers to - * validate if the adapter instance is still running as expected. If the adapter is not running - * anymore a new worker instance is invoked. In addition, it publishes monitoring metrics for all - * running adapters (in line with - * {@link org.apache.streampipes.manager.health.PipelineHealthCheck}). - */ - public void checkAndRestoreAdapters() { - LOG.info("Adapter health check started"); - // Get all adapters that are supposed to run according to the backend storage - Map<String, AdapterDescription> adapterInstancesSupposedToRun = - this.getAllAdaptersSupposedToRun(); - - // group all adapter instances supposed to run by their worker service URL - Map<String, List<AdapterDescription>> groupByWorker = - this.getAllWorkersWithAdapters(adapterInstancesSupposedToRun); - - // Get adapters that are not running anymore - Map<String, AdapterDescription> allAdaptersToRecover = - this.getAdaptersToRecover(groupByWorker, adapterInstancesSupposedToRun); - - allAdaptersToRecover - .keySet() - .forEach(adapterId -> - LOG.info("Adapter instance with id {} needs to be recovered", adapterId)); - - try { - if (!adapterInstancesSupposedToRun.isEmpty()) { - // Filter adapters so that only healthy and running adapters are updated in the metrics - // endpoint - var adaptersToMonitor = adapterInstancesSupposedToRun.entrySet().stream() - .filter((entry -> !allAdaptersToRecover.containsKey(entry.getKey()))) - .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); - - if (!adaptersToMonitor.isEmpty()) { - updateMonitoringMetrics(adaptersToMonitor); - } else { - LOG.info("No running adapter instances to monitor."); - } - } - } catch (NoSuchElementException e) { - LOG.error("Could not update adapter metrics due to an invalid state. ({})", e.getMessage()); - } - - LOG.info("Monitoring metrics updated for running adapters."); - - // Recover Adapters - this.recoverAdapters(allAdaptersToRecover); - } - - /** - * Updates the monitoring metrics based on the descriptions of running adapters. - * - * @param runningAdapterDescriptions A map containing the descriptions of running adapters, where - * the key is the adapter's element ID and the value is the corresponding adapter - * description. - */ - protected void updateMonitoringMetrics(Map<String, AdapterDescription> runningAdapterDescriptions) { - - var adapterMetrics = AdapterMetricsManager.getInstance().getAdapterMetrics(); - runningAdapterDescriptions.values() - .forEach(adapterDescription -> updateTotalEventsPublished(adapterMetrics, - adapterDescription.getElementId(), - adapterDescription.getName())); - LOG.debug("Monitoring {} adapter instances", adapterMetrics.size()); - } - - private void updateTotalEventsPublished(AdapterMetrics adapterMetrics, String adapterId, - String adapterName) { - - // Check if the adapter is already registered; if not, register it first. - // This step is crucial, especially when the StreamPipes Core service is restarted, - // and there are existing running adapters that need proper registration. - // Note: Proper registration is usually handled during the initial start of the adapter. - if (!adapterMetrics.contains(adapterId)) { - adapterMetrics.register(adapterId, adapterName); - } - - adapterMetrics.updateTotalEventsPublished(adapterId, adapterName, ExtensionsLogProvider.INSTANCE - .getMetricInfosForResource(adapterId).getMessagesOut().getCounter()); - } - - - /** - * Retrieves a map of all adapter instances that are supposed to be running according to the - * backend storage. - * <p> - * This method queries the adapter storage to obtain information about all adapters and filters - * the running instances. The resulting map is keyed by the element ID of each running adapter, - * and the corresponding values are the respective {@link AdapterDescription} objects. - * - * @return A map containing all adapter instances supposed to be running according to the backend - * storage. The keys are element IDs, and the values are the corresponding adapter - * descriptions. - */ - public Map<String, AdapterDescription> getAllAdaptersSupposedToRun() { - Map<String, AdapterDescription> result = new HashMap<>(); - List<AdapterDescription> allRunningInstancesAdapterDescription = this.adapterStorage.findAll(); - allRunningInstancesAdapterDescription.stream().filter(AdapterDescription::isRunning) - .forEach(adapterDescription -> result.put(adapterDescription.getElementId(), - adapterDescription)); - - return result; - } - - public Map<String, List<AdapterDescription>> getAllWorkersWithAdapters(Map<String, AdapterDescription> adapterInstancesSupposedToRun) { - - Map<String, List<AdapterDescription>> groupByWorker = new HashMap<>(); - adapterInstancesSupposedToRun.values().forEach(ad -> { - String selectedEndpointUrl = ad.getSelectedEndpointUrl(); - if (selectedEndpointUrl != null) { - if (groupByWorker.containsKey(selectedEndpointUrl)) { - groupByWorker.get(selectedEndpointUrl).add(ad); - } else { - List<AdapterDescription> adapterDescriptionsPerWorker = new ArrayList<>(); - adapterDescriptionsPerWorker.add(ad); - groupByWorker.put(selectedEndpointUrl, adapterDescriptionsPerWorker); - } - } - }); - - return groupByWorker; - } - - /** - * Retrieves a map of adapters to recover by comparing the provided groupings of adapter instances - * with the instances supposed to run according to the storage. For every adapter instance it is - * verified that it actually runs on a worker node. If this is not the case, it is added to the - * output of adapters to recover. - * - * @param adapterInstancesGroupedByWorker A map grouping adapter instances by worker. - * @param adapterInstancesSupposedToRun The map containing all adapter instances supposed to be - * running. - * @return A new map containing adapter instances to recover, filtered based on running instances. - */ - public Map<String, AdapterDescription> getAdaptersToRecover(Map<String, List<AdapterDescription>> adapterInstancesGroupedByWorker, - Map<String, AdapterDescription> adapterInstancesSupposedToRun) { - - // NOTE: This line is added to prevent modifying the existing map of instances supposed to run - // It looks like the parameter `adapterInstancesSupposedToRun` is not required at all, - // but this should be checked more carefully. - Map<String, AdapterDescription> adaptersToRecover = - new HashMap<>(adapterInstancesSupposedToRun); - - adapterInstancesGroupedByWorker.keySet().forEach(adapterEndpointUrl -> { - try { - List<AdapterDescription> allRunningInstancesOfOneWorker = - WorkerRestClient.getAllRunningAdapterInstanceDescriptions(adapterEndpointUrl - + WorkerPaths.getRunningAdaptersPath()); - - // only keep adapters where there is no running adapter instance - // therefore, all others are removed - allRunningInstancesOfOneWorker.forEach(adapterDescription -> adaptersToRecover - .remove(adapterDescription.getElementId())); - } catch (AdapterException e) { - LOG.info("Could not recover adapter at endpoint {} - " - + "marking it as requested to recover (reason: {})", adapterEndpointUrl, - e.getMessage()); - } - }); - - return adaptersToRecover; - } - - - public void recoverAdapters(Map<String, AdapterDescription> adaptersToRecover) { - for (AdapterDescription adapterDescription : adaptersToRecover.values()) { - // Invoke all adapters that were running when the adapter container was stopped - try { - if (adapterDescription.isRunning()) { - LOG.info("Start recovering adapter {} ", adapterDescription.getElementId()); - this.adapterMasterManagement.startStreamAdapter(adapterDescription.getElementId()); - LOG.info("Adapter {} is recovered", adapterDescription.getElementId()); - - } - } catch (AdapterException e) { - LOG.warn("Could not start adapter {} ({})", adapterDescription.getName(), e.getMessage()); - } catch (Exception e) { - LOG.error( - "Unexpected error while recovering adapter {} ({})", - adapterDescription.getName(), - e.getMessage() - ); - } - } - - } -} diff --git a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/WorkerAdministrationManagement.java b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/WorkerAdministrationManagement.java index 6cdad54c6b..32e5b7f813 100644 --- a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/WorkerAdministrationManagement.java +++ b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/WorkerAdministrationManagement.java @@ -20,7 +20,6 @@ package org.apache.streampipes.connect.management.management; import org.apache.streampipes.commons.exceptions.NoServiceEndpointsAvailableException; import org.apache.streampipes.commons.prometheus.adapter.AdapterMetrics; -import org.apache.streampipes.connect.management.health.AdapterHealthCheck; import org.apache.streampipes.connect.management.health.AdapterOperationLock; import org.apache.streampipes.manager.assets.AssetManager; import org.apache.streampipes.model.connect.adapter.AdapterDescription; @@ -47,7 +46,7 @@ public class WorkerAdministrationManagement { private final IAdapterStorage adapterDescriptionStorage; - private final AdapterHealthCheck adapterHealthCheck; + //private final AdapterHealthCheck adapterHealthCheck; public WorkerAdministrationManagement( IAdapterStorage adapterStorage, @@ -55,15 +54,15 @@ public class WorkerAdministrationManagement { AdapterResourceManager adapterResourceManager, DataStreamResourceManager dataStreamResourceManager ) { - this.adapterHealthCheck = new AdapterHealthCheck( - adapterStorage, - new AdapterMasterManagement( - adapterStorage, - adapterResourceManager, - dataStreamResourceManager, - adapterMetrics - ) - ); +// this.adapterHealthCheck = new AdapterHealthCheck( +// adapterStorage, +// new AdapterMasterManagement( +// adapterStorage, +// adapterResourceManager, +// dataStreamResourceManager, +// adapterMetrics +// ) +// ); this.adapterDescriptionStorage = CouchDbStorageManager.INSTANCE.getAdapterDescriptionStorage(); } @@ -81,11 +80,11 @@ public class WorkerAdministrationManagement { } else { LOG.info("Max retries for running adapter operations reached, will do unlock which might cause conflicts..."); AdapterOperationLock.INSTANCE.unlock(); - this.adapterHealthCheck.checkAndRestoreAdapters(); + //this.adapterHealthCheck.checkAndRestoreAdapters(); } } else { AdapterOperationLock.INSTANCE.lock(); - this.adapterHealthCheck.checkAndRestoreAdapters(); + //this.adapterHealthCheck.checkAndRestoreAdapters(); AdapterOperationLock.INSTANCE.unlock(); } } diff --git a/streampipes-connect-management/src/test/java/org/apache/streampipes/connect/management/health/AdapterHealthCheckTest.java b/streampipes-connect-management/src/test/java/org/apache/streampipes/connect/management/health/AdapterHealthCheckTest.java deleted file mode 100644 index 47612f1361..0000000000 --- a/streampipes-connect-management/src/test/java/org/apache/streampipes/connect/management/health/AdapterHealthCheckTest.java +++ /dev/null @@ -1,96 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.streampipes.connect.management.health; - -import org.apache.streampipes.commons.prometheus.adapter.AdapterMetricsManager; -import org.apache.streampipes.connect.management.management.AdapterMasterManagement; -import org.apache.streampipes.model.connect.adapter.AdapterDescription; -import org.apache.streampipes.resource.management.SpResourceManager; -import org.apache.streampipes.storage.api.IAdapterStorage; - -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; - -import java.util.List; - -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -public class AdapterHealthCheckTest { - - private IAdapterStorage adapterInstanceStorageMock; - - @BeforeEach - public void setUp() { - adapterInstanceStorageMock = mock(IAdapterStorage.class); - } - - @Test - public void getAllRunningInstancesAdapterDescriptionsEmpty() { - when(adapterInstanceStorageMock.findAll()).thenReturn(List.of()); - - var healthCheck = new AdapterHealthCheck( - adapterInstanceStorageMock, - new AdapterMasterManagement( - adapterInstanceStorageMock, - new SpResourceManager().manageAdapters(), - new SpResourceManager().manageDataStreams(), - AdapterMetricsManager.INSTANCE.getAdapterMetrics() - ) - ); - var result = healthCheck.getAllAdaptersSupposedToRun(); - - Assertions.assertTrue(result.isEmpty()); - } - - @Test - public void getAllRunningInstancesAdapterDescriptionsMixed() { - - var nameRunningAdapter = "running-adapter"; - var nameStoppedAdapter = "stopped-adapter"; - - var stoppedAdapter = new AdapterDescription(); - stoppedAdapter.setElementId(nameStoppedAdapter); - stoppedAdapter.setRunning(false); - - var runningAdapter = new AdapterDescription(); - runningAdapter.setElementId(nameRunningAdapter); - runningAdapter.setRunning(true); - - when(adapterInstanceStorageMock.findAll()).thenReturn(List.of(stoppedAdapter, runningAdapter)); - - var healthCheck = new AdapterHealthCheck( - adapterInstanceStorageMock, - new AdapterMasterManagement( - adapterInstanceStorageMock, - new SpResourceManager().manageAdapters(), - new SpResourceManager().manageDataStreams(), - AdapterMetricsManager.INSTANCE.getAdapterMetrics() - ) - ); - var result = healthCheck.getAllAdaptersSupposedToRun(); - - Assertions.assertEquals(1, result.size()); - Assertions.assertTrue(result.containsKey(nameRunningAdapter)); - Assertions.assertEquals(runningAdapter, result.get(nameRunningAdapter)); - - } - -} diff --git a/streampipes-data-explorer-influx/src/main/java/org/apache/streampipes/dataexplorer/influx/DataExplorerInfluxQueryExecutor.java b/streampipes-data-explorer-influx/src/main/java/org/apache/streampipes/dataexplorer/influx/DataExplorerInfluxQueryExecutor.java index f4e1df6095..fbb715be88 100644 --- a/streampipes-data-explorer-influx/src/main/java/org/apache/streampipes/dataexplorer/influx/DataExplorerInfluxQueryExecutor.java +++ b/streampipes-data-explorer-influx/src/main/java/org/apache/streampipes/dataexplorer/influx/DataExplorerInfluxQueryExecutor.java @@ -73,7 +73,7 @@ public class DataExplorerInfluxQueryExecutor extends DataExplorerQueryExecutor<Q * The influx client always returns a double for timestamp values. This method converts them to long values. */ private void convertTimestampValueToLong(List<Object> row) { - if (!row.isEmpty()) { + if (!row.isEmpty() && row.get(0) instanceof Number) { row.set(0, ((Number) row.get(0)).longValue()); } } diff --git a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/init/RunningInstances.java b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/init/RunningInstances.java index 538043d7f9..f14a94de00 100644 --- a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/init/RunningInstances.java +++ b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/init/RunningInstances.java @@ -26,6 +26,8 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; public enum RunningInstances { INSTANCE; @@ -65,6 +67,14 @@ public enum RunningInstances { return runningInstances.size(); } + public Set<String> getRunningInstanceIds() { + return runningInstances + .entrySet() + .stream() + .map((entry -> entry.getValue().getDescription().getElementId())) + .collect(Collectors.toSet()); + } + public List<String> getRunningInstanceIdsForElement(String appId) { // TODO change this to appId for STREAMPIPES-319 List<String> instanceIds = new ArrayList<>(); diff --git a/streampipes-health-monitoring/pom.xml b/streampipes-health-monitoring/pom.xml new file mode 100644 index 0000000000..a3347edfe6 --- /dev/null +++ b/streampipes-health-monitoring/pom.xml @@ -0,0 +1,59 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.streampipes</groupId> + <artifactId>streampipes-parent</artifactId> + <version>0.99.0-SNAPSHOT</version> + </parent> + + <artifactId>streampipes-health-monitoring</artifactId> + + <dependencies> + <dependency> + <groupId>org.apache.streampipes</groupId> + <artifactId>streampipes-pipeline-management</artifactId> + <version>0.99.0-SNAPSHOT</version> + </dependency> + <dependency> + <groupId>org.apache.streampipes</groupId> + <artifactId>streampipes-commons</artifactId> + <version>0.99.0-SNAPSHOT</version> + </dependency> + <dependency> + <groupId>org.apache.streampipes</groupId> + <artifactId>streampipes-connect-management</artifactId> + <version>0.99.0-SNAPSHOT</version> + </dependency> + <dependency> + <groupId>org.apache.streampipes</groupId> + <artifactId>streampipes-load-balancer</artifactId> + <version>0.99.0-SNAPSHOT</version> + </dependency> + <dependency> + <groupId>org.apache.streampipes</groupId> + <artifactId>streampipes-model</artifactId> + <version>0.99.0-SNAPSHOT</version> + </dependency> + <dependency> + <groupId>org.apache.streampipes</groupId> + <artifactId>streampipes-storage-api</artifactId> + <version>0.99.0-SNAPSHOT</version> + </dependency> + + + <dependency> + <groupId>org.junit.jupiter</groupId> + <artifactId>junit-jupiter-api</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-core</artifactId> + <scope>test</scope> + </dependency> + </dependencies> + +</project> diff --git a/streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/AdapterHealthCheck.java b/streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/AdapterHealthCheck.java new file mode 100644 index 0000000000..f742b23f57 --- /dev/null +++ b/streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/AdapterHealthCheck.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.streampipes.health.monitoring; + +import org.apache.streampipes.commons.exceptions.connect.AdapterException; +import org.apache.streampipes.commons.prometheus.adapter.AdapterMetrics; +import org.apache.streampipes.commons.prometheus.adapter.AdapterMetricsManager; +import org.apache.streampipes.health.monitoring.model.HealthCheckData; +import org.apache.streampipes.loadbalance.pipeline.ExtensionsLogProvider; +import org.apache.streampipes.model.connect.adapter.AdapterDescription; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.NoSuchElementException; +import java.util.Objects; +import java.util.stream.Collectors; + +public class AdapterHealthCheck { + + private static final Logger LOG = LoggerFactory.getLogger(AdapterHealthCheck.class); + + private final HealthCheckData healthCheckData; + + public AdapterHealthCheck(HealthCheckData healthCheckData) { + this.healthCheckData = healthCheckData; + } + + /** + * In this method it is checked which adapters are currently running. Then it calls all workers to + * validate if the adapter instance is still running as expected. If the adapter is not running + * anymore a new worker instance is invoked. In addition, it publishes monitoring metrics for all + * running adapters (in line with + * {@link PipelineHealthCheck}). + */ + public void runCheck() { + LOG.debug("Adapter health check started"); + + try { + if (!healthCheckData.activeResources().runningAdapters().isEmpty()) { + var allAdaptersToRecover = this.getAdaptersToRecover(); + + allAdaptersToRecover + .forEach(adapter -> + LOG.info("Adapter instance with id {} needs to be recovered", adapter.getElementId())); + // Filter adapters so that only healthy and running adapters are updated in the metrics + // endpoint + var adaptersToMonitor = healthCheckData.activeResources().runningAdapters().stream() + .filter(entry -> allAdaptersToRecover + .stream() + .noneMatch(r -> r.getElementId().equals(entry.getElementId())) + ) + .toList(); + + if (!adaptersToMonitor.isEmpty()) { + updateMonitoringMetrics(adaptersToMonitor); + } else { + LOG.debug("No running adapter instances to monitor."); + } + + LOG.debug("Monitoring metrics updated for running adapters."); + + // Recover Adapters + this.recoverAdapters(allAdaptersToRecover); + } + } catch (NoSuchElementException e) { + LOG.error("Could not update adapter metrics due to an invalid state. ({})", e.getMessage()); + } + } + + /** + * Updates the monitoring metrics based on the descriptions of running adapters. + * + * @param runningAdapterDescriptions A map containing the descriptions of running adapters, where + * the key is the adapter's element ID and the value is the corresponding adapter + * description. + */ + protected void updateMonitoringMetrics(List<AdapterDescription> runningAdapterDescriptions) { + + var adapterMetrics = AdapterMetricsManager.getInstance().getAdapterMetrics(); + runningAdapterDescriptions + .forEach(adapterDescription -> updateTotalEventsPublished(adapterMetrics, + adapterDescription.getElementId(), + adapterDescription.getName())); + LOG.debug("Monitoring {} adapter instances", adapterMetrics.size()); + } + + private void updateTotalEventsPublished(AdapterMetrics adapterMetrics, String adapterId, + String adapterName) { + + // Check if the adapter is already registered; if not, register it first. + // This step is crucial, especially when the StreamPipes Core service is restarted, + // and there are existing running adapters that need proper registration. + // Note: Proper registration is usually handled during the initial start of the adapter. + if (!adapterMetrics.contains(adapterId)) { + adapterMetrics.register(adapterId, adapterName); + } + + adapterMetrics.updateTotalEventsPublished(adapterId, adapterName, ExtensionsLogProvider.INSTANCE + .getMetricInfosForResource(adapterId).getMessagesOut().getCounter()); + } + + + /** + * Retrieves a list of adapters to recover by comparing the provided groupings of adapter instances + * with the instances supposed to run according to the storage. For every adapter instance it is + * verified that it actually runs on a worker node. If this is not the case, it is added to the + * output of adapters to recover. + * + * running. + * @return A new map containing adapter instances to recover, filtered based on running instances. + */ + public List<AdapterDescription> getAdaptersToRecover() { + + var runningAdapterIds = + healthCheckData.activeExtensionInstances() + .values() + .stream() + .flatMap(h -> h.runningAdapterInstanceIds().stream()) + .collect(Collectors.toSet()); + + return healthCheckData.activeResources() + .runningAdapters() + .stream() + .filter(Objects::nonNull) + .filter(a -> a.getElementId() != null) + .filter(a -> !runningAdapterIds.contains(a.getElementId())) + .toList(); + } + + public void recoverAdapters(List<AdapterDescription> adaptersToRecover) { + for (AdapterDescription adapterDescription : adaptersToRecover) { + // Invoke all adapters that were running when the adapter container was stopped + try { + if (adapterDescription.isRunning()) { + LOG.debug("Start recovering adapter {} ", adapterDescription.getElementId()); + this.healthCheckData.resourceProvider().adapterMasterManagement().startStreamAdapter(adapterDescription.getElementId()); + LOG.info("Adapter {} is recovered", adapterDescription.getElementId()); + + } + } catch (AdapterException e) { + LOG.warn("Could not start adapter {} ({})", adapterDescription.getName(), e.getMessage()); + } catch (Exception e) { + LOG.error( + "Unexpected error while recovering adapter {} ({})", + adapterDescription.getName(), + e.getMessage() + ); + } + } + } +} diff --git a/streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/ExtensionHealthCheck.java b/streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/ExtensionHealthCheck.java new file mode 100644 index 0000000000..e7d19bae28 --- /dev/null +++ b/streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/ExtensionHealthCheck.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.health.monitoring; + +import org.apache.streampipes.health.monitoring.model.HealthCheckData; +import org.apache.streampipes.model.health.ExtensionInstanceHealth; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; + +public class ExtensionHealthCheck implements Runnable { + + private static final Logger LOG = LoggerFactory.getLogger(ExtensionHealthCheck.class); + + private final ResourceProvider resourceProvider; + + public ExtensionHealthCheck(ResourceProvider resourceProvider) { + this.resourceProvider = resourceProvider; + } + + @Override + public void run() { + try { + var activeResources = resourceProvider.loadActiveResources(); + var activeCoreInstances = resourceProvider.loadActiveInstances(activeResources); + + var activeExtensionInstances = new HashMap<String, ExtensionInstanceHealth>(); + activeCoreInstances.keySet().forEach(k -> { + activeExtensionInstances.put(k, new PipelineElementEndpointHealthCheck(k).checkRunningInstances()); + }); + + var healthCheckData = new HealthCheckData(resourceProvider, activeResources, activeCoreInstances, activeExtensionInstances); + new PipelineHealthCheck(healthCheckData).runCheck(); + new AdapterHealthCheck(healthCheckData).runCheck(); + } catch (Exception e) { + LOG.warn("An unhandled error occurred while running health check.", e); + } + } +} diff --git a/streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/PipelineElementEndpointHealthCheck.java b/streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/PipelineElementEndpointHealthCheck.java new file mode 100644 index 0000000000..779735b297 --- /dev/null +++ b/streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/PipelineElementEndpointHealthCheck.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.streampipes.health.monitoring; + +import org.apache.streampipes.manager.execution.ExtensionServiceExecutions; +import org.apache.streampipes.model.health.ExtensionInstanceHealth; +import org.apache.streampipes.serializers.json.JacksonSerializer; + +import com.fasterxml.jackson.core.JsonProcessingException; +import org.apache.http.util.EntityUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Set; + +public class PipelineElementEndpointHealthCheck { + + private static final Logger LOG = LoggerFactory.getLogger(PipelineElementEndpointHealthCheck.class); + private static final String InstancePath = "/health"; + + private final String serviceBaseUrl; + + public PipelineElementEndpointHealthCheck(String serviceBaseUrl) { + this.serviceBaseUrl = serviceBaseUrl; + } + + public ExtensionInstanceHealth checkRunningInstances() { + try { + var request = ExtensionServiceExecutions.extServiceGetRequest(makeRequestUrl()); + var response = request.execute().returnResponse(); + if (response.getStatusLine().getStatusCode() != 200) { + return new ExtensionInstanceHealth(Set.of(), Set.of()); + } + String body = EntityUtils.toString(response.getEntity(), StandardCharsets.UTF_8); + return deserialize(body); + + } catch (IOException e) { + LOG.error("Extension service {} is unavailable", serviceBaseUrl); + return new ExtensionInstanceHealth(Set.of(), Set.of()); + } + } + + private ExtensionInstanceHealth deserialize(String json) throws JsonProcessingException { + return JacksonSerializer.getObjectMapper().readValue(json, ExtensionInstanceHealth.class); + } + + private String makeRequestUrl() { + return serviceBaseUrl + InstancePath; + } +} diff --git a/streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/PipelineHealthCheck.java b/streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/PipelineHealthCheck.java new file mode 100644 index 0000000000..fcb44ac9e9 --- /dev/null +++ b/streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/PipelineHealthCheck.java @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.streampipes.health.monitoring; + +import org.apache.streampipes.commons.exceptions.NoServiceEndpointsAvailableException; +import org.apache.streampipes.commons.prometheus.pipelines.PipelinesStats; +import org.apache.streampipes.health.monitoring.model.HealthCheckData; +import org.apache.streampipes.health.monitoring.utils.HealthCheckUtils; +import org.apache.streampipes.manager.execution.endpoint.ExtensionsServiceEndpointGenerator; +import org.apache.streampipes.manager.execution.endpoint.ExtensionsServiceEndpointUtils; +import org.apache.streampipes.manager.execution.http.InvokeHttpRequest; +import org.apache.streampipes.manager.storage.RunningPipelineElementStorage; +import org.apache.streampipes.model.base.InvocableStreamPipesEntity; +import org.apache.streampipes.model.pipeline.Pipeline; +import org.apache.streampipes.model.pipeline.PipelineHealthStatus; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.apache.streampipes.manager.pipeline.PipelineManager.getPipeline; + +public class PipelineHealthCheck { + + private static final Logger LOG = LoggerFactory.getLogger(PipelineHealthCheck.class); + private static final int MAX_FAILED_ATTEMPTS = 10; + + private static final Map<String, Integer> failedRestartAttempts = new HashMap<>(); + private static final PipelinesStats pipelinesStats = new PipelinesStats(); + + private final HealthCheckData healthCheckData; + + public PipelineHealthCheck(HealthCheckData healthCheckData) { + this.healthCheckData = healthCheckData; + } + + public void runCheck() { + try { + initPipelineMetrics(); + + if (!healthCheckData.activeResources().runningPipelines().isEmpty()) { + checkAndRestorePipelineElements(); + } + pipelinesStats.metrics(); + } catch (Exception e) { + LOG.error("Error while checking and restoring pipeline elements", e); + } + } + + private void initPipelineMetrics() { + pipelinesStats.clear(); + pipelinesStats.setAllPipelines(healthCheckData.activeResources().allPipelines().size()); + pipelinesStats.setRunningPipelines(healthCheckData.activeResources().runningPipelines().size()); + pipelinesStats.setStoppedPipelines(pipelinesStats.getAllPipelines() + - pipelinesStats.getRunningPipelines()); + + for (Pipeline p : healthCheckData.activeResources().allPipelines()) { + pipelinesStats.updatePipelineHealthState( + p.getElementId(), + p.getName(), + p.getHealthStatus().toString()); + + pipelinesStats.updatePipelineRunningState( + p.getElementId(), + p.getName(), + p.isRunning()); + } + } + + private void checkAndRestorePipelineElements() { + healthCheckData.activeResources().runningPipelines().forEach(pipeline -> { + AtomicBoolean shouldUpdatePipeline = new AtomicBoolean(false); + List<String> failedInstances = new ArrayList<>(); + List<String> recoveredInstances = new ArrayList<>(); + List<String> pipelineNotifications = new ArrayList<>(); + List<InvocableStreamPipesEntity> runningPipelineElements = RunningPipelineElementStorage.runningProcessorsAndSinks + .get(pipeline.getPipelineId()); + + if (Objects.nonNull(runningPipelineElements)) { + runningPipelineElements.forEach(pipelineElement -> { + String instanceId = HealthCheckUtils.extractInstanceId(pipelineElement); + if (isNowhereRunning(instanceId)) { + if (shouldRetry(instanceId)) { + var serviceBaseUrl = pipelineElement.getSelectedEndpointUrl(); + shouldUpdatePipeline.set(true); + boolean success; + try { + serviceBaseUrl = new ExtensionsServiceEndpointGenerator().getEndpointBaseUrl( + pipelineElement.getAppId(), + ExtensionsServiceEndpointUtils.getPipelineElementType(pipelineElement.getAppId()), + Collections.emptySet() + ); + var invocationUrl = getInvocationUrl(pipelineElement, serviceBaseUrl); + success = new InvokeHttpRequest() + .execute(pipelineElement, invocationUrl, pipeline.getPipelineId()).isSuccess(); + } catch (NoServiceEndpointsAvailableException e) { + success = false; + } + if (!success) { + failedInstances.add(instanceId); + HealthCheckUtils.addFailedAttemptNotification(pipelineNotifications, pipelineElement); + increaseFailedAttempt(instanceId); + LOG.info("Could not restore pipeline element {} of pipeline {} ({}/{})", + pipelineElement.getName(), pipeline.getName(), failedRestartAttempts.get(instanceId), + MAX_FAILED_ATTEMPTS); + } else { + recoveredInstances.add(instanceId); + HealthCheckUtils.addSuccessfulRestoreNotification(pipelineNotifications, pipelineElement); + resetFailedAttempts(instanceId); + pipelineElement.setSelectedEndpointUrl(serviceBaseUrl); + LOG.info("Successfully restored pipeline element {} of pipeline {}", + pipelineElement.getName(), pipeline.getName()); + } + } + } + }); + } + if (shouldUpdatePipeline.get()) { + var currentPipeline = getPipeline(pipeline.getPipelineId()); + if (!failedInstances.isEmpty()) { + currentPipeline.setHealthStatus(PipelineHealthStatus.FAILURE); + pipelinesStats.failedIncrease(); + } else if (!recoveredInstances.isEmpty()) { + currentPipeline.setHealthStatus(PipelineHealthStatus.REQUIRES_ATTENTION); + pipelinesStats.attentionRequiredIncrease(); + } + currentPipeline.setPipelineNotifications(pipelineNotifications); + healthCheckData.resourceProvider().pipelineStorage().updateElement(currentPipeline); + pipelinesStats.updatePipelineHealthState(currentPipeline.getElementId(), currentPipeline.getName(), + currentPipeline.getHealthStatus().toString()); + } + }); + int healthNum = pipelinesStats.getRunningPipelines() - pipelinesStats.getFailedPipelines() + - pipelinesStats.getAttentionRequiredPipelines(); + pipelinesStats.setHealthyPipelines(healthNum); + pipelinesStats.setElementCount(getElementsCount(healthCheckData.activeResources().allPipelines())); + } + + private boolean isNowhereRunning(String instanceId) { + return (healthCheckData.activeExtensionInstances().entrySet().stream() + .noneMatch(entry -> entry.getValue().runningPipelineElementInstanceIds().contains(instanceId))); + } + + private boolean shouldRetry(String instanceId) { + if (!failedRestartAttempts.containsKey(instanceId)) { + return true; + } else { + return failedRestartAttempts.get(instanceId) < MAX_FAILED_ATTEMPTS; + } + } + + private void resetFailedAttempts(String instanceId) { + failedRestartAttempts.put(instanceId, 0); + } + + private void increaseFailedAttempt(String instanceId) { + if (!failedRestartAttempts.containsKey(instanceId)) { + failedRestartAttempts.put(instanceId, 1); + } else { + Integer currentAttempt = failedRestartAttempts.get(instanceId) + 1; + failedRestartAttempts.put(instanceId, currentAttempt); + } + } + + private int getElementsCount(List<Pipeline> allPipelines) { + return allPipelines.stream().mapToInt(pipeline -> pipeline.getActions().size()).sum(); + } + + private String getInvocationUrl(InvocableStreamPipesEntity pipelineElement, + String baseUrl) { + return ExtensionsServiceEndpointUtils + .getPipelineElementType(pipelineElement) + .getInvocationUrl(baseUrl, pipelineElement.getAppId()); + } +} diff --git a/streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/ResourceProvider.java b/streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/ResourceProvider.java new file mode 100644 index 0000000000..aa878b06be --- /dev/null +++ b/streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/ResourceProvider.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.health.monitoring; + +import org.apache.streampipes.connect.management.management.AdapterMasterManagement; +import org.apache.streampipes.health.monitoring.model.ActiveCoreInstances; +import org.apache.streampipes.health.monitoring.model.ActiveResources; +import org.apache.streampipes.manager.storage.RunningPipelineElementStorage; +import org.apache.streampipes.model.base.InvocableStreamPipesEntity; +import org.apache.streampipes.model.connect.adapter.AdapterDescription; +import org.apache.streampipes.model.pipeline.Pipeline; +import org.apache.streampipes.storage.api.IAdapterStorage; +import org.apache.streampipes.storage.api.IPipelineStorage; + +import java.util.AbstractMap; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + +public record ResourceProvider(IPipelineStorage pipelineStorage, + IAdapterStorage adapterInstanceStorage, + AdapterMasterManagement adapterMasterManagement) { + + public ActiveResources loadActiveResources() { + var allPipelines = pipelineStorage.findAll(); + var runningPipelines = allPipelines.stream().filter(Pipeline::isRunning).toList(); + var allAdapters = adapterInstanceStorage.findAll(); + var runningAdapters = allAdapters.stream().filter(AdapterDescription::isRunning).toList(); + + return new ActiveResources( + allPipelines, + runningPipelines, + allAdapters, + runningAdapters + ); + } + + public Map<String, ActiveCoreInstances> loadActiveInstances(ActiveResources activeResources) { + Map<String, List<AdapterDescription>> adaptersByUrl = + activeResources.runningAdapters() + .stream() + .filter(a -> a.getSelectedEndpointUrl() != null) + .collect(Collectors.groupingBy(AdapterDescription::getSelectedEndpointUrl)); + + Map<String, Map<String, InvocableStreamPipesEntity>> elementsByUrl = + activeResources.runningPipelines() + .stream() + .flatMap(p -> { + String pipelineId = p.getPipelineId(); + return Optional.ofNullable( + RunningPipelineElementStorage.runningProcessorsAndSinks.get(pipelineId) + ) + .orElseGet(List::of) + .stream() + .filter(Objects::nonNull) + .map(e -> new AbstractMap.SimpleEntry<>(pipelineId, e)); + }) + .filter(entry -> entry.getValue().getSelectedEndpointUrl() != null) + .collect(Collectors.groupingBy( + entry -> entry.getValue().getSelectedEndpointUrl(), + Collectors.toMap( + Map.Entry::getKey, + AbstractMap.SimpleEntry::getValue, + (left, right) -> left) + )); + + Set<String> allUrls = new HashSet<>(); + allUrls.addAll(adaptersByUrl.keySet()); + allUrls.addAll(elementsByUrl.keySet()); + + Map<String, ActiveCoreInstances> result = new HashMap<>(); + for (String url : allUrls) { + List<AdapterDescription> adapters = adaptersByUrl.getOrDefault(url, List.of()); + Map<String, InvocableStreamPipesEntity> elementsPerPipeline = elementsByUrl.getOrDefault(url, Map.of()); + result.put(url, new ActiveCoreInstances(adapters, elementsPerPipeline)); + } + + return result; + } +} diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/health/ServiceHealthCheck.java b/streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/ServiceHealthCheck.java similarity index 83% rename from streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/health/ServiceHealthCheck.java rename to streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/ServiceHealthCheck.java index 949b65fa0a..e252efd755 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/health/ServiceHealthCheck.java +++ b/streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/ServiceHealthCheck.java @@ -15,7 +15,7 @@ * limitations under the License. * */ -package org.apache.streampipes.manager.health; +package org.apache.streampipes.health.monitoring; import org.apache.streampipes.commons.environment.Environment; @@ -24,7 +24,7 @@ import org.apache.streampipes.loadbalance.LoadManager; import org.apache.streampipes.manager.execution.ExtensionServiceExecutions; import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceRegistration; import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceStatus; -import org.apache.streampipes.storage.management.StorageDispatcher; +import org.apache.streampipes.storage.api.CRUDStorage; import org.apache.http.HttpStatus; import org.slf4j.Logger; @@ -32,9 +32,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.ArrayList; -import java.util.HashSet; import java.util.List; -import java.util.stream.Collectors; public class ServiceHealthCheck implements Runnable { @@ -45,8 +43,7 @@ public class ServiceHealthCheck implements Runnable { private final List<SpServiceRegistration> needDeletedServices = new ArrayList<>(); - public ServiceHealthCheck() { - var storage = StorageDispatcher.INSTANCE.getNoSqlStore().getExtensionsServiceStorage(); + public ServiceHealthCheck(CRUDStorage<SpServiceRegistration> storage) { this.serviceRegistrationManager = new ServiceRegistrationManager(storage); this.maxUnhealthyDurationBeforeRemovalMs = Environments.getEnvironment() .getUnhealthyTimeBeforeServiceDeletionInMillis().getValueOrDefault(); @@ -68,10 +65,8 @@ public class ServiceHealthCheck implements Runnable { } finally { needDeletedServices.clear(); } - new PipelineHealthCheck().run(); } - private void checkServiceHealth(SpServiceRegistration service) { String healthCheckUrl = makeHealthCheckUrl(service); @@ -117,15 +112,4 @@ public class ServiceHealthCheck implements Runnable { private List<SpServiceRegistration> getRegisteredServices() { return serviceRegistrationManager.getAllServices(); } - - public static List<SpServiceRegistration> getService(String tag, - List<SpServiceRegistration> activeServices) { - return activeServices.stream().filter(service -> filtersSupported(service, tag)) - .filter(service -> service.getStatus() != SpServiceStatus.HEALTHY) - .collect(Collectors.toList()); - } - - private static boolean filtersSupported(SpServiceRegistration service, String tag) { - return new HashSet<>(service.getTags()).stream().anyMatch(t -> t.asString().equals(tag)); - } } diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/health/ServiceRegistrationManager.java b/streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/ServiceRegistrationManager.java similarity index 95% rename from streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/health/ServiceRegistrationManager.java rename to streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/ServiceRegistrationManager.java index 3bbee91f74..e3fb6271e1 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/health/ServiceRegistrationManager.java +++ b/streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/ServiceRegistrationManager.java @@ -15,7 +15,7 @@ * limitations under the License. * */ -package org.apache.streampipes.manager.health; +package org.apache.streampipes.health.monitoring; import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceRegistration; import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceStatus; @@ -81,10 +81,6 @@ public class ServiceRegistrationManager { serviceRegistration.getSvcId()); } - public SpServiceStatus getServiceStatus(String serviceId) { - return storage.getElementById(serviceId).getStatus(); - } - private void logService(SpServiceRegistration serviceRegistration) { LOG.info("Service {} (id={}) is now in {} state", serviceRegistration.getSvcGroup(), serviceRegistration.getSvcId(), serviceRegistration.getStatus()); diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/DetachPipelineElementSubmitter.java b/streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/model/ActiveCoreInstances.java similarity index 50% copy from streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/DetachPipelineElementSubmitter.java copy to streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/model/ActiveCoreInstances.java index 16b8772ca4..272ed4a0c4 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/DetachPipelineElementSubmitter.java +++ b/streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/model/ActiveCoreInstances.java @@ -16,33 +16,14 @@ * */ -package org.apache.streampipes.manager.execution.http; +package org.apache.streampipes.health.monitoring.model; -import org.apache.streampipes.model.api.EndpointSelectable; import org.apache.streampipes.model.base.InvocableStreamPipesEntity; -import org.apache.streampipes.model.pipeline.Pipeline; -import org.apache.streampipes.model.pipeline.PipelineElementStatus; +import org.apache.streampipes.model.connect.adapter.AdapterDescription; import java.util.List; +import java.util.Map; -public class DetachPipelineElementSubmitter extends PipelineElementSubmitter { - - public DetachPipelineElementSubmitter(Pipeline pipeline) { - super(pipeline); - } - - @Override - protected PipelineElementStatus submitElement(EndpointSelectable pipelineElement) { - return performDetach(pipelineElement); - } - - @Override - protected void onSuccess() { - status.setTitle("Pipeline " + pipelineName + " successfully stopped"); - } - - @Override - protected void onFailure(List<InvocableStreamPipesEntity> processorsAndSinks) { - status.setTitle("Could not stop all pipeline elements of pipeline " + pipelineName + "."); - } +public record ActiveCoreInstances(List<AdapterDescription> adapters, + Map<String, InvocableStreamPipesEntity> elementsPerPipeline) { } diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/DetachPipelineElementSubmitter.java b/streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/model/ActiveResources.java similarity index 50% copy from streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/DetachPipelineElementSubmitter.java copy to streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/model/ActiveResources.java index 16b8772ca4..862ef6d64a 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/DetachPipelineElementSubmitter.java +++ b/streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/model/ActiveResources.java @@ -16,33 +16,15 @@ * */ -package org.apache.streampipes.manager.execution.http; +package org.apache.streampipes.health.monitoring.model; -import org.apache.streampipes.model.api.EndpointSelectable; -import org.apache.streampipes.model.base.InvocableStreamPipesEntity; +import org.apache.streampipes.model.connect.adapter.AdapterDescription; import org.apache.streampipes.model.pipeline.Pipeline; -import org.apache.streampipes.model.pipeline.PipelineElementStatus; import java.util.List; -public class DetachPipelineElementSubmitter extends PipelineElementSubmitter { - - public DetachPipelineElementSubmitter(Pipeline pipeline) { - super(pipeline); - } - - @Override - protected PipelineElementStatus submitElement(EndpointSelectable pipelineElement) { - return performDetach(pipelineElement); - } - - @Override - protected void onSuccess() { - status.setTitle("Pipeline " + pipelineName + " successfully stopped"); - } - - @Override - protected void onFailure(List<InvocableStreamPipesEntity> processorsAndSinks) { - status.setTitle("Could not stop all pipeline elements of pipeline " + pipelineName + "."); - } +public record ActiveResources(List<Pipeline> allPipelines, + List<Pipeline> runningPipelines, + List<AdapterDescription> allAdapters, + List<AdapterDescription> runningAdapters) { } diff --git a/streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/model/HealthCheckData.java b/streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/model/HealthCheckData.java new file mode 100644 index 0000000000..6a0744d10f --- /dev/null +++ b/streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/model/HealthCheckData.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.health.monitoring.model; + +import org.apache.streampipes.health.monitoring.ResourceProvider; +import org.apache.streampipes.model.health.ExtensionInstanceHealth; + +import java.util.Map; + +public record HealthCheckData(ResourceProvider resourceProvider, + ActiveResources activeResources, + Map<String, ActiveCoreInstances> activeCoreInstances, + Map<String, ExtensionInstanceHealth> activeExtensionInstances) { +} diff --git a/streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/utils/HealthCheckUtils.java b/streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/utils/HealthCheckUtils.java new file mode 100644 index 0000000000..c94e80a8e7 --- /dev/null +++ b/streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/utils/HealthCheckUtils.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.health.monitoring.utils; + +import org.apache.streampipes.commons.constants.InstanceIdExtractor; +import org.apache.streampipes.model.base.InvocableStreamPipesEntity; + +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; +import java.util.List; + +public class HealthCheckUtils { + + public static void addSuccessfulRestoreNotification(List<String> pipelineNotifications, + InvocableStreamPipesEntity pipelineElement) { + pipelineNotifications.add(getCurrentDatetime() + "Pipeline element '" + pipelineElement.getName() + + "' was not available and was successfully restored."); + } + + public static void addFailedAttemptNotification(List<String> pipelineNotifications, + InvocableStreamPipesEntity pipelineElement) { + pipelineNotifications.add(getCurrentDatetime() + "Pipeline element '" + pipelineElement.getName() + + "' was not available and could not be restored."); + } + + private static String getCurrentDatetime() { + DateTimeFormatter dtf = DateTimeFormatter.ofPattern("uuuu/MM/dd HH:mm:ss"); + LocalDateTime now = LocalDateTime.now(); + return "[" + dtf.format(now) + "] "; + } + + public static String extractInstanceId(InvocableStreamPipesEntity pipelineElement) { + return InstanceIdExtractor.extractId(pipelineElement.getElementId()); + } +} diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/health/ExtensionInstanceHealth.java b/streampipes-model/src/main/java/org/apache/streampipes/model/health/ExtensionInstanceHealth.java new file mode 100644 index 0000000000..2adf8a0839 --- /dev/null +++ b/streampipes-model/src/main/java/org/apache/streampipes/model/health/ExtensionInstanceHealth.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.model.health; + +import java.util.Set; + +public record ExtensionInstanceHealth(Set<String> runningAdapterInstanceIds, + Set<String> runningPipelineElementInstanceIds) { +} diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/util/PropertyUtils.java b/streampipes-model/src/main/java/org/apache/streampipes/model/util/PropertyUtils.java index 5cf7cba8dd..028982f953 100644 --- a/streampipes-model/src/main/java/org/apache/streampipes/model/util/PropertyUtils.java +++ b/streampipes-model/src/main/java/org/apache/streampipes/model/util/PropertyUtils.java @@ -30,10 +30,6 @@ import java.util.Map; public class PropertyUtils { - public static Map<String, Object> getRuntimeFormat(EventProperty eventProperty) { - return getUntypedRuntimeFormat(eventProperty); - } - public static Map<String, Object> getUntypedRuntimeFormat(EventProperty ep) { if (ep instanceof EventPropertyPrimitive) { Map<String, Object> result = new HashMap<>(); diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/ExtensionServiceExecutions.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/ExtensionServiceExecutions.java index 70bd2e8cef..e4aca6b6c0 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/ExtensionServiceExecutions.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/ExtensionServiceExecutions.java @@ -35,7 +35,6 @@ public class ExtensionServiceExecutions { .socketTimeout(10000); } - private static String getServiceAdminSid() { return new SpResourceManager().manageUsers().getServiceAdmin().getPrincipalId(); } diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/PipelineExecutionInfo.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/PipelineExecutionInfo.java index 6451131d65..49d3ff72d4 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/PipelineExecutionInfo.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/PipelineExecutionInfo.java @@ -67,7 +67,7 @@ public class PipelineExecutionInfo { return processorsAndSinks; } - public void applyPipelineOperationStatus(PipelineOperationStatus status) { + public void setPipelineOperationStatus(PipelineOperationStatus status) { this.pipelineOperationStatus = status; } @@ -80,6 +80,6 @@ public class PipelineExecutionInfo { } public boolean isOperationSuccessful() { - return failedServices.size() == 0 && pipelineOperationStatus.isSuccess(); + return failedServices.isEmpty() && pipelineOperationStatus.isSuccess(); } } diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/endpoint/ExtensionsServiceEndpointGenerator.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/endpoint/ExtensionsServiceEndpointGenerator.java index 5987e769c3..1b2d8ea647 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/endpoint/ExtensionsServiceEndpointGenerator.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/endpoint/ExtensionsServiceEndpointGenerator.java @@ -44,20 +44,23 @@ public class ExtensionsServiceEndpointGenerator implements IExtensionsServiceEnd public ExtensionsServiceEndpointGenerator() {} - public String getEndpointResourceUrl(String appId, SpServiceUrlProvider spServiceUrlProvider, + public String getEndpointResourceUrl(String appId, + SpServiceUrlProvider spServiceUrlProvider, Set<SpServiceTag> customServiceTags) throws NoServiceEndpointsAvailableException { return spServiceUrlProvider .getInvocationUrl(selectService(appId, spServiceUrlProvider, customServiceTags), appId); } - public String getEndpointBaseUrl(String appId, SpServiceUrlProvider spServiceUrlProvider, + public String getEndpointBaseUrl(String appId, + SpServiceUrlProvider spServiceUrlProvider, Set<SpServiceTag> customServiceTags) throws NoServiceEndpointsAvailableException { return selectService(appId, spServiceUrlProvider, customServiceTags); } - private String selectService(String appId, SpServiceUrlProvider spServiceUrlProvider, + private String selectService(String appId, + SpServiceUrlProvider spServiceUrlProvider, Set<SpServiceTag> customServiceTags) throws NoServiceEndpointsAvailableException { Environment env = Environments.getEnvironment(); @@ -84,7 +87,8 @@ public class ExtensionsServiceEndpointGenerator implements IExtensionsServiceEnd "Could not find any matching service endpoints - are all software components running?"); } - private List<String> getServiceEndpoints(String appId, SpServiceUrlProvider spServiceUrlProvider, + private List<String> getServiceEndpoints(String appId, + SpServiceUrlProvider spServiceUrlProvider, Set<SpServiceTag> customServiceTags) { return SpServiceDiscovery.getServiceDiscovery() .getServiceEndpoints(DefaultSpServiceTypes.EXT, true, diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineElementSubmitter.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/BasePipelineElementSubmitter.java similarity index 71% rename from streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineElementSubmitter.java rename to streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/BasePipelineElementSubmitter.java index 14937b8e56..7462e6f4d9 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineElementSubmitter.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/BasePipelineElementSubmitter.java @@ -18,7 +18,7 @@ package org.apache.streampipes.manager.execution.http; -import org.apache.streampipes.model.api.EndpointSelectable; +import org.apache.streampipes.manager.execution.endpoint.ExtensionsServiceEndpointUtils; import org.apache.streampipes.model.base.InvocableStreamPipesEntity; import org.apache.streampipes.model.pipeline.Pipeline; import org.apache.streampipes.model.pipeline.PipelineElementStatus; @@ -26,14 +26,14 @@ import org.apache.streampipes.model.pipeline.PipelineOperationStatus; import java.util.List; -public abstract class PipelineElementSubmitter { +public abstract class BasePipelineElementSubmitter { protected final String pipelineId; protected final String pipelineName; protected final PipelineOperationStatus status; - public PipelineElementSubmitter(Pipeline pipeline) { + public BasePipelineElementSubmitter(Pipeline pipeline) { this.pipelineId = pipeline.getPipelineId(); this.pipelineName = pipeline.getName(); this.status = new PipelineOperationStatus(pipelineId, pipelineName); @@ -41,7 +41,10 @@ public abstract class PipelineElementSubmitter { public PipelineOperationStatus submit(List<InvocableStreamPipesEntity> processorsAndSinks) { // First, try handling all data processors and sinks - processorsAndSinks.forEach(g -> status.addPipelineElementStatus(submitElement(g))); + processorsAndSinks.forEach(g -> { + var response = submitElement(g); + status.addPipelineElementStatus(response); + }); applySuccess(processorsAndSinks); return status; @@ -60,12 +63,18 @@ public abstract class PipelineElementSubmitter { } } - protected PipelineElementStatus performDetach(EndpointSelectable pipelineElement) { - String endpointUrl = pipelineElement.getSelectedEndpointUrl() + pipelineElement.getDetachPath(); + protected String getInvocationUrl(InvocableStreamPipesEntity pipelineElement) { + return ExtensionsServiceEndpointUtils + .getPipelineElementType(pipelineElement) + .getInvocationUrl(pipelineElement.getSelectedEndpointUrl(), pipelineElement.getAppId()); + } + + protected PipelineElementStatus performDetach(InvocableStreamPipesEntity pipelineElement) { + String endpointUrl = getInvocationUrl(pipelineElement) + pipelineElement.getDetachPath(); return new DetachHttpRequest().execute(pipelineElement, endpointUrl, this.pipelineId); } - protected abstract PipelineElementStatus submitElement(EndpointSelectable pipelineElement); + protected abstract PipelineElementStatus submitElement(InvocableStreamPipesEntity pipelineElement); protected abstract void onSuccess(); diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/DetachPipelineElementSubmitter.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/DetachPipelineElementSubmitter.java index 16b8772ca4..816a043c2f 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/DetachPipelineElementSubmitter.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/DetachPipelineElementSubmitter.java @@ -18,21 +18,20 @@ package org.apache.streampipes.manager.execution.http; -import org.apache.streampipes.model.api.EndpointSelectable; import org.apache.streampipes.model.base.InvocableStreamPipesEntity; import org.apache.streampipes.model.pipeline.Pipeline; import org.apache.streampipes.model.pipeline.PipelineElementStatus; import java.util.List; -public class DetachPipelineElementSubmitter extends PipelineElementSubmitter { +public class DetachPipelineElementSubmitter extends BasePipelineElementSubmitter { public DetachPipelineElementSubmitter(Pipeline pipeline) { super(pipeline); } @Override - protected PipelineElementStatus submitElement(EndpointSelectable pipelineElement) { + protected PipelineElementStatus submitElement(InvocableStreamPipesEntity pipelineElement) { return performDetach(pipelineElement); } diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/InvokePipelineElementSubmitter.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/InvokePipelineElementSubmitter.java index 986bfe6ead..238214a20f 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/InvokePipelineElementSubmitter.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/InvokePipelineElementSubmitter.java @@ -18,7 +18,6 @@ package org.apache.streampipes.manager.execution.http; -import org.apache.streampipes.model.api.EndpointSelectable; import org.apache.streampipes.model.base.InvocableStreamPipesEntity; import org.apache.streampipes.model.pipeline.Pipeline; import org.apache.streampipes.model.pipeline.PipelineElementStatus; @@ -30,7 +29,7 @@ import org.slf4j.LoggerFactory; import java.util.List; import java.util.Optional; -public class InvokePipelineElementSubmitter extends PipelineElementSubmitter { +public class InvokePipelineElementSubmitter extends BasePipelineElementSubmitter { private static final Logger LOG = LoggerFactory.getLogger(InvokePipelineElementSubmitter.class); @@ -39,9 +38,9 @@ public class InvokePipelineElementSubmitter extends PipelineElementSubmitter { } @Override - protected PipelineElementStatus submitElement(EndpointSelectable pipelineElement) { - String endpointUrl = pipelineElement.getSelectedEndpointUrl(); - return new InvokeHttpRequest().execute(pipelineElement, endpointUrl, this.pipelineId); + protected PipelineElementStatus submitElement(InvocableStreamPipesEntity pipelineElement) { + var invocationUrl = getInvocationUrl(pipelineElement); + return new InvokeHttpRequest().execute(pipelineElement, invocationUrl, this.pipelineId); } @Override diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/task/DiscoverEndpointsTask.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/task/DiscoverEndpointsTask.java index 78ca9ee3ce..a3b6cb5141 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/task/DiscoverEndpointsTask.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/task/DiscoverEndpointsTask.java @@ -18,20 +18,17 @@ package org.apache.streampipes.manager.execution.task; -import org.apache.streampipes.commons.exceptions.NoServiceEndpointsAvailableException; import org.apache.streampipes.loadbalance.LoadManager; import org.apache.streampipes.loadbalance.unit.PipelineElementPartitioner; import org.apache.streampipes.manager.execution.PipelineExecutionInfo; -import org.apache.streampipes.manager.execution.endpoint.ExtensionsServiceEndpointGenerator; -import org.apache.streampipes.manager.execution.endpoint.ExtensionsServiceEndpointUtils; import org.apache.streampipes.model.api.EndpointSelectable; -import org.apache.streampipes.model.base.InvocableStreamPipesEntity; import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceRegistration; import org.apache.streampipes.model.pipeline.Pipeline; import org.apache.streampipes.model.pipeline.PipelineElementStatus; import org.apache.streampipes.model.pipeline.PipelineOperationStatus; import java.util.List; +import java.util.Objects; import java.util.stream.Collectors; public class DiscoverEndpointsTask implements PipelineExecutionTask { @@ -40,14 +37,13 @@ public class DiscoverEndpointsTask implements PipelineExecutionTask { PipelineExecutionInfo executionInfo) { for (PipelineElementPartitioner.ResourceUnitWithServices unit : PipelineElementPartitioner.partitionPipeline(pipeline).getResourceUnits()){ SpServiceRegistration registration = LoadManager.allocation(unit.getCompatibleServices(), pipeline.getLabels()); - String url = registration.getServiceUrl(); - for (InvocableStreamPipesEntity el : unit.getResourceUnit().getElements()) { - try { - var endpointUrl = getSelectedEndpoint(el, url); - applyEndpointAndPipeline(pipeline.getPipelineId(), el, endpointUrl); - } catch (NoServiceEndpointsAvailableException en) { - executionInfo.addFailedPipelineElement(el); - } + if (Objects.nonNull(registration)) { + String endpointBaseUrl = registration.getServiceUrl(); + unit.getResourceUnit().getElements().forEach(el -> { + applyEndpointAndPipeline(pipeline.getPipelineId(), el, endpointBaseUrl); + }); + } else { + unit.getResourceUnit().getElements().forEach(executionInfo::addFailedPipelineElement); } } @@ -62,7 +58,7 @@ public class DiscoverEndpointsTask implements PipelineExecutionTask { pipeline.getName(), "Could not start pipeline " + pipeline.getName() + ".", pe); - executionInfo.applyPipelineOperationStatus(status); + executionInfo.setPipelineOperationStatus(status); } } @@ -72,19 +68,4 @@ public class DiscoverEndpointsTask implements PipelineExecutionTask { pipelineElement.setSelectedEndpointUrl(endpointUrl); pipelineElement.setCorrespondingPipeline(pipelineId); } - - private String findSelectedEndpoint(InvocableStreamPipesEntity pipelineElement) - throws NoServiceEndpointsAvailableException { - return new ExtensionsServiceEndpointGenerator() - .getEndpointResourceUrl( - pipelineElement.getAppId(), - ExtensionsServiceEndpointUtils.getPipelineElementType(pipelineElement) - ); - } - private String getSelectedEndpoint(InvocableStreamPipesEntity pipelineElement, String url) - throws NoServiceEndpointsAvailableException { - return ExtensionsServiceEndpointUtils - .getPipelineElementType(pipelineElement) - .getInvocationUrl(url,pipelineElement.getAppId()); - } } diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/task/SubmitRequestTask.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/task/SubmitRequestTask.java index 4805fe9b47..26c2b71075 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/task/SubmitRequestTask.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/task/SubmitRequestTask.java @@ -19,16 +19,16 @@ package org.apache.streampipes.manager.execution.task; import org.apache.streampipes.manager.execution.PipelineExecutionInfo; -import org.apache.streampipes.manager.execution.http.PipelineElementSubmitter; +import org.apache.streampipes.manager.execution.http.BasePipelineElementSubmitter; import org.apache.streampipes.manager.execution.provider.PipelineElementProvider; import org.apache.streampipes.model.pipeline.Pipeline; public class SubmitRequestTask implements PipelineExecutionTask { private final PipelineElementProvider elementProvider; - private final PipelineElementSubmitter submitter; + private final BasePipelineElementSubmitter submitter; - public SubmitRequestTask(PipelineElementSubmitter submitter, + public SubmitRequestTask(BasePipelineElementSubmitter submitter, PipelineElementProvider elementProvider) { this.elementProvider = elementProvider; this.submitter = submitter; @@ -36,7 +36,7 @@ public class SubmitRequestTask implements PipelineExecutionTask { @Override public boolean shouldExecute(PipelineExecutionInfo executionInfo) { - return executionInfo.getFailedServices().size() == 0; + return executionInfo.getFailedServices().isEmpty(); } @Override @@ -45,6 +45,6 @@ public class SubmitRequestTask implements PipelineExecutionTask { var status = submitter.submit(processorsAndSinks); - executionInfo.applyPipelineOperationStatus(status); + executionInfo.setPipelineOperationStatus(status); } } diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/health/PipelineElementEndpointHealthCheck.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/health/PipelineElementEndpointHealthCheck.java deleted file mode 100644 index 9bec880a89..0000000000 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/health/PipelineElementEndpointHealthCheck.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -package org.apache.streampipes.manager.health; - -import org.apache.streampipes.manager.execution.ExtensionServiceExecutions; -import org.apache.streampipes.serializers.json.JacksonSerializer; - -import com.fasterxml.jackson.core.JsonProcessingException; - -import java.io.IOException; -import java.util.Arrays; -import java.util.List; - -public class PipelineElementEndpointHealthCheck { - - private static final String InstancePath = "/instances"; - - private final String endpointUrl; - - public PipelineElementEndpointHealthCheck(String endpointUrl) { - this.endpointUrl = endpointUrl; - } - - public List<String> checkRunningInstances() throws IOException { - var request = ExtensionServiceExecutions.extServiceGetRequest(makeRequestUrl()); - return asList(request.execute().returnContent().toString()); - } - - private List<String> asList(String json) throws JsonProcessingException { - return Arrays.asList(JacksonSerializer.getObjectMapper().readValue(json, String[].class)); - } - - private String makeRequestUrl() { - return endpointUrl + InstancePath; - } -} diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/health/PipelineHealthCheck.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/health/PipelineHealthCheck.java deleted file mode 100644 index 707b6cce09..0000000000 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/health/PipelineHealthCheck.java +++ /dev/null @@ -1,261 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -package org.apache.streampipes.manager.health; - -import org.apache.streampipes.commons.constants.InstanceIdExtractor; -import org.apache.streampipes.commons.exceptions.NoServiceEndpointsAvailableException; -import org.apache.streampipes.commons.prometheus.pipelines.PipelinesStats; -import org.apache.streampipes.manager.execution.endpoint.ExtensionsServiceEndpointUtils; -import org.apache.streampipes.manager.execution.http.InvokeHttpRequest; -import org.apache.streampipes.manager.storage.RunningPipelineElementStorage; -import org.apache.streampipes.model.base.InvocableStreamPipesEntity; -import org.apache.streampipes.model.pipeline.Pipeline; -import org.apache.streampipes.model.pipeline.PipelineHealthStatus; -import org.apache.streampipes.storage.management.StorageDispatcher; -import org.apache.streampipes.svcdiscovery.api.model.SpServiceUrlProvider; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.time.LocalDateTime; -import java.time.format.DateTimeFormatter; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Set; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.stream.Collectors; - -import static org.apache.streampipes.manager.pipeline.PipelineManager.getPipeline; - -public class PipelineHealthCheck implements Runnable { - - private static final Logger LOG = LoggerFactory.getLogger(PipelineHealthCheck.class); - private static final int MAX_FAILED_ATTEMPTS = 10; - - private static final Map<String, Integer> failedRestartAttempts = new HashMap<>(); - - private static final PipelinesStats pipelinesStats = new PipelinesStats(); - - public PipelineHealthCheck() { - - } - - public void checkAndRestorePipelineElements() { - List<Pipeline> allPipelines = getAllPipelines(); - List<Pipeline> runningPipelines = getRunningPipelines(allPipelines); - - pipelinesStats.clear(); - pipelinesStats.setAllPipelines(allPipelines.size()); - pipelinesStats.setRunningPipelines(runningPipelines.size()); - pipelinesStats.setStoppedPipelines(pipelinesStats.getAllPipelines() - - pipelinesStats.getRunningPipelines()); - - for (Pipeline p : allPipelines) { - pipelinesStats.updatePipelineHealthState( - p.getElementId(), - p.getName(), - p.getHealthStatus().toString()); - - pipelinesStats.updatePipelineRunningState( - p.getElementId(), - p.getName(), - p.isRunning()); - } - - if (!runningPipelines.isEmpty()) { - Map<String, List<InvocableStreamPipesEntity>> endpointMap = generateEndpointMap(); - List<String> allRunningInstances = findRunningInstances(endpointMap.keySet()); - - runningPipelines.forEach(pipeline -> { - AtomicBoolean shouldUpdatePipeline = new AtomicBoolean(false); - List<String> failedInstances = new ArrayList<>(); - List<String> recoveredInstances = new ArrayList<>(); - List<String> pipelineNotifications = new ArrayList<>(); - List<InvocableStreamPipesEntity> graphs = RunningPipelineElementStorage.runningProcessorsAndSinks - .get(pipeline.getPipelineId()); - - if (Objects.nonNull(graphs)) { - graphs.forEach(graph -> { - String instanceId = extractInstanceId(graph); - if (allRunningInstances.stream() - .noneMatch(runningInstanceId -> runningInstanceId.equals(instanceId))) { - if (shouldRetry(instanceId)) { - String endpointUrl = graph.getSelectedEndpointUrl(); - shouldUpdatePipeline.set(true); - boolean success; - try { - endpointUrl = findEndpointUrl(graph); - success = new InvokeHttpRequest() - .execute(graph, endpointUrl, pipeline.getPipelineId()).isSuccess(); - } catch (NoServiceEndpointsAvailableException e) { - success = false; - } - if (!success) { - failedInstances.add(instanceId); - addFailedAttemptNotification(pipelineNotifications, graph); - increaseFailedAttempt(instanceId); - LOG.info("Could not restore pipeline element {} of pipeline {} ({}/{})", - graph.getName(), pipeline.getName(), failedRestartAttempts.get(instanceId), - MAX_FAILED_ATTEMPTS); - } else { - recoveredInstances.add(instanceId); - addSuccessfulRestoreNotification(pipelineNotifications, graph); - resetFailedAttempts(instanceId); - graph.setSelectedEndpointUrl(endpointUrl); - LOG.info("Successfully restored pipeline element {} of pipeline {}", - graph.getName(), pipeline.getName()); - } - } - } - }); - } - if (shouldUpdatePipeline.get()) { - var currentPipeline = getPipeline(pipeline.getPipelineId()); - if (!failedInstances.isEmpty()) { - currentPipeline.setHealthStatus(PipelineHealthStatus.FAILURE); - pipelinesStats.failedIncrease(); - } else if (!recoveredInstances.isEmpty()) { - currentPipeline.setHealthStatus(PipelineHealthStatus.REQUIRES_ATTENTION); - pipelinesStats.attentionRequiredIncrease(); - } - currentPipeline.setPipelineNotifications(pipelineNotifications); - StorageDispatcher.INSTANCE.getNoSqlStore().getPipelineStorageAPI() - .updateElement(currentPipeline); - pipelinesStats.updatePipelineHealthState(currentPipeline.getElementId(), currentPipeline.getName(), - currentPipeline.getHealthStatus().toString()); - } - - }); - int healthNum = pipelinesStats.getRunningPipelines() - pipelinesStats.getFailedPipelines() - - pipelinesStats.getAttentionRequiredPipelines(); - pipelinesStats.setHealthyPipelines(healthNum); - pipelinesStats.setElementCount(getElementsCount(allPipelines)); - } - pipelinesStats.metrics(); - } - - private String findEndpointUrl(InvocableStreamPipesEntity graph) - throws NoServiceEndpointsAvailableException { - SpServiceUrlProvider serviceUrlProvider = ExtensionsServiceEndpointUtils.getPipelineElementType(graph); - return serviceUrlProvider.getInvocationUrl(graph.getSelectedEndpointUrl(), graph.getAppId()); - } - - private boolean shouldRetry(String instanceId) { - if (!failedRestartAttempts.containsKey(instanceId)) { - return true; - } else { - return failedRestartAttempts.get(instanceId) < MAX_FAILED_ATTEMPTS; - } - } - - private void resetFailedAttempts(String instanceId) { - failedRestartAttempts.put(instanceId, 0); - } - - private void increaseFailedAttempt(String instanceId) { - if (!failedRestartAttempts.containsKey(instanceId)) { - failedRestartAttempts.put(instanceId, 1); - } else { - Integer currentAttempt = failedRestartAttempts.get(instanceId) + 1; - failedRestartAttempts.put(instanceId, currentAttempt); - } - } - - private void addSuccessfulRestoreNotification(List<String> pipelineNotifications, - InvocableStreamPipesEntity graph) { - pipelineNotifications.add(getCurrentDatetime() + "Pipeline element '" + graph.getName() - + "' was not available and was successfully restored."); - } - - private void addFailedAttemptNotification(List<String> pipelineNotifications, - InvocableStreamPipesEntity graph) { - pipelineNotifications.add(getCurrentDatetime() + "Pipeline element '" + graph.getName() - + "' was not available and could not be restored."); - } - - private String getCurrentDatetime() { - DateTimeFormatter dtf = DateTimeFormatter.ofPattern("uuuu/MM/dd HH:mm:ss"); - LocalDateTime now = LocalDateTime.now(); - return "[" + dtf.format(now) + "] "; - } - - private String extractInstanceId(InvocableStreamPipesEntity graph) { - return InstanceIdExtractor.extractId(graph.getElementId()); - } - - private List<String> findRunningInstances(Set<String> endpoints) { - List<String> allRunningInstances = new ArrayList<>(); - endpoints.forEach(endpoint -> { - try { - allRunningInstances - .addAll(new PipelineElementEndpointHealthCheck(endpoint).checkRunningInstances()); - } catch (IOException e) { - LOG.error("Pipeline element endpoint {} is unavailable", endpoint); - } - }); - - return allRunningInstances; - } - - private Map<String, List<InvocableStreamPipesEntity>> generateEndpointMap() { - Map<String, List<InvocableStreamPipesEntity>> endpointMap = new HashMap<>(); - RunningPipelineElementStorage.runningProcessorsAndSinks - .forEach((pipelineId, graphs) -> graphs.forEach(graph -> addEndpoint(endpointMap, graph))); - - return endpointMap; - } - - private void addEndpoint(Map<String, List<InvocableStreamPipesEntity>> endpointMap, - InvocableStreamPipesEntity graph) { - String selectedEndpoint = graph.getSelectedEndpointUrl(); - if (!endpointMap.containsKey(selectedEndpoint)) { - endpointMap.put(selectedEndpoint, new ArrayList<>()); - } - List<InvocableStreamPipesEntity> existingGraphs = endpointMap.get(selectedEndpoint); - existingGraphs.add(graph); - endpointMap.put(selectedEndpoint, existingGraphs); - } - - @Override - public void run() { - try { - this.checkAndRestorePipelineElements(); - } catch (Exception e) { - LOG.error("Error while checking and restoring pipeline elements", e); - } - - } - - private List<Pipeline> getRunningPipelines(List<Pipeline> allPipelines) { - return allPipelines.stream().filter(Pipeline::isRunning).collect(Collectors.toList()); - - } - - private List<Pipeline> getAllPipelines() { - return StorageDispatcher.INSTANCE.getNoSqlStore().getPipelineStorageAPI().findAll(); - } - - private int getElementsCount(List<Pipeline> allPipelines) { - return allPipelines.stream().mapToInt(pipeline -> pipeline.getActions().size()).sum(); - - } -} diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/storage/PipelineStorageService.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/storage/PipelineStorageService.java index ebd3d99601..09709fac7b 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/storage/PipelineStorageService.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/storage/PipelineStorageService.java @@ -68,10 +68,6 @@ public class PipelineStorageService { SecretProvider.getEncryptionService().apply(graphs); } - private void encryptSecrets(Pipeline pipeline) { - SecretProvider.getEncryptionService().apply(pipeline); - } - private <T> List<T> filter(List<InvocableStreamPipesEntity> graphs, Class<T> clazz) { return graphs .stream() diff --git a/streampipes-rest-extensions/src/main/java/org/apache/streampipes/rest/extensions/monitoring/HealthCheckResource.java b/streampipes-rest-extensions/src/main/java/org/apache/streampipes/rest/extensions/monitoring/HealthCheckResource.java new file mode 100644 index 0000000000..959c869a08 --- /dev/null +++ b/streampipes-rest-extensions/src/main/java/org/apache/streampipes/rest/extensions/monitoring/HealthCheckResource.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.rest.extensions.monitoring; + +import org.apache.streampipes.commons.constants.InstanceIdExtractor; +import org.apache.streampipes.extensions.management.connect.AdapterWorkerManagement; +import org.apache.streampipes.extensions.management.init.DeclarersSingleton; +import org.apache.streampipes.extensions.management.init.RunningAdapterInstances; +import org.apache.streampipes.extensions.management.init.RunningInstances; +import org.apache.streampipes.model.base.NamedStreamPipesEntity; +import org.apache.streampipes.model.health.ExtensionInstanceHealth; +import org.apache.streampipes.rest.extensions.AbstractExtensionsResource; + +import org.springframework.http.MediaType; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +import java.util.stream.Collectors; + +@RestController +@RequestMapping("health") +public class HealthCheckResource extends AbstractExtensionsResource { + + private final AdapterWorkerManagement adapterManagement = new AdapterWorkerManagement( + RunningAdapterInstances.INSTANCE, + DeclarersSingleton.getInstance() + ); + + @GetMapping(produces = MediaType.APPLICATION_JSON_VALUE) + public ResponseEntity<ExtensionInstanceHealth> getRunningInstances() { + + var runningAdapterInstances = adapterManagement.getAllRunningAdapterInstances() + .stream() + .map(NamedStreamPipesEntity::getElementId) + .collect(Collectors.toSet()); + + var runningPipelineElementInstances = RunningInstances.INSTANCE.getRunningInstanceIds() + .stream() + .map(InstanceIdExtractor::extractId) + .collect(Collectors.toSet()); + + var instanceHealth = new ExtensionInstanceHealth( + runningAdapterInstances, + runningPipelineElementInstances + ); + + return ok(instanceHealth); + } +} diff --git a/streampipes-rest/pom.xml b/streampipes-rest/pom.xml index ae360fffc6..6650568127 100644 --- a/streampipes-rest/pom.xml +++ b/streampipes-rest/pom.xml @@ -55,6 +55,11 @@ <artifactId>streampipes-data-export</artifactId> <version>0.99.0-SNAPSHOT</version> </dependency> + <dependency> + <groupId>org.apache.streampipes</groupId> + <artifactId>streampipes-health-monitoring</artifactId> + <version>0.99.0-SNAPSHOT</version> + </dependency> <dependency> <groupId>org.apache.streampipes</groupId> <artifactId>streampipes-measurement-units</artifactId> diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/admin/MigrationResource.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/admin/MigrationResource.java index 58baea3101..d35057c9da 100644 --- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/admin/MigrationResource.java +++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/admin/MigrationResource.java @@ -21,7 +21,7 @@ package org.apache.streampipes.rest.impl.admin; import org.apache.streampipes.connect.management.management.AdapterMigrationManager; import org.apache.streampipes.manager.health.CoreInitialInstallationProgress; import org.apache.streampipes.manager.health.CoreServiceStatusManager; -import org.apache.streampipes.manager.health.ServiceRegistrationManager; +import org.apache.streampipes.health.monitoring.ServiceRegistrationManager; import org.apache.streampipes.manager.migration.PipelineElementMigrationManager; import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceRegistration; import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceStatus; diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/admin/ServiceRegistrationResource.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/admin/ServiceRegistrationResource.java index 69b6e3d5ec..fc243d5d56 100644 --- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/admin/ServiceRegistrationResource.java +++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/admin/ServiceRegistrationResource.java @@ -17,7 +17,7 @@ */ package org.apache.streampipes.rest.impl.admin; -import org.apache.streampipes.manager.health.ServiceRegistrationManager; +import org.apache.streampipes.health.monitoring.ServiceRegistrationManager; import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceRegistration; import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceStatus; import org.apache.streampipes.model.message.Notifications; diff --git a/streampipes-service-core/pom.xml b/streampipes-service-core/pom.xml index 25830676dc..b898e78458 100644 --- a/streampipes-service-core/pom.xml +++ b/streampipes-service-core/pom.xml @@ -54,6 +54,11 @@ </exclusion> </exclusions> </dependency> + <dependency> + <groupId>org.apache.streampipes</groupId> + <artifactId>streampipes-health-monitoring</artifactId> + <version>0.99.0-SNAPSHOT</version> + </dependency> <dependency> <groupId>org.apache.streampipes</groupId> <artifactId>streampipes-messaging-jms</artifactId> diff --git a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/PostStartupTask.java b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/PostStartupTask.java index 06c7257bf0..ce886d620e 100644 --- a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/PostStartupTask.java +++ b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/PostStartupTask.java @@ -21,7 +21,7 @@ package org.apache.streampipes.service.core; import org.apache.streampipes.commons.prometheus.adapter.AdapterMetricsManager; import org.apache.streampipes.connect.management.management.WorkerAdministrationManagement; import org.apache.streampipes.manager.execution.PipelineExecutor; -import org.apache.streampipes.manager.health.ServiceHealthCheck; +import org.apache.streampipes.health.monitoring.ServiceHealthCheck; import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceTagPrefix; import org.apache.streampipes.model.pipeline.Pipeline; import org.apache.streampipes.model.pipeline.PipelineOperationStatus; @@ -66,7 +66,7 @@ public class PostStartupTask implements Runnable { @Override public void run() { - new ServiceHealthCheck().run(); + new ServiceHealthCheck(StorageDispatcher.INSTANCE.getNoSqlStore().getExtensionsServiceStorage()).run(); performAdapterAssetUpdate(); startAllPreviouslyStoppedPipelines(); startAdapters(); diff --git a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/StreamPipesCoreApplication.java b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/StreamPipesCoreApplication.java index d1888420a0..6a275b1ea1 100644 --- a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/StreamPipesCoreApplication.java +++ b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/StreamPipesCoreApplication.java @@ -19,14 +19,14 @@ package org.apache.streampipes.service.core; import org.apache.streampipes.commons.environment.Environments; import org.apache.streampipes.commons.prometheus.adapter.AdapterMetricsManager; -import org.apache.streampipes.connect.management.health.AdapterHealthCheck; import org.apache.streampipes.connect.management.management.AdapterMasterManagement; +import org.apache.streampipes.health.monitoring.ExtensionHealthCheck; +import org.apache.streampipes.health.monitoring.ResourceProvider; +import org.apache.streampipes.health.monitoring.ServiceHealthCheck; import org.apache.streampipes.loadbalance.LoadManager; import org.apache.streampipes.loadbalance.pipeline.ExtensionsServiceLogExecutor; import org.apache.streampipes.manager.health.CoreInitialInstallationProgress; import org.apache.streampipes.manager.health.CoreServiceStatusManager; -import org.apache.streampipes.manager.health.PipelineHealthCheck; -import org.apache.streampipes.manager.health.ServiceHealthCheck; import org.apache.streampipes.manager.pipeline.PipelineManager; import org.apache.streampipes.manager.setup.AutoInstallation; import org.apache.streampipes.manager.setup.StreamPipesEnvChecker; @@ -54,6 +54,7 @@ import org.apache.streampipes.storage.management.StorageDispatcher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import org.springframework.boot.autoconfigure.EnableAutoConfiguration; import org.springframework.context.annotation.ComponentScan; import org.springframework.context.annotation.Configuration; @@ -138,14 +139,19 @@ public class StreamPipesCoreApplication extends StreamPipesServiceBase { TimeUnit.MILLISECONDS); scheduleHealthChecks(env.getHealthCheckIntervalInMillis().getValueOrDefault(), List - .of(new ServiceHealthCheck(), new PipelineHealthCheck(), - new AdapterHealthCheck( - StorageDispatcher.INSTANCE.getNoSqlStore().getAdapterInstanceStorage(), - new AdapterMasterManagement( - StorageDispatcher.INSTANCE.getNoSqlStore().getAdapterInstanceStorage(), - new SpResourceManager().manageAdapters(), - new SpResourceManager().manageDataStreams(), - AdapterMetricsManager.INSTANCE.getAdapterMetrics())))); + .of(new ServiceHealthCheck(StorageDispatcher.INSTANCE.getNoSqlStore().getExtensionsServiceStorage()), + new ExtensionHealthCheck( + new ResourceProvider( + StorageDispatcher.INSTANCE.getNoSqlStore().getPipelineStorageAPI(), + StorageDispatcher.INSTANCE.getNoSqlStore().getAdapterInstanceStorage(), + new AdapterMasterManagement( + StorageDispatcher.INSTANCE.getNoSqlStore().getAdapterInstanceStorage(), + new SpResourceManager().manageAdapters(), + new SpResourceManager().manageDataStreams(), + AdapterMetricsManager.INSTANCE.getAdapterMetrics() + ) + ) + ))); var logFetchInterval = env.getLogFetchIntervalInMillis().getValueOrDefault(); LOG.info("Extensions logs will be fetched every {} milliseconds", logFetchInterval);
