This is an automated email from the ASF dual-hosted git repository.
riemer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/streampipes.git
The following commit(s) were added to refs/heads/dev by this push:
new 63dd142d4a refactor: Improve health checks (#4096)
63dd142d4a is described below
commit 63dd142d4a658fa3ad40fbea2ba12c57758d6f83
Author: Dominik Riemer <[email protected]>
AuthorDate: Thu Jan 15 15:19:02 2026 +0100
refactor: Improve health checks (#4096)
---
pom.xml | 1 +
.../apache/streampipes/commons/constants/Envs.java | 30 +--
.../commons/environment/DefaultEnvironment.java | 24 +-
.../management/health/AdapterHealthCheck.java | 245 -------------------
.../management/WorkerAdministrationManagement.java | 70 ++----
.../management/health/AdapterHealthCheckTest.java | 96 --------
.../influx/DataExplorerInfluxQueryExecutor.java | 2 +-
.../management/init/RunningInstances.java | 10 +
streampipes-health-monitoring/pom.xml | 76 ++++++
.../health/monitoring/AdapterHealthCheck.java | 168 +++++++++++++
.../health/monitoring/ExtensionHealthCheck.java | 57 +++++
.../ExtensionInstanceAvailabilityCheck.java | 67 ++++++
.../health/monitoring/PipelineHealthCheck.java | 210 +++++++++++++++++
.../health/monitoring/PostStartupRecovery.java | 62 +++++
.../health/monitoring/ResourceProvider.java | 104 ++++++++
.../health/monitoring}/ServiceHealthCheck.java | 22 +-
.../monitoring}/ServiceRegistrationManager.java | 6 +-
.../monitoring/model/ActiveCoreInstances.java | 10 +-
.../health/monitoring/model/ActiveResources.java | 15 +-
.../health/monitoring/model/HealthCheckData.java | 15 +-
.../health/monitoring/utils/HealthCheckUtils.java | 53 +++++
.../model/health/ExtensionInstanceHealth.java | 13 +-
.../streampipes/model/util/PropertyUtils.java | 4 -
.../execution/ExtensionServiceExecutions.java | 1 -
.../manager/execution/PipelineExecutionInfo.java | 4 +-
.../execution/PipelineExecutionTaskFactory.java | 6 +-
.../ExtensionsServiceEndpointGenerator.java | 12 +-
...tter.java => BasePipelineElementSubmitter.java} | 23 +-
.../http/DetachPipelineElementSubmitter.java | 5 +-
.../http/InvokePipelineElementSubmitter.java | 9 +-
.../provider/StoredPipelineElementProvider.java | 42 ----
.../execution/task/AfterInvocationTask.java | 13 +-
.../execution/task/DiscoverEndpointsTask.java | 37 +--
.../manager/execution/task/SubmitRequestTask.java | 16 +-
.../health/PipelineElementEndpointHealthCheck.java | 51 ----
.../manager/health/PipelineHealthCheck.java | 261 ---------------------
.../manager/storage/PipelineStorageService.java | 4 -
.../PipelineElementUtils.java} | 23 +-
.../extensions/monitoring/HealthCheckResource.java | 67 ++++++
streampipes-rest/pom.xml | 5 +
.../rest/impl/admin/MigrationResource.java | 2 +-
.../impl/admin/ServiceRegistrationResource.java | 2 +-
.../rest/impl/connect/AbstractAdapterResource.java | 4 +
.../impl/connect/RuntimeResolvableResource.java | 14 +-
streampipes-service-core/pom.xml | 5 +
.../streampipes/service/core/PostStartupTask.java | 61 +++--
.../service/core/StreamPipesCoreApplication.java | 28 ++-
47 files changed, 1088 insertions(+), 967 deletions(-)
diff --git a/pom.xml b/pom.xml
index 3c5025b797..bb4369da70 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1134,6 +1134,7 @@
<module>streampipes-connect-transformer-api</module>
<module>streampipes-connect-transformer-groovy</module>
<module>streampipes-connect-transformer-js</module>
+ <module>streampipes-health-monitoring</module>
</modules>
<profiles>
diff --git
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/Envs.java
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/Envs.java
index 148fa66abd..990629b0f1 100644
---
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/Envs.java
+++
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/Envs.java
@@ -102,21 +102,21 @@ public enum Envs {
SP_PULSAR_URL("SP_PULSAR_URL", "pulsar://localhost:6650"),
- CPU_RESOURCE_WEIGHT("CPU_RESOURCE_WEIGHT", "1.0"),
- MEMORY_RESOURCE_WEIGHT("MEMORY_RESOURCE_WEIGHT", "1.0"),
-
- DIR_MEMORY_RESOURCE_WEIGHT("DIR_MEMORY_RESOURCE_WEIGHT", "1.0"),
- BANDWIDTH_IN_RESOURCE_WEIGHT("BANDWIDTH_IN_RESOURCE_WEIGHT", "1.0"),
- BANDWIDTH_OUT_RESOURCE_WEIGHT("BANDWIDTH_OUT_RESOURCE_WEIGHT", "1.0"),
- THRESHOLD_MIGRATOR_PERCENTAGE("THRESHOLD_MIGRATOR_PERCENTAGE", "20.0"),
- MIN_MIGRATOR_PERCENTAGE("MIN_MIGRATOR_PERCENTAGE", "20.0"),
- OVERLOADED_THRESHOLD_PERCENTAGE("OVERLOADED_THRESHOLD_PERCENTAGE", "85"),
- HISTORY_RESOURCE_PERCENTAGE("HISTORY_RESOURCE_PERCENTAGE", "0.9"),
-
MSG_RATE_DIFFERENCE_MIGRATOR_THRESHOLD("MSG_RATE_DIFFERENCE_MIGRATOR_THRESHOLD",
"85"),
- LOAD_TARGET_STD("LOAD_TARGET_STD", "25.0"),
- SELECTOR("SELECTOR", "WeightedRandomSelector"),
- MIGRATOR("MIGRATOR", "ThresholdMigrator"),
- LOAD_MANAGER_ENABLE("LOAD_MANAGER_ENABLE", "true"),
+ CPU_RESOURCE_WEIGHT("SP_CPU_RESOURCE_WEIGHT", "1.0"),
+ MEMORY_RESOURCE_WEIGHT("SP_MEMORY_RESOURCE_WEIGHT", "1.0"),
+
+ SP_DIR_MEMORY_RESOURCE_WEIGHT("SP_DIR_MEMORY_RESOURCE_WEIGHT", "1.0"),
+ SP_BANDWIDTH_IN_RESOURCE_WEIGHT("SP_BANDWIDTH_IN_RESOURCE_WEIGHT", "1.0"),
+ SP_BANDWIDTH_OUT_RESOURCE_WEIGHT("SP_BANDWIDTH_OUT_RESOURCE_WEIGHT", "1.0"),
+ SP_THRESHOLD_MIGRATOR_PERCENTAGE("SP_THRESHOLD_MIGRATOR_PERCENTAGE", "20.0"),
+ SP_MIN_MIGRATOR_PERCENTAGE("SP_MIN_MIGRATOR_PERCENTAGE", "20.0"),
+ SP_OVERLOADED_THRESHOLD_PERCENTAGE("SP_OVERLOADED_THRESHOLD_PERCENTAGE",
"85"),
+ SP_HISTORY_RESOURCE_PERCENTAGE("SP_HISTORY_RESOURCE_PERCENTAGE", "0.9"),
+
SP_MSG_RATE_DIFFERENCE_MIGRATOR_THRESHOLD("SP_MSG_RATE_DIFFERENCE_MIGRATOR_THRESHOLD",
"85"),
+ SP_LOAD_TARGET_STD("SP_LOAD_TARGET_STD", "25.0"),
+ SP_SELECTOR("SP_SELECTOR", "WeightedRandomSelector"),
+ SP_MIGRATOR("SP_MIGRATOR", "ThresholdMigrator"),
+ SP_LOAD_MANAGER_ENABLE("SP_LOAD_MANAGER_ENABLE", "false"),
// expects a comma separated string of service names
SP_SERVICE_TAGS("SP_SERVICE_TAGS", ""),
diff --git
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/DefaultEnvironment.java
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/DefaultEnvironment.java
index 2e93ea7f80..9f5a2d997a 100644
---
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/DefaultEnvironment.java
+++
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/DefaultEnvironment.java
@@ -444,62 +444,62 @@ public class DefaultEnvironment implements Environment {
@Override
public DoubleEnvironmentVariable getDirMemoryResourceWeight() {
- return new DoubleEnvironmentVariable(Envs.DIR_MEMORY_RESOURCE_WEIGHT);
+ return new DoubleEnvironmentVariable(Envs.SP_DIR_MEMORY_RESOURCE_WEIGHT);
}
@Override
public DoubleEnvironmentVariable getBandwidthInResourceWeight() {
- return new DoubleEnvironmentVariable(Envs.BANDWIDTH_IN_RESOURCE_WEIGHT);
+ return new DoubleEnvironmentVariable(Envs.SP_BANDWIDTH_IN_RESOURCE_WEIGHT);
}
@Override
public DoubleEnvironmentVariable getBandwidthOutResourceWeight() {
- return new DoubleEnvironmentVariable(Envs.BANDWIDTH_OUT_RESOURCE_WEIGHT);
+ return new
DoubleEnvironmentVariable(Envs.SP_BANDWIDTH_OUT_RESOURCE_WEIGHT);
}
@Override
public FloatEnvironmentVariable getThresholdMigratorPercentage() {
- return new FloatEnvironmentVariable(Envs.THRESHOLD_MIGRATOR_PERCENTAGE);
+ return new FloatEnvironmentVariable(Envs.SP_THRESHOLD_MIGRATOR_PERCENTAGE);
}
@Override
public FloatEnvironmentVariable getMinMigratorPercentage() {
- return new FloatEnvironmentVariable(Envs.MIN_MIGRATOR_PERCENTAGE);
+ return new FloatEnvironmentVariable(Envs.SP_MIN_MIGRATOR_PERCENTAGE);
}
@Override
public FloatEnvironmentVariable getOverloadedThresholdPercentage() {
- return new FloatEnvironmentVariable(Envs.OVERLOADED_THRESHOLD_PERCENTAGE);
+ return new
FloatEnvironmentVariable(Envs.SP_OVERLOADED_THRESHOLD_PERCENTAGE);
}
@Override
public FloatEnvironmentVariable getHistoryResourcePercentage() {
- return new FloatEnvironmentVariable(Envs.HISTORY_RESOURCE_PERCENTAGE);
+ return new FloatEnvironmentVariable(Envs.SP_HISTORY_RESOURCE_PERCENTAGE);
}
@Override
public IntEnvironmentVariable getMsgRateDifferenceMigratorThreshold() {
- return new
IntEnvironmentVariable(Envs.MSG_RATE_DIFFERENCE_MIGRATOR_THRESHOLD);
+ return new
IntEnvironmentVariable(Envs.SP_MSG_RATE_DIFFERENCE_MIGRATOR_THRESHOLD);
}
@Override
public FloatEnvironmentVariable getLoadTargetStd() {
- return new FloatEnvironmentVariable(Envs.LOAD_TARGET_STD);
+ return new FloatEnvironmentVariable(Envs.SP_LOAD_TARGET_STD);
}
@Override
public StringEnvironmentVariable getSelector() {
- return new StringEnvironmentVariable(Envs.SELECTOR);
+ return new StringEnvironmentVariable(Envs.SP_SELECTOR);
}
@Override
public StringEnvironmentVariable getMigrator() {
- return new StringEnvironmentVariable(Envs.MIGRATOR);
+ return new StringEnvironmentVariable(Envs.SP_MIGRATOR);
}
@Override
public BooleanEnvironmentVariable getLoadManagerEnable() {
- return new BooleanEnvironmentVariable(Envs.LOAD_MANAGER_ENABLE);
+ return new BooleanEnvironmentVariable(Envs.SP_LOAD_MANAGER_ENABLE);
}
@Override
diff --git
a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/health/AdapterHealthCheck.java
b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/health/AdapterHealthCheck.java
deleted file mode 100644
index 5142d5f4fe..0000000000
---
a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/health/AdapterHealthCheck.java
+++ /dev/null
@@ -1,245 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.connect.management.health;
-
-import org.apache.streampipes.commons.exceptions.connect.AdapterException;
-import org.apache.streampipes.commons.prometheus.adapter.AdapterMetrics;
-import org.apache.streampipes.commons.prometheus.adapter.AdapterMetricsManager;
-import
org.apache.streampipes.connect.management.management.AdapterMasterManagement;
-import org.apache.streampipes.connect.management.management.WorkerRestClient;
-import org.apache.streampipes.connect.management.util.WorkerPaths;
-import org.apache.streampipes.loadbalance.pipeline.ExtensionsLogProvider;
-import org.apache.streampipes.model.connect.adapter.AdapterDescription;
-import org.apache.streampipes.storage.api.IAdapterStorage;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.NoSuchElementException;
-import java.util.stream.Collectors;
-
-public class AdapterHealthCheck implements Runnable {
-
- private static final Logger LOG =
LoggerFactory.getLogger(AdapterHealthCheck.class);
-
- private final IAdapterStorage adapterStorage;
- private final AdapterMasterManagement adapterMasterManagement;
-
- public AdapterHealthCheck(IAdapterStorage adapterStorage,
- AdapterMasterManagement adapterMasterManagement) {
- this.adapterStorage = adapterStorage;
- this.adapterMasterManagement = adapterMasterManagement;
- }
-
- @Override
- public void run() {
- this.checkAndRestoreAdapters();
- }
-
- /**
- * In this method it is checked which adapters are currently running. Then
it calls all workers to
- * validate if the adapter instance is still running as expected. If the
adapter is not running
- * anymore a new worker instance is invoked. In addition, it publishes
monitoring metrics for all
- * running adapters (in line with
- * {@link org.apache.streampipes.manager.health.PipelineHealthCheck}).
- */
- public void checkAndRestoreAdapters() {
- LOG.info("Adapter health check started");
- // Get all adapters that are supposed to run according to the backend
storage
- Map<String, AdapterDescription> adapterInstancesSupposedToRun =
- this.getAllAdaptersSupposedToRun();
-
- // group all adapter instances supposed to run by their worker service URL
- Map<String, List<AdapterDescription>> groupByWorker =
- this.getAllWorkersWithAdapters(adapterInstancesSupposedToRun);
-
- // Get adapters that are not running anymore
- Map<String, AdapterDescription> allAdaptersToRecover =
- this.getAdaptersToRecover(groupByWorker,
adapterInstancesSupposedToRun);
-
- allAdaptersToRecover
- .keySet()
- .forEach(adapterId ->
- LOG.info("Adapter instance with id {} needs to be
recovered", adapterId));
-
- try {
- if (!adapterInstancesSupposedToRun.isEmpty()) {
- // Filter adapters so that only healthy and running adapters are
updated in the metrics
- // endpoint
- var adaptersToMonitor =
adapterInstancesSupposedToRun.entrySet().stream()
- .filter((entry ->
!allAdaptersToRecover.containsKey(entry.getKey())))
- .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
-
- if (!adaptersToMonitor.isEmpty()) {
- updateMonitoringMetrics(adaptersToMonitor);
- } else {
- LOG.info("No running adapter instances to monitor.");
- }
- }
- } catch (NoSuchElementException e) {
- LOG.error("Could not update adapter metrics due to an invalid state.
({})", e.getMessage());
- }
-
- LOG.info("Monitoring metrics updated for running adapters.");
-
- // Recover Adapters
- this.recoverAdapters(allAdaptersToRecover);
- }
-
- /**
- * Updates the monitoring metrics based on the descriptions of running
adapters.
- *
- * @param runningAdapterDescriptions A map containing the descriptions of
running adapters, where
- * the key is the adapter's element ID and the value is the
corresponding adapter
- * description.
- */
- protected void updateMonitoringMetrics(Map<String, AdapterDescription>
runningAdapterDescriptions) {
-
- var adapterMetrics =
AdapterMetricsManager.getInstance().getAdapterMetrics();
- runningAdapterDescriptions.values()
- .forEach(adapterDescription ->
updateTotalEventsPublished(adapterMetrics,
-
adapterDescription.getElementId(),
-
adapterDescription.getName()));
- LOG.debug("Monitoring {} adapter instances", adapterMetrics.size());
- }
-
- private void updateTotalEventsPublished(AdapterMetrics adapterMetrics,
String adapterId,
- String adapterName) {
-
- // Check if the adapter is already registered; if not, register it first.
- // This step is crucial, especially when the StreamPipes Core service is
restarted,
- // and there are existing running adapters that need proper registration.
- // Note: Proper registration is usually handled during the initial start
of the adapter.
- if (!adapterMetrics.contains(adapterId)) {
- adapterMetrics.register(adapterId, adapterName);
- }
-
- adapterMetrics.updateTotalEventsPublished(adapterId, adapterName,
ExtensionsLogProvider.INSTANCE
- .getMetricInfosForResource(adapterId).getMessagesOut().getCounter());
- }
-
-
- /**
- * Retrieves a map of all adapter instances that are supposed to be running
according to the
- * backend storage.
- * <p>
- * This method queries the adapter storage to obtain information about all
adapters and filters
- * the running instances. The resulting map is keyed by the element ID of
each running adapter,
- * and the corresponding values are the respective {@link
AdapterDescription} objects.
- *
- * @return A map containing all adapter instances supposed to be running
according to the backend
- * storage. The keys are element IDs, and the values are the
corresponding adapter
- * descriptions.
- */
- public Map<String, AdapterDescription> getAllAdaptersSupposedToRun() {
- Map<String, AdapterDescription> result = new HashMap<>();
- List<AdapterDescription> allRunningInstancesAdapterDescription =
this.adapterStorage.findAll();
-
allRunningInstancesAdapterDescription.stream().filter(AdapterDescription::isRunning)
- .forEach(adapterDescription ->
result.put(adapterDescription.getElementId(),
- adapterDescription));
-
- return result;
- }
-
- public Map<String, List<AdapterDescription>>
getAllWorkersWithAdapters(Map<String, AdapterDescription>
adapterInstancesSupposedToRun) {
-
- Map<String, List<AdapterDescription>> groupByWorker = new HashMap<>();
- adapterInstancesSupposedToRun.values().forEach(ad -> {
- String selectedEndpointUrl = ad.getSelectedEndpointUrl();
- if (selectedEndpointUrl != null) {
- if (groupByWorker.containsKey(selectedEndpointUrl)) {
- groupByWorker.get(selectedEndpointUrl).add(ad);
- } else {
- List<AdapterDescription> adapterDescriptionsPerWorker = new
ArrayList<>();
- adapterDescriptionsPerWorker.add(ad);
- groupByWorker.put(selectedEndpointUrl, adapterDescriptionsPerWorker);
- }
- }
- });
-
- return groupByWorker;
- }
-
- /**
- * Retrieves a map of adapters to recover by comparing the provided
groupings of adapter instances
- * with the instances supposed to run according to the storage. For every
adapter instance it is
- * verified that it actually runs on a worker node. If this is not the case,
it is added to the
- * output of adapters to recover.
- *
- * @param adapterInstancesGroupedByWorker A map grouping adapter instances
by worker.
- * @param adapterInstancesSupposedToRun The map containing all adapter
instances supposed to be
- * running.
- * @return A new map containing adapter instances to recover, filtered based
on running instances.
- */
- public Map<String, AdapterDescription> getAdaptersToRecover(Map<String,
List<AdapterDescription>> adapterInstancesGroupedByWorker,
- Map<String,
AdapterDescription> adapterInstancesSupposedToRun) {
-
- // NOTE: This line is added to prevent modifying the existing map of
instances supposed to run
- // It looks like the parameter `adapterInstancesSupposedToRun` is not
required at all,
- // but this should be checked more carefully.
- Map<String, AdapterDescription> adaptersToRecover =
- new HashMap<>(adapterInstancesSupposedToRun);
-
- adapterInstancesGroupedByWorker.keySet().forEach(adapterEndpointUrl -> {
- try {
- List<AdapterDescription> allRunningInstancesOfOneWorker =
-
WorkerRestClient.getAllRunningAdapterInstanceDescriptions(adapterEndpointUrl
- + WorkerPaths.getRunningAdaptersPath());
-
- // only keep adapters where there is no running adapter instance
- // therefore, all others are removed
- allRunningInstancesOfOneWorker.forEach(adapterDescription ->
adaptersToRecover
- .remove(adapterDescription.getElementId()));
- } catch (AdapterException e) {
- LOG.info("Could not recover adapter at endpoint {} - "
- + "marking it as requested to recover (reason: {})",
adapterEndpointUrl,
- e.getMessage());
- }
- });
-
- return adaptersToRecover;
- }
-
-
- public void recoverAdapters(Map<String, AdapterDescription>
adaptersToRecover) {
- for (AdapterDescription adapterDescription : adaptersToRecover.values()) {
- // Invoke all adapters that were running when the adapter container was
stopped
- try {
- if (adapterDescription.isRunning()) {
- LOG.info("Start recovering adapter {} ",
adapterDescription.getElementId());
-
this.adapterMasterManagement.startStreamAdapter(adapterDescription.getElementId());
- LOG.info("Adapter {} is recovered",
adapterDescription.getElementId());
-
- }
- } catch (AdapterException e) {
- LOG.warn("Could not start adapter {} ({})",
adapterDescription.getName(), e.getMessage());
- } catch (Exception e) {
- LOG.error(
- "Unexpected error while recovering adapter {} ({})",
- adapterDescription.getName(),
- e.getMessage()
- );
- }
- }
-
- }
-}
diff --git
a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/WorkerAdministrationManagement.java
b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/WorkerAdministrationManagement.java
index 6cdad54c6b..cc5e6154e7 100644
---
a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/WorkerAdministrationManagement.java
+++
b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/WorkerAdministrationManagement.java
@@ -19,18 +19,13 @@
package org.apache.streampipes.connect.management.management;
import
org.apache.streampipes.commons.exceptions.NoServiceEndpointsAvailableException;
-import org.apache.streampipes.commons.prometheus.adapter.AdapterMetrics;
-import org.apache.streampipes.connect.management.health.AdapterHealthCheck;
-import org.apache.streampipes.connect.management.health.AdapterOperationLock;
import org.apache.streampipes.manager.assets.AssetManager;
import org.apache.streampipes.model.connect.adapter.AdapterDescription;
import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceTag;
-import org.apache.streampipes.resource.management.AdapterResourceManager;
-import org.apache.streampipes.resource.management.DataStreamResourceManager;
import org.apache.streampipes.resource.management.PermissionResourceManager;
-import org.apache.streampipes.resource.management.SpResourceManager;
+import org.apache.streampipes.resource.management.UserResourceManager;
import org.apache.streampipes.storage.api.IAdapterStorage;
-import org.apache.streampipes.storage.couchdb.CouchDbStorageManager;
+import org.apache.streampipes.storage.api.IPermissionStorage;
import org.apache.streampipes.svcdiscovery.api.model.SpServiceUrlProvider;
import org.slf4j.Logger;
@@ -38,61 +33,30 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.List;
-import java.util.concurrent.TimeUnit;
public class WorkerAdministrationManagement {
private static final Logger LOG =
LoggerFactory.getLogger(WorkerAdministrationManagement.class);
- private static final int MAX_RETRIES = 7;
private final IAdapterStorage adapterDescriptionStorage;
-
- private final AdapterHealthCheck adapterHealthCheck;
+ private final IPermissionStorage permissionStorage;
+ private final PermissionResourceManager permissionResourceManager;
+ private final UserResourceManager userResourceManager;
public WorkerAdministrationManagement(
- IAdapterStorage adapterStorage,
- AdapterMetrics adapterMetrics,
- AdapterResourceManager adapterResourceManager,
- DataStreamResourceManager dataStreamResourceManager
- ) {
- this.adapterHealthCheck = new AdapterHealthCheck(
- adapterStorage,
- new AdapterMasterManagement(
- adapterStorage,
- adapterResourceManager,
- dataStreamResourceManager,
- adapterMetrics
- )
- );
- this.adapterDescriptionStorage =
CouchDbStorageManager.INSTANCE.getAdapterDescriptionStorage();
- }
-
- public void checkAndRestore(int retryCount) {
- if (AdapterOperationLock.INSTANCE.isLocked()) {
- LOG.info("Adapter operation already in progress, {}/{}", (retryCount +
1), MAX_RETRIES);
- if (retryCount <= MAX_RETRIES) {
- try {
- TimeUnit.MILLISECONDS.sleep(3000);
- retryCount++;
- checkAndRestore(retryCount);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- } else {
- LOG.info("Max retries for running adapter operations reached, will do
unlock which might cause conflicts...");
- AdapterOperationLock.INSTANCE.unlock();
- this.adapterHealthCheck.checkAndRestoreAdapters();
- }
- } else {
- AdapterOperationLock.INSTANCE.lock();
- this.adapterHealthCheck.checkAndRestoreAdapters();
- AdapterOperationLock.INSTANCE.unlock();
- }
+ IAdapterStorage adapterDescriptionStorage,
+ IPermissionStorage permissionStorage,
+ UserResourceManager userResourceManager,
+ PermissionResourceManager permissionResourceManager) {
+ this.adapterDescriptionStorage = adapterDescriptionStorage;
+ this.userResourceManager = userResourceManager;
+ this.permissionStorage = permissionStorage;
+ this.permissionResourceManager = permissionResourceManager;
}
public void performAdapterMigrations(List<SpServiceTag> tags) {
- var installedAdapters =
CouchDbStorageManager.INSTANCE.getAdapterDescriptionStorage().findAll();
- var adminSid = new
SpResourceManager().manageUsers().getAdminUser().getPrincipalId();
+ var installedAdapters = adapterDescriptionStorage.findAll();
+ var adminSid = userResourceManager.getAdminUser().getPrincipalId();
installedAdapters.stream()
.filter(adapter -> tags.stream().anyMatch(tag ->
tag.getValue().equals(adapter.getAppId())))
.forEach(adapter -> {
@@ -107,13 +71,11 @@ public class WorkerAdministrationManagement {
e);
}
}
- var permissionStorage =
CouchDbStorageManager.INSTANCE.getPermissionStorage();
var elementId = adapter.getElementId();
var permissions =
permissionStorage.getUserPermissionsForObject(elementId);
if (permissions.isEmpty()) {
LOG.info("Adding default permission for adapter {}",
adapter.getAppId());
- new PermissionResourceManager()
- .createDefault(elementId, AdapterDescription.class, adminSid,
true);
+ permissionResourceManager.createDefault(elementId,
AdapterDescription.class, adminSid, true);
}
});
}
diff --git
a/streampipes-connect-management/src/test/java/org/apache/streampipes/connect/management/health/AdapterHealthCheckTest.java
b/streampipes-connect-management/src/test/java/org/apache/streampipes/connect/management/health/AdapterHealthCheckTest.java
deleted file mode 100644
index 47612f1361..0000000000
---
a/streampipes-connect-management/src/test/java/org/apache/streampipes/connect/management/health/AdapterHealthCheckTest.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.connect.management.health;
-
-import org.apache.streampipes.commons.prometheus.adapter.AdapterMetricsManager;
-import
org.apache.streampipes.connect.management.management.AdapterMasterManagement;
-import org.apache.streampipes.model.connect.adapter.AdapterDescription;
-import org.apache.streampipes.resource.management.SpResourceManager;
-import org.apache.streampipes.storage.api.IAdapterStorage;
-
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-
-import java.util.List;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-public class AdapterHealthCheckTest {
-
- private IAdapterStorage adapterInstanceStorageMock;
-
- @BeforeEach
- public void setUp() {
- adapterInstanceStorageMock = mock(IAdapterStorage.class);
- }
-
- @Test
- public void getAllRunningInstancesAdapterDescriptionsEmpty() {
- when(adapterInstanceStorageMock.findAll()).thenReturn(List.of());
-
- var healthCheck = new AdapterHealthCheck(
- adapterInstanceStorageMock,
- new AdapterMasterManagement(
- adapterInstanceStorageMock,
- new SpResourceManager().manageAdapters(),
- new SpResourceManager().manageDataStreams(),
- AdapterMetricsManager.INSTANCE.getAdapterMetrics()
- )
- );
- var result = healthCheck.getAllAdaptersSupposedToRun();
-
- Assertions.assertTrue(result.isEmpty());
- }
-
- @Test
- public void getAllRunningInstancesAdapterDescriptionsMixed() {
-
- var nameRunningAdapter = "running-adapter";
- var nameStoppedAdapter = "stopped-adapter";
-
- var stoppedAdapter = new AdapterDescription();
- stoppedAdapter.setElementId(nameStoppedAdapter);
- stoppedAdapter.setRunning(false);
-
- var runningAdapter = new AdapterDescription();
- runningAdapter.setElementId(nameRunningAdapter);
- runningAdapter.setRunning(true);
-
-
when(adapterInstanceStorageMock.findAll()).thenReturn(List.of(stoppedAdapter,
runningAdapter));
-
- var healthCheck = new AdapterHealthCheck(
- adapterInstanceStorageMock,
- new AdapterMasterManagement(
- adapterInstanceStorageMock,
- new SpResourceManager().manageAdapters(),
- new SpResourceManager().manageDataStreams(),
- AdapterMetricsManager.INSTANCE.getAdapterMetrics()
- )
- );
- var result = healthCheck.getAllAdaptersSupposedToRun();
-
- Assertions.assertEquals(1, result.size());
- Assertions.assertTrue(result.containsKey(nameRunningAdapter));
- Assertions.assertEquals(runningAdapter, result.get(nameRunningAdapter));
-
- }
-
-}
diff --git
a/streampipes-data-explorer-influx/src/main/java/org/apache/streampipes/dataexplorer/influx/DataExplorerInfluxQueryExecutor.java
b/streampipes-data-explorer-influx/src/main/java/org/apache/streampipes/dataexplorer/influx/DataExplorerInfluxQueryExecutor.java
index f4e1df6095..fbb715be88 100644
---
a/streampipes-data-explorer-influx/src/main/java/org/apache/streampipes/dataexplorer/influx/DataExplorerInfluxQueryExecutor.java
+++
b/streampipes-data-explorer-influx/src/main/java/org/apache/streampipes/dataexplorer/influx/DataExplorerInfluxQueryExecutor.java
@@ -73,7 +73,7 @@ public class DataExplorerInfluxQueryExecutor extends
DataExplorerQueryExecutor<Q
* The influx client always returns a double for timestamp values. This
method converts them to long values.
*/
private void convertTimestampValueToLong(List<Object> row) {
- if (!row.isEmpty()) {
+ if (!row.isEmpty() && row.get(0) instanceof Number) {
row.set(0, ((Number) row.get(0)).longValue());
}
}
diff --git
a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/init/RunningInstances.java
b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/init/RunningInstances.java
index 538043d7f9..f14a94de00 100644
---
a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/init/RunningInstances.java
+++
b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/init/RunningInstances.java
@@ -26,6 +26,8 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
public enum RunningInstances {
INSTANCE;
@@ -65,6 +67,14 @@ public enum RunningInstances {
return runningInstances.size();
}
+ public Set<String> getRunningInstanceIds() {
+ return runningInstances
+ .entrySet()
+ .stream()
+ .map((entry -> entry.getValue().getDescription().getElementId()))
+ .collect(Collectors.toSet());
+ }
+
public List<String> getRunningInstanceIdsForElement(String appId) {
// TODO change this to appId for STREAMPIPES-319
List<String> instanceIds = new ArrayList<>();
diff --git a/streampipes-health-monitoring/pom.xml
b/streampipes-health-monitoring/pom.xml
new file mode 100644
index 0000000000..40e3992cde
--- /dev/null
+++ b/streampipes-health-monitoring/pom.xml
@@ -0,0 +1,76 @@
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ ~
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.streampipes</groupId>
+ <artifactId>streampipes-parent</artifactId>
+ <version>0.99.0-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>streampipes-health-monitoring</artifactId>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.streampipes</groupId>
+ <artifactId>streampipes-pipeline-management</artifactId>
+ <version>0.99.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.streampipes</groupId>
+ <artifactId>streampipes-commons</artifactId>
+ <version>0.99.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.streampipes</groupId>
+ <artifactId>streampipes-connect-management</artifactId>
+ <version>0.99.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.streampipes</groupId>
+ <artifactId>streampipes-load-balancer</artifactId>
+ <version>0.99.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.streampipes</groupId>
+ <artifactId>streampipes-model</artifactId>
+ <version>0.99.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.streampipes</groupId>
+ <artifactId>streampipes-storage-api</artifactId>
+ <version>0.99.0-SNAPSHOT</version>
+ </dependency>
+
+
+ <dependency>
+ <groupId>org.junit.jupiter</groupId>
+ <artifactId>junit-jupiter-api</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-core</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+</project>
diff --git
a/streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/AdapterHealthCheck.java
b/streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/AdapterHealthCheck.java
new file mode 100644
index 0000000000..f742b23f57
--- /dev/null
+++
b/streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/AdapterHealthCheck.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.health.monitoring;
+
+import org.apache.streampipes.commons.exceptions.connect.AdapterException;
+import org.apache.streampipes.commons.prometheus.adapter.AdapterMetrics;
+import org.apache.streampipes.commons.prometheus.adapter.AdapterMetricsManager;
+import org.apache.streampipes.health.monitoring.model.HealthCheckData;
+import org.apache.streampipes.loadbalance.pipeline.ExtensionsLogProvider;
+import org.apache.streampipes.model.connect.adapter.AdapterDescription;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+public class AdapterHealthCheck {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(AdapterHealthCheck.class);
+
+ private final HealthCheckData healthCheckData;
+
+ public AdapterHealthCheck(HealthCheckData healthCheckData) {
+ this.healthCheckData = healthCheckData;
+ }
+
+ /**
+ * In this method it is checked which adapters are currently running. Then
it calls all workers to
+ * validate if the adapter instance is still running as expected. If the
adapter is not running
+ * anymore a new worker instance is invoked. In addition, it publishes
monitoring metrics for all
+ * running adapters (in line with
+ * {@link PipelineHealthCheck}).
+ */
+ public void runCheck() {
+ LOG.debug("Adapter health check started");
+
+ try {
+ if (!healthCheckData.activeResources().runningAdapters().isEmpty()) {
+ var allAdaptersToRecover = this.getAdaptersToRecover();
+
+ allAdaptersToRecover
+ .forEach(adapter ->
+ LOG.info("Adapter instance with id {} needs to be recovered",
adapter.getElementId()));
+ // Filter adapters so that only healthy and running adapters are
updated in the metrics
+ // endpoint
+ var adaptersToMonitor =
healthCheckData.activeResources().runningAdapters().stream()
+ .filter(entry -> allAdaptersToRecover
+ .stream()
+ .noneMatch(r -> r.getElementId().equals(entry.getElementId()))
+ )
+ .toList();
+
+ if (!adaptersToMonitor.isEmpty()) {
+ updateMonitoringMetrics(adaptersToMonitor);
+ } else {
+ LOG.debug("No running adapter instances to monitor.");
+ }
+
+ LOG.debug("Monitoring metrics updated for running adapters.");
+
+ // Recover Adapters
+ this.recoverAdapters(allAdaptersToRecover);
+ }
+ } catch (NoSuchElementException e) {
+ LOG.error("Could not update adapter metrics due to an invalid state.
({})", e.getMessage());
+ }
+ }
+
+ /**
+ * Updates the monitoring metrics based on the descriptions of running
adapters.
+ *
+ * @param runningAdapterDescriptions A map containing the descriptions of
running adapters, where
+ * the key is the adapter's element ID and the value is the
corresponding adapter
+ * description.
+ */
+ protected void updateMonitoringMetrics(List<AdapterDescription>
runningAdapterDescriptions) {
+
+ var adapterMetrics =
AdapterMetricsManager.getInstance().getAdapterMetrics();
+ runningAdapterDescriptions
+ .forEach(adapterDescription ->
updateTotalEventsPublished(adapterMetrics,
+
adapterDescription.getElementId(),
+
adapterDescription.getName()));
+ LOG.debug("Monitoring {} adapter instances", adapterMetrics.size());
+ }
+
+ private void updateTotalEventsPublished(AdapterMetrics adapterMetrics,
String adapterId,
+ String adapterName) {
+
+ // Check if the adapter is already registered; if not, register it first.
+ // This step is crucial, especially when the StreamPipes Core service is
restarted,
+ // and there are existing running adapters that need proper registration.
+ // Note: Proper registration is usually handled during the initial start
of the adapter.
+ if (!adapterMetrics.contains(adapterId)) {
+ adapterMetrics.register(adapterId, adapterName);
+ }
+
+ adapterMetrics.updateTotalEventsPublished(adapterId, adapterName,
ExtensionsLogProvider.INSTANCE
+ .getMetricInfosForResource(adapterId).getMessagesOut().getCounter());
+ }
+
+
+ /**
+ * Retrieves a list of adapters to recover by comparing the provided
groupings of adapter instances
+ * with the instances supposed to run according to the storage. For every
adapter instance it is
+ * verified that it actually runs on a worker node. If this is not the case,
it is added to the
+ * output of adapters to recover.
+ *
+ * running.
+ * @return A new map containing adapter instances to recover, filtered based
on running instances.
+ */
+ public List<AdapterDescription> getAdaptersToRecover() {
+
+ var runningAdapterIds =
+ healthCheckData.activeExtensionInstances()
+ .values()
+ .stream()
+ .flatMap(h -> h.runningAdapterInstanceIds().stream())
+ .collect(Collectors.toSet());
+
+ return healthCheckData.activeResources()
+ .runningAdapters()
+ .stream()
+ .filter(Objects::nonNull)
+ .filter(a -> a.getElementId() != null)
+ .filter(a -> !runningAdapterIds.contains(a.getElementId()))
+ .toList();
+ }
+
+ public void recoverAdapters(List<AdapterDescription> adaptersToRecover) {
+ for (AdapterDescription adapterDescription : adaptersToRecover) {
+ // Invoke all adapters that were running when the adapter container was
stopped
+ try {
+ if (adapterDescription.isRunning()) {
+ LOG.debug("Start recovering adapter {} ",
adapterDescription.getElementId());
+
this.healthCheckData.resourceProvider().adapterMasterManagement().startStreamAdapter(adapterDescription.getElementId());
+ LOG.info("Adapter {} is recovered",
adapterDescription.getElementId());
+
+ }
+ } catch (AdapterException e) {
+ LOG.warn("Could not start adapter {} ({})",
adapterDescription.getName(), e.getMessage());
+ } catch (Exception e) {
+ LOG.error(
+ "Unexpected error while recovering adapter {} ({})",
+ adapterDescription.getName(),
+ e.getMessage()
+ );
+ }
+ }
+ }
+}
diff --git
a/streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/ExtensionHealthCheck.java
b/streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/ExtensionHealthCheck.java
new file mode 100644
index 0000000000..256c1cc57b
--- /dev/null
+++
b/streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/ExtensionHealthCheck.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.health.monitoring;
+
+import org.apache.streampipes.health.monitoring.model.HealthCheckData;
+import org.apache.streampipes.model.health.ExtensionInstanceHealth;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+
+public class ExtensionHealthCheck implements Runnable {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(ExtensionHealthCheck.class);
+
+ private final ResourceProvider resourceProvider;
+
+ public ExtensionHealthCheck(ResourceProvider resourceProvider) {
+ this.resourceProvider = resourceProvider;
+ }
+
+ @Override
+ public void run() {
+ try {
+ var activeResources = resourceProvider.loadActiveResources();
+ var activeCoreInstances =
resourceProvider.loadActiveInstances(activeResources);
+
+ var activeExtensionInstances = new HashMap<String,
ExtensionInstanceHealth>();
+ activeCoreInstances.keySet().forEach(k -> {
+ activeExtensionInstances.put(k, new
ExtensionInstanceAvailabilityCheck(k).checkRunningInstances());
+ });
+
+ var healthCheckData = new HealthCheckData(resourceProvider,
activeResources, activeCoreInstances, activeExtensionInstances);
+ new PipelineHealthCheck(healthCheckData).runCheck();
+ new AdapterHealthCheck(healthCheckData).runCheck();
+ } catch (Exception e) {
+ LOG.warn("An unhandled error occurred while running health check.", e);
+ }
+ }
+}
diff --git
a/streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/ExtensionInstanceAvailabilityCheck.java
b/streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/ExtensionInstanceAvailabilityCheck.java
new file mode 100644
index 0000000000..46236d90da
--- /dev/null
+++
b/streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/ExtensionInstanceAvailabilityCheck.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.health.monitoring;
+
+import org.apache.streampipes.manager.execution.ExtensionServiceExecutions;
+import org.apache.streampipes.model.health.ExtensionInstanceHealth;
+import org.apache.streampipes.serializers.json.JacksonSerializer;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.http.util.EntityUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Set;
+
+public class ExtensionInstanceAvailabilityCheck {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(ExtensionInstanceAvailabilityCheck.class);
+ private static final String InstancePath = "/health";
+
+ private final String serviceBaseUrl;
+
+ public ExtensionInstanceAvailabilityCheck(String serviceBaseUrl) {
+ this.serviceBaseUrl = serviceBaseUrl;
+ }
+
+ public ExtensionInstanceHealth checkRunningInstances() {
+ try {
+ var request =
ExtensionServiceExecutions.extServiceGetRequest(makeRequestUrl());
+ var response = request.execute().returnResponse();
+ if (response.getStatusLine().getStatusCode() != 200) {
+ return new ExtensionInstanceHealth(Set.of(), Set.of());
+ }
+ String body = EntityUtils.toString(response.getEntity(),
StandardCharsets.UTF_8);
+ return deserialize(body);
+
+ } catch (IOException e) {
+ LOG.error("Extension service {} is unavailable", serviceBaseUrl);
+ return new ExtensionInstanceHealth(Set.of(), Set.of());
+ }
+ }
+
+ private ExtensionInstanceHealth deserialize(String json) throws
JsonProcessingException {
+ return JacksonSerializer.getObjectMapper().readValue(json,
ExtensionInstanceHealth.class);
+ }
+
+ private String makeRequestUrl() {
+ return serviceBaseUrl + InstancePath;
+ }
+}
diff --git
a/streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/PipelineHealthCheck.java
b/streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/PipelineHealthCheck.java
new file mode 100644
index 0000000000..5447f7044b
--- /dev/null
+++
b/streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/PipelineHealthCheck.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.health.monitoring;
+
+import
org.apache.streampipes.commons.exceptions.NoServiceEndpointsAvailableException;
+import org.apache.streampipes.commons.prometheus.pipelines.PipelinesStats;
+import org.apache.streampipes.health.monitoring.model.HealthCheckData;
+import org.apache.streampipes.health.monitoring.utils.HealthCheckUtils;
+import
org.apache.streampipes.manager.execution.endpoint.ExtensionsServiceEndpointGenerator;
+import
org.apache.streampipes.manager.execution.endpoint.ExtensionsServiceEndpointUtils;
+import org.apache.streampipes.manager.execution.http.InvokeHttpRequest;
+import org.apache.streampipes.manager.util.PipelineElementUtils;
+import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
+import org.apache.streampipes.model.graph.DataProcessorInvocation;
+import org.apache.streampipes.model.graph.DataSinkInvocation;
+import org.apache.streampipes.model.pipeline.Pipeline;
+import org.apache.streampipes.model.pipeline.PipelineHealthStatus;
+import org.apache.streampipes.resource.management.secret.SecretDecrypter;
+import org.apache.streampipes.resource.management.secret.SecretEncrypter;
+import org.apache.streampipes.resource.management.secret.SecretService;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Stream;
+
+import static
org.apache.streampipes.manager.pipeline.PipelineManager.getPipeline;
+
+public class PipelineHealthCheck {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(PipelineHealthCheck.class);
+ private static final int MAX_FAILED_ATTEMPTS = 10;
+
+ private static final Map<String, Integer> failedRestartAttempts = new
HashMap<>();
+ private static final PipelinesStats pipelinesStats = new PipelinesStats();
+
+ private final HealthCheckData healthCheckData;
+
+ public PipelineHealthCheck(HealthCheckData healthCheckData) {
+ this.healthCheckData = healthCheckData;
+ }
+
+ public void runCheck() {
+ try {
+ initPipelineMetrics();
+
+ if (!healthCheckData.activeResources().runningPipelines().isEmpty()) {
+ checkAndRestorePipelineElements();
+ }
+ pipelinesStats.metrics();
+ } catch (Exception e) {
+ LOG.error("Error while checking and restoring pipeline elements", e);
+ }
+ }
+
+ private void initPipelineMetrics() {
+ pipelinesStats.clear();
+
pipelinesStats.setAllPipelines(healthCheckData.activeResources().allPipelines().size());
+
pipelinesStats.setRunningPipelines(healthCheckData.activeResources().runningPipelines().size());
+ pipelinesStats.setStoppedPipelines(pipelinesStats.getAllPipelines()
+ - pipelinesStats.getRunningPipelines());
+
+ for (Pipeline p : healthCheckData.activeResources().allPipelines()) {
+ pipelinesStats.updatePipelineHealthState(
+ p.getElementId(),
+ p.getName(),
+ p.getHealthStatus().toString());
+
+ pipelinesStats.updatePipelineRunningState(
+ p.getElementId(),
+ p.getName(),
+ p.isRunning());
+ }
+ }
+
+ private void checkAndRestorePipelineElements() {
+ healthCheckData.activeResources().runningPipelines().forEach(pipeline -> {
+ AtomicBoolean shouldUpdatePipeline = new AtomicBoolean(false);
+ List<String> failedInstances = new ArrayList<>();
+ List<String> recoveredInstances = new ArrayList<>();
+ List<String> pipelineNotifications = new ArrayList<>();
+ List<InvocableStreamPipesEntity> runningPipelineElements = Stream.concat(
+ pipeline.getSepas().stream(),
+ pipeline.getActions().stream()
+ ).toList();
+
+ runningPipelineElements.forEach(pipelineElement -> {
+ String instanceId =
HealthCheckUtils.extractInstanceId(pipelineElement);
+ if (isNowhereRunning(instanceId)) {
+ if (shouldRetry(instanceId)) {
+ var serviceBaseUrl = pipelineElement.getSelectedEndpointUrl();
+ shouldUpdatePipeline.set(true);
+ boolean success;
+ try {
+ serviceBaseUrl = new
ExtensionsServiceEndpointGenerator().getEndpointBaseUrl(
+ pipelineElement.getAppId(),
+
ExtensionsServiceEndpointUtils.getPipelineElementType(pipelineElement.getAppId()),
+ Collections.emptySet()
+ );
+ new SecretService(new SecretDecrypter()).apply(pipelineElement);
+ var invocationUrl = getInvocationUrl(pipelineElement,
serviceBaseUrl);
+ success = new InvokeHttpRequest()
+ .execute(pipelineElement, invocationUrl,
pipeline.getPipelineId()).isSuccess();
+ new SecretService(new SecretEncrypter()).apply(pipelineElement);
+ } catch (NoServiceEndpointsAvailableException e) {
+ success = false;
+ }
+ if (!success) {
+ failedInstances.add(instanceId);
+
HealthCheckUtils.addFailedAttemptNotification(pipelineNotifications,
pipelineElement);
+ increaseFailedAttempt(instanceId);
+ LOG.info("Could not restore pipeline element {} of pipeline {}
({}/{})",
+ pipelineElement.getName(), pipeline.getName(),
failedRestartAttempts.get(instanceId),
+ MAX_FAILED_ATTEMPTS);
+ } else {
+ recoveredInstances.add(instanceId);
+
HealthCheckUtils.addSuccessfulRestoreNotification(pipelineNotifications,
pipelineElement);
+ resetFailedAttempts(instanceId);
+ pipelineElement.setSelectedEndpointUrl(serviceBaseUrl);
+ LOG.info("Successfully restored pipeline element {} of pipeline
{}",
+ pipelineElement.getName(), pipeline.getName());
+ }
+ }
+ }
+ });
+ if (shouldUpdatePipeline.get()) {
+ var currentPipeline = getPipeline(pipeline.getPipelineId());
+ if (!failedInstances.isEmpty()) {
+ currentPipeline.setHealthStatus(PipelineHealthStatus.FAILURE);
+ pipelinesStats.failedIncrease();
+ } else if (!recoveredInstances.isEmpty()) {
+
currentPipeline.setHealthStatus(PipelineHealthStatus.REQUIRES_ATTENTION);
+ pipelinesStats.attentionRequiredIncrease();
+ }
+ currentPipeline.setSepas(
+ PipelineElementUtils.filterInvocation(runningPipelineElements,
DataProcessorInvocation.class)
+ );
+ currentPipeline.setActions(
+ PipelineElementUtils.filterInvocation(runningPipelineElements,
DataSinkInvocation.class)
+ );
+ currentPipeline.setPipelineNotifications(pipelineNotifications);
+
healthCheckData.resourceProvider().pipelineStorage().updateElement(currentPipeline);
+
pipelinesStats.updatePipelineHealthState(currentPipeline.getElementId(),
currentPipeline.getName(),
+ currentPipeline.getHealthStatus().toString());
+ }
+ });
+ int healthNum = pipelinesStats.getRunningPipelines() -
pipelinesStats.getFailedPipelines()
+ - pipelinesStats.getAttentionRequiredPipelines();
+ pipelinesStats.setHealthyPipelines(healthNum);
+
pipelinesStats.setElementCount(getElementsCount(healthCheckData.activeResources().allPipelines()));
+ }
+
+ private boolean isNowhereRunning(String instanceId) {
+ return (healthCheckData.activeExtensionInstances().entrySet().stream()
+ .noneMatch(entry ->
entry.getValue().runningPipelineElementInstanceIds().contains(instanceId)));
+ }
+
+ private boolean shouldRetry(String instanceId) {
+ if (!failedRestartAttempts.containsKey(instanceId)) {
+ return true;
+ } else {
+ return failedRestartAttempts.get(instanceId) < MAX_FAILED_ATTEMPTS;
+ }
+ }
+
+ private void resetFailedAttempts(String instanceId) {
+ failedRestartAttempts.put(instanceId, 0);
+ }
+
+ private void increaseFailedAttempt(String instanceId) {
+ if (!failedRestartAttempts.containsKey(instanceId)) {
+ failedRestartAttempts.put(instanceId, 1);
+ } else {
+ Integer currentAttempt = failedRestartAttempts.get(instanceId) + 1;
+ failedRestartAttempts.put(instanceId, currentAttempt);
+ }
+ }
+
+ private int getElementsCount(List<Pipeline> allPipelines) {
+ return allPipelines.stream().mapToInt(pipeline ->
pipeline.getActions().size()).sum();
+ }
+
+ private String getInvocationUrl(InvocableStreamPipesEntity pipelineElement,
+ String baseUrl) {
+ return ExtensionsServiceEndpointUtils
+ .getPipelineElementType(pipelineElement)
+ .getInvocationUrl(baseUrl, pipelineElement.getAppId());
+ }
+}
diff --git
a/streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/PostStartupRecovery.java
b/streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/PostStartupRecovery.java
new file mode 100644
index 0000000000..73c918e83a
--- /dev/null
+++
b/streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/PostStartupRecovery.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.health.monitoring;
+
+import org.apache.streampipes.connect.management.health.AdapterOperationLock;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.TimeUnit;
+
+public class PostStartupRecovery {
+
+ private final ExtensionHealthCheck extensionHealthCheck;
+
+ public PostStartupRecovery(ExtensionHealthCheck extensionHealthCheck) {
+ this.extensionHealthCheck = extensionHealthCheck;
+ }
+
+ private static final Logger LOG =
LoggerFactory.getLogger(PostStartupRecovery.class);
+
+ private static final int MAX_RETRIES = 7;
+
+ public void checkAndRestore(int retryCount) {
+ if (AdapterOperationLock.INSTANCE.isLocked()) {
+ LOG.info("Adapter operation already in progress, {}/{}", (retryCount +
1), MAX_RETRIES);
+ if (retryCount <= MAX_RETRIES) {
+ try {
+ TimeUnit.MILLISECONDS.sleep(3000);
+ retryCount++;
+ checkAndRestore(retryCount);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ } else {
+ LOG.info("Max retries for running adapter operations reached, will do
unlock which might cause conflicts...");
+ AdapterOperationLock.INSTANCE.unlock();
+ this.extensionHealthCheck.run();
+ }
+ } else {
+ AdapterOperationLock.INSTANCE.lock();
+ this.extensionHealthCheck.run();
+ AdapterOperationLock.INSTANCE.unlock();
+ }
+ }
+}
diff --git
a/streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/ResourceProvider.java
b/streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/ResourceProvider.java
new file mode 100644
index 0000000000..9136778e09
--- /dev/null
+++
b/streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/ResourceProvider.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.health.monitoring;
+
+import
org.apache.streampipes.connect.management.management.AdapterMasterManagement;
+import org.apache.streampipes.health.monitoring.model.ActiveCoreInstances;
+import org.apache.streampipes.health.monitoring.model.ActiveResources;
+import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
+import org.apache.streampipes.model.connect.adapter.AdapterDescription;
+import org.apache.streampipes.model.pipeline.Pipeline;
+import org.apache.streampipes.storage.api.IAdapterStorage;
+import org.apache.streampipes.storage.api.IPipelineStorage;
+
+import java.util.AbstractMap;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public record ResourceProvider(IPipelineStorage pipelineStorage,
+ IAdapterStorage adapterInstanceStorage,
+ AdapterMasterManagement
adapterMasterManagement) {
+
+ public ActiveResources loadActiveResources() {
+ var allPipelines = pipelineStorage.findAll();
+ var runningPipelines =
allPipelines.stream().filter(Pipeline::isRunning).toList();
+ var allAdapters = adapterInstanceStorage.findAll();
+ var runningAdapters =
allAdapters.stream().filter(AdapterDescription::isRunning).toList();
+
+ return new ActiveResources(
+ allPipelines,
+ runningPipelines,
+ allAdapters,
+ runningAdapters
+ );
+ }
+
+ public Map<String, ActiveCoreInstances> loadActiveInstances(ActiveResources
activeResources) {
+ Map<String, List<AdapterDescription>> adaptersByUrl =
+ activeResources.runningAdapters()
+ .stream()
+ .filter(a -> a.getSelectedEndpointUrl() != null)
+
.collect(Collectors.groupingBy(AdapterDescription::getSelectedEndpointUrl));
+
+ Map<String, Map<String, InvocableStreamPipesEntity>> elementsByUrl =
+ activeResources.runningPipelines()
+ .stream()
+ .flatMap(p -> {
+ String pipelineId = p.getPipelineId();
+ return Optional.of(
+ Stream.concat(
+ p.getSepas().stream(),
+ p.getActions().stream()
+ ).toList()
+ )
+ .orElseGet(List::of)
+ .stream()
+ .filter(Objects::nonNull)
+ .map(e -> new AbstractMap.SimpleEntry<>(pipelineId, e));
+ })
+ .filter(entry -> entry.getValue().getSelectedEndpointUrl() != null)
+ .collect(Collectors.groupingBy(
+ entry -> entry.getValue().getSelectedEndpointUrl(),
+ Collectors.toMap(
+ Map.Entry::getKey,
+ AbstractMap.SimpleEntry::getValue,
+ (left, right) -> left)
+ ));
+
+ Set<String> allUrls = new HashSet<>();
+ allUrls.addAll(adaptersByUrl.keySet());
+ allUrls.addAll(elementsByUrl.keySet());
+
+ Map<String, ActiveCoreInstances> result = new HashMap<>();
+ for (String url : allUrls) {
+ List<AdapterDescription> adapters = adaptersByUrl.getOrDefault(url,
List.of());
+ Map<String, InvocableStreamPipesEntity> elementsPerPipeline =
elementsByUrl.getOrDefault(url, Map.of());
+ result.put(url, new ActiveCoreInstances(adapters, elementsPerPipeline));
+ }
+
+ return result;
+ }
+}
diff --git
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/health/ServiceHealthCheck.java
b/streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/ServiceHealthCheck.java
similarity index 83%
rename from
streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/health/ServiceHealthCheck.java
rename to
streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/ServiceHealthCheck.java
index 949b65fa0a..e252efd755 100644
---
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/health/ServiceHealthCheck.java
+++
b/streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/ServiceHealthCheck.java
@@ -15,7 +15,7 @@
* limitations under the License.
*
*/
-package org.apache.streampipes.manager.health;
+package org.apache.streampipes.health.monitoring;
import org.apache.streampipes.commons.environment.Environment;
@@ -24,7 +24,7 @@ import org.apache.streampipes.loadbalance.LoadManager;
import org.apache.streampipes.manager.execution.ExtensionServiceExecutions;
import
org.apache.streampipes.model.extensions.svcdiscovery.SpServiceRegistration;
import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceStatus;
-import org.apache.streampipes.storage.management.StorageDispatcher;
+import org.apache.streampipes.storage.api.CRUDStorage;
import org.apache.http.HttpStatus;
import org.slf4j.Logger;
@@ -32,9 +32,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.HashSet;
import java.util.List;
-import java.util.stream.Collectors;
public class ServiceHealthCheck implements Runnable {
@@ -45,8 +43,7 @@ public class ServiceHealthCheck implements Runnable {
private final List<SpServiceRegistration> needDeletedServices = new
ArrayList<>();
- public ServiceHealthCheck() {
- var storage =
StorageDispatcher.INSTANCE.getNoSqlStore().getExtensionsServiceStorage();
+ public ServiceHealthCheck(CRUDStorage<SpServiceRegistration> storage) {
this.serviceRegistrationManager = new ServiceRegistrationManager(storage);
this.maxUnhealthyDurationBeforeRemovalMs = Environments.getEnvironment()
.getUnhealthyTimeBeforeServiceDeletionInMillis().getValueOrDefault();
@@ -68,10 +65,8 @@ public class ServiceHealthCheck implements Runnable {
} finally {
needDeletedServices.clear();
}
- new PipelineHealthCheck().run();
}
-
private void checkServiceHealth(SpServiceRegistration service) {
String healthCheckUrl = makeHealthCheckUrl(service);
@@ -117,15 +112,4 @@ public class ServiceHealthCheck implements Runnable {
private List<SpServiceRegistration> getRegisteredServices() {
return serviceRegistrationManager.getAllServices();
}
-
- public static List<SpServiceRegistration> getService(String tag,
-
List<SpServiceRegistration> activeServices) {
- return activeServices.stream().filter(service -> filtersSupported(service,
tag))
- .filter(service -> service.getStatus() != SpServiceStatus.HEALTHY)
- .collect(Collectors.toList());
- }
-
- private static boolean filtersSupported(SpServiceRegistration service,
String tag) {
- return new HashSet<>(service.getTags()).stream().anyMatch(t ->
t.asString().equals(tag));
- }
}
diff --git
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/health/ServiceRegistrationManager.java
b/streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/ServiceRegistrationManager.java
similarity index 95%
rename from
streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/health/ServiceRegistrationManager.java
rename to
streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/ServiceRegistrationManager.java
index 3bbee91f74..e3fb6271e1 100644
---
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/health/ServiceRegistrationManager.java
+++
b/streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/ServiceRegistrationManager.java
@@ -15,7 +15,7 @@
* limitations under the License.
*
*/
-package org.apache.streampipes.manager.health;
+package org.apache.streampipes.health.monitoring;
import
org.apache.streampipes.model.extensions.svcdiscovery.SpServiceRegistration;
import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceStatus;
@@ -81,10 +81,6 @@ public class ServiceRegistrationManager {
serviceRegistration.getSvcId());
}
- public SpServiceStatus getServiceStatus(String serviceId) {
- return storage.getElementById(serviceId).getStatus();
- }
-
private void logService(SpServiceRegistration serviceRegistration) {
LOG.info("Service {} (id={}) is now in {} state",
serviceRegistration.getSvcGroup(),
serviceRegistration.getSvcId(), serviceRegistration.getStatus());
diff --git
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/storage/RunningPipelineElementStorage.java
b/streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/model/ActiveCoreInstances.java
similarity index 75%
copy from
streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/storage/RunningPipelineElementStorage.java
copy to
streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/model/ActiveCoreInstances.java
index ba1734b844..272ed4a0c4 100644
---
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/storage/RunningPipelineElementStorage.java
+++
b/streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/model/ActiveCoreInstances.java
@@ -16,16 +16,14 @@
*
*/
-package org.apache.streampipes.manager.storage;
+package org.apache.streampipes.health.monitoring.model;
import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
+import org.apache.streampipes.model.connect.adapter.AdapterDescription;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
-public class RunningPipelineElementStorage {
-
- public static Map<String, List<InvocableStreamPipesEntity>>
runningProcessorsAndSinks = new HashMap<>();
-
+public record ActiveCoreInstances(List<AdapterDescription> adapters,
+ Map<String, InvocableStreamPipesEntity>
elementsPerPipeline) {
}
diff --git
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/storage/RunningPipelineElementStorage.java
b/streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/model/ActiveResources.java
similarity index 65%
copy from
streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/storage/RunningPipelineElementStorage.java
copy to
streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/model/ActiveResources.java
index ba1734b844..862ef6d64a 100644
---
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/storage/RunningPipelineElementStorage.java
+++
b/streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/model/ActiveResources.java
@@ -16,16 +16,15 @@
*
*/
-package org.apache.streampipes.manager.storage;
+package org.apache.streampipes.health.monitoring.model;
-import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
+import org.apache.streampipes.model.connect.adapter.AdapterDescription;
+import org.apache.streampipes.model.pipeline.Pipeline;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
-
-public class RunningPipelineElementStorage {
-
- public static Map<String, List<InvocableStreamPipesEntity>>
runningProcessorsAndSinks = new HashMap<>();
+public record ActiveResources(List<Pipeline> allPipelines,
+ List<Pipeline> runningPipelines,
+ List<AdapterDescription> allAdapters,
+ List<AdapterDescription> runningAdapters) {
}
diff --git
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/storage/RunningPipelineElementStorage.java
b/streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/model/HealthCheckData.java
similarity index 62%
rename from
streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/storage/RunningPipelineElementStorage.java
rename to
streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/model/HealthCheckData.java
index ba1734b844..6a0744d10f 100644
---
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/storage/RunningPipelineElementStorage.java
+++
b/streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/model/HealthCheckData.java
@@ -16,16 +16,15 @@
*
*/
-package org.apache.streampipes.manager.storage;
+package org.apache.streampipes.health.monitoring.model;
-import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
+import org.apache.streampipes.health.monitoring.ResourceProvider;
+import org.apache.streampipes.model.health.ExtensionInstanceHealth;
-import java.util.HashMap;
-import java.util.List;
import java.util.Map;
-public class RunningPipelineElementStorage {
-
- public static Map<String, List<InvocableStreamPipesEntity>>
runningProcessorsAndSinks = new HashMap<>();
-
+public record HealthCheckData(ResourceProvider resourceProvider,
+ ActiveResources activeResources,
+ Map<String, ActiveCoreInstances>
activeCoreInstances,
+ Map<String, ExtensionInstanceHealth>
activeExtensionInstances) {
}
diff --git
a/streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/utils/HealthCheckUtils.java
b/streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/utils/HealthCheckUtils.java
new file mode 100644
index 0000000000..fdec9191bc
--- /dev/null
+++
b/streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/utils/HealthCheckUtils.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.health.monitoring.utils;
+
+import org.apache.streampipes.commons.constants.InstanceIdExtractor;
+import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
+
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.List;
+
+public class HealthCheckUtils {
+
+ public static void addSuccessfulRestoreNotification(List<String>
pipelineNotifications,
+
InvocableStreamPipesEntity pipelineElement) {
+ pipelineNotifications.add(getCurrentDatetime() + "Pipeline element '" +
pipelineElement.getName()
+ + "' was not available and was successfully restored.");
+ }
+
+ public static void addFailedAttemptNotification(List<String>
pipelineNotifications,
+ InvocableStreamPipesEntity
pipelineElement) {
+ pipelineNotifications.add(getCurrentDatetime() + "Pipeline element '" +
pipelineElement.getName()
+ + "' was not available and could not be restored.");
+ }
+
+ private static String getCurrentDatetime() {
+ DateTimeFormatter dtf = DateTimeFormatter.ofPattern("uuuu/MM/dd HH:mm:ss");
+ LocalDateTime now = LocalDateTime.now();
+ return "[" + dtf.format(now) + "] ";
+ }
+
+ public static String extractInstanceId(InvocableStreamPipesEntity
pipelineElement) {
+ return InstanceIdExtractor.extractId(pipelineElement.getElementId());
+ }
+
+
+}
diff --git
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/provider/PipelineElementProvider.java
b/streampipes-model/src/main/java/org/apache/streampipes/model/health/ExtensionInstanceHealth.java
similarity index 69%
rename from
streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/provider/PipelineElementProvider.java
rename to
streampipes-model/src/main/java/org/apache/streampipes/model/health/ExtensionInstanceHealth.java
index 08c37c6742..2adf8a0839 100644
---
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/provider/PipelineElementProvider.java
+++
b/streampipes-model/src/main/java/org/apache/streampipes/model/health/ExtensionInstanceHealth.java
@@ -16,15 +16,10 @@
*
*/
-package org.apache.streampipes.manager.execution.provider;
+package org.apache.streampipes.model.health;
-import org.apache.streampipes.manager.execution.PipelineExecutionInfo;
-import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
-
-import java.util.List;
-
-public interface PipelineElementProvider {
-
- List<InvocableStreamPipesEntity> getProcessorsAndSinks(PipelineExecutionInfo
executionInfo);
+import java.util.Set;
+public record ExtensionInstanceHealth(Set<String> runningAdapterInstanceIds,
+ Set<String>
runningPipelineElementInstanceIds) {
}
diff --git
a/streampipes-model/src/main/java/org/apache/streampipes/model/util/PropertyUtils.java
b/streampipes-model/src/main/java/org/apache/streampipes/model/util/PropertyUtils.java
index 5cf7cba8dd..028982f953 100644
---
a/streampipes-model/src/main/java/org/apache/streampipes/model/util/PropertyUtils.java
+++
b/streampipes-model/src/main/java/org/apache/streampipes/model/util/PropertyUtils.java
@@ -30,10 +30,6 @@ import java.util.Map;
public class PropertyUtils {
- public static Map<String, Object> getRuntimeFormat(EventProperty
eventProperty) {
- return getUntypedRuntimeFormat(eventProperty);
- }
-
public static Map<String, Object> getUntypedRuntimeFormat(EventProperty ep) {
if (ep instanceof EventPropertyPrimitive) {
Map<String, Object> result = new HashMap<>();
diff --git
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/ExtensionServiceExecutions.java
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/ExtensionServiceExecutions.java
index 70bd2e8cef..e4aca6b6c0 100644
---
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/ExtensionServiceExecutions.java
+++
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/ExtensionServiceExecutions.java
@@ -35,7 +35,6 @@ public class ExtensionServiceExecutions {
.socketTimeout(10000);
}
-
private static String getServiceAdminSid() {
return new
SpResourceManager().manageUsers().getServiceAdmin().getPrincipalId();
}
diff --git
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/PipelineExecutionInfo.java
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/PipelineExecutionInfo.java
index 6451131d65..49d3ff72d4 100644
---
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/PipelineExecutionInfo.java
+++
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/PipelineExecutionInfo.java
@@ -67,7 +67,7 @@ public class PipelineExecutionInfo {
return processorsAndSinks;
}
- public void applyPipelineOperationStatus(PipelineOperationStatus status) {
+ public void setPipelineOperationStatus(PipelineOperationStatus status) {
this.pipelineOperationStatus = status;
}
@@ -80,6 +80,6 @@ public class PipelineExecutionInfo {
}
public boolean isOperationSuccessful() {
- return failedServices.size() == 0 && pipelineOperationStatus.isSuccess();
+ return failedServices.isEmpty() && pipelineOperationStatus.isSuccess();
}
}
diff --git
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/PipelineExecutionTaskFactory.java
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/PipelineExecutionTaskFactory.java
index 3fab1d07e6..48a7ba2969 100644
---
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/PipelineExecutionTaskFactory.java
+++
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/PipelineExecutionTaskFactory.java
@@ -20,8 +20,6 @@ package org.apache.streampipes.manager.execution;
import
org.apache.streampipes.manager.execution.http.DetachPipelineElementSubmitter;
import
org.apache.streampipes.manager.execution.http.InvokePipelineElementSubmitter;
-import
org.apache.streampipes.manager.execution.provider.CurrentPipelineElementProvider;
-import
org.apache.streampipes.manager.execution.provider.StoredPipelineElementProvider;
import org.apache.streampipes.manager.execution.task.AfterInvocationTask;
import org.apache.streampipes.manager.execution.task.DiscoverEndpointsTask;
import org.apache.streampipes.manager.execution.task.PipelineExecutionTask;
@@ -42,7 +40,7 @@ public class PipelineExecutionTaskFactory {
new UpdateGroupIdTask(),
new SecretEncryptionTask(SecretProvider.getDecryptionService()),
new DiscoverEndpointsTask(),
- new SubmitRequestTask(new InvokePipelineElementSubmitter(pipeline),
new CurrentPipelineElementProvider()),
+ new SubmitRequestTask(new InvokePipelineElementSubmitter(pipeline)),
new SecretEncryptionTask(SecretProvider.getEncryptionService()),
new AfterInvocationTask(PipelineStatusMessageType.PIPELINE_STARTED),
new StorePipelineStatusTask(true, false)
@@ -52,7 +50,7 @@ public class PipelineExecutionTaskFactory {
public static List<PipelineExecutionTask> makeStopPipelineTasks(Pipeline
pipeline,
boolean
forceStop) {
return List.of(
- new SubmitRequestTask(new DetachPipelineElementSubmitter(pipeline),
new StoredPipelineElementProvider()),
+ new SubmitRequestTask(new DetachPipelineElementSubmitter(pipeline)),
new AfterInvocationTask(PipelineStatusMessageType.PIPELINE_STOPPED),
new StorePipelineStatusTask(false, forceStop)
);
diff --git
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/endpoint/ExtensionsServiceEndpointGenerator.java
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/endpoint/ExtensionsServiceEndpointGenerator.java
index 1a59bc8888..337e19c35d 100644
---
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/endpoint/ExtensionsServiceEndpointGenerator.java
+++
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/endpoint/ExtensionsServiceEndpointGenerator.java
@@ -43,20 +43,23 @@ public class ExtensionsServiceEndpointGenerator implements
IExtensionsServiceEnd
public ExtensionsServiceEndpointGenerator() {}
- public String getEndpointResourceUrl(String appId, SpServiceUrlProvider
spServiceUrlProvider,
+ public String getEndpointResourceUrl(String appId,
+ SpServiceUrlProvider
spServiceUrlProvider,
Set<SpServiceTag> customServiceTags)
throws NoServiceEndpointsAvailableException {
return spServiceUrlProvider
.getInvocationUrl(selectService(appId, spServiceUrlProvider,
customServiceTags), appId);
}
- public String getEndpointBaseUrl(String appId, SpServiceUrlProvider
spServiceUrlProvider,
+ public String getEndpointBaseUrl(String appId,
+ SpServiceUrlProvider spServiceUrlProvider,
Set<SpServiceTag> customServiceTags)
throws NoServiceEndpointsAvailableException {
return selectService(appId, spServiceUrlProvider, customServiceTags);
}
- private String selectService(String appId, SpServiceUrlProvider
spServiceUrlProvider,
+ private String selectService(String appId,
+ SpServiceUrlProvider spServiceUrlProvider,
Set<SpServiceTag> customServiceTags)
throws NoServiceEndpointsAvailableException {
Environment env = Environments.getEnvironment();
@@ -83,7 +86,8 @@ public class ExtensionsServiceEndpointGenerator implements
IExtensionsServiceEnd
"Could not find any matching service endpoints - are all software
components running?");
}
- private List<String> getServiceEndpoints(String appId, SpServiceUrlProvider
spServiceUrlProvider,
+ private List<String> getServiceEndpoints(String appId,
+ SpServiceUrlProvider
spServiceUrlProvider,
Set<SpServiceTag>
customServiceTags) {
return SpServiceDiscovery.getServiceDiscovery()
.getServiceEndpoints(DefaultSpServiceTypes.EXT, true,
diff --git
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineElementSubmitter.java
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/BasePipelineElementSubmitter.java
similarity index 71%
rename from
streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineElementSubmitter.java
rename to
streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/BasePipelineElementSubmitter.java
index 14937b8e56..7462e6f4d9 100644
---
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineElementSubmitter.java
+++
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/BasePipelineElementSubmitter.java
@@ -18,7 +18,7 @@
package org.apache.streampipes.manager.execution.http;
-import org.apache.streampipes.model.api.EndpointSelectable;
+import
org.apache.streampipes.manager.execution.endpoint.ExtensionsServiceEndpointUtils;
import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
import org.apache.streampipes.model.pipeline.Pipeline;
import org.apache.streampipes.model.pipeline.PipelineElementStatus;
@@ -26,14 +26,14 @@ import
org.apache.streampipes.model.pipeline.PipelineOperationStatus;
import java.util.List;
-public abstract class PipelineElementSubmitter {
+public abstract class BasePipelineElementSubmitter {
protected final String pipelineId;
protected final String pipelineName;
protected final PipelineOperationStatus status;
- public PipelineElementSubmitter(Pipeline pipeline) {
+ public BasePipelineElementSubmitter(Pipeline pipeline) {
this.pipelineId = pipeline.getPipelineId();
this.pipelineName = pipeline.getName();
this.status = new PipelineOperationStatus(pipelineId, pipelineName);
@@ -41,7 +41,10 @@ public abstract class PipelineElementSubmitter {
public PipelineOperationStatus submit(List<InvocableStreamPipesEntity>
processorsAndSinks) {
// First, try handling all data processors and sinks
- processorsAndSinks.forEach(g ->
status.addPipelineElementStatus(submitElement(g)));
+ processorsAndSinks.forEach(g -> {
+ var response = submitElement(g);
+ status.addPipelineElementStatus(response);
+ });
applySuccess(processorsAndSinks);
return status;
@@ -60,12 +63,18 @@ public abstract class PipelineElementSubmitter {
}
}
- protected PipelineElementStatus performDetach(EndpointSelectable
pipelineElement) {
- String endpointUrl = pipelineElement.getSelectedEndpointUrl() +
pipelineElement.getDetachPath();
+ protected String getInvocationUrl(InvocableStreamPipesEntity
pipelineElement) {
+ return ExtensionsServiceEndpointUtils
+ .getPipelineElementType(pipelineElement)
+ .getInvocationUrl(pipelineElement.getSelectedEndpointUrl(),
pipelineElement.getAppId());
+ }
+
+ protected PipelineElementStatus performDetach(InvocableStreamPipesEntity
pipelineElement) {
+ String endpointUrl = getInvocationUrl(pipelineElement) +
pipelineElement.getDetachPath();
return new DetachHttpRequest().execute(pipelineElement, endpointUrl,
this.pipelineId);
}
- protected abstract PipelineElementStatus submitElement(EndpointSelectable
pipelineElement);
+ protected abstract PipelineElementStatus
submitElement(InvocableStreamPipesEntity pipelineElement);
protected abstract void onSuccess();
diff --git
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/DetachPipelineElementSubmitter.java
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/DetachPipelineElementSubmitter.java
index 16b8772ca4..816a043c2f 100644
---
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/DetachPipelineElementSubmitter.java
+++
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/DetachPipelineElementSubmitter.java
@@ -18,21 +18,20 @@
package org.apache.streampipes.manager.execution.http;
-import org.apache.streampipes.model.api.EndpointSelectable;
import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
import org.apache.streampipes.model.pipeline.Pipeline;
import org.apache.streampipes.model.pipeline.PipelineElementStatus;
import java.util.List;
-public class DetachPipelineElementSubmitter extends PipelineElementSubmitter {
+public class DetachPipelineElementSubmitter extends
BasePipelineElementSubmitter {
public DetachPipelineElementSubmitter(Pipeline pipeline) {
super(pipeline);
}
@Override
- protected PipelineElementStatus submitElement(EndpointSelectable
pipelineElement) {
+ protected PipelineElementStatus submitElement(InvocableStreamPipesEntity
pipelineElement) {
return performDetach(pipelineElement);
}
diff --git
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/InvokePipelineElementSubmitter.java
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/InvokePipelineElementSubmitter.java
index 986bfe6ead..238214a20f 100644
---
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/InvokePipelineElementSubmitter.java
+++
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/InvokePipelineElementSubmitter.java
@@ -18,7 +18,6 @@
package org.apache.streampipes.manager.execution.http;
-import org.apache.streampipes.model.api.EndpointSelectable;
import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
import org.apache.streampipes.model.pipeline.Pipeline;
import org.apache.streampipes.model.pipeline.PipelineElementStatus;
@@ -30,7 +29,7 @@ import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.Optional;
-public class InvokePipelineElementSubmitter extends PipelineElementSubmitter {
+public class InvokePipelineElementSubmitter extends
BasePipelineElementSubmitter {
private static final Logger LOG =
LoggerFactory.getLogger(InvokePipelineElementSubmitter.class);
@@ -39,9 +38,9 @@ public class InvokePipelineElementSubmitter extends
PipelineElementSubmitter {
}
@Override
- protected PipelineElementStatus submitElement(EndpointSelectable
pipelineElement) {
- String endpointUrl = pipelineElement.getSelectedEndpointUrl();
- return new InvokeHttpRequest().execute(pipelineElement, endpointUrl,
this.pipelineId);
+ protected PipelineElementStatus submitElement(InvocableStreamPipesEntity
pipelineElement) {
+ var invocationUrl = getInvocationUrl(pipelineElement);
+ return new InvokeHttpRequest().execute(pipelineElement, invocationUrl,
this.pipelineId);
}
@Override
diff --git
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/provider/StoredPipelineElementProvider.java
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/provider/StoredPipelineElementProvider.java
deleted file mode 100644
index d3cdda7f98..0000000000
---
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/provider/StoredPipelineElementProvider.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.manager.execution.provider;
-
-import org.apache.streampipes.manager.execution.PipelineExecutionInfo;
-import org.apache.streampipes.manager.storage.RunningPipelineElementStorage;
-import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
-
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * Provides pipeline elements from the cache (for stop actions)
- */
-
-public class StoredPipelineElementProvider implements PipelineElementProvider {
- @Override
- public List<InvocableStreamPipesEntity>
getProcessorsAndSinks(PipelineExecutionInfo executionInfo) {
- if
(RunningPipelineElementStorage.runningProcessorsAndSinks.containsKey(executionInfo.getPipelineId()))
{
- return
RunningPipelineElementStorage.runningProcessorsAndSinks.get(executionInfo.getPipelineId());
- } else {
- return new ArrayList<>();
- }
- }
-
-}
diff --git
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/task/AfterInvocationTask.java
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/task/AfterInvocationTask.java
index 4bd6c2279b..690e09ebde 100644
---
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/task/AfterInvocationTask.java
+++
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/task/AfterInvocationTask.java
@@ -20,8 +20,10 @@ package org.apache.streampipes.manager.execution.task;
import org.apache.streampipes.manager.execution.PipelineExecutionInfo;
import org.apache.streampipes.manager.execution.status.PipelineStatusManager;
-import org.apache.streampipes.manager.storage.RunningPipelineElementStorage;
+import org.apache.streampipes.manager.util.PipelineElementUtils;
import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
+import org.apache.streampipes.model.graph.DataProcessorInvocation;
+import org.apache.streampipes.model.graph.DataSinkInvocation;
import org.apache.streampipes.model.message.PipelineStatusMessage;
import org.apache.streampipes.model.message.PipelineStatusMessageType;
import org.apache.streampipes.model.pipeline.Pipeline;
@@ -45,13 +47,14 @@ public class AfterInvocationTask implements
PipelineExecutionTask {
public void executeTask(Pipeline pipeline,
PipelineExecutionInfo executionInfo) {
var graphs = executionInfo.getProcessorsAndSinks();
- storeInvocationGraphs(pipeline.getPipelineId(), graphs);
+ storeInvocationGraphs(pipeline, graphs);
addPipelineStatus(pipeline);
}
- private void storeInvocationGraphs(String pipelineId,
- List<InvocableStreamPipesEntity> graphs) {
- RunningPipelineElementStorage.runningProcessorsAndSinks.put(pipelineId,
graphs);
+ private void storeInvocationGraphs(Pipeline pipeline,
+ List<InvocableStreamPipesEntity>
pipelineElements) {
+
pipeline.setActions(PipelineElementUtils.filterInvocation(pipelineElements,
DataSinkInvocation.class));
+ pipeline.setSepas(PipelineElementUtils.filterInvocation(pipelineElements,
DataProcessorInvocation.class));
}
private void addPipelineStatus(Pipeline pipeline) {
diff --git
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/task/DiscoverEndpointsTask.java
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/task/DiscoverEndpointsTask.java
index 78ca9ee3ce..a3b6cb5141 100644
---
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/task/DiscoverEndpointsTask.java
+++
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/task/DiscoverEndpointsTask.java
@@ -18,20 +18,17 @@
package org.apache.streampipes.manager.execution.task;
-import
org.apache.streampipes.commons.exceptions.NoServiceEndpointsAvailableException;
import org.apache.streampipes.loadbalance.LoadManager;
import org.apache.streampipes.loadbalance.unit.PipelineElementPartitioner;
import org.apache.streampipes.manager.execution.PipelineExecutionInfo;
-import
org.apache.streampipes.manager.execution.endpoint.ExtensionsServiceEndpointGenerator;
-import
org.apache.streampipes.manager.execution.endpoint.ExtensionsServiceEndpointUtils;
import org.apache.streampipes.model.api.EndpointSelectable;
-import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
import
org.apache.streampipes.model.extensions.svcdiscovery.SpServiceRegistration;
import org.apache.streampipes.model.pipeline.Pipeline;
import org.apache.streampipes.model.pipeline.PipelineElementStatus;
import org.apache.streampipes.model.pipeline.PipelineOperationStatus;
import java.util.List;
+import java.util.Objects;
import java.util.stream.Collectors;
public class DiscoverEndpointsTask implements PipelineExecutionTask {
@@ -40,14 +37,13 @@ public class DiscoverEndpointsTask implements
PipelineExecutionTask {
PipelineExecutionInfo executionInfo) {
for (PipelineElementPartitioner.ResourceUnitWithServices unit :
PipelineElementPartitioner.partitionPipeline(pipeline).getResourceUnits()){
SpServiceRegistration registration =
LoadManager.allocation(unit.getCompatibleServices(), pipeline.getLabels());
- String url = registration.getServiceUrl();
- for (InvocableStreamPipesEntity el :
unit.getResourceUnit().getElements()) {
- try {
- var endpointUrl = getSelectedEndpoint(el, url);
- applyEndpointAndPipeline(pipeline.getPipelineId(), el, endpointUrl);
- } catch (NoServiceEndpointsAvailableException en) {
- executionInfo.addFailedPipelineElement(el);
- }
+ if (Objects.nonNull(registration)) {
+ String endpointBaseUrl = registration.getServiceUrl();
+ unit.getResourceUnit().getElements().forEach(el -> {
+ applyEndpointAndPipeline(pipeline.getPipelineId(), el,
endpointBaseUrl);
+ });
+ } else {
+
unit.getResourceUnit().getElements().forEach(executionInfo::addFailedPipelineElement);
}
}
@@ -62,7 +58,7 @@ public class DiscoverEndpointsTask implements
PipelineExecutionTask {
pipeline.getName(),
"Could not start pipeline " + pipeline.getName() + ".",
pe);
- executionInfo.applyPipelineOperationStatus(status);
+ executionInfo.setPipelineOperationStatus(status);
}
}
@@ -72,19 +68,4 @@ public class DiscoverEndpointsTask implements
PipelineExecutionTask {
pipelineElement.setSelectedEndpointUrl(endpointUrl);
pipelineElement.setCorrespondingPipeline(pipelineId);
}
-
- private String findSelectedEndpoint(InvocableStreamPipesEntity
pipelineElement)
- throws NoServiceEndpointsAvailableException {
- return new ExtensionsServiceEndpointGenerator()
- .getEndpointResourceUrl(
- pipelineElement.getAppId(),
-
ExtensionsServiceEndpointUtils.getPipelineElementType(pipelineElement)
- );
- }
- private String getSelectedEndpoint(InvocableStreamPipesEntity
pipelineElement, String url)
- throws NoServiceEndpointsAvailableException {
- return ExtensionsServiceEndpointUtils
- .getPipelineElementType(pipelineElement)
- .getInvocationUrl(url,pipelineElement.getAppId());
- }
}
diff --git
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/task/SubmitRequestTask.java
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/task/SubmitRequestTask.java
index 4805fe9b47..c78747f1f3 100644
---
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/task/SubmitRequestTask.java
+++
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/task/SubmitRequestTask.java
@@ -19,32 +19,28 @@
package org.apache.streampipes.manager.execution.task;
import org.apache.streampipes.manager.execution.PipelineExecutionInfo;
-import org.apache.streampipes.manager.execution.http.PipelineElementSubmitter;
-import
org.apache.streampipes.manager.execution.provider.PipelineElementProvider;
+import
org.apache.streampipes.manager.execution.http.BasePipelineElementSubmitter;
import org.apache.streampipes.model.pipeline.Pipeline;
public class SubmitRequestTask implements PipelineExecutionTask {
- private final PipelineElementProvider elementProvider;
- private final PipelineElementSubmitter submitter;
+ private final BasePipelineElementSubmitter submitter;
- public SubmitRequestTask(PipelineElementSubmitter submitter,
- PipelineElementProvider elementProvider) {
- this.elementProvider = elementProvider;
+ public SubmitRequestTask(BasePipelineElementSubmitter submitter) {
this.submitter = submitter;
}
@Override
public boolean shouldExecute(PipelineExecutionInfo executionInfo) {
- return executionInfo.getFailedServices().size() == 0;
+ return executionInfo.getFailedServices().isEmpty();
}
@Override
public void executeTask(Pipeline pipeline, PipelineExecutionInfo
executionInfo) {
- var processorsAndSinks =
elementProvider.getProcessorsAndSinks(executionInfo);
+ var processorsAndSinks = executionInfo.getProcessorsAndSinks();
var status = submitter.submit(processorsAndSinks);
- executionInfo.applyPipelineOperationStatus(status);
+ executionInfo.setPipelineOperationStatus(status);
}
}
diff --git
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/health/PipelineElementEndpointHealthCheck.java
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/health/PipelineElementEndpointHealthCheck.java
deleted file mode 100644
index 9bec880a89..0000000000
---
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/health/PipelineElementEndpointHealthCheck.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.manager.health;
-
-import org.apache.streampipes.manager.execution.ExtensionServiceExecutions;
-import org.apache.streampipes.serializers.json.JacksonSerializer;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.List;
-
-public class PipelineElementEndpointHealthCheck {
-
- private static final String InstancePath = "/instances";
-
- private final String endpointUrl;
-
- public PipelineElementEndpointHealthCheck(String endpointUrl) {
- this.endpointUrl = endpointUrl;
- }
-
- public List<String> checkRunningInstances() throws IOException {
- var request =
ExtensionServiceExecutions.extServiceGetRequest(makeRequestUrl());
- return asList(request.execute().returnContent().toString());
- }
-
- private List<String> asList(String json) throws JsonProcessingException {
- return Arrays.asList(JacksonSerializer.getObjectMapper().readValue(json,
String[].class));
- }
-
- private String makeRequestUrl() {
- return endpointUrl + InstancePath;
- }
-}
diff --git
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/health/PipelineHealthCheck.java
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/health/PipelineHealthCheck.java
deleted file mode 100644
index 707b6cce09..0000000000
---
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/health/PipelineHealthCheck.java
+++ /dev/null
@@ -1,261 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.manager.health;
-
-import org.apache.streampipes.commons.constants.InstanceIdExtractor;
-import
org.apache.streampipes.commons.exceptions.NoServiceEndpointsAvailableException;
-import org.apache.streampipes.commons.prometheus.pipelines.PipelinesStats;
-import
org.apache.streampipes.manager.execution.endpoint.ExtensionsServiceEndpointUtils;
-import org.apache.streampipes.manager.execution.http.InvokeHttpRequest;
-import org.apache.streampipes.manager.storage.RunningPipelineElementStorage;
-import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
-import org.apache.streampipes.model.pipeline.Pipeline;
-import org.apache.streampipes.model.pipeline.PipelineHealthStatus;
-import org.apache.streampipes.storage.management.StorageDispatcher;
-import org.apache.streampipes.svcdiscovery.api.model.SpServiceUrlProvider;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.time.LocalDateTime;
-import java.time.format.DateTimeFormatter;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.stream.Collectors;
-
-import static
org.apache.streampipes.manager.pipeline.PipelineManager.getPipeline;
-
-public class PipelineHealthCheck implements Runnable {
-
- private static final Logger LOG =
LoggerFactory.getLogger(PipelineHealthCheck.class);
- private static final int MAX_FAILED_ATTEMPTS = 10;
-
- private static final Map<String, Integer> failedRestartAttempts = new
HashMap<>();
-
- private static final PipelinesStats pipelinesStats = new PipelinesStats();
-
- public PipelineHealthCheck() {
-
- }
-
- public void checkAndRestorePipelineElements() {
- List<Pipeline> allPipelines = getAllPipelines();
- List<Pipeline> runningPipelines = getRunningPipelines(allPipelines);
-
- pipelinesStats.clear();
- pipelinesStats.setAllPipelines(allPipelines.size());
- pipelinesStats.setRunningPipelines(runningPipelines.size());
- pipelinesStats.setStoppedPipelines(pipelinesStats.getAllPipelines()
- - pipelinesStats.getRunningPipelines());
-
- for (Pipeline p : allPipelines) {
- pipelinesStats.updatePipelineHealthState(
- p.getElementId(),
- p.getName(),
- p.getHealthStatus().toString());
-
- pipelinesStats.updatePipelineRunningState(
- p.getElementId(),
- p.getName(),
- p.isRunning());
- }
-
- if (!runningPipelines.isEmpty()) {
- Map<String, List<InvocableStreamPipesEntity>> endpointMap =
generateEndpointMap();
- List<String> allRunningInstances =
findRunningInstances(endpointMap.keySet());
-
- runningPipelines.forEach(pipeline -> {
- AtomicBoolean shouldUpdatePipeline = new AtomicBoolean(false);
- List<String> failedInstances = new ArrayList<>();
- List<String> recoveredInstances = new ArrayList<>();
- List<String> pipelineNotifications = new ArrayList<>();
- List<InvocableStreamPipesEntity> graphs =
RunningPipelineElementStorage.runningProcessorsAndSinks
- .get(pipeline.getPipelineId());
-
- if (Objects.nonNull(graphs)) {
- graphs.forEach(graph -> {
- String instanceId = extractInstanceId(graph);
- if (allRunningInstances.stream()
- .noneMatch(runningInstanceId ->
runningInstanceId.equals(instanceId))) {
- if (shouldRetry(instanceId)) {
- String endpointUrl = graph.getSelectedEndpointUrl();
- shouldUpdatePipeline.set(true);
- boolean success;
- try {
- endpointUrl = findEndpointUrl(graph);
- success = new InvokeHttpRequest()
- .execute(graph, endpointUrl,
pipeline.getPipelineId()).isSuccess();
- } catch (NoServiceEndpointsAvailableException e) {
- success = false;
- }
- if (!success) {
- failedInstances.add(instanceId);
- addFailedAttemptNotification(pipelineNotifications, graph);
- increaseFailedAttempt(instanceId);
- LOG.info("Could not restore pipeline element {} of pipeline
{} ({}/{})",
- graph.getName(), pipeline.getName(),
failedRestartAttempts.get(instanceId),
- MAX_FAILED_ATTEMPTS);
- } else {
- recoveredInstances.add(instanceId);
- addSuccessfulRestoreNotification(pipelineNotifications,
graph);
- resetFailedAttempts(instanceId);
- graph.setSelectedEndpointUrl(endpointUrl);
- LOG.info("Successfully restored pipeline element {} of
pipeline {}",
- graph.getName(), pipeline.getName());
- }
- }
- }
- });
- }
- if (shouldUpdatePipeline.get()) {
- var currentPipeline = getPipeline(pipeline.getPipelineId());
- if (!failedInstances.isEmpty()) {
- currentPipeline.setHealthStatus(PipelineHealthStatus.FAILURE);
- pipelinesStats.failedIncrease();
- } else if (!recoveredInstances.isEmpty()) {
-
currentPipeline.setHealthStatus(PipelineHealthStatus.REQUIRES_ATTENTION);
- pipelinesStats.attentionRequiredIncrease();
- }
- currentPipeline.setPipelineNotifications(pipelineNotifications);
- StorageDispatcher.INSTANCE.getNoSqlStore().getPipelineStorageAPI()
- .updateElement(currentPipeline);
-
pipelinesStats.updatePipelineHealthState(currentPipeline.getElementId(),
currentPipeline.getName(),
- currentPipeline.getHealthStatus().toString());
- }
-
- });
- int healthNum = pipelinesStats.getRunningPipelines() -
pipelinesStats.getFailedPipelines()
- - pipelinesStats.getAttentionRequiredPipelines();
- pipelinesStats.setHealthyPipelines(healthNum);
- pipelinesStats.setElementCount(getElementsCount(allPipelines));
- }
- pipelinesStats.metrics();
- }
-
- private String findEndpointUrl(InvocableStreamPipesEntity graph)
- throws NoServiceEndpointsAvailableException {
- SpServiceUrlProvider serviceUrlProvider =
ExtensionsServiceEndpointUtils.getPipelineElementType(graph);
- return serviceUrlProvider.getInvocationUrl(graph.getSelectedEndpointUrl(),
graph.getAppId());
- }
-
- private boolean shouldRetry(String instanceId) {
- if (!failedRestartAttempts.containsKey(instanceId)) {
- return true;
- } else {
- return failedRestartAttempts.get(instanceId) < MAX_FAILED_ATTEMPTS;
- }
- }
-
- private void resetFailedAttempts(String instanceId) {
- failedRestartAttempts.put(instanceId, 0);
- }
-
- private void increaseFailedAttempt(String instanceId) {
- if (!failedRestartAttempts.containsKey(instanceId)) {
- failedRestartAttempts.put(instanceId, 1);
- } else {
- Integer currentAttempt = failedRestartAttempts.get(instanceId) + 1;
- failedRestartAttempts.put(instanceId, currentAttempt);
- }
- }
-
- private void addSuccessfulRestoreNotification(List<String>
pipelineNotifications,
- InvocableStreamPipesEntity graph) {
- pipelineNotifications.add(getCurrentDatetime() + "Pipeline element '" +
graph.getName()
- + "' was not available and was successfully restored.");
- }
-
- private void addFailedAttemptNotification(List<String> pipelineNotifications,
- InvocableStreamPipesEntity graph) {
- pipelineNotifications.add(getCurrentDatetime() + "Pipeline element '" +
graph.getName()
- + "' was not available and could not be restored.");
- }
-
- private String getCurrentDatetime() {
- DateTimeFormatter dtf = DateTimeFormatter.ofPattern("uuuu/MM/dd HH:mm:ss");
- LocalDateTime now = LocalDateTime.now();
- return "[" + dtf.format(now) + "] ";
- }
-
- private String extractInstanceId(InvocableStreamPipesEntity graph) {
- return InstanceIdExtractor.extractId(graph.getElementId());
- }
-
- private List<String> findRunningInstances(Set<String> endpoints) {
- List<String> allRunningInstances = new ArrayList<>();
- endpoints.forEach(endpoint -> {
- try {
- allRunningInstances
- .addAll(new
PipelineElementEndpointHealthCheck(endpoint).checkRunningInstances());
- } catch (IOException e) {
- LOG.error("Pipeline element endpoint {} is unavailable", endpoint);
- }
- });
-
- return allRunningInstances;
- }
-
- private Map<String, List<InvocableStreamPipesEntity>> generateEndpointMap() {
- Map<String, List<InvocableStreamPipesEntity>> endpointMap = new
HashMap<>();
- RunningPipelineElementStorage.runningProcessorsAndSinks
- .forEach((pipelineId, graphs) -> graphs.forEach(graph ->
addEndpoint(endpointMap, graph)));
-
- return endpointMap;
- }
-
- private void addEndpoint(Map<String, List<InvocableStreamPipesEntity>>
endpointMap,
- InvocableStreamPipesEntity graph) {
- String selectedEndpoint = graph.getSelectedEndpointUrl();
- if (!endpointMap.containsKey(selectedEndpoint)) {
- endpointMap.put(selectedEndpoint, new ArrayList<>());
- }
- List<InvocableStreamPipesEntity> existingGraphs =
endpointMap.get(selectedEndpoint);
- existingGraphs.add(graph);
- endpointMap.put(selectedEndpoint, existingGraphs);
- }
-
- @Override
- public void run() {
- try {
- this.checkAndRestorePipelineElements();
- } catch (Exception e) {
- LOG.error("Error while checking and restoring pipeline elements", e);
- }
-
- }
-
- private List<Pipeline> getRunningPipelines(List<Pipeline> allPipelines) {
- return
allPipelines.stream().filter(Pipeline::isRunning).collect(Collectors.toList());
-
- }
-
- private List<Pipeline> getAllPipelines() {
- return
StorageDispatcher.INSTANCE.getNoSqlStore().getPipelineStorageAPI().findAll();
- }
-
- private int getElementsCount(List<Pipeline> allPipelines) {
- return allPipelines.stream().mapToInt(pipeline ->
pipeline.getActions().size()).sum();
-
- }
-}
diff --git
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/storage/PipelineStorageService.java
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/storage/PipelineStorageService.java
index ebd3d99601..09709fac7b 100644
---
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/storage/PipelineStorageService.java
+++
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/storage/PipelineStorageService.java
@@ -68,10 +68,6 @@ public class PipelineStorageService {
SecretProvider.getEncryptionService().apply(graphs);
}
- private void encryptSecrets(Pipeline pipeline) {
- SecretProvider.getEncryptionService().apply(pipeline);
- }
-
private <T> List<T> filter(List<InvocableStreamPipesEntity> graphs, Class<T>
clazz) {
return graphs
.stream()
diff --git
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/provider/CurrentPipelineElementProvider.java
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/util/PipelineElementUtils.java
similarity index 66%
rename from
streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/provider/CurrentPipelineElementProvider.java
rename to
streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/util/PipelineElementUtils.java
index 9af2aa5f62..de255d8d7f 100644
---
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/provider/CurrentPipelineElementProvider.java
+++
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/util/PipelineElementUtils.java
@@ -16,21 +16,24 @@
*
*/
-package org.apache.streampipes.manager.execution.provider;
+package org.apache.streampipes.manager.util;
-import org.apache.streampipes.manager.execution.PipelineExecutionInfo;
import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
import java.util.List;
-/**
- * Provides pipeline elements from the pipeline of interest (for start actions)
- */
+public class PipelineElementUtils {
-public class CurrentPipelineElementProvider implements PipelineElementProvider
{
- @Override
- public List<InvocableStreamPipesEntity>
getProcessorsAndSinks(PipelineExecutionInfo executionInfo) {
- return executionInfo.getProcessorsAndSinks();
- }
+ public static <T extends InvocableStreamPipesEntity> List<T>
filterInvocation(
+ List<InvocableStreamPipesEntity> pipelineElements,
+ Class<T> clazz) {
+ if (pipelineElements == null || clazz == null) {
+ return List.of();
+ }
+ return pipelineElements.stream()
+ .filter(clazz::isInstance)
+ .map(clazz::cast)
+ .toList();
+ }
}
diff --git
a/streampipes-rest-extensions/src/main/java/org/apache/streampipes/rest/extensions/monitoring/HealthCheckResource.java
b/streampipes-rest-extensions/src/main/java/org/apache/streampipes/rest/extensions/monitoring/HealthCheckResource.java
new file mode 100644
index 0000000000..959c869a08
--- /dev/null
+++
b/streampipes-rest-extensions/src/main/java/org/apache/streampipes/rest/extensions/monitoring/HealthCheckResource.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.rest.extensions.monitoring;
+
+import org.apache.streampipes.commons.constants.InstanceIdExtractor;
+import
org.apache.streampipes.extensions.management.connect.AdapterWorkerManagement;
+import org.apache.streampipes.extensions.management.init.DeclarersSingleton;
+import
org.apache.streampipes.extensions.management.init.RunningAdapterInstances;
+import org.apache.streampipes.extensions.management.init.RunningInstances;
+import org.apache.streampipes.model.base.NamedStreamPipesEntity;
+import org.apache.streampipes.model.health.ExtensionInstanceHealth;
+import org.apache.streampipes.rest.extensions.AbstractExtensionsResource;
+
+import org.springframework.http.MediaType;
+import org.springframework.http.ResponseEntity;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RestController;
+
+import java.util.stream.Collectors;
+
+@RestController
+@RequestMapping("health")
+public class HealthCheckResource extends AbstractExtensionsResource {
+
+ private final AdapterWorkerManagement adapterManagement = new
AdapterWorkerManagement(
+ RunningAdapterInstances.INSTANCE,
+ DeclarersSingleton.getInstance()
+ );
+
+ @GetMapping(produces = MediaType.APPLICATION_JSON_VALUE)
+ public ResponseEntity<ExtensionInstanceHealth> getRunningInstances() {
+
+ var runningAdapterInstances =
adapterManagement.getAllRunningAdapterInstances()
+ .stream()
+ .map(NamedStreamPipesEntity::getElementId)
+ .collect(Collectors.toSet());
+
+ var runningPipelineElementInstances =
RunningInstances.INSTANCE.getRunningInstanceIds()
+ .stream()
+ .map(InstanceIdExtractor::extractId)
+ .collect(Collectors.toSet());
+
+ var instanceHealth = new ExtensionInstanceHealth(
+ runningAdapterInstances,
+ runningPipelineElementInstances
+ );
+
+ return ok(instanceHealth);
+ }
+}
diff --git a/streampipes-rest/pom.xml b/streampipes-rest/pom.xml
index ae360fffc6..6650568127 100644
--- a/streampipes-rest/pom.xml
+++ b/streampipes-rest/pom.xml
@@ -55,6 +55,11 @@
<artifactId>streampipes-data-export</artifactId>
<version>0.99.0-SNAPSHOT</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.streampipes</groupId>
+ <artifactId>streampipes-health-monitoring</artifactId>
+ <version>0.99.0-SNAPSHOT</version>
+ </dependency>
<dependency>
<groupId>org.apache.streampipes</groupId>
<artifactId>streampipes-measurement-units</artifactId>
diff --git
a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/admin/MigrationResource.java
b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/admin/MigrationResource.java
index 58baea3101..437eeef73f 100644
---
a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/admin/MigrationResource.java
+++
b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/admin/MigrationResource.java
@@ -19,9 +19,9 @@
package org.apache.streampipes.rest.impl.admin;
import
org.apache.streampipes.connect.management.management.AdapterMigrationManager;
+import org.apache.streampipes.health.monitoring.ServiceRegistrationManager;
import org.apache.streampipes.manager.health.CoreInitialInstallationProgress;
import org.apache.streampipes.manager.health.CoreServiceStatusManager;
-import org.apache.streampipes.manager.health.ServiceRegistrationManager;
import
org.apache.streampipes.manager.migration.PipelineElementMigrationManager;
import
org.apache.streampipes.model.extensions.svcdiscovery.SpServiceRegistration;
import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceStatus;
diff --git
a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/admin/ServiceRegistrationResource.java
b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/admin/ServiceRegistrationResource.java
index 69b6e3d5ec..fc243d5d56 100644
---
a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/admin/ServiceRegistrationResource.java
+++
b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/admin/ServiceRegistrationResource.java
@@ -17,7 +17,7 @@
*/
package org.apache.streampipes.rest.impl.admin;
-import org.apache.streampipes.manager.health.ServiceRegistrationManager;
+import org.apache.streampipes.health.monitoring.ServiceRegistrationManager;
import
org.apache.streampipes.model.extensions.svcdiscovery.SpServiceRegistration;
import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceStatus;
import org.apache.streampipes.model.message.Notifications;
diff --git
a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/AbstractAdapterResource.java
b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/AbstractAdapterResource.java
index dcdd800b6a..36362febe6 100644
---
a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/AbstractAdapterResource.java
+++
b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/AbstractAdapterResource.java
@@ -30,6 +30,10 @@ public class AbstractAdapterResource<T> extends
AbstractAuthGuardedRestResource
this.managementService = managementServiceSupplier.get();
}
+ // no management service provided
+ public AbstractAdapterResource() {
+ }
+
/**
* required by Spring expression
*/
diff --git
a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/RuntimeResolvableResource.java
b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/RuntimeResolvableResource.java
index cf30a41758..d5d1cd1942 100644
---
a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/RuntimeResolvableResource.java
+++
b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/RuntimeResolvableResource.java
@@ -21,17 +21,13 @@ package org.apache.streampipes.rest.impl.connect;
import
org.apache.streampipes.commons.exceptions.NoServiceEndpointsAvailableException;
import org.apache.streampipes.commons.exceptions.SpConfigurationException;
import org.apache.streampipes.commons.exceptions.connect.AdapterException;
-import org.apache.streampipes.commons.prometheus.adapter.AdapterMetricsManager;
-import
org.apache.streampipes.connect.management.management.WorkerAdministrationManagement;
import org.apache.streampipes.connect.management.management.WorkerRestClient;
import
org.apache.streampipes.manager.api.extensions.IExtensionsServiceEndpointGenerator;
import
org.apache.streampipes.manager.execution.endpoint.ExtensionsServiceEndpointGenerator;
import org.apache.streampipes.model.monitoring.SpLogMessage;
import org.apache.streampipes.model.runtime.RuntimeOptionsRequest;
import org.apache.streampipes.model.runtime.RuntimeOptionsResponse;
-import org.apache.streampipes.resource.management.SpResourceManager;
import org.apache.streampipes.resource.management.secret.SecretProvider;
-import org.apache.streampipes.storage.management.StorageDispatcher;
import org.apache.streampipes.svcdiscovery.api.model.SpServiceUrlProvider;
import org.slf4j.Logger;
@@ -47,20 +43,14 @@ import
org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/api/v2/connect/master/resolvable")
-public class RuntimeResolvableResource extends
AbstractAdapterResource<WorkerAdministrationManagement> {
+public class RuntimeResolvableResource extends AbstractAdapterResource<Void> {
private static final Logger LOG =
LoggerFactory.getLogger(RuntimeResolvableResource.class);
private final IExtensionsServiceEndpointGenerator endpointGenerator;
public RuntimeResolvableResource() {
- super(() -> new WorkerAdministrationManagement(
- StorageDispatcher.INSTANCE.getNoSqlStore()
- .getAdapterInstanceStorage(),
- AdapterMetricsManager.INSTANCE.getAdapterMetrics(),
- new SpResourceManager().manageAdapters(),
- new SpResourceManager().manageDataStreams()
- ));
+ super();
this.endpointGenerator = new ExtensionsServiceEndpointGenerator();
}
diff --git a/streampipes-service-core/pom.xml b/streampipes-service-core/pom.xml
index 6f4b3b19bc..bdbc44bbaa 100644
--- a/streampipes-service-core/pom.xml
+++ b/streampipes-service-core/pom.xml
@@ -64,6 +64,11 @@
<artifactId>streampipes-connect-transformer-js</artifactId>
<version>0.99.0-SNAPSHOT</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.streampipes</groupId>
+ <artifactId>streampipes-health-monitoring</artifactId>
+ <version>0.99.0-SNAPSHOT</version>
+ </dependency>
<dependency>
<groupId>org.apache.streampipes</groupId>
<artifactId>streampipes-messaging-jms</artifactId>
diff --git
a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/PostStartupTask.java
b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/PostStartupTask.java
index 06c7257bf0..80000abf7f 100644
---
a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/PostStartupTask.java
+++
b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/PostStartupTask.java
@@ -19,15 +19,19 @@
package org.apache.streampipes.service.core;
import org.apache.streampipes.commons.prometheus.adapter.AdapterMetricsManager;
+import
org.apache.streampipes.connect.management.management.AdapterMasterManagement;
import
org.apache.streampipes.connect.management.management.WorkerAdministrationManagement;
+import org.apache.streampipes.health.monitoring.ExtensionHealthCheck;
+import org.apache.streampipes.health.monitoring.PostStartupRecovery;
+import org.apache.streampipes.health.monitoring.ResourceProvider;
+import org.apache.streampipes.health.monitoring.ServiceHealthCheck;
import org.apache.streampipes.manager.execution.PipelineExecutor;
-import org.apache.streampipes.manager.health.ServiceHealthCheck;
import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceTagPrefix;
import org.apache.streampipes.model.pipeline.Pipeline;
import org.apache.streampipes.model.pipeline.PipelineOperationStatus;
import org.apache.streampipes.resource.management.SpResourceManager;
+import org.apache.streampipes.storage.api.INoSqlStorage;
import org.apache.streampipes.storage.api.IPipelineStorage;
-import org.apache.streampipes.storage.couchdb.CouchDbStorageManager;
import org.apache.streampipes.storage.management.StorageDispatcher;
import org.slf4j.Logger;
@@ -51,40 +55,56 @@ public class PostStartupTask implements Runnable {
private final Map<String, Integer> failedPipelines = new HashMap<>();
private final ScheduledExecutorService executorService;
private final WorkerAdministrationManagement workerAdministrationManagement;
+ private final PostStartupRecovery postStartupRecovery;
+
+ private final INoSqlStorage storage =
StorageDispatcher.INSTANCE.getNoSqlStore();
public PostStartupTask(IPipelineStorage pipelineStorage) {
this.pipelineStorage = pipelineStorage;
this.executorService = Executors.newSingleThreadScheduledExecutor();
+ var resourceManager = new SpResourceManager();
this.workerAdministrationManagement = new WorkerAdministrationManagement(
- StorageDispatcher.INSTANCE.getNoSqlStore()
- .getAdapterInstanceStorage(),
- AdapterMetricsManager.INSTANCE.getAdapterMetrics(),
- new SpResourceManager().manageAdapters(),
- new SpResourceManager().manageDataStreams()
+ storage.getAdapterDescriptionStorage(),
+ storage.getPermissionStorage(),
+ resourceManager.manageUsers(),
+ resourceManager.managePermissions());
+ this.postStartupRecovery = new PostStartupRecovery(
+ new ExtensionHealthCheck(
+ new ResourceProvider(
+
StorageDispatcher.INSTANCE.getNoSqlStore().getPipelineStorageAPI(),
+
StorageDispatcher.INSTANCE.getNoSqlStore().getAdapterInstanceStorage(),
+ new AdapterMasterManagement(
+
StorageDispatcher.INSTANCE.getNoSqlStore().getAdapterInstanceStorage(),
+ new SpResourceManager().manageAdapters(),
+ new SpResourceManager().manageDataStreams(),
+ AdapterMetricsManager.INSTANCE.getAdapterMetrics()
+ )
+ )
+ )
);
}
@Override
public void run() {
- new ServiceHealthCheck().run();
+ new ServiceHealthCheck(storage.getExtensionsServiceStorage()).run();
performAdapterAssetUpdate();
startAllPreviouslyStoppedPipelines();
- startAdapters();
+ runHealthCheckOnce();
}
private void performAdapterAssetUpdate() {
- var installedAppIds =
CouchDbStorageManager.INSTANCE.getExtensionsServiceStorage()
- .findAll()
- .stream()
- .flatMap(config ->
config.getTags()
-
.stream())
- .filter(tag ->
tag.getPrefix() == SpServiceTagPrefix.ADAPTER)
- .toList();
+ var installedAppIds = storage.getExtensionsServiceStorage()
+ .findAll()
+ .stream()
+ .flatMap(config -> config.getTags()
+ .stream())
+ .filter(tag -> tag.getPrefix() == SpServiceTagPrefix.ADAPTER)
+ .toList();
workerAdministrationManagement.performAdapterMigrations(installedAppIds);
}
- private void startAdapters() {
- workerAdministrationManagement.checkAndRestore(0);
+ private void runHealthCheckOnce() {
+ postStartupRecovery.checkAndRestore(0);
}
private void startAllPreviouslyStoppedPipelines() {
@@ -165,9 +185,6 @@ public class PostStartupTask implements Runnable {
}
private IPipelineStorage getPipelineStorage() {
- return StorageDispatcher
- .INSTANCE
- .getNoSqlStore()
- .getPipelineStorageAPI();
+ return storage.getPipelineStorageAPI();
}
}
diff --git
a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/StreamPipesCoreApplication.java
b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/StreamPipesCoreApplication.java
index 03e02193b4..643ba3f7ab 100644
---
a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/StreamPipesCoreApplication.java
+++
b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/StreamPipesCoreApplication.java
@@ -19,18 +19,18 @@ package org.apache.streampipes.service.core;
import org.apache.streampipes.commons.environment.Environments;
import org.apache.streampipes.commons.prometheus.adapter.AdapterMetricsManager;
-import org.apache.streampipes.connect.management.health.AdapterHealthCheck;
import
org.apache.streampipes.connect.management.management.AdapterMasterManagement;
import org.apache.streampipes.connect.transformer.api.TransformationEngine;
import org.apache.streampipes.connect.transformer.api.TransformationEngines;
import org.apache.streampipes.connect.transformer.groovy.GroovyScriptEngine;
import org.apache.streampipes.connect.transformer.js.GraalJsScriptEngine;
+import org.apache.streampipes.health.monitoring.ExtensionHealthCheck;
+import org.apache.streampipes.health.monitoring.ResourceProvider;
+import org.apache.streampipes.health.monitoring.ServiceHealthCheck;
import org.apache.streampipes.loadbalance.LoadManager;
import
org.apache.streampipes.loadbalance.pipeline.ExtensionsServiceLogExecutor;
import org.apache.streampipes.manager.health.CoreInitialInstallationProgress;
import org.apache.streampipes.manager.health.CoreServiceStatusManager;
-import org.apache.streampipes.manager.health.PipelineHealthCheck;
-import org.apache.streampipes.manager.health.ServiceHealthCheck;
import org.apache.streampipes.manager.pipeline.PipelineManager;
import org.apache.streampipes.manager.setup.AutoInstallation;
import org.apache.streampipes.manager.setup.StreamPipesEnvChecker;
@@ -56,7 +56,6 @@ import
org.apache.streampipes.storage.couchdb.impl.UserStorage;
import org.apache.streampipes.storage.couchdb.utils.CouchDbViewGenerator;
import org.apache.streampipes.storage.management.StorageDispatcher;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
@@ -155,14 +154,19 @@ public class StreamPipesCoreApplication extends
StreamPipesServiceBase {
TimeUnit.MILLISECONDS);
scheduleHealthChecks(env.getHealthCheckIntervalInMillis().getValueOrDefault(),
List
- .of(new ServiceHealthCheck(), new PipelineHealthCheck(),
- new AdapterHealthCheck(
-
StorageDispatcher.INSTANCE.getNoSqlStore().getAdapterInstanceStorage(),
- new AdapterMasterManagement(
-
StorageDispatcher.INSTANCE.getNoSqlStore().getAdapterInstanceStorage(),
- new SpResourceManager().manageAdapters(),
- new SpResourceManager().manageDataStreams(),
- AdapterMetricsManager.INSTANCE.getAdapterMetrics()))));
+ .of(new
ServiceHealthCheck(StorageDispatcher.INSTANCE.getNoSqlStore().getExtensionsServiceStorage()),
+ new ExtensionHealthCheck(
+ new ResourceProvider(
+
StorageDispatcher.INSTANCE.getNoSqlStore().getPipelineStorageAPI(),
+
StorageDispatcher.INSTANCE.getNoSqlStore().getAdapterInstanceStorage(),
+ new AdapterMasterManagement(
+
StorageDispatcher.INSTANCE.getNoSqlStore().getAdapterInstanceStorage(),
+ new SpResourceManager().manageAdapters(),
+ new SpResourceManager().manageDataStreams(),
+ AdapterMetricsManager.INSTANCE.getAdapterMetrics()
+ )
+ )
+ )));
var logFetchInterval =
env.getLogFetchIntervalInMillis().getValueOrDefault();
LOG.info("Extensions logs will be fetched every {} milliseconds",
logFetchInterval);