This is an automated email from the ASF dual-hosted git repository. riemer pushed a commit to branch 3280-add-support-for-pipeline-templates in repository https://gitbox.apache.org/repos/asf/streampipes.git
commit 3f5c07ce287eb51501b283c7ec3381613510a81f Author: Dominik Riemer <[email protected]> AuthorDate: Thu Oct 3 18:57:48 2024 +0200 feat(#3280): Add support for pipeline templates --- .../management/compact/PersistPipelineHandler.java | 113 ++++++++------ .../model/pipeline/compact/CompactPipeline.java | 2 + .../model/template/BoundPipelineElement.java | 57 ------- .../model/template/CompactPipelineTemplate.java | 106 +++++++++++++ .../template/PipelineElementTemplateConfig.java | 67 -------- .../PipelinePlaceholderConfig.java} | 16 +- ...ionContainer.java => PipelinePlaceholders.java} | 25 +-- .../template/PipelineTemplateDescription.java | 87 ----------- .../PipelineTemplateGenerationRequest.java | 16 +- .../model/template/PipelineTemplateInvocation.java | 97 ------------ .../org/apache/streampipes/model/util/Cloner.java | 8 - .../InvocablePipelineElementGenerator.java | 2 +- .../manager/template/PipelineGenerator.java | 139 ----------------- .../template/PipelineTemplateGenerator.java | 75 --------- .../PipelineTemplateInvocationGenerator.java | 66 -------- .../PipelineTemplateInvocationHandler.java | 101 ------------- .../template/PipelineTemplateManagement.java | 80 ---------- .../compact/CompactPipelineTemplateManagement.java | 98 ++++++++++++ .../template/compact/MatchingStreamFinder.java | 77 ++++++++++ .../instances/DataLakePipelineTemplate.java | 44 ------ ...e.java => DefaultPipelineTemplateProvider.java} | 10 +- .../instances/PersistDataLakePipelineTemplate.java | 70 +++++++++ .../rest/core/base/impl/CRUDResource.java | 29 +++- .../rest/shared/constants/SpMediaType.java | 16 +- .../streampipes/rest/impl/PipelineTemplate.java | 116 ++++++++------ .../rest/impl/connect/CompactAdapterResource.java | 7 +- .../sdk/builder/BoundPipelineElementBuilder.java | 63 -------- .../sdk/builder/PipelineTemplateBuilder.java | 61 -------- .../core/migrations/AvailableMigrations.java | 4 +- .../v970/AddDataLakePipelineTemplateMigration.java | 51 +++++++ .../streampipes/storage/api/INoSqlStorage.java | 3 + .../storage/couchdb/CouchDbStorageManager.java | 9 ++ .../standalone/StreamPipesNotificationSink.java | 2 +- .../src/lib/apis/compact-pipeline.service.ts | 43 +++--- .../src/lib/apis/pipeline-template.service.ts | 105 ++----------- .../src/lib/model/gen/streampipes-model.ts | 168 ++++++++++++++++++++- .../platform-services/src/public-api.ts | 1 + .../adapter-started-dialog.component.ts | 106 +++++++------ .../pipeline-assembly-options.component.html | 18 ++- .../pipeline-assembly-options.component.ts | 25 ++- .../pipeline-assembly.component.html | 2 + .../pipeline-assembly.component.ts | 15 +- .../add-template-dialog.component.html | 41 +++++ .../add-template-dialog.component.scss | 0 .../add-template-dialog.component.ts | 63 ++++++++ .../template-selection.component.html | 39 +++++ .../template-selection.component.scss | 22 ++- .../template-selection.component.ts | 38 ++--- ui/src/app/editor/editor.module.ts | 4 + .../services/pipeline-positioning.service.ts | 1 + 50 files changed, 1108 insertions(+), 1300 deletions(-) diff --git a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/compact/PersistPipelineHandler.java b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/compact/PersistPipelineHandler.java index 7a262fca26..dd0afb7ef7 100644 --- a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/compact/PersistPipelineHandler.java +++ b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/compact/PersistPipelineHandler.java @@ -18,72 +18,93 @@ package org.apache.streampipes.connect.management.compact; -import org.apache.streampipes.manager.template.PipelineTemplateManagement; +import org.apache.streampipes.manager.pipeline.PipelineManager; +import org.apache.streampipes.manager.pipeline.compact.CompactPipelineManagement; import org.apache.streampipes.model.connect.adapter.AdapterDescription; +import org.apache.streampipes.model.connect.adapter.compact.CreateOptions; import org.apache.streampipes.model.pipeline.PipelineOperationStatus; +import org.apache.streampipes.model.pipeline.compact.CompactPipeline; +import org.apache.streampipes.model.pipeline.compact.CompactPipelineElement; import org.apache.streampipes.model.schema.EventProperty; import org.apache.streampipes.model.schema.EventPropertyPrimitive; -import org.apache.streampipes.model.staticproperty.FreeTextStaticProperty; -import org.apache.streampipes.model.staticproperty.MappingPropertyUnary; -import org.apache.streampipes.model.staticproperty.OneOfStaticProperty; -import org.apache.streampipes.model.template.PipelineTemplateInvocation; +import org.apache.streampipes.model.schema.PropertyScope; +import org.apache.streampipes.model.template.CompactPipelineTemplate; +import org.apache.streampipes.storage.api.CRUDStorage; import org.apache.streampipes.vocabulary.SO; -public class PersistPipelineHandler { +import java.util.List; +import java.util.Map; + +import static org.apache.streampipes.manager.template.instances.PersistDataLakePipelineTemplate.DATA_LAKE_CONNECTOR_ID; +import static org.apache.streampipes.manager.template.instances.PersistDataLakePipelineTemplate.DATA_LAKE_DIMENSIONS_FIELD; +import static org.apache.streampipes.manager.template.instances.PersistDataLakePipelineTemplate.DATA_LAKE_MEASUREMENT_FIELD; +import static org.apache.streampipes.manager.template.instances.PersistDataLakePipelineTemplate.DATA_LAKE_TEMPLATE_ID; +import static org.apache.streampipes.manager.template.instances.PersistDataLakePipelineTemplate.DATA_LAKE_TIMESTAMP_FIELD; - private static final String templateId = "org.apache.streampipes.manager.template.instances.DataLakePipelineTemplate"; - private static final String configPrefix = "jsplumb_domId2"; +public class PersistPipelineHandler { - private final PipelineTemplateManagement pipelineTemplateManagement; + private final CRUDStorage<CompactPipelineTemplate> templateStorage; + private final CompactPipelineManagement pipelineManagement; private final String authenticatedUserSid; - public PersistPipelineHandler(PipelineTemplateManagement pipelineTemplateManagement, + public PersistPipelineHandler(CRUDStorage<CompactPipelineTemplate> templateStorage, + CompactPipelineManagement pipelineManagement, String authenticatedUserSid) { - this.pipelineTemplateManagement = pipelineTemplateManagement; + this.templateStorage = templateStorage; + this.pipelineManagement = pipelineManagement; this.authenticatedUserSid = authenticatedUserSid; } - public PipelineOperationStatus createAndStartPersistPipeline(AdapterDescription adapterDescription) { - var pipelineTemplateInvocation = pipelineTemplateManagement.prepareInvocation( - adapterDescription.getCorrespondingDataStreamElementId(), - templateId - ); - - applyPipelineName(pipelineTemplateInvocation, adapterDescription.getName()); - applyDataLakeConfig(pipelineTemplateInvocation, adapterDescription); - - return pipelineTemplateManagement.createAndStartPipeline(pipelineTemplateInvocation, authenticatedUserSid); + public PipelineOperationStatus createAndStartPersistPipeline(AdapterDescription adapterDescription) throws Exception { + var template = getTemplate(); + if (template != null) { + var compactPipeline = new CompactPipeline( + String.format("persist-%s", adapterDescription.getName().replaceAll(" ", "-")), + String.format("Persist %s", adapterDescription.getName()), + null, + makeTemplateConfig(adapterDescription, template.getPipeline()), + new CreateOptions(false, true) + ); + var pipelineGenerationResult = pipelineManagement.makePipeline(compactPipeline); + if (pipelineGenerationResult.allPipelineElementsValid()) { + String pipelineId = PipelineManager.addPipeline(authenticatedUserSid, pipelineGenerationResult.pipeline()); + if (compactPipeline.createOptions().start()) { + return PipelineManager.startPipeline(pipelineId); + } + } + } + throw new IllegalArgumentException("Could not start persist pipeline"); } - private void applyPipelineName(PipelineTemplateInvocation pipelineTemplateInvocation, - String adapterName) { - pipelineTemplateInvocation.setPipelineTemplateId(templateId); - pipelineTemplateInvocation.setKviName(adapterName); + private CompactPipelineTemplate getTemplate() { + return this.templateStorage.getElementById(DATA_LAKE_TEMPLATE_ID); } - private void applyDataLakeConfig(PipelineTemplateInvocation pipelineTemplateInvocation, - AdapterDescription adapterDescription) { - pipelineTemplateInvocation.getStaticProperties().forEach(sp -> { - if (sp.getInternalName().equalsIgnoreCase(withPrefix("db_measurement"))) { - ((FreeTextStaticProperty) sp).setValue(adapterDescription.getName()); - } - if (sp.getInternalName().equalsIgnoreCase(withPrefix("timestamp_mapping"))) { - ((MappingPropertyUnary) sp).setSelectedProperty( - String.format("s0::%s", getTimestampField(adapterDescription) - )); - } - if (sp.getInternalName().equalsIgnoreCase(withPrefix("schema_update"))) { - ((OneOfStaticProperty) sp).getOptions().forEach(o -> { - if (o.getName().equals("Update schema")) { - o.setSelected(true); - } - }); - } - }); + private List<CompactPipelineElement> makeTemplateConfig(AdapterDescription adapterDescription, + List<CompactPipelineElement> pipelineElements) { + pipelineElements.get(0).configuration().addAll( + List.of( + Map.of(DATA_LAKE_MEASUREMENT_FIELD, adapterDescription.getName()), + Map.of(DATA_LAKE_TIMESTAMP_FIELD, String.format("s0::%s", getTimestampField(adapterDescription))), + Map.of(DATA_LAKE_DIMENSIONS_FIELD, getDimensions(adapterDescription)) + ) + ); + pipelineElements.add(new CompactPipelineElement( + "stream", + DATA_LAKE_CONNECTOR_ID, + adapterDescription.getCorrespondingDataStreamElementId(), + null, + null + )); + return pipelineElements; } - private String withPrefix(String config) { - return configPrefix + config; + private List<String> getDimensions(AdapterDescription adapterDescription) { + return adapterDescription.getEventSchema().getEventProperties() + .stream() + .filter(ep -> ep.getPropertyScope().equalsIgnoreCase(PropertyScope.DIMENSION_PROPERTY.name())) + .map(EventProperty::getRuntimeName) + .toList(); } private String getTimestampField(AdapterDescription adapterDescription) { diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/compact/CompactPipeline.java b/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/compact/CompactPipeline.java index 59948aeb03..006a9cda12 100644 --- a/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/compact/CompactPipeline.java +++ b/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/compact/CompactPipeline.java @@ -19,9 +19,11 @@ package org.apache.streampipes.model.pipeline.compact; import org.apache.streampipes.model.connect.adapter.compact.CreateOptions; +import org.apache.streampipes.model.shared.annotation.TsModel; import java.util.List; +@TsModel public record CompactPipeline( String id, String name, diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/template/BoundPipelineElement.java b/streampipes-model/src/main/java/org/apache/streampipes/model/template/BoundPipelineElement.java deleted file mode 100644 index 267b3511be..0000000000 --- a/streampipes-model/src/main/java/org/apache/streampipes/model/template/BoundPipelineElement.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -package org.apache.streampipes.model.template; - -import org.apache.streampipes.model.base.InvocableStreamPipesEntity; -import org.apache.streampipes.model.util.Cloner; - -import java.util.ArrayList; -import java.util.List; - -public class BoundPipelineElement { - - private InvocableStreamPipesEntity pipelineElementTemplate; - - private List<BoundPipelineElement> connectedTo; - - public BoundPipelineElement() { - super(); - this.connectedTo = new ArrayList<>(); - } - - public BoundPipelineElement(BoundPipelineElement other) { - this.pipelineElementTemplate = other.getPipelineElementTemplate(); - this.connectedTo = new Cloner().boundPipelineElements(other.getConnectedTo()); - } - - public InvocableStreamPipesEntity getPipelineElementTemplate() { - return pipelineElementTemplate; - } - - public void setPipelineElementTemplate(InvocableStreamPipesEntity pipelineElementTemplate) { - this.pipelineElementTemplate = pipelineElementTemplate; - } - - public List<BoundPipelineElement> getConnectedTo() { - return connectedTo; - } - - public void setConnectedTo(List<BoundPipelineElement> connectedTo) { - this.connectedTo = connectedTo; - } -} diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/template/CompactPipelineTemplate.java b/streampipes-model/src/main/java/org/apache/streampipes/model/template/CompactPipelineTemplate.java new file mode 100644 index 0000000000..6b662c33bc --- /dev/null +++ b/streampipes-model/src/main/java/org/apache/streampipes/model/template/CompactPipelineTemplate.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.model.template; + +import org.apache.streampipes.model.pipeline.compact.CompactPipeline; +import org.apache.streampipes.model.pipeline.compact.CompactPipelineElement; +import org.apache.streampipes.model.shared.annotation.TsModel; +import org.apache.streampipes.model.shared.api.Storable; + +import com.fasterxml.jackson.annotation.JsonAlias; +import com.google.gson.annotations.SerializedName; + +import java.util.List; + +@TsModel +public class CompactPipelineTemplate implements Storable { + + @JsonAlias("id") + protected @SerializedName("_id") String elementId; + + @JsonAlias("_rev") + protected @SerializedName("_rev") String rev; + + private String name; + private String description; + private List<CompactPipelineElement> pipeline; + private PipelinePlaceholders placeholders; + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public String getDescription() { + return description; + } + + public void setDescription(String description) { + this.description = description; + } + + public List<CompactPipelineElement> getPipeline() { + return pipeline; + } + + public void setPipeline(List<CompactPipelineElement> pipeline) { + this.pipeline = pipeline; + } + + public PipelinePlaceholders getPlaceholders() { + return placeholders; + } + + public void setPlaceholders(PipelinePlaceholders placeholders) { + this.placeholders = placeholders; + } + + @Override + public String getRev() { + return rev; + } + + @Override + public void setRev(String rev) { + this.rev = rev; + } + + @Override + public String getElementId() { + return elementId; + } + + @Override + public void setElementId(String elementId) { + this.elementId = elementId; + } + + public CompactPipeline toCompactPipeline() { + return new CompactPipeline( + null, + null, + null, + this.getPipeline(), + null + ); + } +} diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/template/PipelineElementTemplateConfig.java b/streampipes-model/src/main/java/org/apache/streampipes/model/template/PipelineElementTemplateConfig.java deleted file mode 100644 index 0d52913e46..0000000000 --- a/streampipes-model/src/main/java/org/apache/streampipes/model/template/PipelineElementTemplateConfig.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -package org.apache.streampipes.model.template; - -public class PipelineElementTemplateConfig { - - private boolean editable; - private boolean displayed; - - private Object value; - - public PipelineElementTemplateConfig(boolean editable, boolean displayed, Object value) { - this.editable = editable; - this.displayed = displayed; - this.value = value; - } - - public PipelineElementTemplateConfig() { - } - - private PipelineElementTemplateConfig(Object value) { - this.value = value; - } - - public static PipelineElementTemplateConfig from(Object value) { - return new PipelineElementTemplateConfig(value); - } - - public boolean isEditable() { - return editable; - } - - public void setEditable(boolean editable) { - this.editable = editable; - } - - public boolean isDisplayed() { - return displayed; - } - - public void setDisplayed(boolean displayed) { - this.displayed = displayed; - } - - public Object getValue() { - return value; - } - - public void setValue(Object value) { - this.value = value; - } -} diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/compact/CompactPipeline.java b/streampipes-model/src/main/java/org/apache/streampipes/model/template/PipelinePlaceholderConfig.java similarity index 70% copy from streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/compact/CompactPipeline.java copy to streampipes-model/src/main/java/org/apache/streampipes/model/template/PipelinePlaceholderConfig.java index 59948aeb03..b1fea0d342 100644 --- a/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/compact/CompactPipeline.java +++ b/streampipes-model/src/main/java/org/apache/streampipes/model/template/PipelinePlaceholderConfig.java @@ -16,16 +16,8 @@ * */ -package org.apache.streampipes.model.pipeline.compact; +package org.apache.streampipes.model.template; -import org.apache.streampipes.model.connect.adapter.compact.CreateOptions; - -import java.util.List; - -public record CompactPipeline( - String id, - String name, - String description, - List<CompactPipelineElement> pipelineElements, - CreateOptions createOptions -) {} +public record PipelinePlaceholderConfig(String ref, + String id) { +} diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/template/PipelineTemplateDescriptionContainer.java b/streampipes-model/src/main/java/org/apache/streampipes/model/template/PipelinePlaceholders.java similarity index 61% copy from streampipes-model/src/main/java/org/apache/streampipes/model/template/PipelineTemplateDescriptionContainer.java copy to streampipes-model/src/main/java/org/apache/streampipes/model/template/PipelinePlaceholders.java index e523fa0bb6..9f247f312b 100644 --- a/streampipes-model/src/main/java/org/apache/streampipes/model/template/PipelineTemplateDescriptionContainer.java +++ b/streampipes-model/src/main/java/org/apache/streampipes/model/template/PipelinePlaceholders.java @@ -15,30 +15,11 @@ * limitations under the License. * */ + package org.apache.streampipes.model.template; -import java.util.ArrayList; import java.util.List; -public class PipelineTemplateDescriptionContainer { - - private List<PipelineTemplateDescription> list; - - public PipelineTemplateDescriptionContainer() { - super(); - this.list = new ArrayList<>(); - } - - public PipelineTemplateDescriptionContainer(List<PipelineTemplateDescription> dataStreams) { - super(); - this.list = dataStreams; - } - - public List<PipelineTemplateDescription> getList() { - return list; - } - - public void setList(List<PipelineTemplateDescription> list) { - this.list = list; - } +public record PipelinePlaceholders(List<String> requiredStreamInputs, + List<PipelinePlaceholderConfig> requiredConfigs) { } diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/template/PipelineTemplateDescription.java b/streampipes-model/src/main/java/org/apache/streampipes/model/template/PipelineTemplateDescription.java deleted file mode 100644 index 5a00f6c754..0000000000 --- a/streampipes-model/src/main/java/org/apache/streampipes/model/template/PipelineTemplateDescription.java +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -package org.apache.streampipes.model.template; - -import org.apache.streampipes.model.SpDataStream; -import org.apache.streampipes.model.base.NamedStreamPipesEntity; -import org.apache.streampipes.model.shared.annotation.TsModel; -import org.apache.streampipes.model.util.Cloner; - -import java.util.ArrayList; -import java.util.List; - -@TsModel -public class PipelineTemplateDescription extends NamedStreamPipesEntity { - - private List<BoundPipelineElement> boundTo; - - public PipelineTemplateDescription() { - super(); - this.boundTo = new ArrayList<>(); - } - - public PipelineTemplateDescription(PipelineTemplateDescription other) { - super(other); - // TODO use cloner - if (other.getBoundTo() != null) { - this.boundTo = new Cloner().boundPipelineElements(other.getBoundTo()); - } - //this.pipelineTemplateName = other.getPipelineTemplateName(); - //this.pipelineTemplateDescription = other.getPipelineTemplateDescription(); - //this.pipelineTemplateId = other.getPipelineTemplateId(); - } - - public PipelineTemplateDescription(String elementName, SpDataStream requiredStream, - List<BoundPipelineElement> connectedTo) { - super(elementName); - this.boundTo = connectedTo; - } - - public List<BoundPipelineElement> getBoundTo() { - return boundTo; - } - - public void setBoundTo(List<BoundPipelineElement> boundTo) { - this.boundTo = boundTo; - } - - public String getPipelineTemplateName() { - return super.getName(); - } - - public void setPipelineTemplateName(String pipelineTemplateName) { - super.setName(pipelineTemplateName); - } - - public String getPipelineTemplateDescription() { - return super.getDescription(); - } - - public void setPipelineTemplateDescription(String pipelineTemplateDescription) { - super.setDescription(pipelineTemplateDescription); - } - - public String getPipelineTemplateId() { - return super.getElementId(); - } - - public void setPipelineTemplateId(String pipelineTemplateId) { - super.setElementId(pipelineTemplateId); - super.setAppId(pipelineTemplateId); - } -} diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/template/instances/PipelineTemplate.java b/streampipes-model/src/main/java/org/apache/streampipes/model/template/PipelineTemplateGenerationRequest.java similarity index 64% copy from streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/template/instances/PipelineTemplate.java copy to streampipes-model/src/main/java/org/apache/streampipes/model/template/PipelineTemplateGenerationRequest.java index 9e115cb212..b25b1caf27 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/template/instances/PipelineTemplate.java +++ b/streampipes-model/src/main/java/org/apache/streampipes/model/template/PipelineTemplateGenerationRequest.java @@ -16,15 +16,15 @@ * */ -package org.apache.streampipes.manager.template.instances; +package org.apache.streampipes.model.template; -import org.apache.streampipes.commons.exceptions.ElementNotFoundException; -import org.apache.streampipes.model.template.PipelineTemplateDescription; +import org.apache.streampipes.model.shared.annotation.TsModel; -import java.net.URISyntaxException; - -public interface PipelineTemplate { - - PipelineTemplateDescription declareModel() throws URISyntaxException, ElementNotFoundException; +import java.util.Map; +@TsModel +public record PipelineTemplateGenerationRequest(CompactPipelineTemplate template, + Map<String, String> streams, + String pipelineName, + String pipelineDescription) { } diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/template/PipelineTemplateInvocation.java b/streampipes-model/src/main/java/org/apache/streampipes/model/template/PipelineTemplateInvocation.java deleted file mode 100644 index e36de80bcd..0000000000 --- a/streampipes-model/src/main/java/org/apache/streampipes/model/template/PipelineTemplateInvocation.java +++ /dev/null @@ -1,97 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -package org.apache.streampipes.model.template; - -import org.apache.streampipes.model.shared.annotation.TsModel; -import org.apache.streampipes.model.staticproperty.StaticProperty; -import org.apache.streampipes.model.util.Cloner; - -import com.fasterxml.jackson.annotation.JsonTypeInfo; - -import java.util.ArrayList; -import java.util.List; - -@JsonTypeInfo(use = JsonTypeInfo.Id.CLASS, property = "@class") -@TsModel -public class PipelineTemplateInvocation { - - private String kviName; - - private String dataStreamId; - - private String pipelineTemplateId; - - private PipelineTemplateDescription pipelineTemplateDescription; - - private List<StaticProperty> staticProperties; - - public PipelineTemplateInvocation() { - super(); - this.staticProperties = new ArrayList<>(); - } - - public PipelineTemplateInvocation(PipelineTemplateInvocation other) { - this.kviName = other.getKviName(); - this.dataStreamId = other.getDataStreamId(); - this.pipelineTemplateId = other.getPipelineTemplateId(); - - if (other.getStaticProperties() != null) { - this.staticProperties = new Cloner().staticProperties(other.getStaticProperties()); - } - } - - public String getKviName() { - return kviName; - } - - public void setKviName(String kviName) { - this.kviName = kviName; - } - - public String getDataStreamId() { - return dataStreamId; - } - - public void setDataStreamId(String dataStreamId) { - this.dataStreamId = dataStreamId; - } - - public List<StaticProperty> getStaticProperties() { - return staticProperties; - } - - public void setStaticProperties(List<StaticProperty> staticProperties) { - this.staticProperties = staticProperties; - } - - public PipelineTemplateDescription getPipelineTemplateDescription() { - return pipelineTemplateDescription; - } - - public void setPipelineTemplateDescription(PipelineTemplateDescription pipelineTemplateDescription) { - this.pipelineTemplateDescription = pipelineTemplateDescription; - } - - public String getPipelineTemplateId() { - return pipelineTemplateId; - } - - public void setPipelineTemplateId(String pipelineTemplateId) { - this.pipelineTemplateId = pipelineTemplateId; - } -} diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/util/Cloner.java b/streampipes-model/src/main/java/org/apache/streampipes/model/util/Cloner.java index 853415270c..bce767d46c 100644 --- a/streampipes-model/src/main/java/org/apache/streampipes/model/util/Cloner.java +++ b/streampipes-model/src/main/java/org/apache/streampipes/model/util/Cloner.java @@ -73,7 +73,6 @@ import org.apache.streampipes.model.staticproperty.StaticPropertyAlternative; import org.apache.streampipes.model.staticproperty.StaticPropertyAlternatives; import org.apache.streampipes.model.staticproperty.StaticPropertyGroup; import org.apache.streampipes.model.staticproperty.SupportedProperty; -import org.apache.streampipes.model.template.BoundPipelineElement; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -254,13 +253,6 @@ public class Cloner { } } - public List<BoundPipelineElement> boundPipelineElements(List<BoundPipelineElement> boundPipelineElements) { - return boundPipelineElements - .stream() - .map(BoundPipelineElement::new) - .collect(Collectors.toList()); - } - public List<NamedStreamPipesEntity> cloneDescriptions(List<NamedStreamPipesEntity> pipelineElementDescriptions) { return pipelineElementDescriptions .stream() diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/pipeline/compact/generation/InvocablePipelineElementGenerator.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/pipeline/compact/generation/InvocablePipelineElementGenerator.java index 0f8d135830..8686a4b0a6 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/pipeline/compact/generation/InvocablePipelineElementGenerator.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/pipeline/compact/generation/InvocablePipelineElementGenerator.java @@ -27,7 +27,7 @@ import java.util.ArrayList; public class InvocablePipelineElementGenerator<T extends InvocableStreamPipesEntity> { - private static final String ID_PREFIX = "jsplumb_"; + public static final String ID_PREFIX = "jsplumb_"; public void apply(T element, CompactPipelineElement compatPipelineElement) { diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/template/PipelineGenerator.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/template/PipelineGenerator.java deleted file mode 100644 index 134e37a1d2..0000000000 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/template/PipelineGenerator.java +++ /dev/null @@ -1,139 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -package org.apache.streampipes.manager.template; - -import org.apache.streampipes.manager.matching.PipelineVerificationHandlerV2; -import org.apache.streampipes.model.SpDataStream; -import org.apache.streampipes.model.base.InvocableStreamPipesEntity; -import org.apache.streampipes.model.graph.DataProcessorInvocation; -import org.apache.streampipes.model.graph.DataSinkInvocation; -import org.apache.streampipes.model.message.PipelineModificationMessage; -import org.apache.streampipes.model.pipeline.Pipeline; -import org.apache.streampipes.model.pipeline.PipelineModification; -import org.apache.streampipes.model.template.BoundPipelineElement; -import org.apache.streampipes.model.template.PipelineTemplateDescription; -import org.apache.streampipes.storage.management.StorageDispatcher; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.UUID; - -public class PipelineGenerator { - - private final PipelineTemplateDescription pipelineTemplateDescription; - private final String datasetId; - private final Pipeline pipeline; - private final String pipelineName; - - private int count = 0; - - public PipelineGenerator(String datasetId, - PipelineTemplateDescription pipelineTemplateDescription, - String pipelineName) { - this.pipelineTemplateDescription = pipelineTemplateDescription; - this.datasetId = datasetId; - this.pipelineName = pipelineName; - this.pipeline = new Pipeline(); - } - - public Pipeline makePipeline() { - - pipeline.setName(pipelineName); - pipeline.setPipelineId(UUID.randomUUID().toString()); - - pipeline.setStreams(Collections.singletonList(prepareStream(datasetId))); - pipeline.setSepas(new ArrayList<>()); - pipeline.setActions(new ArrayList<>()); - collectInvocations("jsplumb_domId" + count, pipelineTemplateDescription.getBoundTo()); - - return pipeline; - } - - private SpDataStream prepareStream(String streamId) { - SpDataStream stream = getStream(streamId); - stream = new SpDataStream(stream); - stream.setDom(getDom()); - return stream; - } - - private void collectInvocations(String currentDomId, - List<BoundPipelineElement> boundPipelineElements) { - for (BoundPipelineElement pipelineElement : boundPipelineElements) { - InvocableStreamPipesEntity entity = clonePe(pipelineElement.getPipelineElementTemplate()); - entity.setConnectedTo(Collections.singletonList(currentDomId)); - entity.setDom(getDom()); - if (entity instanceof DataProcessorInvocation) { - pipeline.getSepas().add((DataProcessorInvocation) entity); - if (pipelineElement.getConnectedTo().size() > 0) { - collectInvocations(entity.getDom(), pipelineElement.getConnectedTo()); - } - } else { - pipeline.getActions().add((DataSinkInvocation) entity); - } - } - PipelineModificationMessage message = new PipelineVerificationHandlerV2(pipeline).verifyPipeline(); - handleModifications(message); - } - - private void handleModifications(PipelineModificationMessage message) { - pipeline.getSepas().forEach(processor -> { - PipelineModification modification = getModification(message, processor.getDom()); - processor.setOutputStream(modification.getOutputStream()); - processor.setOutputStrategies(modification.getOutputStrategies()); - processor.setStaticProperties(modification.getStaticProperties()); - processor.setInputStreams(modification.getInputStreams()); - processor.setElementId(modification.getElementId()); - }); - pipeline.getActions().forEach(sink -> { - PipelineModification modification = getModification(message, sink.getDom()); - sink.setStaticProperties(modification.getStaticProperties()); - sink.setInputStreams(modification.getInputStreams()); - sink.setElementId(modification.getElementId()); - }); - } - - private PipelineModification getModification(PipelineModificationMessage message, - String domId) { - return message.getPipelineModifications() - .stream() - .filter(pm -> pm.getDomId().equals(domId)) - .findFirst() - .orElseThrow(IllegalArgumentException::new); - } - - private InvocableStreamPipesEntity clonePe(InvocableStreamPipesEntity pipelineElementTemplate) { - if (pipelineElementTemplate instanceof DataProcessorInvocation) { - return new DataProcessorInvocation((DataProcessorInvocation) pipelineElementTemplate); - } else { - return new DataSinkInvocation((DataSinkInvocation) pipelineElementTemplate); - } - } - - private SpDataStream getStream(String datasetId) { - return StorageDispatcher.INSTANCE.getNoSqlStore() - .getPipelineElementDescriptionStorage() - .getEventStreamById(datasetId); - } - - - private String getDom() { - count++; - return "jsplumb_domId" + count; - } -} diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/template/PipelineTemplateGenerator.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/template/PipelineTemplateGenerator.java deleted file mode 100644 index 86c46bc742..0000000000 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/template/PipelineTemplateGenerator.java +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -package org.apache.streampipes.manager.template; - -import org.apache.streampipes.commons.exceptions.ElementNotFoundException; -import org.apache.streampipes.manager.template.instances.DataLakePipelineTemplate; -import org.apache.streampipes.manager.template.instances.PipelineTemplate; -import org.apache.streampipes.model.graph.DataSinkDescription; -import org.apache.streampipes.model.template.PipelineTemplateDescription; -import org.apache.streampipes.storage.api.IPipelineElementDescriptionStorage; -import org.apache.streampipes.storage.management.StorageDispatcher; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.net.URISyntaxException; -import java.util.ArrayList; -import java.util.List; - -public class PipelineTemplateGenerator { - - - Logger logger = LoggerFactory.getLogger(PipelineTemplateGenerator.class); - - private final List<PipelineTemplateDescription> availableDescriptions = new ArrayList<>(); - - public List<PipelineTemplateDescription> getAllPipelineTemplates() { - - List<PipelineTemplate> allPipelineTemplates = new ArrayList<>(); - - allPipelineTemplates.add(new DataLakePipelineTemplate()); - - - for (PipelineTemplate pt : allPipelineTemplates) { - try { - availableDescriptions.add(pt.declareModel()); - } catch (URISyntaxException e) { - e.printStackTrace(); - } catch (ElementNotFoundException e) { - logger.warn("Adapter template can not be used because some elements are not installed", e); - } - } - - return availableDescriptions; - } - - protected DataSinkDescription getSink(String id) throws ElementNotFoundException { - try { - return getStorage() - .getDataSinkByAppId(id); - } catch (IllegalArgumentException e) { - throw new ElementNotFoundException("Data stream " + id + " is not installed!"); - } - } - - protected IPipelineElementDescriptionStorage getStorage() { - return StorageDispatcher.INSTANCE.getNoSqlStore() - .getPipelineElementDescriptionStorage(); - } -} diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/template/PipelineTemplateInvocationGenerator.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/template/PipelineTemplateInvocationGenerator.java deleted file mode 100644 index ac2e1ac221..0000000000 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/template/PipelineTemplateInvocationGenerator.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -package org.apache.streampipes.manager.template; - -import org.apache.streampipes.model.SpDataStream; -import org.apache.streampipes.model.pipeline.Pipeline; -import org.apache.streampipes.model.staticproperty.StaticProperty; -import org.apache.streampipes.model.template.PipelineTemplateDescription; -import org.apache.streampipes.model.template.PipelineTemplateInvocation; - -import java.util.ArrayList; -import java.util.List; - -public class PipelineTemplateInvocationGenerator { - - private final SpDataStream spDataStream; - private final PipelineTemplateDescription pipelineTemplateDescription; - - public PipelineTemplateInvocationGenerator(SpDataStream dataStream, - PipelineTemplateDescription pipelineTemplateDescription) { - this.spDataStream = dataStream; - this.pipelineTemplateDescription = pipelineTemplateDescription; - } - - public PipelineTemplateInvocation generateInvocation() { - - Pipeline pipeline = - new PipelineGenerator(spDataStream.getElementId(), pipelineTemplateDescription, "test").makePipeline(); - - PipelineTemplateInvocation pipelineTemplateInvocation = new PipelineTemplateInvocation(); - pipelineTemplateInvocation.setStaticProperties(collectStaticProperties(pipeline)); - pipelineTemplateInvocation.setDataStreamId(spDataStream.getElementId()); - pipelineTemplateInvocation.setPipelineTemplateId(pipelineTemplateDescription.getPipelineTemplateId()); - return pipelineTemplateInvocation; - } - - private List<StaticProperty> collectStaticProperties(Pipeline pipeline) { - List<StaticProperty> staticProperties = new ArrayList<>(); - - pipeline.getSepas().forEach(pe -> { - pe.getStaticProperties().forEach(sp -> sp.setInternalName(pe.getDom() + sp.getInternalName())); - staticProperties.addAll(pe.getStaticProperties()); - }); - pipeline.getActions().forEach(pe -> { - pe.getStaticProperties().forEach(sp -> sp.setInternalName(pe.getDom() + sp.getInternalName())); - staticProperties.addAll(pe.getStaticProperties()); - }); - - return staticProperties; - } -} diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/template/PipelineTemplateInvocationHandler.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/template/PipelineTemplateInvocationHandler.java deleted file mode 100644 index 7b1748a9ef..0000000000 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/template/PipelineTemplateInvocationHandler.java +++ /dev/null @@ -1,101 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -package org.apache.streampipes.manager.template; - -import org.apache.streampipes.manager.execution.PipelineExecutor; -import org.apache.streampipes.manager.permission.PermissionManager; -import org.apache.streampipes.manager.storage.PipelineStorageService; -import org.apache.streampipes.model.base.InvocableStreamPipesEntity; -import org.apache.streampipes.model.client.user.Permission; -import org.apache.streampipes.model.pipeline.Pipeline; -import org.apache.streampipes.model.pipeline.PipelineOperationStatus; -import org.apache.streampipes.model.staticproperty.StaticProperty; -import org.apache.streampipes.model.template.PipelineTemplateDescription; -import org.apache.streampipes.model.template.PipelineTemplateInvocation; -import org.apache.streampipes.storage.management.StorageDispatcher; - -import java.util.ArrayList; -import java.util.List; - -public class PipelineTemplateInvocationHandler { - - private final PipelineTemplateInvocation pipelineTemplateInvocation; - private final PipelineTemplateDescription pipelineTemplateDescription; - private final String username; - - public PipelineTemplateInvocationHandler(String username, PipelineTemplateInvocation pipelineTemplateInvocation) { - this.username = username; - this.pipelineTemplateInvocation = pipelineTemplateInvocation; - this.pipelineTemplateDescription = getTemplateById(pipelineTemplateInvocation.getPipelineTemplateId()); - } - - public PipelineOperationStatus handlePipelineInvocation() { - Pipeline pipeline = new PipelineGenerator(pipelineTemplateInvocation.getDataStreamId(), pipelineTemplateDescription, - pipelineTemplateInvocation.getKviName()).makePipeline(); - pipeline.setCreatedByUser(username); - pipeline.setCreatedAt(System.currentTimeMillis()); - replaceStaticProperties(pipeline); - new PipelineStorageService(pipeline).addPipeline(); - Permission permission = new PermissionManager().makePermission(pipeline, username); - StorageDispatcher.INSTANCE.getNoSqlStore().getPermissionStorage().persist(permission); - Pipeline storedPipeline = - StorageDispatcher.INSTANCE.getNoSqlStore().getPipelineStorageAPI().getElementById(pipeline.getPipelineId()); - return new PipelineExecutor(storedPipeline).startPipeline(); - } - - private void replaceStaticProperties(Pipeline pipeline) { - pipeline.getSepas().forEach(this::replace); - pipeline.getActions().forEach(this::replace); - } - - private void replace(InvocableStreamPipesEntity pe) { - List<StaticProperty> newProperties = new ArrayList<>(); - pe.getStaticProperties().forEach(sp -> { - if (existsInCustomizedElements(pe.getDom(), sp)) { - newProperties.add(getCustomizedElement(pe.getDom(), pe.getDom() + sp.getInternalName())); - } else { - newProperties.add(sp); - } - }); - pe.setStaticProperties(newProperties); - } - - - private StaticProperty getCustomizedElement(String dom, String internalName) { - StaticProperty staticProperty = pipelineTemplateInvocation - .getStaticProperties() - .stream() - .filter(sp -> sp.getInternalName().equals(internalName)).findFirst().get(); - - staticProperty.setInternalName(staticProperty.getInternalName().replace(dom, "")); - return staticProperty; - } - - private boolean existsInCustomizedElements(String dom, StaticProperty staticProperty) { - return pipelineTemplateInvocation - .getStaticProperties() - .stream() - .anyMatch(sp -> sp.getInternalName().equals(dom + staticProperty.getInternalName())); - } - - - private PipelineTemplateDescription getTemplateById(String pipelineTemplateId) { - return new PipelineTemplateGenerator().getAllPipelineTemplates().stream() - .filter(template -> template.getAppId().equals(pipelineTemplateId)).findFirst().get(); - } -} diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/template/PipelineTemplateManagement.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/template/PipelineTemplateManagement.java deleted file mode 100644 index a561c5093b..0000000000 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/template/PipelineTemplateManagement.java +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.streampipes.manager.template; - -import org.apache.streampipes.model.SpDataStream; -import org.apache.streampipes.model.pipeline.PipelineOperationStatus; -import org.apache.streampipes.model.template.PipelineTemplateDescription; -import org.apache.streampipes.model.template.PipelineTemplateInvocation; -import org.apache.streampipes.storage.management.StorageDispatcher; - -import java.util.List; -import java.util.Optional; - -public class PipelineTemplateManagement { - - public PipelineTemplateInvocation prepareInvocation(String streamId, - String pipelineTemplateId) { - SpDataStream dataStream = getDataStream(streamId); - - var pipelineTemplateDescriptionOpt = getPipelineTemplateDescription(pipelineTemplateId); - if (pipelineTemplateDescriptionOpt.isPresent()) { - PipelineTemplateInvocation invocation = - new PipelineTemplateInvocationGenerator( - dataStream, - pipelineTemplateDescriptionOpt.get() - ).generateInvocation(); - PipelineTemplateInvocation clonedInvocation = new PipelineTemplateInvocation(invocation); - return new PipelineTemplateInvocation(clonedInvocation); - } else { - throw new IllegalArgumentException(String.format( - "Could not find pipeline template with ID %s", - pipelineTemplateId) - ); - } - } - - public PipelineOperationStatus createAndStartPipeline(PipelineTemplateInvocation pipelineTemplateInvocation, - String authenticatedUserSid) { - return new PipelineTemplateInvocationHandler( - authenticatedUserSid, - pipelineTemplateInvocation - ).handlePipelineInvocation(); - } - - private Optional<PipelineTemplateDescription> getPipelineTemplateDescription(String pipelineTemplateId) { - return new PipelineTemplateGenerator() - .getAllPipelineTemplates() - .stream() - .filter(pt -> pt.getAppId().equals(pipelineTemplateId)) - .findFirst(); - } - - private SpDataStream getDataStream(String streamId) { - return getAllDataStreams() - .stream() - .filter(sp -> sp.getElementId().equals(streamId)) - .findFirst() - .get(); - } - - private List<SpDataStream> getAllDataStreams() { - return StorageDispatcher.INSTANCE.getNoSqlStore().getPipelineElementDescriptionStorage().getAllDataStreams(); - } -} diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/template/compact/CompactPipelineTemplateManagement.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/template/compact/CompactPipelineTemplateManagement.java new file mode 100644 index 0000000000..0bf308cfc4 --- /dev/null +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/template/compact/CompactPipelineTemplateManagement.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.manager.template.compact; + +import org.apache.streampipes.manager.matching.PipelineVerificationHandlerV2; +import org.apache.streampipes.manager.pipeline.compact.generation.PipelineElementConfigurationStep; +import org.apache.streampipes.model.pipeline.Pipeline; +import org.apache.streampipes.model.pipeline.PipelineModificationResult; +import org.apache.streampipes.model.pipeline.compact.CompactPipelineElement; +import org.apache.streampipes.model.template.CompactPipelineTemplate; +import org.apache.streampipes.model.template.PipelineTemplateGenerationRequest; +import org.apache.streampipes.storage.api.CRUDStorage; +import org.apache.streampipes.storage.api.IPipelineElementDescriptionStorage; + +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.apache.streampipes.manager.pipeline.compact.generation.InvocablePipelineElementGenerator.ID_PREFIX; + +public class CompactPipelineTemplateManagement { + + private final IPipelineElementDescriptionStorage storage; + private final CRUDStorage<CompactPipelineTemplate> templateStorage; + + public CompactPipelineTemplateManagement(CRUDStorage<CompactPipelineTemplate> templateStorage, + IPipelineElementDescriptionStorage descriptionStorage) { + this.templateStorage = templateStorage; + this.storage = descriptionStorage; + } + + public PipelineModificationResult makePipeline(PipelineTemplateGenerationRequest request) throws Exception { + var template = request.template(); + if (request.streams() != null && !request.streams().isEmpty()) { + request.streams().forEach((key, value) -> { + var stream = storage.getDataStreamById(value); + template.getPipeline().add(new CompactPipelineElement( + "stream", + key, + stream.getElementId(), + List.of(), + List.of() + )); + }); + } + var pipeline = makePipeline(template); + + return new PipelineVerificationHandlerV2(pipeline).makeModifiedPipeline(); + } + + private Pipeline makePipeline(CompactPipelineTemplate template) throws Exception { + var pipeline = new Pipeline(); + template.getPipeline().forEach(pe -> { + List<String> connectedToList = pe.connectedTo(); + Set<String> validIds = template.getPipeline().stream() + .map(CompactPipelineElement::ref) + .collect(Collectors.toSet()); + connectedToList.removeIf(connectedId -> !validIds.contains(connectedId)); + }); + new PipelineElementConfigurationStep(storage).apply(pipeline, template.toCompactPipeline()); + + return pipeline; + } + + public Map<String, List<List<String>>> getStreamsForTemplate(String pipelineTemplateId) throws Exception { + var template = Optional.ofNullable(templateStorage.getElementById(pipelineTemplateId)) + .orElseThrow(() -> new IllegalArgumentException("Template " + pipelineTemplateId + " not found")); + + var pipeline = makePipeline(template); + var allDataStreams = storage.getAllDataStreams(); + var requiredStreamInputs = template + .getPlaceholders() + .requiredStreamInputs() + .stream() + .map(c -> ID_PREFIX + c) + .toList(); + + return new MatchingStreamFinder().findMatchedStreams(pipeline, requiredStreamInputs, allDataStreams); + } +} diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/template/compact/MatchingStreamFinder.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/template/compact/MatchingStreamFinder.java new file mode 100644 index 0000000000..4bc5ef18f3 --- /dev/null +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/template/compact/MatchingStreamFinder.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.manager.template.compact; + +import org.apache.streampipes.manager.matching.v2.SchemaMatch; +import org.apache.streampipes.model.SpDataStream; +import org.apache.streampipes.model.base.InvocableStreamPipesEntity; +import org.apache.streampipes.model.base.NamedStreamPipesEntity; +import org.apache.streampipes.model.pipeline.Pipeline; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Stream; + +public class MatchingStreamFinder { + + public Map<String, List<List<String>>> findMatchedStreams(Pipeline pipeline, + List<String> requiredStreamInputs, + List<SpDataStream> allDataStreams) { + var matchedStreams = new HashMap<String, List<List<String>>>(); + + Stream.concat(pipeline.getSepas().stream(), pipeline.getActions().stream()) + .filter(pe -> hasRequiredInputs(pe, requiredStreamInputs)) + .forEach(pe -> matchedStreams.put( + pe.getElementId(), + findSupportedStreamsForPe(pe, requiredStreamInputs, allDataStreams)) + ); + + return matchedStreams; + } + + private boolean hasRequiredInputs(InvocableStreamPipesEntity pe, List<String> requiredStreamInputs) { + return requiredStreamInputs + .stream() + .anyMatch(pe.getConnectedTo()::contains); + } + + private List<List<String>> findSupportedStreamsForPe(InvocableStreamPipesEntity pe, + List<String> requiredStreamInputs, + List<SpDataStream> allDataStreams) { + var streams = new ArrayList<List<String>>(); + + for (int i = 0; i < pe.getConnectedTo().size(); i++) { + String connectedInput = pe.getConnectedTo().get(i); + if (requiredStreamInputs.contains(connectedInput)) { + streams.add(getSupportedStreams(allDataStreams, pe.getInputStreams().get(i))); + } + } + + return streams; + } + + private List<String> getSupportedStreams(List<SpDataStream> allDataStreams, SpDataStream inputStreamReq) { + return allDataStreams.stream() + .filter(ds -> new SchemaMatch().match(ds.getEventSchema(), inputStreamReq.getEventSchema(), List.of())) + .map(NamedStreamPipesEntity::getElementId) + .toList(); + } +} diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/template/instances/DataLakePipelineTemplate.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/template/instances/DataLakePipelineTemplate.java deleted file mode 100644 index c52854a299..0000000000 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/template/instances/DataLakePipelineTemplate.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.streampipes.manager.template.instances; - -import org.apache.streampipes.commons.exceptions.ElementNotFoundException; -import org.apache.streampipes.manager.template.PipelineTemplateGenerator; -import org.apache.streampipes.model.template.PipelineTemplateDescription; -import org.apache.streampipes.sdk.builder.BoundPipelineElementBuilder; -import org.apache.streampipes.sdk.builder.PipelineTemplateBuilder; - -public class DataLakePipelineTemplate extends PipelineTemplateGenerator implements PipelineTemplate { - - private static final String ID = "org.apache.streampipes.manager.template.instances.DataLakePipelineTemplate"; - - @Override - public PipelineTemplateDescription declareModel() throws ElementNotFoundException { - return new PipelineTemplateDescription( - PipelineTemplateBuilder.create("http://streampipes.org/DataLakePipelineTemplate", "DataLake", - "") - .setAppId(ID) - .boundPipelineElementTemplate( - BoundPipelineElementBuilder - .create(getSink("org.apache.streampipes.sinks.internal.jvm.datalake")) - .build()) - .build()); - - } -} diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/template/instances/PipelineTemplate.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/template/instances/DefaultPipelineTemplateProvider.java similarity index 73% rename from streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/template/instances/PipelineTemplate.java rename to streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/template/instances/DefaultPipelineTemplateProvider.java index 9e115cb212..2c3e53b8f3 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/template/instances/PipelineTemplate.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/template/instances/DefaultPipelineTemplateProvider.java @@ -18,13 +18,9 @@ package org.apache.streampipes.manager.template.instances; -import org.apache.streampipes.commons.exceptions.ElementNotFoundException; -import org.apache.streampipes.model.template.PipelineTemplateDescription; +import org.apache.streampipes.model.template.CompactPipelineTemplate; -import java.net.URISyntaxException; - -public interface PipelineTemplate { - - PipelineTemplateDescription declareModel() throws URISyntaxException, ElementNotFoundException; +public interface DefaultPipelineTemplateProvider { + CompactPipelineTemplate getTemplate(); } diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/template/instances/PersistDataLakePipelineTemplate.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/template/instances/PersistDataLakePipelineTemplate.java new file mode 100644 index 0000000000..caa6e7bf94 --- /dev/null +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/template/instances/PersistDataLakePipelineTemplate.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.manager.template.instances; + +import org.apache.streampipes.model.pipeline.compact.CompactPipelineElement; +import org.apache.streampipes.model.template.CompactPipelineTemplate; +import org.apache.streampipes.model.template.PipelinePlaceholderConfig; +import org.apache.streampipes.model.template.PipelinePlaceholders; + +import java.util.List; +import java.util.Map; + +public class PersistDataLakePipelineTemplate implements DefaultPipelineTemplateProvider { + + public static final String DATA_LAKE_SINK_REF = "lake"; + public static final String DATA_LAKE_SINK_ID = "org.apache.streampipes.sinks.internal.jvm.datalake"; + public static final String DATA_LAKE_CONNECTOR_ID = "stream1"; + public static final String DATA_LAKE_TEMPLATE_ID = "sp-internal-persist"; + + public static final String DATA_LAKE_MEASUREMENT_FIELD = "db_measurement"; + public static final String DATA_LAKE_TIMESTAMP_FIELD = "timestamp_mapping"; + public static final String DATA_LAKE_DIMENSIONS_FIELD = "dimensions_selection"; + + @Override + public CompactPipelineTemplate getTemplate() { + var template = new CompactPipelineTemplate(); + template.setElementId(DATA_LAKE_TEMPLATE_ID); + template.setName("Persist Data"); + template.setDescription("Use this template to persist an input data stream to the time-series storage"); + template.setPipeline(List.of( + new CompactPipelineElement( + "sink", + DATA_LAKE_SINK_REF, + DATA_LAKE_SINK_ID, + List.of(DATA_LAKE_CONNECTOR_ID), + List.of( + Map.of("schema_update", "Update schema"), + Map.of("ignore_duplicates", false) + ) + ) + ) + ); + template.setPlaceholders(new PipelinePlaceholders( + List.of("stream1"), + List.of( + new PipelinePlaceholderConfig(DATA_LAKE_SINK_REF, DATA_LAKE_MEASUREMENT_FIELD), + new PipelinePlaceholderConfig(DATA_LAKE_SINK_REF, DATA_LAKE_TIMESTAMP_FIELD), + new PipelinePlaceholderConfig(DATA_LAKE_SINK_REF, DATA_LAKE_DIMENSIONS_FIELD) + ) + )); + + return template; + } +} diff --git a/streampipes-rest-core-base/src/main/java/org/apache/streampipes/rest/core/base/impl/CRUDResource.java b/streampipes-rest-core-base/src/main/java/org/apache/streampipes/rest/core/base/impl/CRUDResource.java index 4a67fe1511..8e0818aac7 100644 --- a/streampipes-rest-core-base/src/main/java/org/apache/streampipes/rest/core/base/impl/CRUDResource.java +++ b/streampipes-rest-core-base/src/main/java/org/apache/streampipes/rest/core/base/impl/CRUDResource.java @@ -30,19 +30,38 @@ import java.util.List; public interface CRUDResource<T, ReT> { - @GetMapping(produces = MediaType.APPLICATION_JSON_VALUE) + @GetMapping(produces = { + MediaType.APPLICATION_JSON_VALUE, + "application/yaml", + "application/yml"}) List<T> findAll(); - @GetMapping(path = "/{id}", produces = MediaType.APPLICATION_JSON_VALUE) + @GetMapping(path = "/{id}", produces = { + MediaType.APPLICATION_JSON_VALUE, + "application/yaml", + "application/yml"}) T findById(@PathVariable("id") String id); @PostMapping( - produces = MediaType.APPLICATION_JSON_VALUE, - consumes = MediaType.APPLICATION_JSON_VALUE + produces = { + MediaType.APPLICATION_JSON_VALUE, + "application/yaml", + "application/yml"}, + consumes = { + MediaType.APPLICATION_JSON_VALUE, + "application/yaml", + "application/yml"} ) void create(@RequestBody T entity); - @PutMapping(path = "/{id}", produces = MediaType.APPLICATION_JSON_VALUE, consumes = MediaType.APPLICATION_JSON_VALUE) + @PutMapping(path = "/{id}", produces = { + MediaType.APPLICATION_JSON_VALUE, + "application/yaml", + "application/yml" + }, consumes = { + MediaType.APPLICATION_JSON_VALUE, + "application/yaml", + "application/yml"}) ReT update(@RequestBody T entity); @DeleteMapping(path = "/{id}") diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/compact/CompactPipeline.java b/streampipes-rest-shared/src/main/java/org/apache/streampipes/rest/shared/constants/SpMediaType.java similarity index 71% copy from streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/compact/CompactPipeline.java copy to streampipes-rest-shared/src/main/java/org/apache/streampipes/rest/shared/constants/SpMediaType.java index 59948aeb03..558f8dd721 100644 --- a/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/compact/CompactPipeline.java +++ b/streampipes-rest-shared/src/main/java/org/apache/streampipes/rest/shared/constants/SpMediaType.java @@ -16,16 +16,10 @@ * */ -package org.apache.streampipes.model.pipeline.compact; +package org.apache.streampipes.rest.shared.constants; -import org.apache.streampipes.model.connect.adapter.compact.CreateOptions; +public class SpMediaType { -import java.util.List; - -public record CompactPipeline( - String id, - String name, - String description, - List<CompactPipelineElement> pipelineElements, - CreateOptions createOptions -) {} + public static final String YAML = "application/yaml"; + public static final String YML = "application/yml"; +} diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/PipelineTemplate.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/PipelineTemplate.java index ee041f5232..ab36c1d0ed 100644 --- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/PipelineTemplate.java +++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/PipelineTemplate.java @@ -15,79 +15,109 @@ * limitations under the License. * */ + package org.apache.streampipes.rest.impl; -import org.apache.streampipes.manager.template.PipelineTemplateManagement; -import org.apache.streampipes.model.SpDataStream; -import org.apache.streampipes.model.SpDataStreamContainer; -import org.apache.streampipes.model.message.Notifications; -import org.apache.streampipes.model.pipeline.PipelineOperationStatus; -import org.apache.streampipes.model.template.PipelineTemplateInvocation; +import org.apache.streampipes.manager.template.compact.CompactPipelineTemplateManagement; +import org.apache.streampipes.model.template.CompactPipelineTemplate; +import org.apache.streampipes.model.template.PipelineTemplateGenerationRequest; import org.apache.streampipes.rest.core.base.impl.AbstractAuthGuardedRestResource; -import org.apache.streampipes.rest.shared.exception.SpMessageException; +import org.apache.streampipes.rest.shared.constants.SpMediaType; +import org.apache.streampipes.rest.shared.exception.BadRequestException; +import org.apache.streampipes.storage.api.CRUDStorage; +import org.apache.streampipes.storage.management.StorageDispatcher; -import org.springframework.http.HttpStatus; import org.springframework.http.MediaType; import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.DeleteMapping; import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.PutMapping; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; -import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; -import java.util.ArrayList; +import java.util.Comparator; import java.util.List; +import java.util.Map; @RestController @RequestMapping("/api/v2/pipeline-templates") public class PipelineTemplate extends AbstractAuthGuardedRestResource { - private final PipelineTemplateManagement pipelineTemplateManagement; + private final CRUDStorage<CompactPipelineTemplate> storage; + private final CompactPipelineTemplateManagement templateManagement; public PipelineTemplate() { - this.pipelineTemplateManagement = new PipelineTemplateManagement(); + storage = StorageDispatcher.INSTANCE.getNoSqlStore().getPipelineTemplateStorage(); + templateManagement = new CompactPipelineTemplateManagement( + storage, + StorageDispatcher.INSTANCE.getNoSqlStore().getPipelineElementDescriptionStorage() + ); + } + + @GetMapping( + produces = {MediaType.APPLICATION_JSON_VALUE, SpMediaType.YAML, SpMediaType.YML}) + public List<CompactPipelineTemplate> findAll() { + return storage.findAll() + .stream() + .sorted(Comparator.comparing(CompactPipelineTemplate::getName)) + .toList(); } - @GetMapping(path = "/streams", produces = MediaType.APPLICATION_JSON_VALUE) - public ResponseEntity<SpDataStreamContainer> getAvailableDataStreams() { - List<SpDataStream> sources = getPipelineElementRdfStorage().getAllDataStreams(); - List<SpDataStream> datasets = new ArrayList<>(); + @GetMapping( + path = "/{id}", + produces = {MediaType.APPLICATION_JSON_VALUE, SpMediaType.YAML, SpMediaType.YML}) + public ResponseEntity<?> findById(@PathVariable("id") String id) { + return ok(storage.getElementById(id)); + } - sources.stream() - .map(SpDataStream::new) - .forEach(datasets::add); - return ok((new SpDataStreamContainer(datasets))); + @PostMapping( + produces = {MediaType.APPLICATION_JSON_VALUE, SpMediaType.YAML, SpMediaType.YML}, + consumes = {MediaType.APPLICATION_JSON_VALUE, SpMediaType.YAML, SpMediaType.YML}) + public void create(@RequestBody CompactPipelineTemplate entity) { + storage.persist(entity); } - @GetMapping( - path = "/invocation", - produces = MediaType.APPLICATION_JSON_VALUE) - public ResponseEntity<PipelineTemplateInvocation> getPipelineTemplateInvocation( - @RequestParam(value = "streamId", required = false) String streamId, - @RequestParam(value = "templateId") String pipelineTemplateId) { + @PutMapping(path = "/{id}", + produces = {MediaType.APPLICATION_JSON_VALUE, SpMediaType.YAML, SpMediaType.YML}, + consumes = {MediaType.APPLICATION_JSON_VALUE, SpMediaType.YAML, SpMediaType.YML}) + public void update(@PathVariable("id") String id, @RequestBody CompactPipelineTemplate entity) { + storage.updateElement(entity); + } + + @DeleteMapping(path = "/{id}") + public void delete(@PathVariable("id") String id) { + storage.deleteElementById(id); + } + + + @PostMapping(path = "/{id}/pipeline", + produces = {MediaType.APPLICATION_JSON_VALUE, SpMediaType.YAML, SpMediaType.YML}, + consumes = {MediaType.APPLICATION_JSON_VALUE, SpMediaType.YAML, SpMediaType.YML}) + public ResponseEntity<?> makePipelineFromTemplate(@RequestBody PipelineTemplateGenerationRequest request) { try { - return ok(pipelineTemplateManagement.prepareInvocation(streamId, pipelineTemplateId)); + return ok(templateManagement.makePipeline(request).pipeline()); } catch (IllegalArgumentException e) { - throw new SpMessageException(HttpStatus.BAD_REQUEST, Notifications.error( - String.format( - "Could not create pipeline template %s - did you install all pipeline elements?", - pipelineTemplateId.substring(pipelineTemplateId.lastIndexOf(".") + 1)) - )); + return badRequest(e.getMessage()); + } catch (Exception e) { + throw new RuntimeException(e); } } - @PostMapping( - consumes = MediaType.APPLICATION_JSON_VALUE, - produces = MediaType.APPLICATION_JSON_VALUE) - public ResponseEntity<PipelineOperationStatus> generatePipeline( - @RequestBody PipelineTemplateInvocation pipelineTemplateInvocation) { - - var status = pipelineTemplateManagement.createAndStartPipeline( - pipelineTemplateInvocation, - getAuthenticatedUserSid() - ); - return ok(status); + @GetMapping( + path = "/{id}/streams", + produces = {MediaType.APPLICATION_JSON_VALUE, SpMediaType.YAML, SpMediaType.YML}) + public ResponseEntity<Map<String, List<List<String>>>> getAvailableStreamsForTemplate( + @PathVariable("id") String pipelineTemplateId) { + try { + return ok(templateManagement.getStreamsForTemplate(pipelineTemplateId)); + } catch (IllegalArgumentException e) { + throw new BadRequestException(e.getMessage()); + } catch (Exception e) { + throw new RuntimeException(e); + } } } diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/CompactAdapterResource.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/CompactAdapterResource.java index da76a0eff3..97dbb7912a 100644 --- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/CompactAdapterResource.java +++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/CompactAdapterResource.java @@ -25,7 +25,7 @@ import org.apache.streampipes.connect.management.compact.PersistPipelineHandler; import org.apache.streampipes.connect.management.management.AdapterMasterManagement; import org.apache.streampipes.connect.management.management.AdapterUpdateManagement; import org.apache.streampipes.connect.management.management.CompactAdapterManagement; -import org.apache.streampipes.manager.template.PipelineTemplateManagement; +import org.apache.streampipes.manager.pipeline.compact.CompactPipelineManagement; import org.apache.streampipes.model.connect.adapter.AdapterDescription; import org.apache.streampipes.model.connect.adapter.compact.CompactAdapter; import org.apache.streampipes.model.message.Notifications; @@ -88,7 +88,10 @@ public class CompactAdapterResource extends AbstractAdapterResource<AdapterMaste if (compactAdapter.createOptions().persist()) { var storedAdapter = managementService.getAdapter(adapterId); var status = new PersistPipelineHandler( - new PipelineTemplateManagement(), + getNoSqlStorage().getPipelineTemplateStorage(), + new CompactPipelineManagement( + getNoSqlStorage().getPipelineElementDescriptionStorage() + ), getAuthenticatedUserSid() ).createAndStartPersistPipeline(storedAdapter); } diff --git a/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/builder/BoundPipelineElementBuilder.java b/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/builder/BoundPipelineElementBuilder.java deleted file mode 100644 index 8382c9346c..0000000000 --- a/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/builder/BoundPipelineElementBuilder.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -package org.apache.streampipes.sdk.builder; - -import org.apache.streampipes.model.base.InvocableStreamPipesEntity; -import org.apache.streampipes.model.graph.DataProcessorDescription; -import org.apache.streampipes.model.graph.DataProcessorInvocation; -import org.apache.streampipes.model.graph.DataSinkDescription; -import org.apache.streampipes.model.graph.DataSinkInvocation; -import org.apache.streampipes.model.template.BoundPipelineElement; - -import java.util.ArrayList; -import java.util.List; -import java.util.UUID; - -public class BoundPipelineElementBuilder { - - private BoundPipelineElement boundPipelineElement; - private InvocableStreamPipesEntity streamPipesEntity; - private List<BoundPipelineElement> connectedTo; - - private BoundPipelineElementBuilder(InvocableStreamPipesEntity streamPipesEntity) { - this.streamPipesEntity = streamPipesEntity; - // TODO fix this hack - this.streamPipesEntity.setElementId(this.streamPipesEntity.getBelongsTo() + ":" + UUID.randomUUID().toString()); - this.boundPipelineElement = new BoundPipelineElement(); - this.connectedTo = new ArrayList<>(); - } - - public static BoundPipelineElementBuilder create(DataProcessorDescription dataProcessorDescription) { - return new BoundPipelineElementBuilder(new DataProcessorInvocation(dataProcessorDescription)); - } - - public static BoundPipelineElementBuilder create(DataSinkDescription dataSinkDescription) { - return new BoundPipelineElementBuilder(new DataSinkInvocation(dataSinkDescription)); - } - - public BoundPipelineElementBuilder connectTo(BoundPipelineElement boundPipelineElement) { - this.connectedTo.add(boundPipelineElement); - return this; - } - - public BoundPipelineElement build() { - this.boundPipelineElement.setPipelineElementTemplate(streamPipesEntity); - this.boundPipelineElement.setConnectedTo(connectedTo); - return boundPipelineElement; - } -} diff --git a/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/builder/PipelineTemplateBuilder.java b/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/builder/PipelineTemplateBuilder.java deleted file mode 100644 index 432e2fd5c6..0000000000 --- a/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/builder/PipelineTemplateBuilder.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -package org.apache.streampipes.sdk.builder; - -import org.apache.streampipes.model.template.BoundPipelineElement; -import org.apache.streampipes.model.template.PipelineTemplateDescription; - -import java.util.ArrayList; -import java.util.List; - -public class PipelineTemplateBuilder { - - private PipelineTemplateDescription pipelineTemplateDescription; - private List<BoundPipelineElement> boundPipelineElements; - private String appId; - - private PipelineTemplateBuilder(String internalId, String pipelineTemplateName, String pipelineTemplateDescription) { - this.pipelineTemplateDescription = new PipelineTemplateDescription(); - this.pipelineTemplateDescription.setPipelineTemplateName(pipelineTemplateName); - this.pipelineTemplateDescription.setPipelineTemplateId(internalId); - this.pipelineTemplateDescription.setPipelineTemplateDescription(pipelineTemplateDescription); - this.boundPipelineElements = new ArrayList<>(); - } - - public static PipelineTemplateBuilder create(String internalId, String pipelineTemplateName, - String pipelineTemplateDescription) { - return new PipelineTemplateBuilder(internalId, pipelineTemplateName, pipelineTemplateDescription); - } - - public PipelineTemplateBuilder setAppId(String id) { - this.pipelineTemplateDescription.setAppId(id); - return this; - } - - - public PipelineTemplateBuilder boundPipelineElementTemplate(BoundPipelineElement boundPipelineElement) { - this.boundPipelineElements.add(boundPipelineElement); - return this; - } - - public PipelineTemplateDescription build() { - this.pipelineTemplateDescription.setBoundTo(boundPipelineElements); - return pipelineTemplateDescription; - } - -} diff --git a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/migrations/AvailableMigrations.java b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/migrations/AvailableMigrations.java index fda4cd49ec..ed731d7ce8 100644 --- a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/migrations/AvailableMigrations.java +++ b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/migrations/AvailableMigrations.java @@ -26,6 +26,7 @@ import org.apache.streampipes.service.core.migrations.v090.UpdateUsernameViewMig import org.apache.streampipes.service.core.migrations.v093.AdapterMigration; import org.apache.streampipes.service.core.migrations.v093.StoreEmailTemplatesMigration; import org.apache.streampipes.service.core.migrations.v095.MergeFilenamesAndRenameDuplicatesMigration; +import org.apache.streampipes.service.core.migrations.v970.AddDataLakePipelineTemplateMigration; import org.apache.streampipes.service.core.migrations.v970.AddLinkSettingsMigration; import org.apache.streampipes.service.core.migrations.v970.AddRolesToUserDbMigration; import org.apache.streampipes.service.core.migrations.v970.DataExplorerDataViewMigration; @@ -50,7 +51,8 @@ public class AvailableMigrations { new DataExplorerDataViewMigration(), new ModifyAssetLinkTypeMigration(), new RemoveNodesFromOpcUaAdaptersMigration(), - new AddRolesToUserDbMigration() + new AddRolesToUserDbMigration(), + new AddDataLakePipelineTemplateMigration() ); } } diff --git a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/migrations/v970/AddDataLakePipelineTemplateMigration.java b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/migrations/v970/AddDataLakePipelineTemplateMigration.java new file mode 100644 index 0000000000..fd7fced44b --- /dev/null +++ b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/migrations/v970/AddDataLakePipelineTemplateMigration.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.service.core.migrations.v970; + +import org.apache.streampipes.manager.template.instances.PersistDataLakePipelineTemplate; +import org.apache.streampipes.model.template.CompactPipelineTemplate; +import org.apache.streampipes.service.core.migrations.Migration; +import org.apache.streampipes.storage.api.CRUDStorage; +import org.apache.streampipes.storage.management.StorageDispatcher; + +import java.io.IOException; + +public class AddDataLakePipelineTemplateMigration implements Migration { + + private final CRUDStorage<CompactPipelineTemplate> storage; + + public AddDataLakePipelineTemplateMigration() { + this.storage = StorageDispatcher.INSTANCE.getNoSqlStore().getPipelineTemplateStorage(); + } + + @Override + public boolean shouldExecute() { + return storage.getElementById(PersistDataLakePipelineTemplate.DATA_LAKE_TEMPLATE_ID) == null; + } + + @Override + public void executeMigration() throws IOException { + storage.persist(new PersistDataLakePipelineTemplate().getTemplate()); + } + + @Override + public String getDescription() { + return "Adding data lake pipeline template to template storage"; + } +} diff --git a/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/INoSqlStorage.java b/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/INoSqlStorage.java index 482051e353..0a64296929 100644 --- a/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/INoSqlStorage.java +++ b/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/INoSqlStorage.java @@ -29,6 +29,7 @@ import org.apache.streampipes.model.datalake.DataLakeMeasure; import org.apache.streampipes.model.extensions.configuration.SpServiceConfiguration; import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceRegistration; import org.apache.streampipes.model.file.FileMetadata; +import org.apache.streampipes.model.template.CompactPipelineTemplate; public interface INoSqlStorage { @@ -87,4 +88,6 @@ public interface INoSqlStorage { CRUDStorage<Role> getRoleStorage(); CRUDStorage<Privilege> getPrivilegeStorage(); + + CRUDStorage<CompactPipelineTemplate> getPipelineTemplateStorage(); } diff --git a/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/CouchDbStorageManager.java b/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/CouchDbStorageManager.java index ab618c9512..ab2ce7e4b1 100644 --- a/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/CouchDbStorageManager.java +++ b/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/CouchDbStorageManager.java @@ -29,6 +29,7 @@ import org.apache.streampipes.model.datalake.DataLakeMeasure; import org.apache.streampipes.model.extensions.configuration.SpServiceConfiguration; import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceRegistration; import org.apache.streampipes.model.file.FileMetadata; +import org.apache.streampipes.model.template.CompactPipelineTemplate; import org.apache.streampipes.storage.api.CRUDStorage; import org.apache.streampipes.storage.api.IAdapterStorage; import org.apache.streampipes.storage.api.IDataProcessorStorage; @@ -245,4 +246,12 @@ public enum CouchDbStorageManager implements INoSqlStorage { public CRUDStorage<Privilege> getPrivilegeStorage() { return new PrivilegeStorageImpl(); } + + @Override + public CRUDStorage<CompactPipelineTemplate> getPipelineTemplateStorage() { + return new DefaultCrudStorage<>( + () -> Utils.getCouchDbGsonClient("pipeline-templates"), + CompactPipelineTemplate.class + ); + } } diff --git a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/StreamPipesNotificationSink.java b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/StreamPipesNotificationSink.java index eaa0917287..d4fc478e55 100644 --- a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/StreamPipesNotificationSink.java +++ b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/StreamPipesNotificationSink.java @@ -84,7 +84,7 @@ public abstract class StreamPipesNotificationSink extends StreamPipesDataSink { // convert input given in minutes to seconds // this is later used to determine if a notification should be sent this.silentPeriodInSeconds = parameters.extractor() - .singleValueParameter(KEY_SILENT_PERIOD, Long.class) * 60 + .singleValueParameter(KEY_SILENT_PERIOD, Integer.class) * 60 ; } diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/template/PipelineTemplateDescriptionContainer.java b/ui/projects/streampipes/platform-services/src/lib/apis/compact-pipeline.service.ts similarity index 54% copy from streampipes-model/src/main/java/org/apache/streampipes/model/template/PipelineTemplateDescriptionContainer.java copy to ui/projects/streampipes/platform-services/src/lib/apis/compact-pipeline.service.ts index e523fa0bb6..2639ca3440 100644 --- a/streampipes-model/src/main/java/org/apache/streampipes/model/template/PipelineTemplateDescriptionContainer.java +++ b/ui/projects/streampipes/platform-services/src/lib/apis/compact-pipeline.service.ts @@ -15,30 +15,29 @@ * limitations under the License. * */ -package org.apache.streampipes.model.template; -import java.util.ArrayList; -import java.util.List; +import { Injectable } from '@angular/core'; +import { HttpClient } from '@angular/common/http'; +import { + CompactPipeline, + PlatformServicesCommons, +} from '@streampipes/platform-services'; +import { Observable } from 'rxjs'; -public class PipelineTemplateDescriptionContainer { +@Injectable({ + providedIn: 'root', +}) +export class CompactPipelineService { + constructor( + private http: HttpClient, + private platformServicesCommons: PlatformServicesCommons, + ) {} - private List<PipelineTemplateDescription> list; + create(pipeline: CompactPipeline): Observable<any> { + return this.http.post(this.baseUrl, pipeline); + } - public PipelineTemplateDescriptionContainer() { - super(); - this.list = new ArrayList<>(); - } - - public PipelineTemplateDescriptionContainer(List<PipelineTemplateDescription> dataStreams) { - super(); - this.list = dataStreams; - } - - public List<PipelineTemplateDescription> getList() { - return list; - } - - public void setList(List<PipelineTemplateDescription> list) { - this.list = list; - } + get baseUrl(): string { + return `${this.platformServicesCommons.apiBasePath}/compact-pipelines`; + } } diff --git a/ui/projects/streampipes/platform-services/src/lib/apis/pipeline-template.service.ts b/ui/projects/streampipes/platform-services/src/lib/apis/pipeline-template.service.ts index 2df674c729..abd4ccf764 100644 --- a/ui/projects/streampipes/platform-services/src/lib/apis/pipeline-template.service.ts +++ b/ui/projects/streampipes/platform-services/src/lib/apis/pipeline-template.service.ts @@ -19,10 +19,9 @@ import { Injectable } from '@angular/core'; import { HttpClient } from '@angular/common/http'; import { - FreeTextStaticProperty, - PipelineOperationStatus, - PipelineTemplateInvocation, - StaticPropertyUnion, + CompactPipelineTemplate, + Pipeline, + PipelineTemplateGenerationRequest, } from '../model/gen/streampipes-model'; import { map } from 'rxjs/operators'; import { Observable } from 'rxjs'; @@ -37,96 +36,24 @@ export class PipelineTemplateService { return '/streampipes-backend'; } - // getDataSets(): Observable<DataSetDescription[]> { - // return this.http - // .get(this.getServerUrl() + '/api/v2/users/'+ this.authStatusService.email + '/pipeline-templates/streams') - // .pipe(map(response => { - // - // - // - // // TODO remove this - // // quick fix to deserialize URIs - // response['@graph'].forEach(function (object) { - // if (object['sp:domainProperty'] != undefined) { - // // object['sp:domainProperty']['@type'] = "sp:URI"; - // object['sp:domainProperty'] = object['sp:domainProperty']['@id']; - // delete object['sp:domainProperty']['@id']; - // } - // }); - // - // const res = this.tsonLdSerializerService.fromJsonLd(response, 'sp:DataStreamContainer'); - // return res.list; - // })); - // } - // - // getOperators(dataSet: DataSetDescription): Observable<PipelineTemplateDescription[]> { - // return this.http - // .get(this.getServerUrl() + '/api/v2/users/'+ this.authStatusService.email + '/pipeline-templates?dataset=' + dataSet.id) - // .pipe(map(response => { - // const res = this.tsonLdSerializerService.fromJsonLd(response, 'sp:PipelineTemplateDescriptionContainer'); - // return res.list; - // })); - // } - - getPipelineTemplateInvocation( - dataSetId: string, - templateId: string, - ): Observable<PipelineTemplateInvocation> { - return this.http - .get( - `${this.getServerUrl()}/api/v2/pipeline-templates/invocation?streamId=${dataSetId}&templateId=${templateId}`, - ) - .pipe( - map(data => { - return PipelineTemplateInvocation.fromData( - data as PipelineTemplateInvocation, - ); - }), - ); - - // .pipe(map(response: PipelineTemplateInvocation => { - - // Currently tsonld dows not support objects that just contain one root object without an enclosing @graph array - // const res = new PipelineTemplateInvocation(response['@id']); - // res.dataSetId = response['sp:hasDataSetId']; - // res.name = response['hasElementName']; - // res.pipelineTemplateId = response['sp:hasInternalName']; - - // TODO find better solution - // This will remove preconfigured values from the UI - // res.list.forEach(property => { - // if (this.isFreeTextStaticProperty(property)) { - // if (this.asFreeTextStaticProperty(property).value !== undefined) { - // this.asFreeTextStaticProperty(property).render = false; - // } - // } - // }); - // return res; - // })); + findAll(): Observable<CompactPipelineTemplate[]> { + return this.http.get<CompactPipelineTemplate[]>(`${this.baseUrl}`); } - isFreeTextStaticProperty(val) { - return val instanceof FreeTextStaticProperty; + findById(id: string): Observable<CompactPipelineTemplate> { + return this.http.get<CompactPipelineTemplate>(`${this.baseUrl}/${id}`); } - asFreeTextStaticProperty(val: StaticPropertyUnion): FreeTextStaticProperty { - return val as FreeTextStaticProperty; + getPipelineForTemplate( + id: string, + request: PipelineTemplateGenerationRequest, + ): Observable<Pipeline> { + return this.http + .post(`${this.baseUrl}/${id}/pipeline`, request) + .pipe(map(res => Pipeline.fromData(res as any))); } - createPipelineTemplateInvocation( - invocation: PipelineTemplateInvocation, - ): Observable<PipelineOperationStatus> { - return this.http - .post( - `${this.getServerUrl()}/api/v2/pipeline-templates`, - invocation, - ) - .pipe( - map(result => - PipelineOperationStatus.fromData( - result as PipelineOperationStatus, - ), - ), - ); + get baseUrl(): string { + return `${this.getServerUrl()}/api/v2/pipeline-templates`; } } diff --git a/ui/projects/streampipes/platform-services/src/lib/model/gen/streampipes-model.ts b/ui/projects/streampipes/platform-services/src/lib/model/gen/streampipes-model.ts index 1ae8e4e32d..235a32498e 100644 --- a/ui/projects/streampipes/platform-services/src/lib/model/gen/streampipes-model.ts +++ b/ui/projects/streampipes/platform-services/src/lib/model/gen/streampipes-model.ts @@ -20,7 +20,7 @@ /* tslint:disable */ /* eslint-disable */ // @ts-nocheck -// Generated using typescript-generator version 3.2.1263 on 2024-09-30 14:11:40. +// Generated using typescript-generator version 3.2.1263 on 2024-10-02 14:54:56. export class NamedStreamPipesEntity implements Storable { '@class': @@ -773,6 +773,90 @@ export class ColorPickerStaticProperty extends StaticProperty { } } +export class CompactPipeline { + createOptions: CreateOptions; + description: string; + id: string; + name: string; + pipelineElements: CompactPipelineElement[]; + + static fromData( + data: CompactPipeline, + target?: CompactPipeline, + ): CompactPipeline { + if (!data) { + return data; + } + const instance = target || new CompactPipeline(); + instance.createOptions = CreateOptions.fromData(data.createOptions); + instance.description = data.description; + instance.id = data.id; + instance.name = data.name; + instance.pipelineElements = __getCopyArrayFn( + CompactPipelineElement.fromData, + )(data.pipelineElements); + return instance; + } +} + +export class CompactPipelineElement { + configuration: { [index: string]: any }[]; + connectedTo: string[]; + id: string; + ref: string; + type: string; + + static fromData( + data: CompactPipelineElement, + target?: CompactPipelineElement, + ): CompactPipelineElement { + if (!data) { + return data; + } + const instance = target || new CompactPipelineElement(); + instance.configuration = __getCopyArrayFn( + __getCopyObjectFn(__identity<any>()), + )(data.configuration); + instance.connectedTo = __getCopyArrayFn(__identity<string>())( + data.connectedTo, + ); + instance.id = data.id; + instance.ref = data.ref; + instance.type = data.type; + return instance; + } +} + +export class CompactPipelineTemplate implements Storable { + description: string; + elementId: string; + name: string; + pipeline: CompactPipelineElement[]; + placeholders: PipelinePlaceholders; + rev: string; + + static fromData( + data: CompactPipelineTemplate, + target?: CompactPipelineTemplate, + ): CompactPipelineTemplate { + if (!data) { + return data; + } + const instance = target || new CompactPipelineTemplate(); + instance.description = data.description; + instance.elementId = data.elementId; + instance.name = data.name; + instance.pipeline = __getCopyArrayFn(CompactPipelineElement.fromData)( + data.pipeline, + ); + instance.placeholders = PipelinePlaceholders.fromData( + data.placeholders, + ); + instance.rev = data.rev; + return instance; + } +} + export class ConfigItem { configurationScope: ConfigurationScope; description: string; @@ -899,6 +983,24 @@ export class CreateNestedRuleDescription extends SchemaTransformationRuleDescrip } } +export class CreateOptions { + persist: boolean; + start: boolean; + + static fromData( + data: CreateOptions, + target?: CreateOptions, + ): CreateOptions { + if (!data) { + return data; + } + const instance = target || new CreateOptions(); + instance.persist = data.persist; + instance.start = data.start; + return instance; + } +} + export class CustomOutputStrategy extends OutputStrategy { '@class': 'org.apache.streampipes.model.output.CustomOutputStrategy'; 'availablePropertyKeys': string[]; @@ -2840,6 +2942,46 @@ export class PipelineOperationStatus { } } +export class PipelinePlaceholderConfig { + id: string; + ref: string; + + static fromData( + data: PipelinePlaceholderConfig, + target?: PipelinePlaceholderConfig, + ): PipelinePlaceholderConfig { + if (!data) { + return data; + } + const instance = target || new PipelinePlaceholderConfig(); + instance.id = data.id; + instance.ref = data.ref; + return instance; + } +} + +export class PipelinePlaceholders { + requiredConfigs: PipelinePlaceholderConfig[]; + requiredStreamInputs: string[]; + + static fromData( + data: PipelinePlaceholders, + target?: PipelinePlaceholders, + ): PipelinePlaceholders { + if (!data) { + return data; + } + const instance = target || new PipelinePlaceholders(); + instance.requiredConfigs = __getCopyArrayFn( + PipelinePlaceholderConfig.fromData, + )(data.requiredConfigs); + instance.requiredStreamInputs = __getCopyArrayFn(__identity<string>())( + data.requiredStreamInputs, + ); + return instance; + } +} + export class PipelinePreviewModel { elementIdMappings: { [index: string]: string }; previewId: string; @@ -2908,6 +3050,30 @@ export class PipelineTemplateDescription extends NamedStreamPipesEntity { } } +export class PipelineTemplateGenerationRequest { + pipelineDescription: string; + pipelineName: string; + streams: { [index: string]: string }; + template: CompactPipelineTemplate; + + static fromData( + data: PipelineTemplateGenerationRequest, + target?: PipelineTemplateGenerationRequest, + ): PipelineTemplateGenerationRequest { + if (!data) { + return data; + } + const instance = target || new PipelineTemplateGenerationRequest(); + instance.pipelineDescription = data.pipelineDescription; + instance.pipelineName = data.pipelineName; + instance.streams = __getCopyObjectFn(__identity<string>())( + data.streams, + ); + instance.template = CompactPipelineTemplate.fromData(data.template); + return instance; + } +} + export class PipelineTemplateInvocation { '@class': 'org.apache.streampipes.model.template.PipelineTemplateInvocation'; 'dataStreamId': string; diff --git a/ui/projects/streampipes/platform-services/src/public-api.ts b/ui/projects/streampipes/platform-services/src/public-api.ts index be270b9fa6..caa7068565 100644 --- a/ui/projects/streampipes/platform-services/src/public-api.ts +++ b/ui/projects/streampipes/platform-services/src/public-api.ts @@ -26,6 +26,7 @@ export * from './lib/apis/commons.service'; export * from './lib/apis/adapter.service'; export * from './lib/apis/adapter-monitoring.service'; export * from './lib/apis/asset-management.service'; +export * from './lib/apis/compact-pipeline.service'; export * from './lib/apis/data-view-data-explorer.service'; export * from './lib/apis/datalake-rest.service'; export * from './lib/apis/dashboard.service'; diff --git a/ui/src/app/connect/dialog/adapter-started/adapter-started-dialog.component.ts b/ui/src/app/connect/dialog/adapter-started/adapter-started-dialog.component.ts index 9d385b4516..598dae37ae 100644 --- a/ui/src/app/connect/dialog/adapter-started/adapter-started-dialog.component.ts +++ b/ui/src/app/connect/dialog/adapter-started/adapter-started-dialog.component.ts @@ -21,6 +21,8 @@ import { ShepherdService } from '../../../services/tour/shepherd.service'; import { AdapterDescription, AdapterService, + CompactPipeline, + CompactPipelineElement, ErrorMessage, PipelineOperationStatus, PipelineTemplateService, @@ -28,7 +30,7 @@ import { SpLogMessage, } from '@streampipes/platform-services'; import { DialogRef } from '@streampipes/shared-ui'; -import { PipelineInvocationBuilder } from '../../../core-services/template/PipelineInvocationBuilder'; +import { CompactPipelineService } from '../../../../../projects/streampipes/platform-services/src/lib/apis/compact-pipeline.service'; @Component({ selector: 'sp-dialog-adapter-started-dialog', @@ -80,6 +82,7 @@ export class AdapterStartedDialog implements OnInit { private adapterService: AdapterService, private shepherdService: ShepherdService, private pipelineTemplateService: PipelineTemplateService, + private compactPipelineService: CompactPipelineService, ) {} ngOnInit() { @@ -201,57 +204,68 @@ export class AdapterStartedDialog implements OnInit { private startSaveInDataLakePipeline(adapterElementId: string) { this.loadingText = 'Creating pipeline to persist data stream'; this.adapterService.getAdapter(adapterElementId).subscribe(adapter => { - const pipelineId = - 'org.apache.streampipes.manager.template.instances.DataLakePipelineTemplate'; this.pipelineTemplateService - .getPipelineTemplateInvocation( - adapter.correspondingDataStreamElementId, - pipelineId, - ) + .findById('sp-internal-persist') .subscribe( - res => { - const pipelineName = 'Persist ' + this.adapter.name; - - const indexName = this.adapter.name; - - const pipelineInvocation = - PipelineInvocationBuilder.create(res) - .setName(pipelineName) - .setTemplateId(pipelineId) - .setFreeTextStaticProperty( - 'db_measurement', - indexName, - ) - .setMappingPropertyUnary( - 'timestamp_mapping', - 's0::' + this.dataLakeTimestampField, - ) - .setOneOfStaticProperty( - 'schema_update', - 'Update schema', - ) - .build(); - - this.pipelineTemplateService - .createPipelineTemplateInvocation( - pipelineInvocation, - ) - .subscribe( - pipelineOperationStatus => { - this.pipelineOperationStatus = - pipelineOperationStatus; - this.startAdapter(adapterElementId, true); - }, - error => { - this.onAdapterFailure(error.error); - }, - ); + template => { + const pipeline: CompactPipeline = { + id: + 'persist-' + + this.adapter.name.replaceAll(' ', '-'), + name: 'Persist ' + this.adapter.name, + description: '', + pipelineElements: this.makeTemplateConfigs( + template.pipeline, + adapter, + ), + createOptions: { + persist: false, + start: true, + }, + }; + this.compactPipelineService.create(pipeline).subscribe( + pipelineOperationStatus => { + this.pipelineOperationStatus = + pipelineOperationStatus; + this.startAdapter(adapterElementId, true); + }, + error => { + this.onAdapterFailure(error.error); + }, + ); }, - res => { - this.templateErrorMessage = res.error; + error => { + this.templateErrorMessage = error.error; this.startAdapter(adapterElementId); }, ); }); } + + makeTemplateConfigs( + template: CompactPipelineElement[], + adapter: AdapterDescription, + ): CompactPipelineElement[] { + template[0].configuration.push( + { + db_measurement: this.adapter.name, + }, + { + timestamp_mapping: 's0::' + this.dataLakeTimestampField, + }, + { + dimensions_selection: adapter.eventSchema.eventProperties + .filter(ep => ep.propertyScope === 'DIMENSION_PROPERTY') + .map(ep => ep.runtimeName), + }, + ); + template.push({ + type: 'stream', + ref: 'stream1', + configuration: undefined, + id: adapter.correspondingDataStreamElementId, + connectedTo: undefined, + }); + return template; + } } diff --git a/ui/src/app/editor/components/pipeline-assembly/pipeline-assembly-options/pipeline-assembly-options.component.html b/ui/src/app/editor/components/pipeline-assembly/pipeline-assembly-options/pipeline-assembly-options.component.html index bbef21c3be..f15c990c06 100644 --- a/ui/src/app/editor/components/pipeline-assembly/pipeline-assembly-options/pipeline-assembly-options.component.html +++ b/ui/src/app/editor/components/pipeline-assembly/pipeline-assembly-options/pipeline-assembly-options.component.html @@ -50,23 +50,37 @@ </button> <span class="assembly-options-divider"></span> <button + mat-button color="accent" - mat-icon-button matTooltip="Auto Layout" [matTooltipPosition]="'above'" (click)="autoLayout()" > <i class="material-icons">settings_overscan</i> + <span> Auto Layout</span> </button> + <span class="assembly-options-divider"></span> <button color="accent" - mat-icon-button + mat-button matTooltip="Add pipeline element" [matTooltipPosition]="'above'" (click)="openDiscoverDialog()" data-cy="sp-editor-add-pipeline-element" > <i class="material-icons">add</i> + <span> Add element</span> + </button> + <button + color="accent" + mat-button + matTooltip="Add template" + [matTooltipPosition]="'above'" + (click)="openAddTemplateDialog()" + data-cy="sp-editor-add-template" + > + <i class="material-icons">add</i> + <span> Add template</span> </button> <sp-pipeline-assembly-options-pipeline-cache [rawPipelineModel]="rawPipelineModel" diff --git a/ui/src/app/editor/components/pipeline-assembly/pipeline-assembly-options/pipeline-assembly-options.component.ts b/ui/src/app/editor/components/pipeline-assembly/pipeline-assembly-options/pipeline-assembly-options.component.ts index 00ecb8e93b..7c0b17e8b6 100644 --- a/ui/src/app/editor/components/pipeline-assembly/pipeline-assembly-options/pipeline-assembly-options.component.ts +++ b/ui/src/app/editor/components/pipeline-assembly/pipeline-assembly-options/pipeline-assembly-options.component.ts @@ -39,7 +39,11 @@ import { PipelineElementUnion, } from '../../../model/editor.model'; import { PipelineAssemblyOptionsPipelineCacheComponent } from './pipeline-assembly-options-pipeline-cache/pipeline-assembly-options-pipeline-cache.component'; -import { PipelineCanvasMetadata } from '@streampipes/platform-services'; +import { + Pipeline, + PipelineCanvasMetadata, +} from '@streampipes/platform-services'; +import { AddTemplateDialogComponent } from '../../../dialog/add-template-dialog/add-template-dialog.component'; @Component({ selector: 'sp-pipeline-assembly-options', @@ -71,6 +75,10 @@ export class PipelineAssemblyOptionsComponent { @Output() togglePreviewEmitter: EventEmitter<void> = new EventEmitter<void>(); + @Output() + displayPipelineTemplateEmitter: EventEmitter<Pipeline> = + new EventEmitter<Pipeline>(); + @ViewChild('assemblyOptionsPipelineCacheComponent') assemblyOptionsCacheComponent: PipelineAssemblyOptionsPipelineCacheComponent; @@ -104,6 +112,21 @@ export class PipelineAssemblyOptionsComponent { }); } + openAddTemplateDialog() { + const dialogRef = this.dialogService.open(AddTemplateDialogComponent, { + panelType: PanelType.SLIDE_IN_PANEL, + title: 'Add template', + width: '50vw', + data: {}, + }); + dialogRef.afterClosed().subscribe(pipeline => { + if (pipeline !== undefined) { + this.clearAssemblyEmitter.emit(); + this.displayPipelineTemplateEmitter.emit(pipeline); + } + }); + } + showClearAssemblyConfirmDialog(event: any) { const dialogRef = this.dialog.open(ConfirmDialogComponent, { width: '500px', diff --git a/ui/src/app/editor/components/pipeline-assembly/pipeline-assembly.component.html b/ui/src/app/editor/components/pipeline-assembly/pipeline-assembly.component.html index e7d49b22b8..249eb2b489 100644 --- a/ui/src/app/editor/components/pipeline-assembly/pipeline-assembly.component.html +++ b/ui/src/app/editor/components/pipeline-assembly/pipeline-assembly.component.html @@ -28,11 +28,13 @@ (clearAssemblyEmitter)="clearAssembly()" (togglePreviewEmitter)="togglePreview()" (savePipelineEmitter)="submit()" + (displayPipelineTemplateEmitter)="displayPipelineTemplate($event)" > </sp-pipeline-assembly-options> </div> <div id="outerAssemblyArea" class="outerAssembly assembly-border"> <sp-pipeline-assembly-drawing-area + *ngIf="rawPipelineModel" #drawingAreaComponent fxFlex="100" style="position: relative" diff --git a/ui/src/app/editor/components/pipeline-assembly/pipeline-assembly.component.ts b/ui/src/app/editor/components/pipeline-assembly/pipeline-assembly.component.ts index bda501b04a..84eb67c593 100644 --- a/ui/src/app/editor/components/pipeline-assembly/pipeline-assembly.component.ts +++ b/ui/src/app/editor/components/pipeline-assembly/pipeline-assembly.component.ts @@ -37,6 +37,7 @@ import { forkJoin } from 'rxjs'; import { Router } from '@angular/router'; import { PipelineAssemblyDrawingAreaComponent } from './pipeline-assembly-drawing-area/pipeline-assembly-drawing-area.component'; import { PipelineAssemblyOptionsComponent } from './pipeline-assembly-options/pipeline-assembly-options.component'; +import { JsplumbService } from '../../services/jsplumb.service'; @Component({ selector: 'sp-pipeline-assembly', @@ -77,6 +78,7 @@ export class PipelineAssemblyComponent implements AfterViewInit { public pipelineValidationService: PipelineValidationService, private dialogService: DialogService, private router: Router, + private jsplumbService: JsplumbService, ) {} ngAfterViewInit() { @@ -110,7 +112,6 @@ export class PipelineAssemblyComponent implements AfterViewInit { * Sends the pipeline to the server */ submit() { - //const pipelineModel = this.pipelineComponent.rawPipelineModel; const pipelineModel = this.rawPipelineModel; const pipeline = this.objectProvider.makePipeline(pipelineModel); this.pipelinePositioningService.collectPipelineElementPositions( @@ -153,4 +154,16 @@ export class PipelineAssemblyComponent implements AfterViewInit { triggerCacheUpdate(): void { this.assemblyOptionsComponent.triggerCacheUpdate(); } + + displayPipelineTemplate(pipeline: Pipeline) { + this.originalPipeline = pipeline; + this.rawPipelineModel = undefined; + setTimeout(() => { + this.rawPipelineModel = this.jsplumbService.makeRawPipeline( + pipeline, + false, + ); + this.drawingAreaComponent.displayPipelineInEditor(true, undefined); + }); + } } diff --git a/ui/src/app/editor/dialog/add-template-dialog/add-template-dialog.component.html b/ui/src/app/editor/dialog/add-template-dialog/add-template-dialog.component.html new file mode 100644 index 0000000000..981ebee58d --- /dev/null +++ b/ui/src/app/editor/dialog/add-template-dialog/add-template-dialog.component.html @@ -0,0 +1,41 @@ +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + ~ + --> + +<div class="sp-dialog-container"> + <div class="sp-dialog-content p-15"> + <div fxFlex="100" fxLayout="column"> + <sp-template-selection + [pipelineTemplates]="pipelineTemplates" + (selectTemplateEmitter)="selectTemplate($event)" + > + </sp-template-selection> + </div> + </div> + <mat-divider></mat-divider> + <div class="sp-dialog-actions"> + <button mat-raised-button color="accent">Next</button> + <button + mat-button + mat-raised-button + class="mat-basic" + (click)="close()" + > + Cancel + </button> + </div> +</div> diff --git a/ui/src/app/editor/dialog/add-template-dialog/add-template-dialog.component.scss b/ui/src/app/editor/dialog/add-template-dialog/add-template-dialog.component.scss new file mode 100644 index 0000000000..e69de29bb2 diff --git a/ui/src/app/editor/dialog/add-template-dialog/add-template-dialog.component.ts b/ui/src/app/editor/dialog/add-template-dialog/add-template-dialog.component.ts new file mode 100644 index 0000000000..1e3b4c05fe --- /dev/null +++ b/ui/src/app/editor/dialog/add-template-dialog/add-template-dialog.component.ts @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +import { Component, OnInit } from '@angular/core'; +import { + CompactPipelineTemplate, + PipelineTemplateGenerationRequest, + PipelineTemplateService, +} from '@streampipes/platform-services'; +import { DialogRef } from '@streampipes/shared-ui'; + +@Component({ + selector: 'sp-add-template-dialog', + templateUrl: './add-template-dialog.component.html', + styleUrls: ['./add-template-dialog.component.scss'], +}) +export class AddTemplateDialogComponent implements OnInit { + pipelineTemplates: CompactPipelineTemplate[] = []; + + constructor( + private pipelineTemplateService: PipelineTemplateService, + private dialogRef: DialogRef<AddTemplateDialogComponent>, + ) {} + + ngOnInit() { + this.pipelineTemplateService + .findAll() + .subscribe(t => (this.pipelineTemplates = t)); + } + + selectTemplate(template: CompactPipelineTemplate): void { + const req: PipelineTemplateGenerationRequest = { + streams: {}, + template, + pipelineName: undefined, + pipelineDescription: undefined, + }; + this.pipelineTemplateService + .getPipelineForTemplate(template.elementId, req) + .subscribe(pipeline => { + this.dialogRef.close(pipeline); + }); + } + + close(): void { + this.dialogRef.close(); + } +} diff --git a/ui/src/app/editor/dialog/add-template-dialog/template-selection/template-selection.component.html b/ui/src/app/editor/dialog/add-template-dialog/template-selection/template-selection.component.html new file mode 100644 index 0000000000..58f92cc707 --- /dev/null +++ b/ui/src/app/editor/dialog/add-template-dialog/template-selection/template-selection.component.html @@ -0,0 +1,39 @@ +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + ~ + --> + +<div fxLayout="column"> + @for (template of pipelineTemplates; track template.elementId) { + <div + fxLayout="row" + class="p-10 m-10 template-item" + (click)="selectTemplate(template)" + fxLayoutAlign="start center" + [attr.data-cy]="template.name.toLowerCase().replaceAll(' ', '_')" + > + <div fxLayout="column" fxLayoutAlign="center start"> + <div class="element-name"> + <b>{{ template.name }}</b> + </div> + <div class="element-description"> + <small>{{ template.description }}</small> + </div> + </div> + </div> + <mat-divider></mat-divider> + } +</div> diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/compact/CompactPipeline.java b/ui/src/app/editor/dialog/add-template-dialog/template-selection/template-selection.component.scss similarity index 70% copy from streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/compact/CompactPipeline.java copy to ui/src/app/editor/dialog/add-template-dialog/template-selection/template-selection.component.scss index 59948aeb03..23ba393d68 100644 --- a/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/compact/CompactPipeline.java +++ b/ui/src/app/editor/dialog/add-template-dialog/template-selection/template-selection.component.scss @@ -1,4 +1,4 @@ -/* +/*! * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. @@ -16,16 +16,14 @@ * */ -package org.apache.streampipes.model.pipeline.compact; +.template-item { +} -import org.apache.streampipes.model.connect.adapter.compact.CreateOptions; +.template-item:nth-child(even) { + background: var(--color-bg-1); +} -import java.util.List; - -public record CompactPipeline( - String id, - String name, - String description, - List<CompactPipelineElement> pipelineElements, - CreateOptions createOptions -) {} +.template-item:hover { + background: var(--color-bg-2); + cursor: pointer; +} diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/template/PipelineTemplateDescriptionContainer.java b/ui/src/app/editor/dialog/add-template-dialog/template-selection/template-selection.component.ts similarity index 55% rename from streampipes-model/src/main/java/org/apache/streampipes/model/template/PipelineTemplateDescriptionContainer.java rename to ui/src/app/editor/dialog/add-template-dialog/template-selection/template-selection.component.ts index e523fa0bb6..3cbd628ed1 100644 --- a/streampipes-model/src/main/java/org/apache/streampipes/model/template/PipelineTemplateDescriptionContainer.java +++ b/ui/src/app/editor/dialog/add-template-dialog/template-selection/template-selection.component.ts @@ -15,30 +15,24 @@ * limitations under the License. * */ -package org.apache.streampipes.model.template; -import java.util.ArrayList; -import java.util.List; +import { Component, EventEmitter, Input, Output } from '@angular/core'; +import { CompactPipelineTemplate } from '@streampipes/platform-services'; -public class PipelineTemplateDescriptionContainer { +@Component({ + selector: 'sp-template-selection', + templateUrl: './template-selection.component.html', + styleUrls: ['./template-selection.component.scss'], +}) +export class TemplateSelectionComponent { + @Input() + pipelineTemplates: CompactPipelineTemplate[] = []; - private List<PipelineTemplateDescription> list; + @Output() + selectTemplateEmitter: EventEmitter<CompactPipelineTemplate> = + new EventEmitter(); - public PipelineTemplateDescriptionContainer() { - super(); - this.list = new ArrayList<>(); - } - - public PipelineTemplateDescriptionContainer(List<PipelineTemplateDescription> dataStreams) { - super(); - this.list = dataStreams; - } - - public List<PipelineTemplateDescription> getList() { - return list; - } - - public void setList(List<PipelineTemplateDescription> list) { - this.list = list; - } + selectTemplate(template: CompactPipelineTemplate): void { + this.selectTemplateEmitter.emit(template); + } } diff --git a/ui/src/app/editor/editor.module.ts b/ui/src/app/editor/editor.module.ts index 0995cda75a..c2742c7199 100644 --- a/ui/src/app/editor/editor.module.ts +++ b/ui/src/app/editor/editor.module.ts @@ -84,6 +84,8 @@ import { DroppedPipelineElementComponent } from './components/pipeline/dropped-p import { InputSchemaPanelComponent } from './dialog/customize/input-schema-panel/input-schema-panel.component'; import { InputSchemaPropertyComponent } from './dialog/customize/input-schema-panel/input-schema-property/input-schema-property.component'; import { SortByRuntimeNamePipe } from './pipes/sort-by-runtime-name.pipe'; +import { AddTemplateDialogComponent } from './dialog/add-template-dialog/add-template-dialog.component'; +import { TemplateSelectionComponent } from './dialog/add-template-dialog/template-selection/template-selection.component'; @NgModule({ imports: [ @@ -125,6 +127,7 @@ import { SortByRuntimeNamePipe } from './pipes/sort-by-runtime-name.pipe'; PlatformServicesModule, ], declarations: [ + AddTemplateDialogComponent, CompatibleElementsComponent, CustomizeComponent, CustomOutputStrategyComponent, @@ -155,6 +158,7 @@ import { SortByRuntimeNamePipe } from './pipes/sort-by-runtime-name.pipe'; PipelineElementTypeFilterPipe, PipelineComponent, PropertySelectionComponent, + TemplateSelectionComponent, SavePipelineComponent, SavePipelineSettingsComponent, SortByRuntimeNamePipe, diff --git a/ui/src/app/editor/services/pipeline-positioning.service.ts b/ui/src/app/editor/services/pipeline-positioning.service.ts index 52342e9e66..853568d11f 100644 --- a/ui/src/app/editor/services/pipeline-positioning.service.ts +++ b/ui/src/app/editor/services/pipeline-positioning.service.ts @@ -165,6 +165,7 @@ export class PipelinePositioningService { elementRef.css('left', g.node(v).x + 'px'); elementRef.css('top', g.node(v).y + 'px'); }); + jsPlumbBridge.repaintEverything(); } layoutGraphFromCanvasMetadata(
