This is an automated email from the ASF dual-hosted git repository. riemer pushed a commit to branch dev in repository https://gitbox.apache.org/repos/asf/streampipes.git
commit 7be2b887deb685870063ac34f2771dce13a38869 Author: Dominik Riemer <[email protected]> AuthorDate: Sun Jan 15 22:07:21 2023 +0100 Improve structure of pipeline execution management (#1096) --- .../org/apache/streampipes/model/SpDataSet.java | 15 +- .../streampipes/model/api/EndpointSelectable.java | 17 +- .../model/base/InvocableStreamPipesEntity.java | 18 +- .../model/message/PipelineStatusMessage.java | 9 +- .../model/pipeline/PipelineOperationStatus.java | 10 +- .../manager/execution/PipelineExecutionInfo.java | 106 +++++++++ .../execution/PipelineExecutionTaskFactory.java | 60 +++++ .../manager/execution/PipelineExecutor.java | 57 +++++ .../ExtensionsServiceEndpointProvider.java | 62 ++++++ .../http/DetachHttpRequest.java} | 26 ++- .../http/DetachPipelineElementSubmitter.java | 53 +++++ .../manager/execution/http/GraphSubmitter.java | 132 ----------- .../manager/execution/http/HttpRequestBuilder.java | 98 --------- .../manager/execution/http/InvokeHttpRequest.java | 54 +++++ .../http/InvokePipelineElementSubmitter.java | 81 +++++++ .../execution/http/PipelineElementHttpRequest.java | 73 +++++++ .../execution/http/PipelineElementSubmitter.java | 82 +++++++ .../manager/execution/http/PipelineExecutor.java | 243 --------------------- .../provider/CurrentPipelineElementProvider.java} | 19 +- .../provider/PipelineElementProvider.java} | 12 +- .../provider/StoredPipelineElementProvider.java} | 20 +- .../execution/task/AfterInvocationTask.java | 65 ++++++ .../execution/task/DiscoverEndpointsTask.java | 88 ++++++++ .../task/PipelineExecutionTask.java} | 20 +- .../task/SecretEncryptionTask.java} | 24 +- .../execution/task/StorePipelineStatusTask.java | 76 +++++++ .../manager/execution/task/SubmitRequestTask.java | 51 +++++ .../manager/execution/task/UpdateGroupIdTask.java | 46 ++++ .../manager/health/PipelineHealthCheck.java | 10 +- .../streampipes/manager/operations/Operations.java | 22 +- .../manager/preview/PipelinePreview.java | 10 +- .../http => storage}/PipelineStorageService.java | 2 +- .../RunningPipelineElementStorage.java} | 8 +- .../streampipes/manager/storage/UserService.java | 101 --------- .../rest/core/base/impl/AbstractRestResource.java | 5 - 35 files changed, 1093 insertions(+), 682 deletions(-) diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/SpDataSet.java b/streampipes-model/src/main/java/org/apache/streampipes/model/SpDataSet.java index b0a9dff08..cfcbf77e3 100644 --- a/streampipes-model/src/main/java/org/apache/streampipes/model/SpDataSet.java +++ b/streampipes-model/src/main/java/org/apache/streampipes/model/SpDataSet.java @@ -17,10 +17,13 @@ */ package org.apache.streampipes.model; +import org.apache.streampipes.model.api.EndpointSelectable; import org.apache.streampipes.model.grounding.EventGrounding; import org.apache.streampipes.model.schema.EventSchema; -public class SpDataSet extends SpDataStream { +import com.fasterxml.jackson.annotation.JsonIgnore; + +public class SpDataSet extends SpDataStream implements EndpointSelectable { private EventGrounding supportedGrounding; @@ -90,18 +93,28 @@ public class SpDataSet extends SpDataStream { this.datasetInvocationId = datasetInvocationId; } + @Override public String getCorrespondingPipeline() { return correspondingPipeline; } + @Override public void setCorrespondingPipeline(String correspondingPipeline) { this.correspondingPipeline = correspondingPipeline; } + @Override + @JsonIgnore + public String getDetachPath() { + return "/" + getCorrespondingAdapterId() + "/" + getDatasetInvocationId(); + } + + @Override public String getSelectedEndpointUrl() { return selectedEndpointUrl; } + @Override public void setSelectedEndpointUrl(String selectedEndpointUrl) { this.selectedEndpointUrl = selectedEndpointUrl; } diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/util/TemporaryGraphStorage.java b/streampipes-model/src/main/java/org/apache/streampipes/model/api/EndpointSelectable.java similarity index 64% copy from streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/util/TemporaryGraphStorage.java copy to streampipes-model/src/main/java/org/apache/streampipes/model/api/EndpointSelectable.java index 085d17e44..585400010 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/util/TemporaryGraphStorage.java +++ b/streampipes-model/src/main/java/org/apache/streampipes/model/api/EndpointSelectable.java @@ -16,19 +16,18 @@ * */ -package org.apache.streampipes.manager.util; +package org.apache.streampipes.model.api; -import org.apache.streampipes.model.SpDataSet; -import org.apache.streampipes.model.base.InvocableStreamPipesEntity; +public interface EndpointSelectable { -import java.util.HashMap; -import java.util.List; -import java.util.Map; + String getName(); -public class TemporaryGraphStorage { + String getSelectedEndpointUrl(); + void setSelectedEndpointUrl(String selectedEndpointUrl); - public static Map<String, List<InvocableStreamPipesEntity>> graphStorage = new HashMap<>(); + String getCorrespondingPipeline(); - public static Map<String, List<SpDataSet>> datasetStorage = new HashMap<>(); + void setCorrespondingPipeline(String pipelineId); + String getDetachPath(); } diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/base/InvocableStreamPipesEntity.java b/streampipes-model/src/main/java/org/apache/streampipes/model/base/InvocableStreamPipesEntity.java index 3a87cc5ba..0d64297fc 100644 --- a/streampipes-model/src/main/java/org/apache/streampipes/model/base/InvocableStreamPipesEntity.java +++ b/streampipes-model/src/main/java/org/apache/streampipes/model/base/InvocableStreamPipesEntity.java @@ -18,19 +18,21 @@ package org.apache.streampipes.model.base; +import org.apache.streampipes.commons.constants.InstanceIdExtractor; import org.apache.streampipes.logging.LoggerFactory; import org.apache.streampipes.logging.api.Logger; import org.apache.streampipes.model.SpDataStream; +import org.apache.streampipes.model.api.EndpointSelectable; import org.apache.streampipes.model.grounding.EventGrounding; import org.apache.streampipes.model.monitoring.ElementStatusInfoSettings; import org.apache.streampipes.model.staticproperty.StaticProperty; import org.apache.streampipes.model.util.Cloner; -import java.util.List; +import com.fasterxml.jackson.annotation.JsonIgnore; -public abstract class InvocableStreamPipesEntity extends NamedStreamPipesEntity { +import java.util.List; - private static final long serialVersionUID = 2727573914765473470L; +public abstract class InvocableStreamPipesEntity extends NamedStreamPipesEntity implements EndpointSelectable { protected List<SpDataStream> inputStreams; @@ -120,10 +122,12 @@ public abstract class InvocableStreamPipesEntity extends NamedStreamPipesEntity this.supportedGrounding = supportedGrounding; } + @Override public String getCorrespondingPipeline() { return correspondingPipeline; } + @Override public void setCorrespondingPipeline(String correspondingPipeline) { this.correspondingPipeline = correspondingPipeline; } @@ -168,14 +172,22 @@ public abstract class InvocableStreamPipesEntity extends NamedStreamPipesEntity this.uncompleted = uncompleted; } + @Override public String getSelectedEndpointUrl() { return selectedEndpointUrl; } + @Override public void setSelectedEndpointUrl(String selectedEndpointUrl) { this.selectedEndpointUrl = selectedEndpointUrl; } + @Override + @JsonIgnore + public String getDetachPath() { + return "/" + InstanceIdExtractor.extractId(getElementId()); + } + //public Logger getLogger(Class clazz, PeConfig peConfig) { public Logger getLogger(Class clazz) { //return LoggerFactory.getPeLogger(clazz, getCorrespondingPipeline(), getUri(), peConfig); diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/message/PipelineStatusMessage.java b/streampipes-model/src/main/java/org/apache/streampipes/model/message/PipelineStatusMessage.java index a688a101c..6fa8f7816 100644 --- a/streampipes-model/src/main/java/org/apache/streampipes/model/message/PipelineStatusMessage.java +++ b/streampipes-model/src/main/java/org/apache/streampipes/model/message/PipelineStatusMessage.java @@ -28,13 +28,14 @@ public class PipelineStatusMessage { private String messageType; private String message; - public PipelineStatusMessage(String pipelineId, long timestamp, - String messageType, String message) { + public PipelineStatusMessage(String pipelineId, + long timestamp, + PipelineStatusMessageType message) { super(); this.pipelineId = pipelineId; this.timestamp = timestamp; - this.messageType = messageType; - this.message = message; + this.messageType = message.title(); + this.message = message.description(); } public String getPipelineId() { diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/PipelineOperationStatus.java b/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/PipelineOperationStatus.java index 25b3c5b27..8e42372e2 100644 --- a/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/PipelineOperationStatus.java +++ b/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/PipelineOperationStatus.java @@ -33,7 +33,9 @@ public class PipelineOperationStatus { private List<PipelineElementStatus> elementStatus; - public PipelineOperationStatus(String pipelineId, String pipelineName, String title, + public PipelineOperationStatus(String pipelineId, + String pipelineName, + String title, List<PipelineElementStatus> elementStatus) { super(); this.title = title; @@ -46,6 +48,12 @@ public class PipelineOperationStatus { this.elementStatus = new ArrayList<>(); } + public PipelineOperationStatus(String pipelineId, String pipelineName) { + this(); + this.pipelineId = pipelineId; + this.pipelineName = pipelineName; + } + public String getPipelineId() { return pipelineId; } diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/PipelineExecutionInfo.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/PipelineExecutionInfo.java new file mode 100644 index 000000000..ba13ab7b0 --- /dev/null +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/PipelineExecutionInfo.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.manager.execution; + +import org.apache.streampipes.model.SpDataSet; +import org.apache.streampipes.model.base.InvocableStreamPipesEntity; +import org.apache.streampipes.model.base.NamedStreamPipesEntity; +import org.apache.streampipes.model.pipeline.Pipeline; +import org.apache.streampipes.model.pipeline.PipelineOperationStatus; + +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +public class PipelineExecutionInfo { + + private final List<NamedStreamPipesEntity> failedServices; + private final List<InvocableStreamPipesEntity> processorsAndSinks; + private final List<SpDataSet> dataSets; + + private PipelineOperationStatus pipelineOperationStatus; + + private final String pipelineId; + + public static PipelineExecutionInfo create(Pipeline pipeline) { + return new PipelineExecutionInfo(pipeline); + } + + private PipelineExecutionInfo(Pipeline pipeline) { + this.failedServices = new ArrayList<>(); + this.processorsAndSinks = findProcessorsAndSinks(pipeline); + this.dataSets = findDataSets(pipeline); + this.pipelineOperationStatus = new PipelineOperationStatus(); + this.pipelineId = pipeline.getPipelineId(); + } + + private List<InvocableStreamPipesEntity> findProcessorsAndSinks(Pipeline pipeline) { + return Stream + .concat( + pipeline.getSepas().stream(), + pipeline.getActions().stream() + ).collect(Collectors.toList()); + } + + private List<SpDataSet> findDataSets(Pipeline pipeline) { + return pipeline + .getStreams() + .stream() + .filter(s -> s instanceof SpDataSet) + .map(s -> new SpDataSet((SpDataSet) s)) + .toList(); + } + + public void addFailedPipelineElement(NamedStreamPipesEntity failedElement) { + this.failedServices.add(failedElement); + } + + public void addDataSets(List<SpDataSet> dataSets) { + this.dataSets.addAll(dataSets); + } + + public List<SpDataSet> getDataSets() { + return dataSets; + } + + public List<NamedStreamPipesEntity> getFailedServices() { + return failedServices; + } + + public List<InvocableStreamPipesEntity> getProcessorsAndSinks() { + return processorsAndSinks; + } + + public void applyPipelineOperationStatus(PipelineOperationStatus status) { + this.pipelineOperationStatus = status; + } + + public PipelineOperationStatus getPipelineOperationStatus() { + return this.pipelineOperationStatus; + } + + public String getPipelineId() { + return pipelineId; + } + + public boolean isOperationSuccessful() { + return failedServices.size() == 0 && pipelineOperationStatus.isSuccess(); + } +} diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/PipelineExecutionTaskFactory.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/PipelineExecutionTaskFactory.java new file mode 100644 index 000000000..e290c485d --- /dev/null +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/PipelineExecutionTaskFactory.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.manager.execution; + +import org.apache.streampipes.manager.execution.http.DetachPipelineElementSubmitter; +import org.apache.streampipes.manager.execution.http.InvokePipelineElementSubmitter; +import org.apache.streampipes.manager.execution.provider.CurrentPipelineElementProvider; +import org.apache.streampipes.manager.execution.provider.StoredPipelineElementProvider; +import org.apache.streampipes.manager.execution.task.AfterInvocationTask; +import org.apache.streampipes.manager.execution.task.DiscoverEndpointsTask; +import org.apache.streampipes.manager.execution.task.SubmitRequestTask; +import org.apache.streampipes.manager.execution.task.PipelineExecutionTask; +import org.apache.streampipes.manager.execution.task.SecretEncryptionTask; +import org.apache.streampipes.manager.execution.task.StorePipelineStatusTask; +import org.apache.streampipes.manager.execution.task.UpdateGroupIdTask; +import org.apache.streampipes.model.message.PipelineStatusMessageType; +import org.apache.streampipes.model.pipeline.Pipeline; +import org.apache.streampipes.resource.management.secret.SecretProvider; + +import java.util.List; + +public class PipelineExecutionTaskFactory { + + public static List<PipelineExecutionTask> makeStartPipelineTasks(Pipeline pipeline) { + return List.of( + new UpdateGroupIdTask(), + new SecretEncryptionTask(SecretProvider.getDecryptionService()), + new DiscoverEndpointsTask(), + new SubmitRequestTask(new InvokePipelineElementSubmitter(pipeline), new CurrentPipelineElementProvider()), + new SecretEncryptionTask(SecretProvider.getEncryptionService()), + new AfterInvocationTask(PipelineStatusMessageType.PIPELINE_STARTED), + new StorePipelineStatusTask(true, false) + ); + } + + public static List<PipelineExecutionTask> makeStopPipelineTasks(Pipeline pipeline, + boolean forceStop) { + return List.of( + new SubmitRequestTask(new DetachPipelineElementSubmitter(pipeline), new StoredPipelineElementProvider()), + new AfterInvocationTask(PipelineStatusMessageType.PIPELINE_STOPPED), + new StorePipelineStatusTask(false, forceStop) + ); + } +} diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/PipelineExecutor.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/PipelineExecutor.java new file mode 100644 index 000000000..bc14e46f5 --- /dev/null +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/PipelineExecutor.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.manager.execution; + +import org.apache.streampipes.manager.execution.task.PipelineExecutionTask; +import org.apache.streampipes.model.pipeline.Pipeline; +import org.apache.streampipes.model.pipeline.PipelineOperationStatus; + +import java.util.List; + +public class PipelineExecutor { + + private final Pipeline pipeline; + private final boolean forceStop; + + public PipelineExecutor(Pipeline pipeline, + boolean forceStop) { + this.pipeline = pipeline; + this.forceStop = forceStop; + } + + public PipelineOperationStatus startPipeline() { + return executeOperation(PipelineExecutionTaskFactory.makeStartPipelineTasks(pipeline)); + } + + public PipelineOperationStatus stopPipeline() { + return executeOperation(PipelineExecutionTaskFactory.makeStopPipelineTasks(pipeline, forceStop)); + } + + private PipelineOperationStatus executeOperation(List<PipelineExecutionTask> executionTasks) { + var executionInfo = PipelineExecutionInfo.create(pipeline); + executionTasks + .forEach(task -> { + if (task.shouldExecute(executionInfo)) { + task.executeTask(pipeline, executionInfo); + } + }); + return executionInfo.getPipelineOperationStatus(); + } + +} diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/endpoint/ExtensionsServiceEndpointProvider.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/endpoint/ExtensionsServiceEndpointProvider.java new file mode 100644 index 000000000..51760b252 --- /dev/null +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/endpoint/ExtensionsServiceEndpointProvider.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.manager.execution.endpoint; + +import org.apache.streampipes.commons.constants.GlobalStreamPipesConstants; +import org.apache.streampipes.commons.exceptions.NoServiceEndpointsAvailableException; +import org.apache.streampipes.model.SpDataSet; +import org.apache.streampipes.model.base.InvocableStreamPipesEntity; +import org.apache.streampipes.svcdiscovery.SpServiceDiscovery; +import org.apache.streampipes.svcdiscovery.api.model.DefaultSpServiceGroups; +import org.apache.streampipes.svcdiscovery.api.model.DefaultSpServiceTags; +import org.apache.streampipes.svcdiscovery.api.model.SpServiceUrlProvider; + +import java.util.Collections; +import java.util.List; + +public class ExtensionsServiceEndpointProvider { + + public String findSelectedEndpoint(InvocableStreamPipesEntity g) throws NoServiceEndpointsAvailableException { + return new ExtensionsServiceEndpointGenerator( + g.getAppId(), + ExtensionsServiceEndpointUtils.getPipelineElementType(g)) + .getEndpointResourceUrl(); + } + + public String findSelectedEndpoint(SpDataSet ds) throws NoServiceEndpointsAvailableException { + String appId = ds.getAppId() != null ? ds.getAppId() : ds.getCorrespondingAdapterId(); + if (ds.isInternallyManaged()) { + return getConnectMasterSourcesUrl(); + } else { + return new ExtensionsServiceEndpointGenerator(appId, SpServiceUrlProvider.DATA_SET) + .getEndpointResourceUrl(); + } + } + + private String getConnectMasterSourcesUrl() throws NoServiceEndpointsAvailableException { + List<String> connectMasterEndpoints = SpServiceDiscovery.getServiceDiscovery() + .getServiceEndpoints(DefaultSpServiceGroups.CORE, true, + Collections.singletonList(DefaultSpServiceTags.CONNECT_MASTER.asString())); + if (connectMasterEndpoints.size() > 0) { + return connectMasterEndpoints.get(0) + GlobalStreamPipesConstants.CONNECT_MASTER_SOURCES_ENDPOINT; + } else { + throw new NoServiceEndpointsAvailableException("Could not find any available connect master service endpoint"); + } + } +} diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/util/TemporaryGraphStorage.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/DetachHttpRequest.java similarity index 50% copy from streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/util/TemporaryGraphStorage.java copy to streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/DetachHttpRequest.java index 085d17e44..ac9b73705 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/util/TemporaryGraphStorage.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/DetachHttpRequest.java @@ -16,19 +16,27 @@ * */ -package org.apache.streampipes.manager.util; +package org.apache.streampipes.manager.execution.http; -import org.apache.streampipes.model.SpDataSet; -import org.apache.streampipes.model.base.InvocableStreamPipesEntity; +import org.apache.streampipes.model.api.EndpointSelectable; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import org.apache.http.client.fluent.Request; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -public class TemporaryGraphStorage { - public static Map<String, List<InvocableStreamPipesEntity>> graphStorage = new HashMap<>(); +public class DetachHttpRequest extends PipelineElementHttpRequest { - public static Map<String, List<SpDataSet>> datasetStorage = new HashMap<>(); + private static final Logger LOG = LoggerFactory.getLogger(DetachHttpRequest.class); + @Override + protected Request initRequest(EndpointSelectable pipelineElement, String endpointUrl) { + LOG.info("Detaching element: " + endpointUrl); + return Request.Delete(endpointUrl); + } + + @Override + protected void logError(String endpointUrl, String pipelineElementName, String exceptionMessage) { + LOG.error("Could not stop pipeline element {} at {}: {}", endpointUrl, pipelineElementName, exceptionMessage); + } } diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/DetachPipelineElementSubmitter.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/DetachPipelineElementSubmitter.java new file mode 100644 index 000000000..47651cda3 --- /dev/null +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/DetachPipelineElementSubmitter.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.manager.execution.http; + +import org.apache.streampipes.model.api.EndpointSelectable; +import org.apache.streampipes.model.base.InvocableStreamPipesEntity; +import org.apache.streampipes.model.pipeline.Pipeline; +import org.apache.streampipes.model.pipeline.PipelineElementStatus; + +import java.util.List; + +public class DetachPipelineElementSubmitter extends PipelineElementSubmitter { + + public DetachPipelineElementSubmitter(Pipeline pipeline) { + super(pipeline); + } + + @Override + protected PipelineElementStatus submitElement(EndpointSelectable pipelineElement) { + return performDetach(pipelineElement); + } + + @Override + protected boolean shouldSubmitDataSets() { + return true; + } + + @Override + protected void onSuccess() { + status.setTitle("Pipeline " + pipelineName + " successfully stopped"); + } + + @Override + protected void onFailure(List<InvocableStreamPipesEntity> processorsAndSinks) { + status.setTitle("Could not stop all pipeline elements of pipeline " + pipelineName + "."); + } +} diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/GraphSubmitter.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/GraphSubmitter.java deleted file mode 100644 index 59598fa14..000000000 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/GraphSubmitter.java +++ /dev/null @@ -1,132 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.streampipes.manager.execution.http; - -import org.apache.streampipes.commons.constants.InstanceIdExtractor; -import org.apache.streampipes.model.SpDataSet; -import org.apache.streampipes.model.base.InvocableStreamPipesEntity; -import org.apache.streampipes.model.pipeline.PipelineElementStatus; -import org.apache.streampipes.model.pipeline.PipelineOperationStatus; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.ArrayList; -import java.util.List; -import java.util.Optional; - -public class GraphSubmitter { - - private List<InvocableStreamPipesEntity> graphs; - private List<SpDataSet> dataSets; - - private String pipelineId; - private String pipelineName; - - private static final Logger LOG = LoggerFactory.getLogger(GraphSubmitter.class); - - public GraphSubmitter(String pipelineId, - String pipelineName, - List<InvocableStreamPipesEntity> graphs, - List<SpDataSet> dataSets) { - this.graphs = graphs != null ? graphs : new ArrayList<>(); - this.pipelineId = pipelineId; - this.pipelineName = pipelineName; - this.dataSets = dataSets != null ? dataSets : new ArrayList<>(); - } - - public PipelineOperationStatus invokeGraphs() { - PipelineOperationStatus status = new PipelineOperationStatus(); - status.setPipelineId(pipelineId); - status.setPipelineName(pipelineName); - - - graphs.forEach(g -> status.addPipelineElementStatus(performInvocation(g))); - if (status.getElementStatus().stream().allMatch(PipelineElementStatus::isSuccess)) { - dataSets.forEach(dataSet -> - status.addPipelineElementStatus - (performInvocation(dataSet))); - } - status.setSuccess(status.getElementStatus().stream().allMatch(PipelineElementStatus::isSuccess)); - - if (status.isSuccess()) { - status.setTitle("Pipeline " + pipelineName + " successfully started"); - } else { - LOG.info("Could not start pipeline, initializing rollback..."); - rollbackInvokedPipelineElements(status); - status.setTitle("Could not start pipeline " + pipelineName + "."); - } - return status; - } - - private void rollbackInvokedPipelineElements(PipelineOperationStatus status) { - for (PipelineElementStatus s : status.getElementStatus()) { - if (s.isSuccess()) { - Optional<InvocableStreamPipesEntity> graph = findGraph(s.getElementId()); - graph.ifPresent(g -> { - LOG.info("Rolling back element " + g.getElementId()); - performDetach(g); - }); - } - } - } - - private Optional<InvocableStreamPipesEntity> findGraph(String elementId) { - return graphs.stream().filter(g -> g.getBelongsTo().equals(elementId)).findFirst(); - } - - public PipelineOperationStatus detachGraphs() { - PipelineOperationStatus status = new PipelineOperationStatus(); - status.setPipelineId(pipelineId); - status.setPipelineName(pipelineName); - - graphs.forEach(g -> status.addPipelineElementStatus(performDetach(g))); - dataSets.forEach(dataSet -> status.addPipelineElementStatus(performDetach(dataSet))); - status.setSuccess(status.getElementStatus().stream().allMatch(PipelineElementStatus::isSuccess)); - - if (status.isSuccess()) { - status.setTitle("Pipeline " + pipelineName + " successfully stopped"); - } else { - status.setTitle("Could not stop all pipeline elements of pipeline " + pipelineName + "."); - } - - return status; - } - - private PipelineElementStatus performInvocation(InvocableStreamPipesEntity entity) { - String endpointUrl = entity.getSelectedEndpointUrl(); - return new HttpRequestBuilder(entity, endpointUrl, this.pipelineId).invoke(); - } - - private PipelineElementStatus performInvocation(SpDataSet dataset) { - String endpointUrl = dataset.getSelectedEndpointUrl(); - return new HttpRequestBuilder(dataset, endpointUrl, this.pipelineId).invoke(); - } - - private PipelineElementStatus performDetach(InvocableStreamPipesEntity entity) { - String endpointUrl = entity.getSelectedEndpointUrl() + "/" + InstanceIdExtractor.extractId(entity.getElementId()); - return new HttpRequestBuilder(entity, endpointUrl, this.pipelineId).detach(); - } - - private PipelineElementStatus performDetach(SpDataSet dataset) { - String endpointUrl = dataset.getSelectedEndpointUrl() + "/" + dataset.getCorrespondingAdapterId() + "/" - + dataset.getDatasetInvocationId(); - return new HttpRequestBuilder(dataset, endpointUrl, this.pipelineId).detach(); - } -} diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/HttpRequestBuilder.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/HttpRequestBuilder.java deleted file mode 100644 index d9dd5a313..000000000 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/HttpRequestBuilder.java +++ /dev/null @@ -1,98 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.streampipes.manager.execution.http; - -import org.apache.streampipes.manager.util.AuthTokenUtils; -import org.apache.streampipes.model.base.NamedStreamPipesEntity; -import org.apache.streampipes.model.pipeline.PipelineElementStatus; -import org.apache.streampipes.serializers.json.JacksonSerializer; - -import com.google.gson.JsonSyntaxException; -import org.apache.http.client.fluent.Request; -import org.apache.http.client.fluent.Response; -import org.apache.http.entity.ContentType; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; - -public class HttpRequestBuilder { - - private final NamedStreamPipesEntity payload; - private final String endpointUrl; - private String pipelineId; - - private static final Logger LOG = LoggerFactory.getLogger(HttpRequestBuilder.class); - - public HttpRequestBuilder(NamedStreamPipesEntity payload, - String endpointUrl, - String pipelineId) { - this.payload = payload; - this.endpointUrl = endpointUrl; - this.pipelineId = pipelineId; - } - - public PipelineElementStatus invoke() { - LOG.info("Invoking element: " + endpointUrl); - try { - String jsonDocument = toJson(); - Response httpResp = - Request.Post(endpointUrl) - .addHeader("Authorization", AuthTokenUtils.getAuthToken(this.pipelineId)) - .bodyString(jsonDocument, ContentType.APPLICATION_JSON) - .connectTimeout(10000) - .execute(); - return handleResponse(httpResp); - } catch (Exception e) { - LOG.error("Could not perform invocation request", e); - return new PipelineElementStatus(endpointUrl, payload.getName(), false, e.getMessage()); - } - } - - public PipelineElementStatus detach() { - try { - Response httpResp = Request.Delete(endpointUrl) - .addHeader("Authorization", AuthTokenUtils.getAuthToken(this.pipelineId)) - .connectTimeout(10000).execute(); - return handleResponse(httpResp); - } catch (Exception e) { - LOG.error("Could not stop pipeline {}", endpointUrl, e); - return new PipelineElementStatus(endpointUrl, payload.getName(), false, e.getMessage()); - } - } - - private PipelineElementStatus handleResponse(Response httpResp) throws JsonSyntaxException, IOException { - String resp = httpResp.returnContent().asString(); - org.apache.streampipes.model.Response streamPipesResp = JacksonSerializer - .getObjectMapper() - .readValue(resp, org.apache.streampipes.model.Response.class); - return convert(streamPipesResp); - } - - private String toJson() throws Exception { - return JacksonSerializer.getObjectMapper().writeValueAsString(payload); - } - - private PipelineElementStatus convert(org.apache.streampipes.model.Response response) { - return new PipelineElementStatus(endpointUrl, payload.getName(), response.isSuccess(), - response.getOptionalMessage()); - } - - -} diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/InvokeHttpRequest.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/InvokeHttpRequest.java new file mode 100644 index 000000000..1656e677b --- /dev/null +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/InvokeHttpRequest.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.manager.execution.http; + +import org.apache.streampipes.model.api.EndpointSelectable; +import org.apache.streampipes.serializers.json.JacksonSerializer; + +import com.fasterxml.jackson.core.JsonProcessingException; +import org.apache.http.client.fluent.Request; +import org.apache.http.entity.ContentType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class InvokeHttpRequest extends PipelineElementHttpRequest { + + private static final Logger LOG = LoggerFactory.getLogger(InvokeHttpRequest.class); + + @Override + protected Request initRequest(EndpointSelectable pipelineElement, + String endpointUrl) throws JsonProcessingException { + LOG.info("Invoking element: " + endpointUrl); + return Request + .Post(endpointUrl) + .bodyString(toJson(pipelineElement), ContentType.APPLICATION_JSON); + } + + @Override + protected void logError(String endpointUrl, + String pipelineElementName, + String exceptionMessage) { + LOG.error("Could not perform invocation request at {} for pipeline element {}: {}", + endpointUrl, pipelineElementName, exceptionMessage); + } + + private String toJson(EndpointSelectable pipelineElement) throws JsonProcessingException { + return JacksonSerializer.getObjectMapper().writeValueAsString(pipelineElement); + } +} diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/InvokePipelineElementSubmitter.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/InvokePipelineElementSubmitter.java new file mode 100644 index 000000000..d746d2319 --- /dev/null +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/InvokePipelineElementSubmitter.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.manager.execution.http; + +import org.apache.streampipes.model.api.EndpointSelectable; +import org.apache.streampipes.model.base.InvocableStreamPipesEntity; +import org.apache.streampipes.model.pipeline.Pipeline; +import org.apache.streampipes.model.pipeline.PipelineElementStatus; +import org.apache.streampipes.model.pipeline.PipelineOperationStatus; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.Optional; + +public class InvokePipelineElementSubmitter extends PipelineElementSubmitter { + + private static final Logger LOG = LoggerFactory.getLogger(InvokePipelineElementSubmitter.class); + + public InvokePipelineElementSubmitter(Pipeline pipeline) { + super(pipeline); + } + + @Override + protected PipelineElementStatus submitElement(EndpointSelectable pipelineElement) { + String endpointUrl = pipelineElement.getSelectedEndpointUrl(); + return new InvokeHttpRequest().execute(pipelineElement, endpointUrl, this.pipelineId); + } + + @Override + protected boolean shouldSubmitDataSets() { + return isSuccess(); + } + + @Override + protected void onSuccess() { + status.setTitle("Pipeline " + pipelineName + " successfully started"); + } + + @Override + protected void onFailure(List<InvocableStreamPipesEntity> processorsAndSinks) { + LOG.info("Could not start pipeline, initializing rollback..."); + rollbackInvokedPipelineElements(status, processorsAndSinks); + status.setTitle("Could not start pipeline " + pipelineName + "."); + } + + private void rollbackInvokedPipelineElements(PipelineOperationStatus status, + List<InvocableStreamPipesEntity> pe) { + for (PipelineElementStatus s : status.getElementStatus()) { + if (s.isSuccess()) { + Optional<InvocableStreamPipesEntity> graph = findPipelineElements(s.getElementId(), pe); + graph.ifPresent(this::performDetach); + } + } + } + + private Optional<InvocableStreamPipesEntity> findPipelineElements(String elementId, + List<InvocableStreamPipesEntity> pe) { + return pe + .stream() + .filter(g -> g.getBelongsTo().equals(elementId)) + .findFirst(); + } +} diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineElementHttpRequest.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineElementHttpRequest.java new file mode 100644 index 000000000..400626e54 --- /dev/null +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineElementHttpRequest.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.manager.execution.http; + +import org.apache.streampipes.manager.util.AuthTokenUtils; +import org.apache.streampipes.model.api.EndpointSelectable; +import org.apache.streampipes.model.pipeline.PipelineElementStatus; +import org.apache.streampipes.serializers.json.JacksonSerializer; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.google.gson.JsonSyntaxException; +import org.apache.http.client.fluent.Request; +import org.apache.http.client.fluent.Response; + +import java.io.IOException; + +public abstract class PipelineElementHttpRequest { + + public PipelineElementStatus execute(EndpointSelectable pipelineElement, + String endpointUrl, + String pipelineId) { + try { + Response httpResp = initRequest(pipelineElement, endpointUrl) + .addHeader("Authorization", AuthTokenUtils.getAuthToken(pipelineId)) + .connectTimeout(10000) + .execute(); + return handleResponse(httpResp, pipelineElement, endpointUrl); + } catch (Exception e) { + logError(endpointUrl, pipelineElement.getName(), e.getMessage()); + return new PipelineElementStatus(endpointUrl, pipelineElement.getName(), false, e.getMessage()); + } + } + + protected abstract Request initRequest(EndpointSelectable pipelineElement, + String endpointUrl) throws JsonProcessingException; + + protected abstract void logError(String endpointUrl, + String pipelineElementName, + String exceptionMessage); + + protected PipelineElementStatus handleResponse(Response httpResp, + EndpointSelectable pipelineElement, + String endpointUrl) throws JsonSyntaxException, IOException { + String resp = httpResp.returnContent().asString(); + org.apache.streampipes.model.Response streamPipesResp = JacksonSerializer + .getObjectMapper() + .readValue(resp, org.apache.streampipes.model.Response.class); + return convert(streamPipesResp, endpointUrl, pipelineElement.getName()); + } + + private PipelineElementStatus convert(org.apache.streampipes.model.Response response, + String endpointUrl, + String pipelineElementName) { + return new PipelineElementStatus(endpointUrl, pipelineElementName, response.isSuccess(), + response.getOptionalMessage()); + } +} diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineElementSubmitter.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineElementSubmitter.java new file mode 100644 index 000000000..8be496b13 --- /dev/null +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineElementSubmitter.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.manager.execution.http; + +import org.apache.streampipes.model.SpDataSet; +import org.apache.streampipes.model.api.EndpointSelectable; +import org.apache.streampipes.model.base.InvocableStreamPipesEntity; +import org.apache.streampipes.model.pipeline.Pipeline; +import org.apache.streampipes.model.pipeline.PipelineElementStatus; +import org.apache.streampipes.model.pipeline.PipelineOperationStatus; + +import java.util.List; + +public abstract class PipelineElementSubmitter { + + protected final String pipelineId; + protected final String pipelineName; + + protected final PipelineOperationStatus status; + + public PipelineElementSubmitter(Pipeline pipeline) { + this.pipelineId = pipeline.getPipelineId(); + this.pipelineName = pipeline.getName(); + this.status = new PipelineOperationStatus(pipelineId, pipelineName); + } + + public PipelineOperationStatus submit(List<InvocableStreamPipesEntity> processorsAndSinks, + List<SpDataSet> dataSets) { + // First, try handling all data processors and sinks + processorsAndSinks.forEach(g -> status.addPipelineElementStatus(submitElement(g))); + + // Then,submit data sets always for detach operation and otherwise only in case of success + if (shouldSubmitDataSets()) { + dataSets.forEach(dataSet -> status.addPipelineElementStatus(submitElement(dataSet))); + } + + applySuccess(processorsAndSinks); + return status; + } + + protected boolean isSuccess() { + return status.getElementStatus().stream().allMatch(PipelineElementStatus::isSuccess); + } + + protected void applySuccess(List<InvocableStreamPipesEntity> processorsAndSinks) { + status.setSuccess(isSuccess()); + if (status.isSuccess()) { + this.onSuccess(); + } else { + this.onFailure(processorsAndSinks); + } + } + + protected PipelineElementStatus performDetach(EndpointSelectable pipelineElement) { + String endpointUrl = pipelineElement.getSelectedEndpointUrl() + pipelineElement.getDetachPath(); + return new DetachHttpRequest().execute(pipelineElement, endpointUrl, this.pipelineId); + } + + protected abstract PipelineElementStatus submitElement(EndpointSelectable pipelineElement); + + protected abstract boolean shouldSubmitDataSets(); + + protected abstract void onSuccess(); + + protected abstract void onFailure(List<InvocableStreamPipesEntity> processorsAndSinks); +} diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineExecutor.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineExecutor.java deleted file mode 100644 index 9ee3b8467..000000000 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineExecutor.java +++ /dev/null @@ -1,243 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.streampipes.manager.execution.http; - -import org.apache.streampipes.commons.MD5; -import org.apache.streampipes.commons.Utils; -import org.apache.streampipes.commons.constants.GlobalStreamPipesConstants; -import org.apache.streampipes.commons.exceptions.NoServiceEndpointsAvailableException; -import org.apache.streampipes.manager.execution.endpoint.ExtensionsServiceEndpointGenerator; -import org.apache.streampipes.manager.execution.endpoint.ExtensionsServiceEndpointUtils; -import org.apache.streampipes.manager.execution.status.PipelineStatusManager; -import org.apache.streampipes.manager.util.TemporaryGraphStorage; -import org.apache.streampipes.model.SpDataSet; -import org.apache.streampipes.model.base.InvocableStreamPipesEntity; -import org.apache.streampipes.model.base.NamedStreamPipesEntity; -import org.apache.streampipes.model.graph.DataProcessorInvocation; -import org.apache.streampipes.model.graph.DataSinkInvocation; -import org.apache.streampipes.model.grounding.KafkaTransportProtocol; -import org.apache.streampipes.model.message.PipelineStatusMessage; -import org.apache.streampipes.model.message.PipelineStatusMessageType; -import org.apache.streampipes.model.pipeline.Pipeline; -import org.apache.streampipes.model.pipeline.PipelineElementStatus; -import org.apache.streampipes.model.pipeline.PipelineHealthStatus; -import org.apache.streampipes.model.pipeline.PipelineOperationStatus; -import org.apache.streampipes.resource.management.secret.SecretProvider; -import org.apache.streampipes.storage.api.IPipelineStorage; -import org.apache.streampipes.storage.management.StorageDispatcher; -import org.apache.streampipes.svcdiscovery.SpServiceDiscovery; -import org.apache.streampipes.svcdiscovery.api.model.DefaultSpServiceGroups; -import org.apache.streampipes.svcdiscovery.api.model.DefaultSpServiceTags; -import org.apache.streampipes.svcdiscovery.api.model.SpServiceUrlProvider; - -import org.lightcouch.DocumentConflictException; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.Date; -import java.util.List; -import java.util.stream.Collectors; - -public class PipelineExecutor { - - private final Pipeline pipeline; - private final boolean storeStatus; - private final boolean forceStop; - - public PipelineExecutor(Pipeline pipeline, - boolean storeStatus, - boolean forceStop) { - this.pipeline = pipeline; - this.storeStatus = storeStatus; - this.forceStop = forceStop; - } - - public PipelineOperationStatus startPipeline() { - - pipeline.getSepas().forEach(this::updateGroupIds); - pipeline.getActions().forEach(this::updateGroupIds); - - List<DataProcessorInvocation> sepas = pipeline.getSepas(); - List<DataSinkInvocation> secs = pipeline.getActions(); - - List<SpDataSet> dataSets = pipeline - .getStreams() - .stream() - .filter(s -> s instanceof SpDataSet) - .map(s -> new SpDataSet((SpDataSet) s)) - .collect(Collectors.toList()); - - List<NamedStreamPipesEntity> failedServices = new ArrayList<>(); - - dataSets.forEach(ds -> { - ds.setCorrespondingPipeline(pipeline.getPipelineId()); - try { - ds.setSelectedEndpointUrl(findSelectedEndpoint(ds)); - } catch (NoServiceEndpointsAvailableException e) { - failedServices.add(ds); - } - }); - - List<InvocableStreamPipesEntity> graphs = new ArrayList<>(); - graphs.addAll(sepas); - graphs.addAll(secs); - - decryptSecrets(graphs); - - graphs.forEach(g -> { - try { - g.setSelectedEndpointUrl(findSelectedEndpoint(g)); - g.setCorrespondingPipeline(pipeline.getPipelineId()); - } catch (NoServiceEndpointsAvailableException e) { - failedServices.add(g); - } - }); - - PipelineOperationStatus status; - if (failedServices.size() == 0) { - - status = new GraphSubmitter(pipeline.getPipelineId(), - pipeline.getName(), graphs, dataSets) - .invokeGraphs(); - - encryptSecrets(graphs); - - if (status.isSuccess()) { - storeInvocationGraphs(pipeline.getPipelineId(), graphs, dataSets); - - PipelineStatusManager.addPipelineStatus(pipeline.getPipelineId(), - new PipelineStatusMessage(pipeline.getPipelineId(), System.currentTimeMillis(), - PipelineStatusMessageType.PIPELINE_STARTED.title(), - PipelineStatusMessageType.PIPELINE_STARTED.description())); - - if (storeStatus) { - pipeline.setHealthStatus(PipelineHealthStatus.OK); - setPipelineStarted(pipeline); - } - } - } else { - List<PipelineElementStatus> pe = failedServices.stream().map(fs -> - new PipelineElementStatus(fs.getElementId(), - fs.getName(), - false, - "No active supporting service found")).collect(Collectors.toList()); - status = new PipelineOperationStatus(pipeline.getPipelineId(), - pipeline.getName(), - "Could not start pipeline " + pipeline.getName() + ".", - pe); - } - return status; - } - - private String findSelectedEndpoint(InvocableStreamPipesEntity g) throws NoServiceEndpointsAvailableException { - return new ExtensionsServiceEndpointGenerator( - g.getAppId(), - ExtensionsServiceEndpointUtils.getPipelineElementType(g)) - .getEndpointResourceUrl(); - } - - private String findSelectedEndpoint(SpDataSet ds) throws NoServiceEndpointsAvailableException { - String appId = ds.getAppId() != null ? ds.getAppId() : ds.getCorrespondingAdapterId(); - if (ds.isInternallyManaged()) { - return getConnectMasterSourcesUrl(); - } else { - return new ExtensionsServiceEndpointGenerator(appId, SpServiceUrlProvider.DATA_SET) - .getEndpointResourceUrl(); - } - } - - private String getConnectMasterSourcesUrl() throws NoServiceEndpointsAvailableException { - List<String> connectMasterEndpoints = SpServiceDiscovery.getServiceDiscovery() - .getServiceEndpoints(DefaultSpServiceGroups.CORE, true, - Collections.singletonList(DefaultSpServiceTags.CONNECT_MASTER.asString())); - if (connectMasterEndpoints.size() > 0) { - return connectMasterEndpoints.get(0) + GlobalStreamPipesConstants.CONNECT_MASTER_SOURCES_ENDPOINT; - } else { - throw new NoServiceEndpointsAvailableException("Could not find any available connect master service endpoint"); - } - } - - private void updateGroupIds(InvocableStreamPipesEntity entity) { - entity.getInputStreams() - .stream() - .filter(is -> is.getEventGrounding().getTransportProtocol() instanceof KafkaTransportProtocol) - .map(is -> is.getEventGrounding().getTransportProtocol()) - .map(KafkaTransportProtocol.class::cast) - .forEach(tp -> tp.setGroupId(Utils.filterSpecialChar(pipeline.getName()) + MD5.crypt(tp.getElementId()))); - } - - private void decryptSecrets(List<InvocableStreamPipesEntity> graphs) { - SecretProvider.getDecryptionService().apply(graphs); - } - - private void encryptSecrets(List<InvocableStreamPipesEntity> graphs) { - SecretProvider.getEncryptionService().apply(graphs); - } - - public PipelineOperationStatus stopPipeline() { - List<InvocableStreamPipesEntity> graphs = TemporaryGraphStorage.graphStorage.get(pipeline.getPipelineId()); - List<SpDataSet> dataSets = TemporaryGraphStorage.datasetStorage.get(pipeline.getPipelineId()); - - PipelineOperationStatus status = new GraphSubmitter(pipeline.getPipelineId(), - pipeline.getName(), graphs, dataSets) - .detachGraphs(); - - if (status.isSuccess()) { - PipelineStatusManager.addPipelineStatus(pipeline.getPipelineId(), - new PipelineStatusMessage(pipeline.getPipelineId(), - System.currentTimeMillis(), - PipelineStatusMessageType.PIPELINE_STOPPED.title(), - PipelineStatusMessageType.PIPELINE_STOPPED.description())); - - } - - if (status.isSuccess() || forceStop) { - if (storeStatus) { - setPipelineStopped(pipeline); - } - } - return status; - } - - private void setPipelineStarted(Pipeline pipeline) { - pipeline.setRunning(true); - pipeline.setStartedAt(new Date().getTime()); - try { - getPipelineStorageApi().updatePipeline(pipeline); - } catch (DocumentConflictException dce) { - //dce.printStackTrace(); - } - } - - private void setPipelineStopped(Pipeline pipeline) { - pipeline.setRunning(false); - getPipelineStorageApi().updatePipeline(pipeline); - } - - private void storeInvocationGraphs(String pipelineId, List<InvocableStreamPipesEntity> graphs, - List<SpDataSet> dataSets) { - TemporaryGraphStorage.graphStorage.put(pipelineId, graphs); - TemporaryGraphStorage.datasetStorage.put(pipelineId, dataSets); - } - - private IPipelineStorage getPipelineStorageApi() { - return StorageDispatcher.INSTANCE.getNoSqlStore().getPipelineStorageAPI(); - } - -} diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/util/TemporaryGraphStorage.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/provider/CurrentPipelineElementProvider.java similarity index 64% copy from streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/util/TemporaryGraphStorage.java copy to streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/provider/CurrentPipelineElementProvider.java index 085d17e44..282f08537 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/util/TemporaryGraphStorage.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/provider/CurrentPipelineElementProvider.java @@ -16,19 +16,22 @@ * */ -package org.apache.streampipes.manager.util; +package org.apache.streampipes.manager.execution.provider; +import org.apache.streampipes.manager.execution.PipelineExecutionInfo; import org.apache.streampipes.model.SpDataSet; import org.apache.streampipes.model.base.InvocableStreamPipesEntity; -import java.util.HashMap; import java.util.List; -import java.util.Map; -public class TemporaryGraphStorage { - - public static Map<String, List<InvocableStreamPipesEntity>> graphStorage = new HashMap<>(); - - public static Map<String, List<SpDataSet>> datasetStorage = new HashMap<>(); +public class CurrentPipelineElementProvider implements PipelineElementProvider { + @Override + public List<InvocableStreamPipesEntity> getProcessorsAndSinks(PipelineExecutionInfo executionInfo) { + return executionInfo.getProcessorsAndSinks(); + } + @Override + public List<SpDataSet> getDataSets(PipelineExecutionInfo executionInfo) { + return executionInfo.getDataSets(); + } } diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/util/TemporaryGraphStorage.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/provider/PipelineElementProvider.java similarity index 73% copy from streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/util/TemporaryGraphStorage.java copy to streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/provider/PipelineElementProvider.java index 085d17e44..d8f724801 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/util/TemporaryGraphStorage.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/provider/PipelineElementProvider.java @@ -16,19 +16,17 @@ * */ -package org.apache.streampipes.manager.util; +package org.apache.streampipes.manager.execution.provider; +import org.apache.streampipes.manager.execution.PipelineExecutionInfo; import org.apache.streampipes.model.SpDataSet; import org.apache.streampipes.model.base.InvocableStreamPipesEntity; -import java.util.HashMap; import java.util.List; -import java.util.Map; -public class TemporaryGraphStorage { +public interface PipelineElementProvider { - public static Map<String, List<InvocableStreamPipesEntity>> graphStorage = new HashMap<>(); - - public static Map<String, List<SpDataSet>> datasetStorage = new HashMap<>(); + List<InvocableStreamPipesEntity> getProcessorsAndSinks(PipelineExecutionInfo executionInfo); + List<SpDataSet> getDataSets(PipelineExecutionInfo executionInfo); } diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/util/TemporaryGraphStorage.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/provider/StoredPipelineElementProvider.java similarity index 57% copy from streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/util/TemporaryGraphStorage.java copy to streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/provider/StoredPipelineElementProvider.java index 085d17e44..a41adbde7 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/util/TemporaryGraphStorage.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/provider/StoredPipelineElementProvider.java @@ -16,19 +16,23 @@ * */ -package org.apache.streampipes.manager.util; +package org.apache.streampipes.manager.execution.provider; +import org.apache.streampipes.manager.execution.PipelineExecutionInfo; +import org.apache.streampipes.manager.storage.RunningPipelineElementStorage; import org.apache.streampipes.model.SpDataSet; import org.apache.streampipes.model.base.InvocableStreamPipesEntity; -import java.util.HashMap; import java.util.List; -import java.util.Map; -public class TemporaryGraphStorage { - - public static Map<String, List<InvocableStreamPipesEntity>> graphStorage = new HashMap<>(); - - public static Map<String, List<SpDataSet>> datasetStorage = new HashMap<>(); +public class StoredPipelineElementProvider implements PipelineElementProvider { + @Override + public List<InvocableStreamPipesEntity> getProcessorsAndSinks(PipelineExecutionInfo executionInfo) { + return RunningPipelineElementStorage.runningProcessorsAndSinks.get(executionInfo.getPipelineId()); + } + @Override + public List<SpDataSet> getDataSets(PipelineExecutionInfo executionInfo) { + return RunningPipelineElementStorage.runningDataSets.get(executionInfo.getPipelineId()); + } } diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/task/AfterInvocationTask.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/task/AfterInvocationTask.java new file mode 100644 index 000000000..a7d967d2a --- /dev/null +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/task/AfterInvocationTask.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.manager.execution.task; + +import org.apache.streampipes.manager.execution.PipelineExecutionInfo; +import org.apache.streampipes.manager.execution.status.PipelineStatusManager; +import org.apache.streampipes.manager.storage.RunningPipelineElementStorage; +import org.apache.streampipes.model.SpDataSet; +import org.apache.streampipes.model.base.InvocableStreamPipesEntity; +import org.apache.streampipes.model.message.PipelineStatusMessage; +import org.apache.streampipes.model.message.PipelineStatusMessageType; +import org.apache.streampipes.model.pipeline.Pipeline; + +import java.util.List; + +public class AfterInvocationTask implements PipelineExecutionTask { + + private final PipelineStatusMessageType statusMessageType; + + public AfterInvocationTask(PipelineStatusMessageType statusMessageType) { + this.statusMessageType = statusMessageType; + } + + @Override + public boolean shouldExecute(PipelineExecutionInfo executionInfo) { + return executionInfo.isOperationSuccessful(); + } + + @Override + public void executeTask(Pipeline pipeline, + PipelineExecutionInfo executionInfo) { + var graphs = executionInfo.getProcessorsAndSinks(); + var dataSets = executionInfo.getDataSets(); + storeInvocationGraphs(pipeline.getPipelineId(), graphs, dataSets); + addPipelineStatus(pipeline); + } + + private void storeInvocationGraphs(String pipelineId, + List<InvocableStreamPipesEntity> graphs, + List<SpDataSet> dataSets) { + RunningPipelineElementStorage.runningProcessorsAndSinks.put(pipelineId, graphs); + RunningPipelineElementStorage.runningDataSets.put(pipelineId, dataSets); + } + + private void addPipelineStatus(Pipeline pipeline) { + PipelineStatusManager.addPipelineStatus(pipeline.getPipelineId(), + new PipelineStatusMessage(pipeline.getPipelineId(), System.currentTimeMillis(), statusMessageType)); + } +} diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/task/DiscoverEndpointsTask.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/task/DiscoverEndpointsTask.java new file mode 100644 index 000000000..713644650 --- /dev/null +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/task/DiscoverEndpointsTask.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.manager.execution.task; + +import org.apache.streampipes.commons.exceptions.NoServiceEndpointsAvailableException; +import org.apache.streampipes.manager.execution.PipelineExecutionInfo; +import org.apache.streampipes.manager.execution.endpoint.ExtensionsServiceEndpointProvider; +import org.apache.streampipes.model.SpDataSet; +import org.apache.streampipes.model.api.EndpointSelectable; +import org.apache.streampipes.model.base.InvocableStreamPipesEntity; +import org.apache.streampipes.model.pipeline.Pipeline; +import org.apache.streampipes.model.pipeline.PipelineElementStatus; +import org.apache.streampipes.model.pipeline.PipelineOperationStatus; + +import java.util.List; +import java.util.stream.Collectors; + +public class DiscoverEndpointsTask implements PipelineExecutionTask { + @Override + public void executeTask(Pipeline pipeline, + PipelineExecutionInfo executionInfo) { + var processorsAndSinks = executionInfo.getProcessorsAndSinks(); + var dataSets = executionInfo.getDataSets(); + + processorsAndSinks.forEach(el -> { + try { + var endpointUrl = findSelectedEndpoint(el); + applyEndpointAndPipeline(pipeline.getPipelineId(), el, endpointUrl); + } catch (NoServiceEndpointsAvailableException e) { + executionInfo.addFailedPipelineElement(el); + } + }); + dataSets.forEach(ds -> { + try { + var endpointUrl = findSelectedDsEndpoint(ds); + applyEndpointAndPipeline(pipeline.getPipelineId(), ds, endpointUrl); + } catch (NoServiceEndpointsAvailableException e) { + executionInfo.addFailedPipelineElement(ds); + } + }); + + var failedServices = executionInfo.getFailedServices(); + if (executionInfo.getFailedServices().size() > 0) { + List<PipelineElementStatus> pe = failedServices + .stream() + .map(fs -> new PipelineElementStatus(fs.getElementId(), fs.getName(), false, + "No active extensions service found which provides this pipeline element")) + .collect(Collectors.toList()); + var status = new PipelineOperationStatus(pipeline.getPipelineId(), + pipeline.getName(), + "Could not start pipeline " + pipeline.getName() + ".", + pe); + executionInfo.applyPipelineOperationStatus(status); + } + } + + private void applyEndpointAndPipeline(String pipelineId, + EndpointSelectable pipelineElement, + String endpointUrl) { + pipelineElement.setSelectedEndpointUrl(endpointUrl); + pipelineElement.setCorrespondingPipeline(pipelineId); + } + + private String findSelectedEndpoint(InvocableStreamPipesEntity pipelineElement) + throws NoServiceEndpointsAvailableException { + return new ExtensionsServiceEndpointProvider().findSelectedEndpoint(pipelineElement); + } + + private String findSelectedDsEndpoint(SpDataSet dataSet) throws NoServiceEndpointsAvailableException { + return new ExtensionsServiceEndpointProvider().findSelectedEndpoint(dataSet); + } +} diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/util/TemporaryGraphStorage.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/task/PipelineExecutionTask.java similarity index 64% copy from streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/util/TemporaryGraphStorage.java copy to streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/task/PipelineExecutionTask.java index 085d17e44..096456061 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/util/TemporaryGraphStorage.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/task/PipelineExecutionTask.java @@ -16,19 +16,17 @@ * */ -package org.apache.streampipes.manager.util; +package org.apache.streampipes.manager.execution.task; -import org.apache.streampipes.model.SpDataSet; -import org.apache.streampipes.model.base.InvocableStreamPipesEntity; +import org.apache.streampipes.manager.execution.PipelineExecutionInfo; +import org.apache.streampipes.model.pipeline.Pipeline; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +public interface PipelineExecutionTask { -public class TemporaryGraphStorage { - - public static Map<String, List<InvocableStreamPipesEntity>> graphStorage = new HashMap<>(); - - public static Map<String, List<SpDataSet>> datasetStorage = new HashMap<>(); + default boolean shouldExecute(PipelineExecutionInfo executionInfo) { + return true; + } + void executeTask(Pipeline pipeline, + PipelineExecutionInfo executionInfo); } diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/util/TemporaryGraphStorage.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/task/SecretEncryptionTask.java similarity index 54% copy from streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/util/TemporaryGraphStorage.java copy to streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/task/SecretEncryptionTask.java index 085d17e44..b530f93c8 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/util/TemporaryGraphStorage.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/task/SecretEncryptionTask.java @@ -16,19 +16,23 @@ * */ -package org.apache.streampipes.manager.util; +package org.apache.streampipes.manager.execution.task; -import org.apache.streampipes.model.SpDataSet; -import org.apache.streampipes.model.base.InvocableStreamPipesEntity; +import org.apache.streampipes.manager.execution.PipelineExecutionInfo; +import org.apache.streampipes.model.pipeline.Pipeline; +import org.apache.streampipes.resource.management.secret.SecretService; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +public class SecretEncryptionTask implements PipelineExecutionTask { -public class TemporaryGraphStorage { + private final SecretService secretService; - public static Map<String, List<InvocableStreamPipesEntity>> graphStorage = new HashMap<>(); - - public static Map<String, List<SpDataSet>> datasetStorage = new HashMap<>(); + public SecretEncryptionTask(SecretService secretService) { + this.secretService = secretService; + } + @Override + public void executeTask(Pipeline pipeline, + PipelineExecutionInfo executionInfo) { + this.secretService.apply(executionInfo.getProcessorsAndSinks()); + } } diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/task/StorePipelineStatusTask.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/task/StorePipelineStatusTask.java new file mode 100644 index 000000000..f01f99005 --- /dev/null +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/task/StorePipelineStatusTask.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.manager.execution.task; + +import org.apache.streampipes.manager.execution.PipelineExecutionInfo; +import org.apache.streampipes.model.pipeline.Pipeline; +import org.apache.streampipes.model.pipeline.PipelineHealthStatus; +import org.apache.streampipes.storage.api.IPipelineStorage; +import org.apache.streampipes.storage.management.StorageDispatcher; + +import org.lightcouch.DocumentConflictException; + +import java.util.Date; + +public class StorePipelineStatusTask implements PipelineExecutionTask { + + private final boolean start; + private final boolean forceStop; + + public StorePipelineStatusTask(boolean start, + boolean forceStop) { + this.start = start; + this.forceStop = forceStop; + } + + @Override + public boolean shouldExecute(PipelineExecutionInfo executionInfo) { + return executionInfo.isOperationSuccessful() || forceStop; + } + + @Override + public void executeTask(Pipeline pipeline, + PipelineExecutionInfo executionInfo) { + if (this.start) { + pipeline.setHealthStatus(PipelineHealthStatus.OK); + setPipelineStarted(pipeline); + } else { + setPipelineStopped(pipeline); + } + } + + private void setPipelineStarted(Pipeline pipeline) { + pipeline.setRunning(true); + pipeline.setStartedAt(new Date().getTime()); + try { + getPipelineStorageApi().updatePipeline(pipeline); + } catch (DocumentConflictException dce) { + //dce.printStackTrace(); + } + } + + private void setPipelineStopped(Pipeline pipeline) { + pipeline.setRunning(false); + getPipelineStorageApi().updatePipeline(pipeline); + } + + private IPipelineStorage getPipelineStorageApi() { + return StorageDispatcher.INSTANCE.getNoSqlStore().getPipelineStorageAPI(); + } +} diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/task/SubmitRequestTask.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/task/SubmitRequestTask.java new file mode 100644 index 000000000..c29d74909 --- /dev/null +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/task/SubmitRequestTask.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.manager.execution.task; + +import org.apache.streampipes.manager.execution.PipelineExecutionInfo; +import org.apache.streampipes.manager.execution.http.PipelineElementSubmitter; +import org.apache.streampipes.manager.execution.provider.PipelineElementProvider; +import org.apache.streampipes.model.pipeline.Pipeline; + +public class SubmitRequestTask implements PipelineExecutionTask { + + private final PipelineElementProvider elementProvider; + private final PipelineElementSubmitter submitter; + + public SubmitRequestTask(PipelineElementSubmitter submitter, + PipelineElementProvider elementProvider) { + this.elementProvider = elementProvider; + this.submitter = submitter; + } + + @Override + public boolean shouldExecute(PipelineExecutionInfo executionInfo) { + return executionInfo.getFailedServices().size() == 0; + } + + @Override + public void executeTask(Pipeline pipeline, PipelineExecutionInfo executionInfo) { + var processorsAndSinks = elementProvider.getProcessorsAndSinks(executionInfo); + var dataSets = elementProvider.getDataSets(executionInfo); + + var status = submitter.submit(processorsAndSinks, dataSets); + + executionInfo.applyPipelineOperationStatus(status); + } +} diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/task/UpdateGroupIdTask.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/task/UpdateGroupIdTask.java new file mode 100644 index 000000000..19fd87910 --- /dev/null +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/task/UpdateGroupIdTask.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.manager.execution.task; + +import org.apache.streampipes.commons.MD5; +import org.apache.streampipes.commons.Utils; +import org.apache.streampipes.manager.execution.PipelineExecutionInfo; +import org.apache.streampipes.model.base.InvocableStreamPipesEntity; +import org.apache.streampipes.model.grounding.KafkaTransportProtocol; +import org.apache.streampipes.model.pipeline.Pipeline; + +public class UpdateGroupIdTask implements PipelineExecutionTask { + @Override + public void executeTask(Pipeline pipeline, + PipelineExecutionInfo executionInfo) { + var sanitizedPipelineName = Utils.filterSpecialChar(pipeline.getName()); + pipeline.getSepas().forEach(processor -> updateGroupIds(processor, sanitizedPipelineName)); + pipeline.getActions().forEach(sink -> updateGroupIds(sink, sanitizedPipelineName)); + } + + private void updateGroupIds(InvocableStreamPipesEntity entity, + String sanitizedPipelineName) { + entity.getInputStreams() + .stream() + .filter(is -> is.getEventGrounding().getTransportProtocol() instanceof KafkaTransportProtocol) + .map(is -> is.getEventGrounding().getTransportProtocol()) + .map(KafkaTransportProtocol.class::cast) + .forEach(tp -> tp.setGroupId(sanitizedPipelineName + MD5.crypt(tp.getElementId()))); + } +} diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/health/PipelineHealthCheck.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/health/PipelineHealthCheck.java index 335d76c46..c0e159df0 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/health/PipelineHealthCheck.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/health/PipelineHealthCheck.java @@ -22,8 +22,8 @@ import org.apache.streampipes.commons.constants.InstanceIdExtractor; import org.apache.streampipes.commons.exceptions.NoServiceEndpointsAvailableException; import org.apache.streampipes.manager.execution.endpoint.ExtensionsServiceEndpointGenerator; import org.apache.streampipes.manager.execution.endpoint.ExtensionsServiceEndpointUtils; -import org.apache.streampipes.manager.execution.http.HttpRequestBuilder; -import org.apache.streampipes.manager.util.TemporaryGraphStorage; +import org.apache.streampipes.manager.execution.http.InvokeHttpRequest; +import org.apache.streampipes.manager.storage.RunningPipelineElementStorage; import org.apache.streampipes.model.base.InvocableStreamPipesEntity; import org.apache.streampipes.model.pipeline.Pipeline; import org.apache.streampipes.model.pipeline.PipelineHealthStatus; @@ -67,7 +67,7 @@ public class PipelineHealthCheck implements Runnable { List<String> failedInstances = new ArrayList<>(); List<String> recoveredInstances = new ArrayList<>(); List<String> pipelineNotifications = new ArrayList<>(); - List<InvocableStreamPipesEntity> graphs = TemporaryGraphStorage.graphStorage.get(pipeline.getPipelineId()); + List<InvocableStreamPipesEntity> graphs = RunningPipelineElementStorage.runningProcessorsAndSinks.get(pipeline.getPipelineId()); graphs.forEach(graph -> { String instanceId = extractInstanceId(graph); if (allRunningInstances.stream().noneMatch(runningInstanceId -> runningInstanceId.equals(instanceId))) { @@ -77,7 +77,7 @@ public class PipelineHealthCheck implements Runnable { boolean success; try { endpointUrl = findEndpointUrl(graph); - success = new HttpRequestBuilder(graph, endpointUrl, pipeline.getPipelineId()).invoke().isSuccess(); + success = new InvokeHttpRequest().execute(graph, endpointUrl, pipeline.getPipelineId()).isSuccess(); } catch (NoServiceEndpointsAvailableException e) { success = false; } @@ -182,7 +182,7 @@ public class PipelineHealthCheck implements Runnable { private Map<String, List<InvocableStreamPipesEntity>> generateEndpointMap() { Map<String, List<InvocableStreamPipesEntity>> endpointMap = new HashMap<>(); - TemporaryGraphStorage.graphStorage.forEach((pipelineId, graphs) -> + RunningPipelineElementStorage.runningProcessorsAndSinks.forEach((pipelineId, graphs) -> graphs.forEach(graph -> addEndpoint(endpointMap, graph))); return endpointMap; diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/operations/Operations.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/operations/Operations.java index 6e45801ca..3457d63ea 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/operations/Operations.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/operations/Operations.java @@ -22,8 +22,8 @@ import org.apache.streampipes.commons.exceptions.NoSuitableSepasAvailableExcepti import org.apache.streampipes.commons.exceptions.SepaParseException; import org.apache.streampipes.commons.exceptions.SpRuntimeException; import org.apache.streampipes.manager.endpoint.EndpointItemFetcher; -import org.apache.streampipes.manager.execution.http.PipelineExecutor; -import org.apache.streampipes.manager.execution.http.PipelineStorageService; +import org.apache.streampipes.manager.execution.PipelineExecutor; +import org.apache.streampipes.manager.storage.PipelineStorageService; import org.apache.streampipes.manager.matching.DataSetGroundingSelector; import org.apache.streampipes.manager.matching.PipelineVerificationHandlerV2; import org.apache.streampipes.manager.recommender.ElementRecommender; @@ -96,19 +96,8 @@ public class Operations { new PipelineStorageService(pipeline).updatePipeline(); } - public static PipelineOperationStatus startPipeline( - Pipeline pipeline) { - return startPipeline(pipeline, true); - } - - public static PipelineOperationStatus startPipeline( - Pipeline pipeline, boolean storeStatus) { - return new PipelineExecutor(pipeline, storeStatus, false).startPipeline(); - } - - public static PipelineOperationStatus stopPipeline( - Pipeline pipeline, boolean forceStop) { - return stopPipeline(pipeline, true, forceStop); + public static PipelineOperationStatus startPipeline(Pipeline pipeline) { + return new PipelineExecutor(pipeline, false).startPipeline(); } public static List<PipelineOperationStatus> stopAllPipelines(boolean forceStop) { @@ -125,9 +114,8 @@ public class Operations { } public static PipelineOperationStatus stopPipeline(Pipeline pipeline, - boolean storeStatus, boolean forceStop) { - return new PipelineExecutor(pipeline, storeStatus, forceStop).stopPipeline(); + return new PipelineExecutor(pipeline, forceStop).stopPipeline(); } public static List<ExtensionsServiceEndpointItem> getEndpointUriContents(List<ExtensionsServiceEndpoint> endpoints) { diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/preview/PipelinePreview.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/preview/PipelinePreview.java index bb37f1e73..7b5e8e67d 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/preview/PipelinePreview.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/preview/PipelinePreview.java @@ -17,11 +17,11 @@ */ package org.apache.streampipes.manager.preview; -import org.apache.streampipes.commons.constants.InstanceIdExtractor; import org.apache.streampipes.commons.exceptions.NoServiceEndpointsAvailableException; import org.apache.streampipes.manager.execution.endpoint.ExtensionsServiceEndpointGenerator; import org.apache.streampipes.manager.execution.endpoint.ExtensionsServiceEndpointUtils; -import org.apache.streampipes.manager.execution.http.HttpRequestBuilder; +import org.apache.streampipes.manager.execution.http.DetachHttpRequest; +import org.apache.streampipes.manager.execution.http.InvokeHttpRequest; import org.apache.streampipes.manager.matching.PipelineVerificationHandlerV2; import org.apache.streampipes.manager.operations.Operations; import org.apache.streampipes.model.SpDataSet; @@ -92,7 +92,7 @@ public class PipelinePreview { graphs.forEach(g -> { try { g.setSelectedEndpointUrl(findSelectedEndpoint(g)); - new HttpRequestBuilder(g, g.getSelectedEndpointUrl(), null).invoke(); + new InvokeHttpRequest().execute(g, g.getSelectedEndpointUrl(), null); } catch (NoServiceEndpointsAvailableException e) { e.printStackTrace(); } @@ -101,8 +101,8 @@ public class PipelinePreview { private void detachGraphs(List<InvocableStreamPipesEntity> graphs) { graphs.forEach(g -> { - String endpointUrl = g.getSelectedEndpointUrl() + "/" + InstanceIdExtractor.extractId(g.getElementId()); - new HttpRequestBuilder(g, endpointUrl, null).detach(); + String endpointUrl = g.getSelectedEndpointUrl() + g.getDetachPath(); + new DetachHttpRequest().execute(g, endpointUrl, null); }); } diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineStorageService.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/storage/PipelineStorageService.java similarity index 98% rename from streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineStorageService.java rename to streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/storage/PipelineStorageService.java index 5c42b5eb0..9d0f2b582 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineStorageService.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/storage/PipelineStorageService.java @@ -16,7 +16,7 @@ * */ -package org.apache.streampipes.manager.execution.http; +package org.apache.streampipes.manager.storage; import org.apache.streampipes.manager.data.PipelineGraph; import org.apache.streampipes.manager.data.PipelineGraphBuilder; diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/util/TemporaryGraphStorage.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/storage/RunningPipelineElementStorage.java similarity index 78% rename from streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/util/TemporaryGraphStorage.java rename to streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/storage/RunningPipelineElementStorage.java index 085d17e44..360e6db90 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/util/TemporaryGraphStorage.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/storage/RunningPipelineElementStorage.java @@ -16,7 +16,7 @@ * */ -package org.apache.streampipes.manager.util; +package org.apache.streampipes.manager.storage; import org.apache.streampipes.model.SpDataSet; import org.apache.streampipes.model.base.InvocableStreamPipesEntity; @@ -25,10 +25,10 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -public class TemporaryGraphStorage { +public class RunningPipelineElementStorage { - public static Map<String, List<InvocableStreamPipesEntity>> graphStorage = new HashMap<>(); + public static Map<String, List<InvocableStreamPipesEntity>> runningProcessorsAndSinks = new HashMap<>(); - public static Map<String, List<SpDataSet>> datasetStorage = new HashMap<>(); + public static Map<String, List<SpDataSet>> runningDataSets = new HashMap<>(); } diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/storage/UserService.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/storage/UserService.java deleted file mode 100644 index 0523158ff..000000000 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/storage/UserService.java +++ /dev/null @@ -1,101 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.streampipes.manager.storage; - -import org.apache.streampipes.model.client.user.Principal; -import org.apache.streampipes.model.pipeline.Pipeline; -import org.apache.streampipes.storage.api.INoSqlStorage; -import org.apache.streampipes.storage.api.IUserStorage; -import org.apache.streampipes.storage.management.StorageDispatcher; - -import java.util.ArrayList; -import java.util.List; -import java.util.stream.Collectors; - -public class UserService { - - private IUserStorage userStorage; - - public UserService(IUserStorage userStorage) { - this.userStorage = userStorage; - } - - public List<Pipeline> getOwnPipelines(String email) { - return StorageDispatcher.INSTANCE.getNoSqlStore().getPipelineStorageAPI().getAllPipelines().stream().filter(p -> p - .getCreatedByUser() - .equals(email)) - .collect(Collectors.toList()); - } - - public void deleteOwnSource(String username, String sourceId) { - if (checkUser(username)) { - Principal user = getPrincipal(username); - //user.getOwnSources().removeIf(a -> a.getElementId().equals(sourceId)); - userStorage.updateUser(user); - } - } - - /** - * Get actions/sepas/sources - */ - - public List<String> getOwnActionUris(String username) { - // TODO permissions - return new ArrayList<>(); - //return userStorage.getUser(username) - // .getOwnActions().stream().map(r -> r.getElementId()).collect(Collectors.toList()); - } - - public List<String> getOwnSepaUris(String username) { - // TODO Permissions - return new ArrayList<>(); - //return userStorage.getUser(username) - // .getOwnSepas().stream().map(r -> r.getElementId()).collect(Collectors.toList()); - } - - - public List<String> getOwnSourceUris(String email) { - // TODO permissions - return new ArrayList<>(); -// return userStorage -// .getUser(email) -// .getOwnSources() -// .stream() -// .map(r -> r.getElementId()) -// .collect(Collectors.toList()); - } - - private Principal getPrincipal(String username) { - return userStorage.getUser(username); - } - - - /** - * @param username - * @return True if user exists exactly once, false otherwise - */ - public boolean checkUser(String username) { - return userStorage.checkUser(username); - } - - private INoSqlStorage getStorageManager() { - return StorageDispatcher.INSTANCE.getNoSqlStore(); - } - -} diff --git a/streampipes-rest-core-base/src/main/java/org/apache/streampipes/rest/core/base/impl/AbstractRestResource.java b/streampipes-rest-core-base/src/main/java/org/apache/streampipes/rest/core/base/impl/AbstractRestResource.java index 33b2607b9..10c74f619 100644 --- a/streampipes-rest-core-base/src/main/java/org/apache/streampipes/rest/core/base/impl/AbstractRestResource.java +++ b/streampipes-rest-core-base/src/main/java/org/apache/streampipes/rest/core/base/impl/AbstractRestResource.java @@ -19,7 +19,6 @@ package org.apache.streampipes.rest.core.base.impl; import org.apache.streampipes.manager.endpoint.HttpJsonParser; -import org.apache.streampipes.manager.storage.UserService; import org.apache.streampipes.model.message.ErrorMessage; import org.apache.streampipes.model.message.Message; import org.apache.streampipes.model.message.Notification; @@ -68,10 +67,6 @@ public abstract class AbstractRestResource extends AbstractSharedRestInterface { return getNoSqlStorage().getUserStorageAPI(); } - protected UserService getUserService() { - return new UserService(getUserStorage()); - } - protected IVisualizationStorage getVisualizationStorage() { return getNoSqlStorage().getVisualizationStorageApi(); }
