This is an automated email from the ASF dual-hosted git repository. riemer pushed a commit to branch isolate-extensions-http-request in repository https://gitbox.apache.org/repos/asf/streampipes.git
commit 5249dc5e8d1467094189415117db0520063b44df Author: Dominik Riemer <[email protected]> AuthorDate: Wed Mar 11 22:37:02 2026 +0100 refactor: Isolate core-ext HTTP requests into single interface --- .../management/compact/AdapterGenerationSteps.java | 8 +- .../management/AdapterMasterManagement.java | 9 +- .../management/AdapterMigrationManager.java | 10 +- .../management/DescriptionManagement.java | 12 +- .../management/management/GuessManagement.java | 22 +-- .../management/management/WorkerRestClient.java | 126 +++++++------ .../management/AdapterMasterManagementTest.java | 12 +- .../export/resolver/AdapterResolver.java | 5 +- .../health/monitoring/ExtensionHealthCheck.java | 11 +- .../ExtensionInstanceAvailabilityCheck.java | 17 +- .../health/monitoring/ServiceHealthCheck.java | 12 +- .../ExtensionServiceOperationResult.java | 52 ++++++ .../extensions/ExtensionServiceRequestManager.java | 70 ++++++++ .../streampipes/manager/assets/AssetFetcher.java | 30 +++- .../execution/ExtensionServiceExecutions.java | 75 -------- .../HttpExtensionServiceRequestManager.java | 197 +++++++++++++++++++++ .../manager/execution/http/DetachHttpRequest.java | 10 +- .../manager/execution/http/InvokeHttpRequest.java | 14 +- .../execution/http/PipelineElementHttpRequest.java | 41 +++-- .../manager/extensions/ExtensionItemInstaller.java | 9 +- .../manager/function/FunctionManager.java | 25 +-- .../CustomTransformOutputSchemaGenerator.java | 23 +-- .../migration/AbstractMigrationManager.java | 18 +- .../migration/PipelineElementMigrationManager.java | 5 +- .../remote/ContainerProvidedOptionsHandler.java | 19 +- .../manager/setup/AutoInstallation.java | 11 +- .../manager/setup/ExtensionsInstallationTask.java | 9 +- .../manager/setup/InstallationConfiguration.java | 11 +- .../setup/PipelineElementInstallationStep.java | 9 +- .../apache/streampipes/rest/ResetManagement.java | 10 +- .../rest/impl/ContainerProvidedOptions.java | 9 +- .../streampipes/rest/impl/ResetResource.java | 13 +- .../impl/admin/ExtensionsInstallationResource.java | 11 +- .../rest/impl/admin/MigrationResource.java | 19 +- .../rest/impl/connect/AdapterResource.java | 6 +- .../rest/impl/connect/CompactAdapterResource.java | 18 +- .../rest/impl/connect/DescriptionResource.java | 5 +- .../rest/impl/connect/GuessResource.java | 6 +- .../impl/connect/RuntimeResolvableResource.java | 6 +- .../core/ExtensionServiceRequestConfiguration.java | 31 ++-- .../streampipes/service/core/PostStartupTask.java | 16 +- .../service/core/StreamPipesCoreApplication.java | 30 +++- 42 files changed, 736 insertions(+), 316 deletions(-) diff --git a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/compact/AdapterGenerationSteps.java b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/compact/AdapterGenerationSteps.java index 0156f390a8..e8aeca17d9 100644 --- a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/compact/AdapterGenerationSteps.java +++ b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/compact/AdapterGenerationSteps.java @@ -28,13 +28,19 @@ import java.util.List; public class AdapterGenerationSteps { + private final GuessManagement guessManagement; + + public AdapterGenerationSteps(GuessManagement guessManagement) { + this.guessManagement = guessManagement; + } + public List<AdapterModelGenerator> getGenerators() { return List.of( new AdapterBasicsGenerator(), new AdapterConfigGenerator(), new AdapterSchemaGenerator( new SchemaMetadataEnricher(), - new GuessManagement() + guessManagement ) ); } diff --git a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterMasterManagement.java b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterMasterManagement.java index dfda65d629..bfc294c0c3 100644 --- a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterMasterManagement.java +++ b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterMasterManagement.java @@ -53,15 +53,18 @@ public class AdapterMasterManagement { private final AdapterResourceManager adapterResourceManager; private final DataStreamResourceManager dataStreamResourceManager; + private final WorkerRestClient workerRestClient; public AdapterMasterManagement(IAdapterStorage adapterInstanceStorage, AdapterResourceManager adapterResourceManager, DataStreamResourceManager dataStreamResourceManager, - AdapterMetrics adapterMetrics) { + AdapterMetrics adapterMetrics, + WorkerRestClient workerRestClient) { this.adapterInstanceStorage = adapterInstanceStorage; this.adapterMetrics = adapterMetrics; this.adapterResourceManager = adapterResourceManager; this.dataStreamResourceManager = dataStreamResourceManager; + this.workerRestClient = workerRestClient; } public void addAdapter(AdapterDescription adapterDescription, String adapterId, @@ -142,7 +145,7 @@ public class AdapterMasterManagement { AdapterDescription ad = adapterInstanceStorage.getElementById(elementId); try { try { - WorkerRestClient.stopStreamAdapter(ad.getSelectedEndpointUrl(), ad); + workerRestClient.stopStreamAdapter(ad.getSelectedEndpointUrl(), ad); } catch (AdapterException e) { if (!forceStop) { throw new AdapterException("Could not stop adapter", e); @@ -182,7 +185,7 @@ public class AdapterMasterManagement { adapterInstanceStorage.updateElement(ad); // Invoke adapter instance - WorkerRestClient.invokeStreamAdapter(baseUrl, elementId); + workerRestClient.invokeStreamAdapter(baseUrl, elementId); // register the adapter at the metrics manager so that the AdapterHealthCheck // can send metrics diff --git a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterMigrationManager.java b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterMigrationManager.java index 753c78f0d3..318d4501a6 100644 --- a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterMigrationManager.java +++ b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterMigrationManager.java @@ -19,6 +19,7 @@ package org.apache.streampipes.connect.management.management; import org.apache.streampipes.commons.exceptions.connect.AdapterException; +import org.apache.streampipes.manager.api.extensions.ExtensionServiceRequestManager; import org.apache.streampipes.manager.migration.AbstractMigrationManager; import org.apache.streampipes.manager.migration.IMigrationHandler; import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceRegistration; @@ -38,11 +39,16 @@ public class AdapterMigrationManager extends AbstractMigrationManager implements private final IAdapterStorage adapterStorage; private final IAdapterStorage adapterDescriptionStorage; + private final WorkerRestClient workerRestClient; public AdapterMigrationManager(IAdapterStorage adapterStorage, - IAdapterStorage adapterDescriptionStorage) { + IAdapterStorage adapterDescriptionStorage, + WorkerRestClient workerRestClient, + ExtensionServiceRequestManager extensionServiceRequestManager) { + super(extensionServiceRequestManager); this.adapterStorage = adapterStorage; this.adapterDescriptionStorage = adapterDescriptionStorage; + this.workerRestClient = workerRestClient; } @Override @@ -96,7 +102,7 @@ public class AdapterMigrationManager extends AbstractMigrationManager implements migrationResult.element().getElementId() ); try { - WorkerRestClient.stopStreamAdapter(extensionsServiceConfig.getServiceUrl(), adapterDescription); + workerRestClient.stopStreamAdapter(extensionsServiceConfig.getServiceUrl(), adapterDescription); } catch (AdapterException e) { LOG.error("Stopping adapter failed: {}", StringUtils.join(e.getStackTrace(), "\n")); } diff --git a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/DescriptionManagement.java b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/DescriptionManagement.java index c52f0e1736..d06730e6a8 100644 --- a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/DescriptionManagement.java +++ b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/DescriptionManagement.java @@ -29,6 +29,12 @@ import java.util.Optional; public class DescriptionManagement { + private final WorkerRestClient workerRestClient; + + public DescriptionManagement(WorkerRestClient workerRestClient) { + this.workerRestClient = workerRestClient; + } + public List<AdapterDescription> getAdapters() { IAdapterStorage adapterStorage = StorageDispatcher.INSTANCE.getNoSqlStore().getAdapterDescriptionStorage(); return adapterStorage.findAll(); @@ -51,15 +57,15 @@ public class DescriptionManagement { } public String getAssets(String baseUrl) throws AdapterException { - return WorkerRestClient.getAssets(baseUrl); + return workerRestClient.getAssets(baseUrl); } public byte[] getIconAsset(String baseUrl) throws AdapterException { - return WorkerRestClient.getIconAsset(baseUrl); + return workerRestClient.getIconAsset(baseUrl); } public String getDocumentationAsset(String baseUrl) throws AdapterException { - return WorkerRestClient.getDocumentationAsset(baseUrl); + return workerRestClient.getDocumentationAsset(baseUrl); } private boolean isAdapterUsed(AdapterDescription adapter) { diff --git a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/GuessManagement.java b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/GuessManagement.java index 62adce519e..693e6e7281 100644 --- a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/GuessManagement.java +++ b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/GuessManagement.java @@ -27,9 +27,8 @@ import org.apache.streampipes.connect.transformer.api.TransformationEngines; import org.apache.streampipes.connect.transformer.api.exception.ScriptCompilationException; import org.apache.streampipes.connect.transformer.api.exception.ScriptExecutionException; import org.apache.streampipes.extensions.api.connect.exception.WorkerAdapterException; +import org.apache.streampipes.manager.api.extensions.ExtensionServiceRequestManager; import org.apache.streampipes.manager.api.extensions.IExtensionsServiceEndpointGenerator; -import org.apache.streampipes.manager.execution.ExtensionServiceExecutions; -import org.apache.streampipes.manager.execution.endpoint.ExtensionsServiceEndpointGenerator; import org.apache.streampipes.model.connect.adapter.AdapterDescription; import org.apache.streampipes.model.connect.guess.SampleData; import org.apache.streampipes.model.monitoring.SpLogMessage; @@ -40,7 +39,6 @@ import org.apache.streampipes.svcdiscovery.api.model.SpServiceUrlProvider; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.http.HttpStatus; -import org.apache.http.util.EntityUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,11 +50,14 @@ import java.util.Map; public class GuessManagement { private static final Logger LOG = LoggerFactory.getLogger(GuessManagement.class); + private final ExtensionServiceRequestManager extensionRequestManager; private final IExtensionsServiceEndpointGenerator endpointGenerator; private final ObjectMapper objectMapper; - public GuessManagement() { - this.endpointGenerator = new ExtensionsServiceEndpointGenerator(); + public GuessManagement(IExtensionsServiceEndpointGenerator endpointGenerator, + ExtensionServiceRequestManager extensionRequestManager) { + this.endpointGenerator = endpointGenerator; + this.extensionRequestManager = extensionRequestManager; this.objectMapper = JacksonSerializer.getObjectMapper(); } @@ -83,15 +84,10 @@ public class GuessManagement { LOG.debug("Calling get get sample data at: {}", workerUrl); - var httpResponse = ExtensionServiceExecutions - .extServicePostRequest(workerUrl, adapterDescriptionString) - .execute() - .returnResponse(); + var response = extensionRequestManager.requestSampleData(workerUrl, adapterDescriptionString); + var responseString = response.responseBody(); - var responseString = EntityUtils.toString(httpResponse.getEntity()); - - if (httpResponse.getStatusLine() - .getStatusCode() == HttpStatus.SC_OK) { + if (response.statusCode() == HttpStatus.SC_OK) { return objectMapper.readValue(responseString, SampleData.class); } else { var exception = objectMapper.readValue(responseString, SpLogMessage.class); diff --git a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/WorkerRestClient.java b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/WorkerRestClient.java index 79f204aa6b..5294d5a640 100644 --- a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/WorkerRestClient.java +++ b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/WorkerRestClient.java @@ -22,7 +22,8 @@ package org.apache.streampipes.connect.management.management; import org.apache.streampipes.commons.exceptions.SpConfigurationException; import org.apache.streampipes.commons.exceptions.connect.AdapterException; import org.apache.streampipes.connect.management.util.WorkerPaths; -import org.apache.streampipes.manager.execution.ExtensionServiceExecutions; +import org.apache.streampipes.manager.api.extensions.ExtensionServiceOperationResult; +import org.apache.streampipes.manager.api.extensions.ExtensionServiceRequestManager; import org.apache.streampipes.model.connect.adapter.AdapterDescription; import org.apache.streampipes.model.runtime.RuntimeOptionsRequest; import org.apache.streampipes.model.runtime.RuntimeOptionsResponse; @@ -34,15 +35,11 @@ import org.apache.streampipes.storage.couchdb.impl.connect.AdapterInstanceStorag import org.apache.streampipes.storage.management.StorageDispatcher; import com.fasterxml.jackson.databind.ObjectMapper; -import org.apache.commons.io.IOUtils; -import org.apache.http.HttpResponse; import org.apache.http.HttpStatus; -import org.apache.http.client.fluent.Request; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; -import java.nio.charset.StandardCharsets; import java.util.List; /** @@ -51,9 +48,14 @@ import java.util.List; public class WorkerRestClient { private static final Logger LOG = LoggerFactory.getLogger(WorkerRestClient.class); + private final ExtensionServiceRequestManager requestManager; - public static void invokeStreamAdapter(String baseUrl, - String elementId) throws AdapterException { + public WorkerRestClient(ExtensionServiceRequestManager requestManager) { + this.requestManager = requestManager; + } + + public void invokeStreamAdapter(String baseUrl, + String elementId) throws AdapterException { var adapterStreamDescription = getAndDecryptAdapter(elementId); var url = baseUrl + WorkerPaths.getStreamInvokePath(); @@ -61,8 +63,8 @@ public class WorkerRestClient { updateStreamAdapterStatus(adapterStreamDescription.getElementId(), true); } - public static void stopStreamAdapter(String baseUrl, - AdapterDescription adapterStreamDescription) throws AdapterException { + public void stopStreamAdapter(String baseUrl, + AdapterDescription adapterStreamDescription) throws AdapterException { String url = baseUrl + WorkerPaths.getStreamStopPath(); var ad = getAdapterDescriptionById(new AdapterInstanceStorageImpl(), adapterStreamDescription.getElementId()); @@ -71,11 +73,9 @@ public class WorkerRestClient { updateStreamAdapterStatus(adapterStreamDescription.getElementId(), false); } - public static List<AdapterDescription> getAllRunningAdapterInstanceDescriptions(String url) throws AdapterException { + public List<AdapterDescription> getAllRunningAdapterInstanceDescriptions(String url) throws AdapterException { try { - var responseString = ExtensionServiceExecutions - .extServiceGetRequest(url) - .execute().returnContent().asString(); + var responseString = requestManager.requestRunningAdapters(url).responseBody(); return JacksonSerializer.getObjectMapper().readValue(responseString, List.class); } catch (IOException e) { @@ -83,30 +83,31 @@ public class WorkerRestClient { } } - private static void startAdapter(String url, - AdapterDescription ad) throws AdapterException { + private void startAdapter(String url, + AdapterDescription ad) throws AdapterException { LOG.debug("Trying to start adapter on endpoint {} ", url); triggerAdapterStateChange(ad, url, "started"); } - private static void stopAdapter(AdapterDescription ad, - String url) throws AdapterException { + private void stopAdapter(AdapterDescription ad, + String url) throws AdapterException { LOG.debug("Trying to stop adapter on endpoint {} ", url); triggerAdapterStateChange(ad, url, "stopped"); } - private static void triggerAdapterStateChange(AdapterDescription ad, - String url, - String action) throws AdapterException { + private void triggerAdapterStateChange(AdapterDescription ad, + String url, + String action) throws AdapterException { try { String adapterDescription = JacksonSerializer.getObjectMapper().writeValueAsString(ad); - var response = triggerPost(url, ad.getCorrespondingDataStreamElementId(), adapterDescription); - var responseString = getResponseBody(response); + var response = + triggerPost(url, ad.getCorrespondingDataStreamElementId(), adapterDescription); + var responseString = response.responseBody(); - if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK) { + if (response.statusCode() != HttpStatus.SC_OK) { var exception = getSerializer().readValue(responseString, AdapterException.class); throw new AdapterException(exception.getMessage(), exception.getCause()); } @@ -116,32 +117,24 @@ public class WorkerRestClient { } } - private static String getResponseBody(HttpResponse response) throws IOException { - return IOUtils.toString(response.getEntity().getContent(), StandardCharsets.UTF_8); + private ExtensionServiceOperationResult triggerPost(String url, + String elementId, + String payload) throws IOException { + return requestManager.requestAdapterStateChange(url, elementId, payload); } - private static HttpResponse triggerPost(String url, - String elementId, - String payload) throws IOException { - var request = ExtensionServiceExecutions.extServicePostRequest(url, elementId, payload); - return request.execute().returnResponse(); - } - - public static RuntimeOptionsResponse getConfiguration(String baseUrl, - String appId, - RuntimeOptionsRequest runtimeOptionsRequest) - throws AdapterException, SpConfigurationException { + public RuntimeOptionsResponse getConfiguration(String baseUrl, + String appId, + RuntimeOptionsRequest runtimeOptionsRequest) + throws AdapterException, SpConfigurationException { String url = baseUrl + WorkerPaths.getRuntimeResolvablePath(appId); try { String payload = JacksonSerializer.getObjectMapper().writeValueAsString(runtimeOptionsRequest); - var response = ExtensionServiceExecutions.extServicePostRequest(url, payload) - .execute() - .returnResponse(); - - String responseString = IOUtils.toString(response.getEntity().getContent(), StandardCharsets.UTF_8); + var response = requestManager.requestRuntimeOptions(url, payload); + String responseString = response.responseBody(); - if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) { + if (response.statusCode() == HttpStatus.SC_OK) { return getSerializer().readValue(responseString, RuntimeOptionsResponse.class); } else { var exception = getSerializer().readValue(responseString, SpConfigurationException.class); @@ -152,15 +145,17 @@ public class WorkerRestClient { } } - public static String getAssets(String workerPath) throws AdapterException { + public String getAssets(String workerPath) throws AdapterException { String url = workerPath + "/assets"; LOG.info("Trying to Assets from endpoint: " + url); try { - return Request.Get(url) - .connectTimeout(1000) - .socketTimeout(100000) - .execute().returnContent().asString(); + var response = requestManager.requestAdapterAssets(url); + + if (!response.isSuccess()) { + throw new AdapterException("Could not get assets endpoint: " + url); + } + return response.responseBody(); } catch (IOException e) { LOG.error(e.getMessage()); throw new AdapterException("Could not get assets endpoint: " + url); @@ -168,28 +163,30 @@ public class WorkerRestClient { } - public static byte[] getIconAsset(String baseUrl) throws AdapterException { + public byte[] getIconAsset(String baseUrl) throws AdapterException { String url = baseUrl + "/assets/icon"; try { - return Request.Get(url) - .connectTimeout(1000) - .socketTimeout(100000) - .execute().returnContent().asBytes(); + var response = requestManager.requestAdapterIconAsset(url); + if (!response.isSuccess()) { + throw new AdapterException("Could not get icon endpoint: " + url); + } + return response.responseBytes(); } catch (IOException e) { LOG.error(e.getMessage()); throw new AdapterException("Could not get icon endpoint: " + url); } } - public static String getDocumentationAsset(String baseUrl) throws AdapterException { + public String getDocumentationAsset(String baseUrl) throws AdapterException { String url = baseUrl + "/assets/documentation"; try { - return Request.Get(url) - .connectTimeout(1000) - .socketTimeout(100000) - .execute().returnContent().asString(); + var response = requestManager.requestAdapterDocumentationAsset(url); + if (!response.isSuccess()) { + throw new AdapterException("Could not get documentation endpoint: " + url); + } + return response.responseBody(); } catch (IOException e) { LOG.error(e.getMessage()); throw new AdapterException("Could not get documentation endpoint: " + url); @@ -197,7 +194,7 @@ public class WorkerRestClient { } - private static AdapterDescription getAdapterDescriptionById(AdapterInstanceStorageImpl adapterStorage, String id) { + private AdapterDescription getAdapterDescriptionById(AdapterInstanceStorageImpl adapterStorage, String id) { AdapterDescription adapterDescription = null; List<AdapterDescription> allAdapters = adapterStorage.findAll(); for (AdapterDescription a : allAdapters) { @@ -209,31 +206,30 @@ public class WorkerRestClient { return adapterDescription; } - private static void updateStreamAdapterStatus(String adapterId, - boolean running) { + private void updateStreamAdapterStatus(String adapterId, + boolean running) { var adapter = getAndDecryptAdapter(adapterId); adapter.setRunning(running); encryptAndUpdateAdapter(adapter); } - private static void encryptAndUpdateAdapter(AdapterDescription adapter) { + private void encryptAndUpdateAdapter(AdapterDescription adapter) { AdapterDescription encryptedDescription = new Cloner().adapterDescription(adapter); SecretProvider.getEncryptionService().apply(encryptedDescription); getAdapterStorage().updateElement(encryptedDescription); } - private static AdapterDescription getAndDecryptAdapter(String adapterId) { + private AdapterDescription getAndDecryptAdapter(String adapterId) { AdapterDescription adapter = getAdapterStorage().getElementById(adapterId); SecretProvider.getDecryptionService().apply(adapter); return adapter; } - private static IAdapterStorage getAdapterStorage() { + private IAdapterStorage getAdapterStorage() { return StorageDispatcher.INSTANCE.getNoSqlStore().getAdapterInstanceStorage(); } - private static ObjectMapper getSerializer() { + private ObjectMapper getSerializer() { return JacksonSerializer.getObjectMapper(); } } - diff --git a/streampipes-connect-management/src/test/java/org/apache/streampipes/connect/management/management/AdapterMasterManagementTest.java b/streampipes-connect-management/src/test/java/org/apache/streampipes/connect/management/management/AdapterMasterManagementTest.java index 2d8b70e426..fb28632b6d 100644 --- a/streampipes-connect-management/src/test/java/org/apache/streampipes/connect/management/management/AdapterMasterManagementTest.java +++ b/streampipes-connect-management/src/test/java/org/apache/streampipes/connect/management/management/AdapterMasterManagementTest.java @@ -39,6 +39,7 @@ public class AdapterMasterManagementTest { public void getAdapter_FailNull() { var adapterStorage = mock(AdapterInstanceStorageImpl.class); var resourceManager = mock(AdapterResourceManager.class); + var workerRestClient = mock(WorkerRestClient.class); when(adapterStorage.findAll()).thenReturn(null); var adapterMasterManagement = @@ -46,7 +47,8 @@ public class AdapterMasterManagementTest { adapterStorage, resourceManager, null, - AdapterMetricsManager.INSTANCE.getAdapterMetrics() + AdapterMetricsManager.INSTANCE.getAdapterMetrics(), + workerRestClient ); assertThrows(AdapterException.class, () -> adapterMasterManagement.getAdapter("id2")); @@ -57,6 +59,7 @@ public class AdapterMasterManagementTest { var adapterDescriptions = List.of(new AdapterDescription()); var adapterStorage = mock(AdapterInstanceStorageImpl.class); var resourceManager = mock(AdapterResourceManager.class); + var workerRestClient = mock(WorkerRestClient.class); when(adapterStorage.findAll()).thenReturn(adapterDescriptions); var adapterMasterManagement = @@ -64,7 +67,8 @@ public class AdapterMasterManagementTest { adapterStorage, resourceManager, null, - AdapterMetricsManager.INSTANCE.getAdapterMetrics() + AdapterMetricsManager.INSTANCE.getAdapterMetrics(), + workerRestClient ); assertThrows(AdapterException.class, () -> adapterMasterManagement.getAdapter("id2")); @@ -75,6 +79,7 @@ public class AdapterMasterManagementTest { var adapterDescriptions = List.of(new AdapterDescription()); var adapterStorage = mock(AdapterInstanceStorageImpl.class); var resourceManager = mock(AdapterResourceManager.class); + var workerRestClient = mock(WorkerRestClient.class); when(adapterStorage.findAll()).thenReturn(adapterDescriptions); AdapterMasterManagement adapterMasterManagement = @@ -82,7 +87,8 @@ public class AdapterMasterManagementTest { adapterStorage, resourceManager, null, - AdapterMetricsManager.INSTANCE.getAdapterMetrics() + AdapterMetricsManager.INSTANCE.getAdapterMetrics(), + workerRestClient ); List<AdapterDescription> result = adapterMasterManagement.getAllAdapterInstances(); diff --git a/streampipes-data-export/src/main/java/org/apache/streampipes/export/resolver/AdapterResolver.java b/streampipes-data-export/src/main/java/org/apache/streampipes/export/resolver/AdapterResolver.java index a8f902635b..6e3d153e84 100644 --- a/streampipes-data-export/src/main/java/org/apache/streampipes/export/resolver/AdapterResolver.java +++ b/streampipes-data-export/src/main/java/org/apache/streampipes/export/resolver/AdapterResolver.java @@ -22,6 +22,8 @@ package org.apache.streampipes.export.resolver; import org.apache.streampipes.commons.exceptions.connect.AdapterException; import org.apache.streampipes.commons.prometheus.adapter.AdapterMetricsManager; import org.apache.streampipes.connect.management.management.AdapterMasterManagement; +import org.apache.streampipes.connect.management.management.WorkerRestClient; +import org.apache.streampipes.manager.execution.HttpExtensionServiceRequestManager; import org.apache.streampipes.model.connect.adapter.AdapterDescription; import org.apache.streampipes.model.export.AssetExportConfiguration; import org.apache.streampipes.model.export.ExportItem; @@ -89,7 +91,8 @@ public class AdapterResolver extends AbstractResolver<AdapterDescription> { getNoSqlStore().getAdapterInstanceStorage(), new SpResourceManager().manageAdapters(), new SpResourceManager().manageDataStreams(), - AdapterMetricsManager.INSTANCE.getAdapterMetrics() + AdapterMetricsManager.INSTANCE.getAdapterMetrics(), + new WorkerRestClient(new HttpExtensionServiceRequestManager()) ).stopStreamAdapter(resourceId, true); } catch (AdapterException e) { LOG.warn("Error when stopping adapter with id {} and name {}", resourceId, existingAdapter.getName()); diff --git a/streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/ExtensionHealthCheck.java b/streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/ExtensionHealthCheck.java index 256c1cc57b..744e080809 100644 --- a/streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/ExtensionHealthCheck.java +++ b/streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/ExtensionHealthCheck.java @@ -19,6 +19,7 @@ package org.apache.streampipes.health.monitoring; import org.apache.streampipes.health.monitoring.model.HealthCheckData; +import org.apache.streampipes.manager.api.extensions.ExtensionServiceRequestManager; import org.apache.streampipes.model.health.ExtensionInstanceHealth; import org.slf4j.Logger; @@ -31,9 +32,12 @@ public class ExtensionHealthCheck implements Runnable { private static final Logger LOG = LoggerFactory.getLogger(ExtensionHealthCheck.class); private final ResourceProvider resourceProvider; + private final ExtensionServiceRequestManager extensionRequestManager; - public ExtensionHealthCheck(ResourceProvider resourceProvider) { + public ExtensionHealthCheck(ResourceProvider resourceProvider, + ExtensionServiceRequestManager extensionRequestManager) { this.resourceProvider = resourceProvider; + this.extensionRequestManager = extensionRequestManager; } @Override @@ -44,7 +48,10 @@ public class ExtensionHealthCheck implements Runnable { var activeExtensionInstances = new HashMap<String, ExtensionInstanceHealth>(); activeCoreInstances.keySet().forEach(k -> { - activeExtensionInstances.put(k, new ExtensionInstanceAvailabilityCheck(k).checkRunningInstances()); + activeExtensionInstances.put( + k, + new ExtensionInstanceAvailabilityCheck(k, extensionRequestManager).checkRunningInstances() + ); }); var healthCheckData = new HealthCheckData(resourceProvider, activeResources, activeCoreInstances, activeExtensionInstances); diff --git a/streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/ExtensionInstanceAvailabilityCheck.java b/streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/ExtensionInstanceAvailabilityCheck.java index 46236d90da..9a5cf8dc1b 100644 --- a/streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/ExtensionInstanceAvailabilityCheck.java +++ b/streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/ExtensionInstanceAvailabilityCheck.java @@ -17,17 +17,15 @@ */ package org.apache.streampipes.health.monitoring; -import org.apache.streampipes.manager.execution.ExtensionServiceExecutions; +import org.apache.streampipes.manager.api.extensions.ExtensionServiceRequestManager; import org.apache.streampipes.model.health.ExtensionInstanceHealth; import org.apache.streampipes.serializers.json.JacksonSerializer; import com.fasterxml.jackson.core.JsonProcessingException; -import org.apache.http.util.EntityUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; -import java.nio.charset.StandardCharsets; import java.util.Set; public class ExtensionInstanceAvailabilityCheck { @@ -36,20 +34,21 @@ public class ExtensionInstanceAvailabilityCheck { private static final String InstancePath = "/health"; private final String serviceBaseUrl; + private final ExtensionServiceRequestManager extensionRequestManager; - public ExtensionInstanceAvailabilityCheck(String serviceBaseUrl) { + public ExtensionInstanceAvailabilityCheck(String serviceBaseUrl, + ExtensionServiceRequestManager extensionRequestManager) { this.serviceBaseUrl = serviceBaseUrl; + this.extensionRequestManager = extensionRequestManager; } public ExtensionInstanceHealth checkRunningInstances() { try { - var request = ExtensionServiceExecutions.extServiceGetRequest(makeRequestUrl()); - var response = request.execute().returnResponse(); - if (response.getStatusLine().getStatusCode() != 200) { + var response = extensionRequestManager.requestExtensionInstanceHealth(makeRequestUrl()); + if (response.statusCode() != 200) { return new ExtensionInstanceHealth(Set.of(), Set.of()); } - String body = EntityUtils.toString(response.getEntity(), StandardCharsets.UTF_8); - return deserialize(body); + return deserialize(response.responseBody()); } catch (IOException e) { LOG.error("Extension service {} is unavailable", serviceBaseUrl); diff --git a/streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/ServiceHealthCheck.java b/streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/ServiceHealthCheck.java index 549066f3c8..ecd07082e2 100644 --- a/streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/ServiceHealthCheck.java +++ b/streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/ServiceHealthCheck.java @@ -21,7 +21,7 @@ package org.apache.streampipes.health.monitoring; import org.apache.streampipes.commons.environment.Environment; import org.apache.streampipes.commons.environment.Environments; import org.apache.streampipes.loadbalance.LoadManager; -import org.apache.streampipes.manager.execution.ExtensionServiceExecutions; +import org.apache.streampipes.manager.api.extensions.ExtensionServiceRequestManager; import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceRegistration; import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceStatus; import org.apache.streampipes.storage.api.system.IExtensionsServiceStorage; @@ -37,13 +37,16 @@ import java.util.List; public class ServiceHealthCheck implements Runnable { private static final Logger LOG = LoggerFactory.getLogger(ServiceHealthCheck.class); + private final ExtensionServiceRequestManager extensionRequestManager; private final ServiceRegistrationManager serviceRegistrationManager; private final int maxUnhealthyDurationBeforeRemovalMs; private final List<SpServiceRegistration> needDeletedServices = new ArrayList<>(); - public ServiceHealthCheck(IExtensionsServiceStorage storage) { + public ServiceHealthCheck(IExtensionsServiceStorage storage, + ExtensionServiceRequestManager extensionRequestManager) { + this.extensionRequestManager = extensionRequestManager; this.serviceRegistrationManager = new ServiceRegistrationManager(storage); this.maxUnhealthyDurationBeforeRemovalMs = Environments.getEnvironment() .getUnhealthyTimeBeforeServiceDeletionInMillis().getValueOrDefault(); @@ -71,9 +74,8 @@ public class ServiceHealthCheck implements Runnable { String healthCheckUrl = makeHealthCheckUrl(service); try { - var request = ExtensionServiceExecutions.extServiceGetRequest(healthCheckUrl); - var response = request.execute(); - if (response.returnResponse().getStatusLine().getStatusCode() != HttpStatus.SC_OK) { + var response = extensionRequestManager.requestServiceHealth(healthCheckUrl); + if (response.statusCode() != HttpStatus.SC_OK) { processUnhealthyService(service); } else { if (service.getStatus() == SpServiceStatus.UNHEALTHY) { diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/api/extensions/ExtensionServiceOperationResult.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/api/extensions/ExtensionServiceOperationResult.java new file mode 100644 index 0000000000..1c0308ab9f --- /dev/null +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/api/extensions/ExtensionServiceOperationResult.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.manager.api.extensions; + +import java.nio.charset.StandardCharsets; + +public class ExtensionServiceOperationResult { + + private final int statusCode; + private final byte[] responseBody; + + public ExtensionServiceOperationResult(int statusCode, String responseBody) { + this(statusCode, responseBody == null ? null : responseBody.getBytes(StandardCharsets.UTF_8)); + } + + public ExtensionServiceOperationResult(int statusCode, byte[] responseBody) { + this.statusCode = statusCode; + this.responseBody = responseBody; + } + + public int statusCode() { + return statusCode; + } + + public String responseBody() { + return responseBody == null ? null : new String(responseBody, StandardCharsets.UTF_8); + } + + public byte[] responseBytes() { + return responseBody; + } + + public boolean isSuccess() { + return statusCode >= 200 && statusCode < 300; + } +} diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/api/extensions/ExtensionServiceRequestManager.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/api/extensions/ExtensionServiceRequestManager.java new file mode 100644 index 0000000000..160fe0a4d9 --- /dev/null +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/api/extensions/ExtensionServiceRequestManager.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.manager.api.extensions; + +import java.io.IOException; + +public interface ExtensionServiceRequestManager { + + ExtensionServiceOperationResult requestContainerProvidedOptions(String url, + String payload) throws IOException; + + ExtensionServiceOperationResult requestMigration(String url, + String payload) throws IOException; + + ExtensionServiceOperationResult requestDescriptionUpdate(String requestUrl) throws IOException; + + ExtensionServiceOperationResult requestExtensionDescription(String descriptionUrl) throws IOException; + + ExtensionServiceOperationResult requestFunctionStop(String endpoint) throws IOException; + + ExtensionServiceOperationResult requestRunningAdapters(String url) throws IOException; + + ExtensionServiceOperationResult requestAdapterStateChange(String url, + String elementId, + String payload) throws IOException; + + ExtensionServiceOperationResult requestRuntimeOptions(String url, + String payload) throws IOException; + + ExtensionServiceOperationResult requestSampleData(String workerUrl, + String payload) throws IOException; + + ExtensionServiceOperationResult requestExtensionInstanceHealth(String url) throws IOException; + + ExtensionServiceOperationResult requestServiceHealth(String url) throws IOException; + + ExtensionServiceOperationResult requestPipelineElementInvocation(String url, + String pipelineId, + String payload) throws IOException; + + ExtensionServiceOperationResult requestPipelineElementDetach(String url, + String pipelineId) throws IOException; + + ExtensionServiceOperationResult requestPipelineElementAssets(String url) throws IOException; + + ExtensionServiceOperationResult requestAdapterAssets(String url) throws IOException; + + ExtensionServiceOperationResult requestAdapterIconAsset(String url) throws IOException; + + ExtensionServiceOperationResult requestAdapterDocumentationAsset(String url) throws IOException; + + ExtensionServiceOperationResult requestOutputSchema(String url, + String payload) throws IOException; +} diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/assets/AssetFetcher.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/assets/AssetFetcher.java index 25c3c9accf..f42047b461 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/assets/AssetFetcher.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/assets/AssetFetcher.java @@ -18,11 +18,12 @@ package org.apache.streampipes.manager.assets; import org.apache.streampipes.commons.exceptions.NoServiceEndpointsAvailableException; +import org.apache.streampipes.manager.api.extensions.ExtensionServiceRequestManager; +import org.apache.streampipes.manager.execution.HttpExtensionServiceRequestManager; import org.apache.streampipes.manager.execution.endpoint.ExtensionsServiceEndpointGenerator; import org.apache.streampipes.svcdiscovery.api.model.SpServiceUrlProvider; -import org.apache.http.client.fluent.Request; - +import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; @@ -30,22 +31,33 @@ public class AssetFetcher { private static final String ASSET_ENDPOINT_APPENDIX = "/assets"; - private SpServiceUrlProvider spServiceUrlProvider; - private String appId; + private final SpServiceUrlProvider spServiceUrlProvider; + private final String appId; + private final ExtensionServiceRequestManager requestManager; public AssetFetcher(SpServiceUrlProvider spServiceUrlProvider, String appId) { + this(spServiceUrlProvider, appId, new HttpExtensionServiceRequestManager()); + } + + public AssetFetcher(SpServiceUrlProvider spServiceUrlProvider, + String appId, + ExtensionServiceRequestManager requestManager) { this.spServiceUrlProvider = spServiceUrlProvider; this.appId = appId; + this.requestManager = requestManager; } public InputStream fetchPipelineElementAssets() throws IOException, NoServiceEndpointsAvailableException { String endpointUrl = new ExtensionsServiceEndpointGenerator().getEndpointResourceUrl(appId, spServiceUrlProvider); - return Request - .Get(endpointUrl + ASSET_ENDPOINT_APPENDIX) - .execute() - .returnContent() - .asStream(); + var response = requestManager.requestPipelineElementAssets(endpointUrl + ASSET_ENDPOINT_APPENDIX); + + if (!response.isSuccess()) { + throw new IOException("Could not fetch pipeline element assets from " + endpointUrl); + } + + var responseBytes = response.responseBytes(); + return new ByteArrayInputStream(responseBytes == null ? new byte[0] : responseBytes); } } diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/ExtensionServiceExecutions.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/ExtensionServiceExecutions.java deleted file mode 100644 index 2164b2776c..0000000000 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/ExtensionServiceExecutions.java +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.streampipes.manager.execution; - -import org.apache.streampipes.manager.util.AuthTokenUtils; -import org.apache.streampipes.resource.management.SpResourceManager; - -import org.apache.http.client.fluent.Request; -import org.apache.http.entity.ContentType; - -public class ExtensionServiceExecutions { - - public static Request extServiceGetRequest(String url) { - return Request - .Get(url) - .addHeader("Authorization", AuthTokenUtils.getAuthTokenForUser(getServiceAdminSid())) - .addHeader("Accept", "application/json") - .connectTimeout(10000) - .socketTimeout(10000); - } - - public static Request extServicePostRequestAsServiceAdmin(String url) { - return Request - .Post(url) - .addHeader("Authorization", AuthTokenUtils.getAuthTokenForUser(getServiceAdminSid())) - .addHeader("Accept", "application/json") - .connectTimeout(10000) - .socketTimeout(10000); - } - - private static String getServiceAdminSid() { - return new SpResourceManager().manageUsers().getServiceAdmin().getPrincipalId(); - } - - public static Request extServicePostRequest(String url, - String payload) { - return authenticatedPostRequest(url, AuthTokenUtils.getAuthTokenForCurrentUser(), payload); - } - - public static Request extServicePostRequest(String url, - String elementId, - String payload) { - return authenticatedPostRequest( - url, - AuthTokenUtils.getAuthToken(elementId), - payload - ); - } - - private static Request authenticatedPostRequest(String url, - String token, - String payload) { - return Request.Post(url) - .addHeader("Authorization", token) - .bodyString(payload, ContentType.APPLICATION_JSON) - .connectTimeout(1000) - .socketTimeout(100000); - } -} diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/HttpExtensionServiceRequestManager.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/HttpExtensionServiceRequestManager.java new file mode 100644 index 0000000000..13deb5d102 --- /dev/null +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/HttpExtensionServiceRequestManager.java @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.manager.execution; + +import org.apache.streampipes.manager.api.extensions.ExtensionServiceOperationResult; +import org.apache.streampipes.manager.api.extensions.ExtensionServiceRequestManager; +import org.apache.streampipes.manager.util.AuthTokenUtils; +import org.apache.streampipes.resource.management.SpResourceManager; + +import org.apache.http.client.fluent.Request; +import org.apache.http.entity.ContentType; +import org.apache.http.util.EntityUtils; + +import java.io.IOException; + +public class HttpExtensionServiceRequestManager implements ExtensionServiceRequestManager { + + @Override + public ExtensionServiceOperationResult requestContainerProvidedOptions(String url, + String payload) throws IOException { + return post(url, AuthTokenUtils.getAuthTokenForCurrentUser(), payload); + } + + @Override + public ExtensionServiceOperationResult requestMigration(String url, + String payload) throws IOException { + return post(url, AuthTokenUtils.getAuthTokenForCurrentUser(), payload); + } + + @Override + public ExtensionServiceOperationResult requestDescriptionUpdate(String requestUrl) throws IOException { + return get(requestUrl, AuthTokenUtils.getAuthTokenForUser(getServiceAdminSid())); + } + + @Override + public ExtensionServiceOperationResult requestExtensionDescription(String descriptionUrl) throws IOException { + return get(descriptionUrl, AuthTokenUtils.getAuthTokenForUser(getServiceAdminSid())); + } + + @Override + public ExtensionServiceOperationResult requestFunctionStop(String endpoint) throws IOException { + return post(endpoint, AuthTokenUtils.getAuthTokenForUser(getServiceAdminSid()), null); + } + + @Override + public ExtensionServiceOperationResult requestRunningAdapters(String url) throws IOException { + return get(url, AuthTokenUtils.getAuthTokenForUser(getServiceAdminSid())); + } + + @Override + public ExtensionServiceOperationResult requestAdapterStateChange(String url, + String elementId, + String payload) throws IOException { + return post(url, AuthTokenUtils.getAuthToken(elementId), payload); + } + + @Override + public ExtensionServiceOperationResult requestRuntimeOptions(String url, + String payload) throws IOException { + return post(url, AuthTokenUtils.getAuthTokenForCurrentUser(), payload); + } + + @Override + public ExtensionServiceOperationResult requestSampleData(String workerUrl, + String payload) throws IOException { + return post(workerUrl, AuthTokenUtils.getAuthTokenForCurrentUser(), payload); + } + + @Override + public ExtensionServiceOperationResult requestExtensionInstanceHealth(String url) throws IOException { + return get(url, AuthTokenUtils.getAuthTokenForUser(getServiceAdminSid())); + } + + @Override + public ExtensionServiceOperationResult requestServiceHealth(String url) throws IOException { + return get(url, AuthTokenUtils.getAuthTokenForUser(getServiceAdminSid())); + } + + @Override + public ExtensionServiceOperationResult requestPipelineElementInvocation(String url, + String pipelineId, + String payload) throws IOException { + return post(url, AuthTokenUtils.getAuthToken(pipelineId), payload); + } + + @Override + public ExtensionServiceOperationResult requestPipelineElementDetach(String url, + String pipelineId) throws IOException { + return delete(url, AuthTokenUtils.getAuthToken(pipelineId)); + } + + @Override + public ExtensionServiceOperationResult requestPipelineElementAssets(String url) throws IOException { + return get(url, null); + } + + @Override + public ExtensionServiceOperationResult requestAdapterAssets(String url) throws IOException { + return get(url, null); + } + + @Override + public ExtensionServiceOperationResult requestAdapterIconAsset(String url) throws IOException { + return get(url, null); + } + + @Override + public ExtensionServiceOperationResult requestAdapterDocumentationAsset(String url) throws IOException { + return get(url, null); + } + + @Override + public ExtensionServiceOperationResult requestOutputSchema(String url, + String payload) throws IOException { + return post(url, null, payload); + } + + private ExtensionServiceOperationResult get(String url, + String token) throws IOException { + var request = Request + .Get(url) + .addHeader("Accept", "application/json") + .connectTimeout(10000) + .socketTimeout(10000); + + var response = addAuthorizationHeader(request, token) + .execute() + .returnResponse(); + + return new ExtensionServiceOperationResult( + response.getStatusLine().getStatusCode(), + response.getEntity() == null ? null : EntityUtils.toByteArray(response.getEntity()) + ); + } + + private ExtensionServiceOperationResult post(String url, + String token, + String payload) throws IOException { + var request = Request + .Post(url) + .addHeader("Accept", "application/json"); + + if (payload != null) { + request = request.bodyString(payload, ContentType.APPLICATION_JSON); + } + + var response = addAuthorizationHeader(request, token) + .connectTimeout(payload == null ? 10000 : 1000) + .socketTimeout(payload == null ? 10000 : 100000) + .execute() + .returnResponse(); + + return new ExtensionServiceOperationResult( + response.getStatusLine().getStatusCode(), + response.getEntity() == null ? null : EntityUtils.toByteArray(response.getEntity()) + ); + } + + private ExtensionServiceOperationResult delete(String url, + String token) throws IOException { + var response = addAuthorizationHeader(Request.Delete(url), token) + .addHeader("Accept", "application/json") + .connectTimeout(10000) + .socketTimeout(10000) + .execute() + .returnResponse(); + + return new ExtensionServiceOperationResult( + response.getStatusLine().getStatusCode(), + response.getEntity() == null ? null : EntityUtils.toByteArray(response.getEntity()) + ); + } + + private Request addAuthorizationHeader(Request request, String token) { + return token == null ? request : request.addHeader("Authorization", token); + } + + private String getServiceAdminSid() { + return new SpResourceManager().manageUsers().getServiceAdmin().getPrincipalId(); + } +} diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/DetachHttpRequest.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/DetachHttpRequest.java index ac9b73705a..4a87e265e4 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/DetachHttpRequest.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/DetachHttpRequest.java @@ -18,21 +18,25 @@ package org.apache.streampipes.manager.execution.http; +import org.apache.streampipes.manager.api.extensions.ExtensionServiceOperationResult; import org.apache.streampipes.model.api.EndpointSelectable; -import org.apache.http.client.fluent.Request; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; + public class DetachHttpRequest extends PipelineElementHttpRequest { private static final Logger LOG = LoggerFactory.getLogger(DetachHttpRequest.class); @Override - protected Request initRequest(EndpointSelectable pipelineElement, String endpointUrl) { + protected ExtensionServiceOperationResult performRequest(EndpointSelectable pipelineElement, + String endpointUrl, + String pipelineId) throws IOException { LOG.info("Detaching element: " + endpointUrl); - return Request.Delete(endpointUrl); + return requestManager().requestPipelineElementDetach(endpointUrl, pipelineId); } @Override diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/InvokeHttpRequest.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/InvokeHttpRequest.java index 1656e677b0..88e2db8066 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/InvokeHttpRequest.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/InvokeHttpRequest.java @@ -18,26 +18,26 @@ package org.apache.streampipes.manager.execution.http; +import org.apache.streampipes.manager.api.extensions.ExtensionServiceOperationResult; import org.apache.streampipes.model.api.EndpointSelectable; import org.apache.streampipes.serializers.json.JacksonSerializer; import com.fasterxml.jackson.core.JsonProcessingException; -import org.apache.http.client.fluent.Request; -import org.apache.http.entity.ContentType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; + public class InvokeHttpRequest extends PipelineElementHttpRequest { private static final Logger LOG = LoggerFactory.getLogger(InvokeHttpRequest.class); @Override - protected Request initRequest(EndpointSelectable pipelineElement, - String endpointUrl) throws JsonProcessingException { + protected ExtensionServiceOperationResult performRequest(EndpointSelectable pipelineElement, + String endpointUrl, + String pipelineId) throws IOException { LOG.info("Invoking element: " + endpointUrl); - return Request - .Post(endpointUrl) - .bodyString(toJson(pipelineElement), ContentType.APPLICATION_JSON); + return requestManager().requestPipelineElementInvocation(endpointUrl, pipelineId, toJson(pipelineElement)); } @Override diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineElementHttpRequest.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineElementHttpRequest.java index 400626e54d..0e2229a62f 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineElementHttpRequest.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineElementHttpRequest.java @@ -18,46 +18,57 @@ package org.apache.streampipes.manager.execution.http; -import org.apache.streampipes.manager.util.AuthTokenUtils; +import org.apache.streampipes.manager.api.extensions.ExtensionServiceOperationResult; +import org.apache.streampipes.manager.api.extensions.ExtensionServiceRequestManager; +import org.apache.streampipes.manager.execution.HttpExtensionServiceRequestManager; import org.apache.streampipes.model.api.EndpointSelectable; import org.apache.streampipes.model.pipeline.PipelineElementStatus; import org.apache.streampipes.serializers.json.JacksonSerializer; -import com.fasterxml.jackson.core.JsonProcessingException; import com.google.gson.JsonSyntaxException; -import org.apache.http.client.fluent.Request; -import org.apache.http.client.fluent.Response; import java.io.IOException; public abstract class PipelineElementHttpRequest { + private final ExtensionServiceRequestManager requestManager; + + public PipelineElementHttpRequest() { + this(new HttpExtensionServiceRequestManager()); + } + + public PipelineElementHttpRequest(ExtensionServiceRequestManager requestManager) { + this.requestManager = requestManager; + } + public PipelineElementStatus execute(EndpointSelectable pipelineElement, String endpointUrl, String pipelineId) { try { - Response httpResp = initRequest(pipelineElement, endpointUrl) - .addHeader("Authorization", AuthTokenUtils.getAuthToken(pipelineId)) - .connectTimeout(10000) - .execute(); - return handleResponse(httpResp, pipelineElement, endpointUrl); + ExtensionServiceOperationResult response = performRequest(pipelineElement, endpointUrl, pipelineId); + return handleResponse(response, pipelineElement, endpointUrl); } catch (Exception e) { logError(endpointUrl, pipelineElement.getName(), e.getMessage()); return new PipelineElementStatus(endpointUrl, pipelineElement.getName(), false, e.getMessage()); } } - protected abstract Request initRequest(EndpointSelectable pipelineElement, - String endpointUrl) throws JsonProcessingException; + protected abstract ExtensionServiceOperationResult performRequest(EndpointSelectable pipelineElement, + String endpointUrl, + String pipelineId) throws IOException; protected abstract void logError(String endpointUrl, String pipelineElementName, String exceptionMessage); - protected PipelineElementStatus handleResponse(Response httpResp, + protected PipelineElementStatus handleResponse(ExtensionServiceOperationResult response, EndpointSelectable pipelineElement, String endpointUrl) throws JsonSyntaxException, IOException { - String resp = httpResp.returnContent().asString(); + if (!response.isSuccess()) { + throw new IOException("Request failed with status code " + response.statusCode()); + } + + String resp = response.responseBody(); org.apache.streampipes.model.Response streamPipesResp = JacksonSerializer .getObjectMapper() .readValue(resp, org.apache.streampipes.model.Response.class); @@ -70,4 +81,8 @@ public abstract class PipelineElementHttpRequest { return new PipelineElementStatus(endpointUrl, pipelineElementName, response.isSuccess(), response.getOptionalMessage()); } + + protected ExtensionServiceRequestManager requestManager() { + return requestManager; + } } diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/extensions/ExtensionItemInstaller.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/extensions/ExtensionItemInstaller.java index 16fc95a029..109529cd31 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/extensions/ExtensionItemInstaller.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/extensions/ExtensionItemInstaller.java @@ -18,8 +18,8 @@ package org.apache.streampipes.manager.extensions; import org.apache.streampipes.commons.exceptions.SepaParseException; +import org.apache.streampipes.manager.api.extensions.ExtensionServiceRequestManager; import org.apache.streampipes.manager.api.extensions.IExtensionsResourceUrlProvider; -import org.apache.streampipes.manager.execution.ExtensionServiceExecutions; import org.apache.streampipes.manager.verification.extractor.TypeExtractor; import org.apache.streampipes.model.extensions.ExtensionItemInstallationRequest; import org.apache.streampipes.model.message.Message; @@ -28,9 +28,12 @@ import java.io.IOException; public class ExtensionItemInstaller { + private final ExtensionServiceRequestManager extensionRequestManager; private final IExtensionsResourceUrlProvider urlProvider; - public ExtensionItemInstaller(IExtensionsResourceUrlProvider urlProvider) { + public ExtensionItemInstaller(IExtensionsResourceUrlProvider urlProvider, + ExtensionServiceRequestManager extensionRequestManager) { + this.extensionRequestManager = extensionRequestManager; this.urlProvider = urlProvider; } @@ -52,6 +55,6 @@ public class ExtensionItemInstaller { } private String fetchDescription(String descriptionUrl) throws IOException { - return ExtensionServiceExecutions.extServiceGetRequest(descriptionUrl).execute().returnContent().asString(); + return extensionRequestManager.requestExtensionDescription(descriptionUrl).responseBody(); } } diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/function/FunctionManager.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/function/FunctionManager.java index 691714faba..5ca4f3227b 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/function/FunctionManager.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/function/FunctionManager.java @@ -18,7 +18,7 @@ package org.apache.streampipes.manager.function; -import org.apache.streampipes.manager.execution.ExtensionServiceExecutions; +import org.apache.streampipes.manager.api.extensions.ExtensionServiceRequestManager; import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceRegistration; import org.apache.streampipes.model.function.FunctionState; import org.apache.streampipes.model.function.FunctionsShutdownResponse; @@ -26,19 +26,23 @@ import org.apache.streampipes.serializers.json.JacksonSerializer; import org.apache.streampipes.storage.api.function.IFunctionStateStorage; import org.apache.streampipes.storage.management.StorageDispatcher; -import org.apache.http.util.EntityUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; -import java.nio.charset.StandardCharsets; public class FunctionManager { private static final Logger LOG = LoggerFactory.getLogger(FunctionManager.class); private static final String FUNCTION_STOP_PATH = "/api/v1/functions/stop"; - public static void stopAllFunctionsAndPersistState(IFunctionStateStorage functionStateStorage) { + private final ExtensionServiceRequestManager requestManager; + + public FunctionManager(ExtensionServiceRequestManager requestManager) { + this.requestManager = requestManager; + } + + public void stopAllFunctionsAndPersistState(IFunctionStateStorage functionStateStorage) { var extensions = StorageDispatcher.INSTANCE.getNoSqlStore().getExtensionsServiceStorage().findAll(); LOG.info("Triggering function stop at {} extension services...", extensions.size()); @@ -50,23 +54,22 @@ public class FunctionManager { }); } - private static FunctionsShutdownResponse triggerFunctionStop(SpServiceRegistration service) { + private FunctionsShutdownResponse triggerFunctionStop(SpServiceRegistration service) { var endpoint = service.getServiceUrl() + FUNCTION_STOP_PATH; try { LOG.info("Triggering function stop at {}", endpoint); - var response = ExtensionServiceExecutions.extServicePostRequestAsServiceAdmin(endpoint).execute(); - var httpResponse = response.returnResponse(); - int statusCode = httpResponse.getStatusLine().getStatusCode(); + var response = requestManager.requestFunctionStop(endpoint); + int statusCode = response.statusCode(); if (statusCode >= 200 && statusCode < 300) { LOG.debug("Function stop triggered at {} (HTTP {})", service.getSvcId(), statusCode); - if (httpResponse.getEntity() == null) { + if (response.responseBody() == null) { return null; } return JacksonSerializer.getObjectMapper().readValue( - EntityUtils.toString(httpResponse.getEntity(), StandardCharsets.UTF_8), + response.responseBody(), FunctionsShutdownResponse.class ); } else { @@ -80,7 +83,7 @@ public class FunctionManager { } } - private static void persistReturnedFunctionStates(IFunctionStateStorage functionStateStorage, + private void persistReturnedFunctionStates(IFunctionStateStorage functionStateStorage, FunctionsShutdownResponse shutdownResponse) { if (shutdownResponse == null || shutdownResponse.getFunctions() == null) { return; diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/output/CustomTransformOutputSchemaGenerator.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/output/CustomTransformOutputSchemaGenerator.java index bf2e38aae6..a0fcf39e16 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/output/CustomTransformOutputSchemaGenerator.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/output/CustomTransformOutputSchemaGenerator.java @@ -17,6 +17,9 @@ */ package org.apache.streampipes.manager.matching.output; +import org.apache.streampipes.manager.api.extensions.ExtensionServiceOperationResult; +import org.apache.streampipes.manager.api.extensions.ExtensionServiceRequestManager; +import org.apache.streampipes.manager.execution.HttpExtensionServiceRequestManager; import org.apache.streampipes.manager.execution.endpoint.ExtensionsServiceEndpointGenerator; import org.apache.streampipes.model.SpDataStream; import org.apache.streampipes.model.graph.DataProcessorInvocation; @@ -28,17 +31,14 @@ import org.apache.streampipes.serializers.json.JacksonSerializer; import org.apache.streampipes.svcdiscovery.api.model.SpServiceUrlProvider; import com.google.gson.JsonSyntaxException; -import org.apache.http.client.fluent.Request; -import org.apache.http.client.fluent.Response; -import org.apache.http.entity.ContentType; import java.io.IOException; -import java.nio.charset.StandardCharsets; public class CustomTransformOutputSchemaGenerator extends OutputSchemaGenerator<CustomTransformOutputStrategy> { private DataProcessorInvocation dataProcessorInvocation; private CustomTransformOutputStrategy outputStrategy; + private final ExtensionServiceRequestManager requestManager; public static CustomTransformOutputSchemaGenerator from(OutputStrategy strategy, DataProcessorInvocation invocation) { return new CustomTransformOutputSchemaGenerator((CustomTransformOutputStrategy) strategy, invocation); @@ -48,6 +48,7 @@ public class CustomTransformOutputSchemaGenerator extends OutputSchemaGenerator< DataProcessorInvocation invocation) { super(strategy); this.dataProcessorInvocation = invocation; + this.requestManager = new HttpExtensionServiceRequestManager(); } @@ -69,18 +70,20 @@ public class CustomTransformOutputSchemaGenerator extends OutputSchemaGenerator< dataProcessorInvocation.getAppId(), SpServiceUrlProvider.DATA_PROCESSOR ); - Response httpResp = Request.Post(endpointUrl + "/output").bodyString(httpRequestBody, - ContentType - .APPLICATION_JSON).execute(); - return handleResponse(httpResp); + var response = requestManager.requestOutputSchema(endpointUrl + "/output", httpRequestBody); + return handleResponse(response); } catch (Exception e) { e.printStackTrace(); return new EventSchema(); } } - private EventSchema handleResponse(Response httpResp) throws JsonSyntaxException, IOException { - String resp = httpResp.returnContent().asString(StandardCharsets.UTF_8); + private EventSchema handleResponse(ExtensionServiceOperationResult response) throws JsonSyntaxException, IOException { + if (!response.isSuccess()) { + throw new IOException("Could not compute output schema, status code: " + response.statusCode()); + } + + String resp = response.responseBody(); return JacksonSerializer .getObjectMapper() diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/AbstractMigrationManager.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/AbstractMigrationManager.java index 1bc61b0ae2..ff45a709f8 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/AbstractMigrationManager.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/AbstractMigrationManager.java @@ -19,7 +19,7 @@ package org.apache.streampipes.manager.migration; import org.apache.streampipes.commons.exceptions.SepaParseException; -import org.apache.streampipes.manager.execution.ExtensionServiceExecutions; +import org.apache.streampipes.manager.api.extensions.ExtensionServiceRequestManager; import org.apache.streampipes.manager.verification.extractor.TypeExtractor; import org.apache.streampipes.model.base.VersionedNamedStreamPipesEntity; import org.apache.streampipes.model.extensions.migration.MigrationRequest; @@ -45,9 +45,14 @@ import static org.apache.streampipes.manager.migration.MigrationUtils.getRequest public abstract class AbstractMigrationManager { private static final Logger LOG = LoggerFactory.getLogger(AbstractMigrationManager.class); + private final ExtensionServiceRequestManager extensionRequestManager; protected static final String MIGRATION_ENDPOINT = "api/v1/migrations"; + protected AbstractMigrationManager(ExtensionServiceRequestManager extensionRequestManager) { + this.extensionRequestManager = extensionRequestManager; + } + /** * Performs the actual migration of a pipeline element. * This includes the communication with the extensions service which runs the migration. @@ -71,15 +76,15 @@ public abstract class AbstractMigrationManager { String serializedRequest = JacksonSerializer.getObjectMapper().writeValueAsString(migrationRequest); - var migrationResponse = ExtensionServiceExecutions.extServicePostRequest( + var migrationResponse = extensionRequestManager.requestMigration( url, serializedRequest - ).execute(); + ); TypeReference<MigrationResult<T>> typeReference = new TypeReference<>() { }; - String migrationResponseString = migrationResponse.returnContent().asString(); + String migrationResponseString = migrationResponse.responseBody(); return JacksonSerializer .getObjectMapper() .readValue(migrationResponseString, typeReference); @@ -138,10 +143,7 @@ public abstract class AbstractMigrationManager { protected void performUpdate(String requestUrl) { try { - var entityPayload = ExtensionServiceExecutions.extServiceGetRequest(requestUrl) - .execute() - .returnContent() - .asString(); + var entityPayload = extensionRequestManager.requestDescriptionUpdate(requestUrl).responseBody(); var updateResult = new TypeExtractor(entityPayload).getTypeVerifier().verifyAndUpdate(); if (!updateResult.isSuccess()) { LOG.error( diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/PipelineElementMigrationManager.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/PipelineElementMigrationManager.java index ab461575f3..4741de8f84 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/PipelineElementMigrationManager.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/PipelineElementMigrationManager.java @@ -19,6 +19,7 @@ package org.apache.streampipes.manager.migration; import org.apache.streampipes.commons.prometheus.pipelines.PipelinesStats; +import org.apache.streampipes.manager.api.extensions.ExtensionServiceRequestManager; import org.apache.streampipes.manager.execution.PipelineExecutor; import org.apache.streampipes.model.base.InvocableStreamPipesEntity; import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceRegistration; @@ -55,7 +56,9 @@ public class PipelineElementMigrationManager extends AbstractMigrationManager im public PipelineElementMigrationManager(IPipelineStorage pipelineStorage, IDataProcessorStorage dataProcessorStorage, - IDataSinkStorage dataSinkStorage) { + IDataSinkStorage dataSinkStorage, + ExtensionServiceRequestManager extensionServiceRequestManager) { + super(extensionServiceRequestManager); this.pipelineStorage = pipelineStorage; this.dataProcessorStorage = dataProcessorStorage; this.dataSinkStorage = dataSinkStorage; diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/remote/ContainerProvidedOptionsHandler.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/remote/ContainerProvidedOptionsHandler.java index eb212bb33e..c327931124 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/remote/ContainerProvidedOptionsHandler.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/remote/ContainerProvidedOptionsHandler.java @@ -18,7 +18,7 @@ package org.apache.streampipes.manager.remote; import org.apache.streampipes.commons.exceptions.NoServiceEndpointsAvailableException; -import org.apache.streampipes.manager.execution.ExtensionServiceExecutions; +import org.apache.streampipes.manager.api.extensions.ExtensionServiceRequestManager; import org.apache.streampipes.manager.execution.endpoint.ExtensionsServiceEndpointGenerator; import org.apache.streampipes.manager.execution.endpoint.ExtensionsServiceEndpointUtils; import org.apache.streampipes.model.runtime.RuntimeOptionsRequest; @@ -27,31 +27,32 @@ import org.apache.streampipes.serializers.json.JacksonSerializer; import org.apache.streampipes.svcdiscovery.api.model.SpServiceUrlProvider; import com.google.gson.JsonSyntaxException; -import org.apache.http.client.fluent.Response; import java.io.IOException; -import java.nio.charset.StandardCharsets; public class ContainerProvidedOptionsHandler { + private final ExtensionServiceRequestManager extensionRequestManager; + + public ContainerProvidedOptionsHandler(ExtensionServiceRequestManager extensionRequestManager) { + this.extensionRequestManager = extensionRequestManager; + } public RuntimeOptionsResponse fetchRemoteOptions(RuntimeOptionsRequest request) { try { var payload = JacksonSerializer.getObjectMapper().writeValueAsString(request); var url = getEndpointUrl(request.getAppId()); - var resp = ExtensionServiceExecutions.extServicePostRequest(url, payload).execute(); - - return handleResponse(resp); + var response = extensionRequestManager.requestContainerProvidedOptions(url, payload); + return handleResponse(response.responseBody()); } catch (Exception e) { e.printStackTrace(); return new RuntimeOptionsResponse(); } } - private RuntimeOptionsResponse handleResponse(Response httpResp) throws JsonSyntaxException, IOException { - String resp = httpResp.returnContent().asString(StandardCharsets.UTF_8); - return JacksonSerializer.getObjectMapper().readValue(resp, RuntimeOptionsResponse.class); + private RuntimeOptionsResponse handleResponse(String responseBody) throws JsonSyntaxException, IOException { + return JacksonSerializer.getObjectMapper().readValue(responseBody, RuntimeOptionsResponse.class); } private String getEndpointUrl(String appId) throws NoServiceEndpointsAvailableException { diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/setup/AutoInstallation.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/setup/AutoInstallation.java index f7e14125a5..0cea95bf8d 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/setup/AutoInstallation.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/setup/AutoInstallation.java @@ -20,6 +20,7 @@ package org.apache.streampipes.manager.setup; import org.apache.streampipes.commons.environment.Environment; import org.apache.streampipes.commons.environment.Environments; import org.apache.streampipes.commons.environment.variable.StringEnvironmentVariable; +import org.apache.streampipes.manager.api.extensions.ExtensionServiceRequestManager; import org.apache.streampipes.model.client.setup.InitialSettings; import org.slf4j.Logger; @@ -35,19 +36,25 @@ public class AutoInstallation implements BackgroundTaskNotifier { private static final Logger LOG = LoggerFactory.getLogger(AutoInstallation.class); private final Environment env; + private final ExtensionServiceRequestManager extensionServiceRequestManager; private final AtomicInteger errorCount = new AtomicInteger(); private int numberOfBackgroundSteps = 0; private int executedBackgroundSteps = 0; - public AutoInstallation() { + public AutoInstallation(ExtensionServiceRequestManager extensionServiceRequestManager) { this.env = Environments.getEnvironment(); + this.extensionServiceRequestManager = extensionServiceRequestManager; } public void startAutoInstallation() { InitialSettings settings = collectInitialSettings(); List<InstallationStep> steps = InstallationConfiguration.getInstallationSteps(settings); - List<Runnable> backgroundSteps = InstallationConfiguration.getBackgroundInstallationSteps(settings, this); + List<Runnable> backgroundSteps = InstallationConfiguration.getBackgroundInstallationSteps( + settings, + this, + extensionServiceRequestManager + ); numberOfBackgroundSteps = backgroundSteps.size(); steps.forEach(step -> { diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/setup/ExtensionsInstallationTask.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/setup/ExtensionsInstallationTask.java index 524ee4f32a..2027f4a500 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/setup/ExtensionsInstallationTask.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/setup/ExtensionsInstallationTask.java @@ -18,6 +18,7 @@ package org.apache.streampipes.manager.setup; +import org.apache.streampipes.manager.api.extensions.ExtensionServiceRequestManager; import org.apache.streampipes.manager.extensions.AvailableExtensionsProvider; import org.apache.streampipes.model.client.setup.InitialSettings; import org.apache.streampipes.model.extensions.ExtensionItemDescription; @@ -41,13 +42,16 @@ public class ExtensionsInstallationTask implements Runnable { private final InitialSettings settings; private final BackgroundTaskNotifier callback; private final INoSqlStorage storage; + private final ExtensionServiceRequestManager extensionServiceRequestManager; public ExtensionsInstallationTask(InitialSettings settings, INoSqlStorage storage, - BackgroundTaskNotifier callback) { + BackgroundTaskNotifier callback, + ExtensionServiceRequestManager extensionServiceRequestManager) { this.settings = settings; this.storage = storage; this.callback = callback; + this.extensionServiceRequestManager = extensionServiceRequestManager; } @Override @@ -76,7 +80,8 @@ public class ExtensionsInstallationTask implements Runnable { for (ExtensionItemDescription extensionItem : availableExtensions) { steps.add(new PipelineElementInstallationStep( extensionItem, - settings.getInitialAdminUserSid()) + settings.getInitialAdminUserSid(), + extensionServiceRequestManager) ); } diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/setup/InstallationConfiguration.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/setup/InstallationConfiguration.java index 294fd7073b..af4baa93c6 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/setup/InstallationConfiguration.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/setup/InstallationConfiguration.java @@ -18,6 +18,7 @@ package org.apache.streampipes.manager.setup; +import org.apache.streampipes.manager.api.extensions.ExtensionServiceRequestManager; import org.apache.streampipes.model.client.setup.InitialSettings; import org.apache.streampipes.storage.management.StorageDispatcher; @@ -42,7 +43,13 @@ public class InstallationConfiguration { } public static List<Runnable> getBackgroundInstallationSteps(InitialSettings settings, - BackgroundTaskNotifier callback) { - return List.of(new ExtensionsInstallationTask(settings, StorageDispatcher.INSTANCE.getNoSqlStore(), callback)); + BackgroundTaskNotifier callback, + ExtensionServiceRequestManager extensionServiceRequestManager) { + return List.of(new ExtensionsInstallationTask( + settings, + StorageDispatcher.INSTANCE.getNoSqlStore(), + callback, + extensionServiceRequestManager + )); } } diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/setup/PipelineElementInstallationStep.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/setup/PipelineElementInstallationStep.java index 0bbb8f31b1..6a3e55f829 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/setup/PipelineElementInstallationStep.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/setup/PipelineElementInstallationStep.java @@ -18,6 +18,7 @@ package org.apache.streampipes.manager.setup; import org.apache.streampipes.commons.exceptions.SepaParseException; +import org.apache.streampipes.manager.api.extensions.ExtensionServiceRequestManager; import org.apache.streampipes.manager.extensions.ExtensionItemInstaller; import org.apache.streampipes.manager.extensions.ExtensionsResourceUrlProvider; import org.apache.streampipes.model.extensions.ExtensionItemDescription; @@ -30,12 +31,15 @@ public class PipelineElementInstallationStep extends InstallationStep { private final ExtensionItemDescription extensionItem; private final String principalSid; + private final ExtensionServiceRequestManager extensionServiceRequestManager; public PipelineElementInstallationStep(ExtensionItemDescription extensionItem, - String principalSid) { + String principalSid, + ExtensionServiceRequestManager extensionServiceRequestManager) { this.extensionItem = extensionItem; this.principalSid = principalSid; + this.extensionServiceRequestManager = extensionServiceRequestManager; } @Override @@ -43,7 +47,8 @@ public class PipelineElementInstallationStep extends InstallationStep { var installationReq = ExtensionItemInstallationRequest.fromDescription(extensionItem, true); var resourceUrlProvider = new ExtensionsResourceUrlProvider(SpServiceDiscovery.getServiceDiscovery()); try { - new ExtensionItemInstaller(resourceUrlProvider).installExtension(installationReq, principalSid); + new ExtensionItemInstaller(resourceUrlProvider, extensionServiceRequestManager) + .installExtension(installationReq, principalSid); logSuccess(getTitle()); } catch (SepaParseException | IOException e) { logFailure(getTitle()); diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/ResetManagement.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/ResetManagement.java index 462294286c..1c72d3b94a 100644 --- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/ResetManagement.java +++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/ResetManagement.java @@ -22,6 +22,7 @@ import org.apache.streampipes.commons.exceptions.SpRuntimeException; import org.apache.streampipes.commons.exceptions.connect.AdapterException; import org.apache.streampipes.commons.prometheus.adapter.AdapterMetricsManager; import org.apache.streampipes.connect.management.management.AdapterMasterManagement; +import org.apache.streampipes.connect.management.management.WorkerRestClient; import org.apache.streampipes.dataexplorer.management.DataExplorerDispatcher; import org.apache.streampipes.manager.file.FileManager; import org.apache.streampipes.manager.pipeline.PipelineCacheManager; @@ -56,7 +57,7 @@ public class ResetManagement { * * @param username of the user to delte the resources */ - public static void reset(String username) { + public static void reset(String username, WorkerRestClient workerRestClient) { logger.info("Start resetting the system"); setHideTutorialToFalse(username); @@ -65,7 +66,7 @@ public class ResetManagement { stopAndDeleteAllPipelines(); - stopAndDeleteAllAdapters(); + stopAndDeleteAllAdapters(workerRestClient); deleteAllFiles(); @@ -101,13 +102,14 @@ public class ResetManagement { }); } - private static void stopAndDeleteAllAdapters() { + private static void stopAndDeleteAllAdapters(WorkerRestClient workerRestClient) { AdapterMasterManagement adapterMasterManagement = new AdapterMasterManagement( StorageDispatcher.INSTANCE.getNoSqlStore() .getAdapterInstanceStorage(), new SpResourceManager().manageAdapters(), new SpResourceManager().manageDataStreams(), - AdapterMetricsManager.INSTANCE.getAdapterMetrics() + AdapterMetricsManager.INSTANCE.getAdapterMetrics(), + workerRestClient ); List<AdapterDescription> allAdapters = adapterMasterManagement.getAllAdapterInstances(); diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/ContainerProvidedOptions.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/ContainerProvidedOptions.java index 1d41f47cc4..fa1b1d798b 100644 --- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/ContainerProvidedOptions.java +++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/ContainerProvidedOptions.java @@ -17,6 +17,7 @@ */ package org.apache.streampipes.rest.impl; +import org.apache.streampipes.manager.api.extensions.ExtensionServiceRequestManager; import org.apache.streampipes.manager.remote.ContainerProvidedOptionsHandler; import org.apache.streampipes.model.runtime.RuntimeOptionsRequest; import org.apache.streampipes.model.runtime.RuntimeOptionsResponse; @@ -33,11 +34,17 @@ import org.springframework.web.bind.annotation.RestController; @RequestMapping("/api/v2/pe/options") public class ContainerProvidedOptions extends AbstractRestResource { + private final ContainerProvidedOptionsHandler containerProvidedOptionsHandler; + + public ContainerProvidedOptions(ExtensionServiceRequestManager extensionServiceRequestManager) { + this.containerProvidedOptionsHandler = new ContainerProvidedOptionsHandler(extensionServiceRequestManager); + } + @PostMapping( produces = MediaType.APPLICATION_JSON_VALUE, consumes = MediaType.APPLICATION_JSON_VALUE ) public ResponseEntity<RuntimeOptionsResponse> fetchRemoteOptions(@RequestBody RuntimeOptionsRequest request) { - return ok(new ContainerProvidedOptionsHandler().fetchRemoteOptions(request)); + return ok(containerProvidedOptionsHandler.fetchRemoteOptions(request)); } } diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/ResetResource.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/ResetResource.java index bbf7f2aa2c..66eb908b43 100644 --- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/ResetResource.java +++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/ResetResource.java @@ -18,6 +18,7 @@ package org.apache.streampipes.rest.impl; +import org.apache.streampipes.connect.management.management.WorkerRestClient; import org.apache.streampipes.model.client.user.Principal; import org.apache.streampipes.model.client.user.PrincipalType; import org.apache.streampipes.model.message.Notifications; @@ -38,10 +39,16 @@ import java.util.ArrayList; @RequestMapping("/api/v2/reset") public class ResetResource extends AbstractAuthGuardedRestResource { - @PostMapping(produces = MediaType.APPLICATION_JSON_VALUE) + private final WorkerRestClient workerRestClient; + + public ResetResource(WorkerRestClient workerRestClient) { + this.workerRestClient = workerRestClient; + } + + @PostMapping(produces = MediaType.APPLICATION_JSON_VALUE) @Operation(summary = "Resets StreamPipes instance") public ResponseEntity<SuccessMessage> reset() { - ResetManagement.reset(getAuthenticatedUsername()); + ResetManagement.reset(getAuthenticatedUsername(), workerRestClient); var userStorage = getUserStorage(); @@ -50,7 +57,7 @@ public class ResetResource extends AbstractAuthGuardedRestResource { for (var user : allUsers) { if (user.getPrincipalType() == PrincipalType.USER_ACCOUNT && !user.getPrincipalId().equals(getAuthenticatedUserSid())) { - ResetManagement.reset(user.getUsername()); + ResetManagement.reset(user.getUsername(), workerRestClient); userStorage.deleteUser(user.getPrincipalId()); } } diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/admin/ExtensionsInstallationResource.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/admin/ExtensionsInstallationResource.java index 42bb1403e4..24a120bd09 100644 --- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/admin/ExtensionsInstallationResource.java +++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/admin/ExtensionsInstallationResource.java @@ -19,6 +19,7 @@ package org.apache.streampipes.rest.impl.admin; import org.apache.streampipes.commons.exceptions.SepaParseException; +import org.apache.streampipes.manager.api.extensions.ExtensionServiceRequestManager; import org.apache.streampipes.manager.assets.AssetManager; import org.apache.streampipes.manager.extensions.ExtensionItemInstaller; import org.apache.streampipes.manager.extensions.ExtensionsResourceUrlProvider; @@ -49,13 +50,19 @@ import java.io.IOException; @PreAuthorize(AuthConstants.IS_ADMIN_ROLE) public class ExtensionsInstallationResource extends AbstractAuthGuardedRestResource { + private final ExtensionServiceRequestManager extensionServiceRequestManager; + + public ExtensionsInstallationResource(ExtensionServiceRequestManager extensionServiceRequestManager) { + this.extensionServiceRequestManager = extensionServiceRequestManager; + } + @PostMapping( consumes = MediaType.APPLICATION_JSON_VALUE, produces = MediaType.APPLICATION_JSON_VALUE) public ResponseEntity<Message> addElement(@RequestBody ExtensionItemInstallationRequest installationReq) { var descriptionUrlProvider = new ExtensionsResourceUrlProvider(SpServiceDiscovery.getServiceDiscovery()); try { - return ok(new ExtensionItemInstaller(descriptionUrlProvider) + return ok(new ExtensionItemInstaller(descriptionUrlProvider, extensionServiceRequestManager) .installExtension(installationReq, getAuthenticatedUserSid())); } catch (IOException | SepaParseException e) { return constructErrorMessage(new Notification(NotificationType.PARSE_ERROR, e.getMessage())); @@ -68,7 +75,7 @@ public class ExtensionsInstallationResource extends AbstractAuthGuardedRestResou public ResponseEntity<Message> updateElement(@RequestBody ExtensionItemInstallationRequest installationReq) { var descriptionUrlProvider = new ExtensionsResourceUrlProvider(SpServiceDiscovery.getServiceDiscovery()); try { - return ok(new ExtensionItemInstaller(descriptionUrlProvider) + return ok(new ExtensionItemInstaller(descriptionUrlProvider, extensionServiceRequestManager) .updateExtension(installationReq)); } catch (IOException | SepaParseException e) { return constructErrorMessage(new Notification(NotificationType.PARSE_ERROR, e.getMessage())); diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/admin/MigrationResource.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/admin/MigrationResource.java index 4af17cb901..5fc813d6de 100644 --- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/admin/MigrationResource.java +++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/admin/MigrationResource.java @@ -19,7 +19,9 @@ package org.apache.streampipes.rest.impl.admin; import org.apache.streampipes.connect.management.management.AdapterMigrationManager; +import org.apache.streampipes.connect.management.management.WorkerRestClient; import org.apache.streampipes.health.monitoring.ServiceRegistrationManager; +import org.apache.streampipes.manager.api.extensions.ExtensionServiceRequestManager; import org.apache.streampipes.manager.health.CoreInitialInstallationProgress; import org.apache.streampipes.manager.health.CoreServiceStatusManager; import org.apache.streampipes.manager.migration.PipelineElementMigrationManager; @@ -73,6 +75,14 @@ public class MigrationResource extends AbstractAuthGuardedRestResource { private final CoreServiceStatusManager coreServiceStatusManager = new CoreServiceStatusManager( getNoSqlStorage().getSpCoreConfigurationStorage() ); + private final ExtensionServiceRequestManager extensionServiceRequestManager; + private final WorkerRestClient workerRestClient; + + public MigrationResource(ExtensionServiceRequestManager extensionServiceRequestManager, + WorkerRestClient workerRestClient) { + this.extensionServiceRequestManager = extensionServiceRequestManager; + this.workerRestClient = workerRestClient; + } @PostMapping(path = "{serviceId}", consumes = MediaType.APPLICATION_JSON_VALUE) @Operation( @@ -119,12 +129,17 @@ public class MigrationResource extends AbstractAuthGuardedRestResource { List.of(SpServiceTagPrefix.DATA_PROCESSOR, SpServiceTagPrefix.DATA_SINK) ); - new AdapterMigrationManager(adapterStorage, adapterDescriptionStorage) + new AdapterMigrationManager( + adapterStorage, + adapterDescriptionStorage, + workerRestClient, + extensionServiceRequestManager) .handleMigrations(extensionsServiceConfig, adapterMigrations); new PipelineElementMigrationManager( pipelineStorage, dataProcessorStorage, - dataSinkStorage) + dataSinkStorage, + extensionServiceRequestManager) .handleMigrations(extensionsServiceConfig, pipelineElementMigrations); } } diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/AdapterResource.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/AdapterResource.java index b9a79917e0..d07cf9c55a 100644 --- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/AdapterResource.java +++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/AdapterResource.java @@ -23,6 +23,7 @@ import org.apache.streampipes.commons.prometheus.adapter.AdapterMetricsManager; import org.apache.streampipes.connect.management.management.AdapterMasterManagement; import org.apache.streampipes.connect.management.management.AdapterUpdateManagement; import org.apache.streampipes.connect.management.management.CompactAdapterManagement; +import org.apache.streampipes.connect.management.management.WorkerRestClient; import org.apache.streampipes.manager.pipeline.PipelineManager; import org.apache.streampipes.model.client.user.DefaultRole; import org.apache.streampipes.model.client.user.Permission; @@ -69,13 +70,14 @@ public class AdapterResource extends AbstractAdapterResource<AdapterMasterManage private static final Logger LOG = LoggerFactory.getLogger(AdapterResource.class); - public AdapterResource() { + public AdapterResource(WorkerRestClient workerRestClient) { super(() -> new AdapterMasterManagement( StorageDispatcher.INSTANCE.getNoSqlStore() .getAdapterInstanceStorage(), new SpResourceManager().manageAdapters(), new SpResourceManager().manageDataStreams(), - AdapterMetricsManager.INSTANCE.getAdapterMetrics())); + AdapterMetricsManager.INSTANCE.getAdapterMetrics(), + workerRestClient)); } @PostMapping(consumes = MediaType.APPLICATION_JSON_VALUE) diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/CompactAdapterResource.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/CompactAdapterResource.java index 2101cb3fe6..a4211f2a9e 100644 --- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/CompactAdapterResource.java +++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/CompactAdapterResource.java @@ -25,6 +25,10 @@ import org.apache.streampipes.connect.management.compact.PersistPipelineHandler; import org.apache.streampipes.connect.management.management.AdapterMasterManagement; import org.apache.streampipes.connect.management.management.AdapterUpdateManagement; import org.apache.streampipes.connect.management.management.CompactAdapterManagement; +import org.apache.streampipes.connect.management.management.GuessManagement; +import org.apache.streampipes.connect.management.management.WorkerRestClient; +import org.apache.streampipes.manager.api.extensions.ExtensionServiceRequestManager; +import org.apache.streampipes.manager.execution.endpoint.ExtensionsServiceEndpointGenerator; import org.apache.streampipes.manager.pipeline.compact.CompactPipelineManagement; import org.apache.streampipes.model.connect.adapter.compact.CompactAdapter; import org.apache.streampipes.model.message.Notifications; @@ -54,15 +58,23 @@ public class CompactAdapterResource extends AbstractAdapterResource<AdapterMaste private final CompactAdapterManagement compactAdapterManagement; private final AdapterUpdateManagement adapterUpdateManagement; - public CompactAdapterResource() { + public CompactAdapterResource(WorkerRestClient workerRestClient, + ExtensionServiceRequestManager extensionServiceRequestManager) { super(() -> new AdapterMasterManagement( StorageDispatcher.INSTANCE.getNoSqlStore() .getAdapterInstanceStorage(), new SpResourceManager().manageAdapters(), new SpResourceManager().manageDataStreams(), - AdapterMetricsManager.INSTANCE.getAdapterMetrics() + AdapterMetricsManager.INSTANCE.getAdapterMetrics(), + workerRestClient )); - this.compactAdapterManagement = new CompactAdapterManagement(new AdapterGenerationSteps().getGenerators()); + var guessManagement = new GuessManagement( + new ExtensionsServiceEndpointGenerator(), + extensionServiceRequestManager + ); + this.compactAdapterManagement = new CompactAdapterManagement( + new AdapterGenerationSteps(guessManagement).getGenerators() + ); this.adapterUpdateManagement = new AdapterUpdateManagement(managementService); } diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/DescriptionResource.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/DescriptionResource.java index 5d22800231..3dfc02014a 100644 --- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/DescriptionResource.java +++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/DescriptionResource.java @@ -23,6 +23,7 @@ import org.apache.streampipes.commons.exceptions.SpRuntimeException; import org.apache.streampipes.commons.exceptions.connect.AdapterException; import org.apache.streampipes.commons.media.ImageMimeTypeDetector; import org.apache.streampipes.connect.management.management.DescriptionManagement; +import org.apache.streampipes.connect.management.management.WorkerRestClient; import org.apache.streampipes.manager.api.extensions.IExtensionsServiceEndpointGenerator; import org.apache.streampipes.manager.execution.endpoint.ExtensionsServiceEndpointGenerator; import org.apache.streampipes.model.connect.adapter.AdapterDescription; @@ -49,8 +50,8 @@ public class DescriptionResource extends AbstractAdapterResource<DescriptionMana private static final Logger LOG = LoggerFactory.getLogger(DescriptionResource.class); private final IExtensionsServiceEndpointGenerator endpointGenerator; - public DescriptionResource() { - super(DescriptionManagement::new); + public DescriptionResource(WorkerRestClient workerRestClient) { + super(() -> new DescriptionManagement(workerRestClient)); endpointGenerator = new ExtensionsServiceEndpointGenerator(); } diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/GuessResource.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/GuessResource.java index 921fb565cd..1b57e21491 100644 --- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/GuessResource.java +++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/GuessResource.java @@ -22,6 +22,8 @@ import org.apache.streampipes.commons.exceptions.NoServiceEndpointsAvailableExce import org.apache.streampipes.commons.exceptions.connect.AdapterException; import org.apache.streampipes.connect.management.management.GuessManagement; import org.apache.streampipes.extensions.api.connect.exception.WorkerAdapterException; +import org.apache.streampipes.manager.api.extensions.ExtensionServiceRequestManager; +import org.apache.streampipes.manager.execution.endpoint.ExtensionsServiceEndpointGenerator; import org.apache.streampipes.model.connect.adapter.AdapterDescription; import org.apache.streampipes.model.monitoring.SpLogMessage; import org.apache.streampipes.model.schema.EventSchema; @@ -44,8 +46,8 @@ public class GuessResource extends AbstractAdapterResource<GuessManagement> { private static final Logger LOG = LoggerFactory.getLogger(GuessResource.class); - public GuessResource() { - super(GuessManagement::new); + public GuessResource(ExtensionServiceRequestManager extensionServiceRequestManager) { + super(() -> new GuessManagement(new ExtensionsServiceEndpointGenerator(), extensionServiceRequestManager)); } diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/RuntimeResolvableResource.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/RuntimeResolvableResource.java index fddde0f7a9..1d471a1ce2 100644 --- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/RuntimeResolvableResource.java +++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/RuntimeResolvableResource.java @@ -48,10 +48,12 @@ public class RuntimeResolvableResource extends AbstractAdapterResource<Void> { private static final Logger LOG = LoggerFactory.getLogger(RuntimeResolvableResource.class); private final IExtensionsServiceEndpointGenerator endpointGenerator; + private final WorkerRestClient workerRestClient; - public RuntimeResolvableResource() { + public RuntimeResolvableResource(WorkerRestClient workerRestClient) { super(); this.endpointGenerator = new ExtensionsServiceEndpointGenerator(); + this.workerRestClient = workerRestClient; } @PostMapping( @@ -69,7 +71,7 @@ public class RuntimeResolvableResource extends AbstractAdapterResource<Void> { runtimeOptionsRequest.getDeploymentConfiguration().getDesiredServiceTags() ); SecretProvider.getDecryptionService().applyConfig(runtimeOptionsRequest.getStaticProperties()); - RuntimeOptionsResponse result = WorkerRestClient.getConfiguration(baseUrl, appId, runtimeOptionsRequest); + RuntimeOptionsResponse result = workerRestClient.getConfiguration(baseUrl, appId, runtimeOptionsRequest); return ok(result); } catch (AdapterException | NoServiceEndpointsAvailableException e) { diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/DetachHttpRequest.java b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/ExtensionServiceRequestConfiguration.java similarity index 50% copy from streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/DetachHttpRequest.java copy to streampipes-service-core/src/main/java/org/apache/streampipes/service/core/ExtensionServiceRequestConfiguration.java index ac9b73705a..4e2bd33111 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/DetachHttpRequest.java +++ b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/ExtensionServiceRequestConfiguration.java @@ -15,28 +15,25 @@ * limitations under the License. * */ +package org.apache.streampipes.service.core; -package org.apache.streampipes.manager.execution.http; +import org.apache.streampipes.connect.management.management.WorkerRestClient; +import org.apache.streampipes.manager.api.extensions.ExtensionServiceRequestManager; +import org.apache.streampipes.manager.execution.HttpExtensionServiceRequestManager; -import org.apache.streampipes.model.api.EndpointSelectable; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; -import org.apache.http.client.fluent.Request; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +@Configuration +public class ExtensionServiceRequestConfiguration { - -public class DetachHttpRequest extends PipelineElementHttpRequest { - - private static final Logger LOG = LoggerFactory.getLogger(DetachHttpRequest.class); - - @Override - protected Request initRequest(EndpointSelectable pipelineElement, String endpointUrl) { - LOG.info("Detaching element: " + endpointUrl); - return Request.Delete(endpointUrl); + @Bean + public ExtensionServiceRequestManager extensionServiceRequestManager() { + return new HttpExtensionServiceRequestManager(); } - @Override - protected void logError(String endpointUrl, String pipelineElementName, String exceptionMessage) { - LOG.error("Could not stop pipeline element {} at {}: {}", endpointUrl, pipelineElementName, exceptionMessage); + @Bean + public WorkerRestClient workerRestClient(ExtensionServiceRequestManager extensionServiceRequestManager) { + return new WorkerRestClient(extensionServiceRequestManager); } } diff --git a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/PostStartupTask.java b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/PostStartupTask.java index c08739b1d2..465247cd53 100644 --- a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/PostStartupTask.java +++ b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/PostStartupTask.java @@ -21,10 +21,12 @@ package org.apache.streampipes.service.core; import org.apache.streampipes.commons.prometheus.adapter.AdapterMetricsManager; import org.apache.streampipes.connect.management.management.AdapterMasterManagement; import org.apache.streampipes.connect.management.management.WorkerAdministrationManagement; +import org.apache.streampipes.connect.management.management.WorkerRestClient; import org.apache.streampipes.health.monitoring.ExtensionHealthCheck; import org.apache.streampipes.health.monitoring.PostStartupRecovery; import org.apache.streampipes.health.monitoring.ResourceProvider; import org.apache.streampipes.health.monitoring.ServiceHealthCheck; +import org.apache.streampipes.manager.api.extensions.ExtensionServiceRequestManager; import org.apache.streampipes.manager.execution.PipelineExecutor; import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceTagPrefix; import org.apache.streampipes.model.pipeline.Pipeline; @@ -56,11 +58,15 @@ public class PostStartupTask implements Runnable { private final ScheduledExecutorService executorService; private final WorkerAdministrationManagement workerAdministrationManagement; private final PostStartupRecovery postStartupRecovery; + private final ExtensionServiceRequestManager extensionServiceRequestManager; private final INoSqlStorage storage = StorageDispatcher.INSTANCE.getNoSqlStore(); - public PostStartupTask(IPipelineStorage pipelineStorage) { + public PostStartupTask(IPipelineStorage pipelineStorage, + ExtensionServiceRequestManager extensionServiceRequestManager, + WorkerRestClient workerRestClient) { this.pipelineStorage = pipelineStorage; + this.extensionServiceRequestManager = extensionServiceRequestManager; this.executorService = Executors.newSingleThreadScheduledExecutor(); var resourceManager = new SpResourceManager(); this.workerAdministrationManagement = new WorkerAdministrationManagement( @@ -77,16 +83,18 @@ public class PostStartupTask implements Runnable { StorageDispatcher.INSTANCE.getNoSqlStore().getAdapterInstanceStorage(), new SpResourceManager().manageAdapters(), new SpResourceManager().manageDataStreams(), - AdapterMetricsManager.INSTANCE.getAdapterMetrics() + AdapterMetricsManager.INSTANCE.getAdapterMetrics(), + workerRestClient ) - ) + ), + extensionServiceRequestManager ) ); } @Override public void run() { - new ServiceHealthCheck(storage.getExtensionsServiceStorage()).run(); + new ServiceHealthCheck(storage.getExtensionsServiceStorage(), extensionServiceRequestManager).run(); performAdapterAssetUpdate(); startAllPreviouslyStoppedPipelines(); runHealthCheckOnce(); diff --git a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/StreamPipesCoreApplication.java b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/StreamPipesCoreApplication.java index d7029f2bcf..541bb57850 100644 --- a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/StreamPipesCoreApplication.java +++ b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/StreamPipesCoreApplication.java @@ -20,6 +20,7 @@ package org.apache.streampipes.service.core; import org.apache.streampipes.commons.environment.Environments; import org.apache.streampipes.commons.prometheus.adapter.AdapterMetricsManager; import org.apache.streampipes.connect.management.management.AdapterMasterManagement; +import org.apache.streampipes.connect.management.management.WorkerRestClient; import org.apache.streampipes.connect.transformer.api.TransformationEngine; import org.apache.streampipes.connect.transformer.api.TransformationEngines; import org.apache.streampipes.connect.transformer.groovy.GroovyScriptEngine; @@ -29,6 +30,7 @@ import org.apache.streampipes.health.monitoring.ResourceProvider; import org.apache.streampipes.health.monitoring.ServiceHealthCheck; import org.apache.streampipes.loadbalance.LoadManager; import org.apache.streampipes.loadbalance.pipeline.ExtensionsServiceLogExecutor; +import org.apache.streampipes.manager.api.extensions.ExtensionServiceRequestManager; import org.apache.streampipes.manager.function.FunctionManager; import org.apache.streampipes.manager.health.CoreInitialInstallationProgress; import org.apache.streampipes.manager.health.CoreServiceStatusManager; @@ -82,7 +84,7 @@ import java.util.function.Supplier; @EnableScheduling @Import({OpenApiConfiguration.class, StreamPipesPasswordEncoder.class, StreamPipesPrometheusConfig.class, WebSecurityConfig.class, WelcomePageController.class, - StorageApiConfiguration.class}) + StorageApiConfiguration.class, ExtensionServiceRequestConfiguration.class}) @ComponentScan({"org.apache.streampipes.rest.*", "org.apache.streampipes.service.core.oauth2", "org.apache.streampipes.service.core.scheduler"}) public class StreamPipesCoreApplication extends StreamPipesServiceBase { @@ -99,6 +101,12 @@ public class StreamPipesCoreApplication extends StreamPipesServiceBase { @Autowired private IFunctionStateStorage functionStateStorage; + @Autowired + private ExtensionServiceRequestManager extensionServiceRequestManager; + + @Autowired + private WorkerRestClient workerRestClient; + public static void main(String[] args) { StreamPipesCoreApplication application = new StreamPipesCoreApplication(); application.initialize(() -> List.of( @@ -159,12 +167,17 @@ public class StreamPipesCoreApplication extends StreamPipesServiceBase { new ApplyDefaultRolesAndPrivilegesTask().execute(); coreStatusManager.updateCoreStatus(SpCoreConfigurationStatus.READY); - executorService.schedule(new PostStartupTask(getPipelineStorage()), + executorService.schedule(new PostStartupTask( + getPipelineStorage(), + extensionServiceRequestManager, + workerRestClient), env.getInitialHealthCheckDelayInMillis().getValueOrDefault(), TimeUnit.MILLISECONDS); scheduleHealthChecks(env.getHealthCheckIntervalInMillis().getValueOrDefault(), List - .of(new ServiceHealthCheck(StorageDispatcher.INSTANCE.getNoSqlStore().getExtensionsServiceStorage()), + .of(new ServiceHealthCheck( + StorageDispatcher.INSTANCE.getNoSqlStore().getExtensionsServiceStorage(), + extensionServiceRequestManager), new ExtensionHealthCheck( new ResourceProvider( StorageDispatcher.INSTANCE.getNoSqlStore().getPipelineStorageAPI(), @@ -173,9 +186,10 @@ public class StreamPipesCoreApplication extends StreamPipesServiceBase { StorageDispatcher.INSTANCE.getNoSqlStore().getAdapterInstanceStorage(), new SpResourceManager().manageAdapters(), new SpResourceManager().manageDataStreams(), - AdapterMetricsManager.INSTANCE.getAdapterMetrics() - ) - ) + AdapterMetricsManager.INSTANCE.getAdapterMetrics(), + workerRestClient + )), + extensionServiceRequestManager ))); var logFetchInterval = env.getLogFetchIntervalInMillis().getValueOrDefault(); @@ -212,7 +226,7 @@ public class StreamPipesCoreApplication extends StreamPipesServiceBase { try { TimeUnit.MILLISECONDS.sleep(initialSleepBeforeInstallation); LOG.info("Starting installation procedure"); - new AutoInstallation().startAutoInstallation(); + new AutoInstallation(extensionServiceRequestManager).startAutoInstallation(); } catch (InterruptedException e) { LOG.error("Ooops, something went wrong during the installation", e); } @@ -242,7 +256,7 @@ public class StreamPipesCoreApplication extends StreamPipesServiceBase { } }); - FunctionManager.stopAllFunctionsAndPersistState(functionStateStorage); + new FunctionManager(extensionServiceRequestManager).stopAllFunctionsAndPersistState(functionStateStorage); LOG.info("Thanks for using Apache StreamPipes - see you next time!"); }
