This is an automated email from the ASF dual-hosted git repository. riemer pushed a commit to branch simplify-element-verification in repository https://gitbox.apache.org/repos/asf/streampipes.git
commit 257cb6b0ba2a56e012e231460e8c48ead56d9dc4 Author: Dominik Riemer <[email protected]> AuthorDate: Thu Mar 5 08:25:17 2026 +0100 chore: Simplify code to verify pipeline elements --- .../management/AdapterMasterManagement.java | 20 +-- .../manager/verification/AdapterVerifier.java | 65 ---------- .../verification/DataProcessorVerifier.java | 66 ---------- .../manager/verification/DataSinkVerifier.java | 66 ---------- .../manager/verification/DataStreamVerifier.java | 66 ---------- .../manager/verification/ElementVerifier.java | 139 ++++++++------------- .../manager/verification/TypedElementVerifier.java | 89 +++++++++++++ .../verification/extractor/TypeExtractor.java | 125 ++++++++++++------ .../verification/messages/VerificationError.java | 29 ----- .../verification/messages/VerificationResult.java | 36 ------ .../verification/messages/VerificationWarning.java | 29 ----- .../verification/structure/AbstractVerifier.java | 44 ------- .../verification/structure/GeneralVerifier.java | 48 ------- .../manager/verification/structure/Verifier.java | 28 ----- .../manager/verification/TestTypeExtractor.java | 114 +++++++++++++++++ 15 files changed, 352 insertions(+), 612 deletions(-) diff --git a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterMasterManagement.java b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterMasterManagement.java index 4aa3625f0d..925f2447ef 100644 --- a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterMasterManagement.java +++ b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterMasterManagement.java @@ -18,20 +18,20 @@ package org.apache.streampipes.connect.management.management; import org.apache.streampipes.commons.exceptions.NoServiceEndpointsAvailableException; -import org.apache.streampipes.commons.exceptions.SepaParseException; import org.apache.streampipes.commons.exceptions.connect.AdapterException; import org.apache.streampipes.commons.prometheus.adapter.AdapterMetrics; import org.apache.streampipes.connect.management.util.GroundingUtils; import org.apache.streampipes.loadbalance.LoadManager; import org.apache.streampipes.loadbalance.pipeline.ExtensionsLogProvider; import org.apache.streampipes.manager.execution.endpoint.ExtensionsServiceEndpointGenerator; -import org.apache.streampipes.manager.verification.DataStreamVerifier; +import org.apache.streampipes.manager.verification.TypedElementVerifier; import org.apache.streampipes.model.SpDataStream; import org.apache.streampipes.model.connect.adapter.AdapterDescription; import org.apache.streampipes.model.util.ElementIdGenerator; import org.apache.streampipes.resource.management.AdapterResourceManager; import org.apache.streampipes.resource.management.DataStreamResourceManager; import org.apache.streampipes.storage.api.connect.IAdapterStorage; +import org.apache.streampipes.storage.management.StorageDispatcher; import org.apache.streampipes.svcdiscovery.api.model.SpServiceUrlProvider; import org.slf4j.Logger; @@ -199,11 +199,15 @@ public class AdapterMasterManagement { } private void installDataSource(SpDataStream stream, String principalSid) throws AdapterException { - try { - new DataStreamVerifier(stream).verifyAndAdd(principalSid, false); - } catch (SepaParseException e) { - LOG.error("Error while installing data source: {}", stream.getElementId(), e); - throw new AdapterException(); - } + var storageApi = StorageDispatcher.INSTANCE.getNoSqlStore().getPipelineElementDescriptionStorage(); + var verifier = new TypedElementVerifier<>( + stream, + storageApi, + storageApi::exists, + storageApi::storeDataStream, + storageApi::update, + SpServiceUrlProvider.DATA_STREAM + ); + verifier.verifyAndAdd(principalSid, false); } } diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/AdapterVerifier.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/AdapterVerifier.java deleted file mode 100644 index e7545675d7..0000000000 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/AdapterVerifier.java +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.streampipes.manager.verification; - -import org.apache.streampipes.commons.exceptions.NoServiceEndpointsAvailableException; -import org.apache.streampipes.commons.exceptions.SepaParseException; -import org.apache.streampipes.manager.assets.AssetManager; -import org.apache.streampipes.model.connect.adapter.AdapterDescription; -import org.apache.streampipes.svcdiscovery.api.model.SpServiceUrlProvider; - -import java.io.IOException; - -public class AdapterVerifier extends ElementVerifier<AdapterDescription> { - - public AdapterVerifier(String graphData) throws SepaParseException { - super(graphData, AdapterDescription.class); - } - - @Override - protected StorageState store() { - var storageState = StorageState.STORED; - if (!storageApi.exists(elementDescription)) { - storageApi.storeAdapterDescription(elementDescription); - } else { - storageState = StorageState.ALREADY_STORED; - } - return storageState; - } - - @Override - protected void update() { - storageApi.update(elementDescription); - } - - @Override - protected void storeAssets() throws IOException, NoServiceEndpointsAvailableException { - if (elementDescription.isIncludesAssets()) { - AssetManager.storeAsset(SpServiceUrlProvider.ADAPTER, elementDescription.getAppId()); - } - } - - @Override - protected void updateAssets() throws IOException, NoServiceEndpointsAvailableException { - if (elementDescription.isIncludesAssets()) { - AssetManager.deleteAsset(elementDescription.getAppId()); - storeAssets(); - } - } -} diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/DataProcessorVerifier.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/DataProcessorVerifier.java deleted file mode 100644 index c5a86d8ed7..0000000000 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/DataProcessorVerifier.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.streampipes.manager.verification; - -import org.apache.streampipes.commons.exceptions.NoServiceEndpointsAvailableException; -import org.apache.streampipes.commons.exceptions.SepaParseException; -import org.apache.streampipes.manager.assets.AssetManager; -import org.apache.streampipes.model.graph.DataProcessorDescription; -import org.apache.streampipes.svcdiscovery.api.model.SpServiceUrlProvider; - -import java.io.IOException; - -public class DataProcessorVerifier extends ElementVerifier<DataProcessorDescription> { - - public DataProcessorVerifier(String graphData) - throws SepaParseException { - super(graphData, DataProcessorDescription.class); - // TODO Auto-generated constructor stub - } - - @Override - protected void collectValidators() { - super.collectValidators(); - } - - @Override - protected StorageState store() { - StorageState storageState = StorageState.STORED; - - if (!storageApi.exists(elementDescription)) { - storageApi.storeDataProcessor(elementDescription); - } else { - storageState = StorageState.ALREADY_STORED; - } - return storageState; - } - - @Override - protected void update() { - storageApi.update(elementDescription); - } - - @Override - protected void storeAssets() throws IOException, NoServiceEndpointsAvailableException { - if (elementDescription.isIncludesAssets()) { - AssetManager.storeAsset(SpServiceUrlProvider.DATA_PROCESSOR, elementDescription.getAppId()); - } - } - -} diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/DataSinkVerifier.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/DataSinkVerifier.java deleted file mode 100644 index 6bbd41e29e..0000000000 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/DataSinkVerifier.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.streampipes.manager.verification; - -import org.apache.streampipes.commons.exceptions.NoServiceEndpointsAvailableException; -import org.apache.streampipes.commons.exceptions.SepaParseException; -import org.apache.streampipes.manager.assets.AssetManager; -import org.apache.streampipes.model.graph.DataSinkDescription; -import org.apache.streampipes.svcdiscovery.api.model.SpServiceUrlProvider; - -import java.io.IOException; - -public class DataSinkVerifier extends ElementVerifier<DataSinkDescription> { - - - public DataSinkVerifier(String graphData) - throws SepaParseException { - super(graphData, DataSinkDescription.class); - } - - - @Override - protected StorageState store() { - StorageState storageState = StorageState.STORED; - if (!storageApi.exists(elementDescription)) { - storageApi.storeDataSink(elementDescription); - } else { - storageState = StorageState.ALREADY_STORED; - } - return storageState; - } - - @Override - protected void collectValidators() { - super.collectValidators(); - } - - - @Override - protected void update() { - storageApi.update(elementDescription); - } - - @Override - protected void storeAssets() throws IOException, NoServiceEndpointsAvailableException { - if (elementDescription.isIncludesAssets()) { - AssetManager.storeAsset(SpServiceUrlProvider.DATA_SINK, elementDescription.getAppId()); - } - } -} diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/DataStreamVerifier.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/DataStreamVerifier.java deleted file mode 100644 index f29c999c14..0000000000 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/DataStreamVerifier.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.streampipes.manager.verification; - -import org.apache.streampipes.commons.exceptions.NoServiceEndpointsAvailableException; -import org.apache.streampipes.manager.assets.AssetManager; -import org.apache.streampipes.model.SpDataStream; -import org.apache.streampipes.svcdiscovery.api.model.SpServiceUrlProvider; - -import java.io.IOException; - -public class DataStreamVerifier extends ElementVerifier<SpDataStream> { - - public DataStreamVerifier(String graphData) { - super(graphData, SpDataStream.class); - } - - public DataStreamVerifier(SpDataStream stream) { - super(stream); - } - - @Override - protected void collectValidators() { - super.collectValidators(); - } - - @Override - protected StorageState store() { - StorageState storageState = StorageState.STORED; - - if (!storageApi.exists(elementDescription)) { - storageApi.storeDataStream(elementDescription); - } else { - storageState = StorageState.ALREADY_STORED; - } - return storageState; - } - - @Override - protected void update() { - storageApi.update(elementDescription); - } - - @Override - protected void storeAssets() throws IOException, NoServiceEndpointsAvailableException { - if (elementDescription.isIncludesAssets()) { - AssetManager.storeAsset(SpServiceUrlProvider.DATA_STREAM, elementDescription.getAppId()); - } - } -} diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/ElementVerifier.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/ElementVerifier.java index a7215b938f..4b080f8b67 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/ElementVerifier.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/ElementVerifier.java @@ -19,12 +19,7 @@ package org.apache.streampipes.manager.verification; import org.apache.streampipes.commons.exceptions.NoServiceEndpointsAvailableException; -import org.apache.streampipes.commons.exceptions.SepaParseException; import org.apache.streampipes.manager.assets.AssetManager; -import org.apache.streampipes.manager.verification.messages.VerificationError; -import org.apache.streampipes.manager.verification.messages.VerificationResult; -import org.apache.streampipes.manager.verification.structure.GeneralVerifier; -import org.apache.streampipes.manager.verification.structure.Verifier; import org.apache.streampipes.model.base.NamedStreamPipesEntity; import org.apache.streampipes.model.client.user.Permission; import org.apache.streampipes.model.client.user.PermissionBuilder; @@ -36,9 +31,10 @@ import org.apache.streampipes.model.message.SuccessMessage; import org.apache.streampipes.resource.management.SpResourceManager; import org.apache.streampipes.serializers.json.JacksonSerializer; import org.apache.streampipes.storage.api.pipeline.IPipelineElementDescriptionStorage; -import org.apache.streampipes.storage.management.StorageDispatcher; import com.fasterxml.jackson.core.JsonProcessingException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.ArrayList; @@ -46,96 +42,73 @@ import java.util.List; public abstract class ElementVerifier<T extends NamedStreamPipesEntity> { - private String graphData; - private Class<T> elementClass; + private static final Logger LOG = LoggerFactory.getLogger(ElementVerifier.class); + + private final String graphData; + private final Class<T> elementClass; private final boolean shouldTransform; protected T elementDescription; - protected List<VerificationResult> validationResults; - protected List<Verifier> validators; - - protected IPipelineElementDescriptionStorage storageApi = StorageDispatcher - .INSTANCE - .getNoSqlStore() - .getPipelineElementDescriptionStorage(); + protected final IPipelineElementDescriptionStorage storageApi; - public ElementVerifier(String graphData, Class<T> elementClass) { + public ElementVerifier( + String graphData, + Class<T> elementClass, + IPipelineElementDescriptionStorage storageApi + ) { this.elementClass = elementClass; this.graphData = graphData; + this.storageApi = storageApi; this.shouldTransform = true; - this.validators = new ArrayList<>(); - this.validationResults = new ArrayList<>(); } - public ElementVerifier(T elementDescription) { + public ElementVerifier(T elementDescription, IPipelineElementDescriptionStorage storageApi) { this.elementDescription = elementDescription; + this.storageApi = storageApi; + this.graphData = null; + this.elementClass = null; this.shouldTransform = false; - this.validators = new ArrayList<>(); - this.validationResults = new ArrayList<>(); - } - - protected void collectValidators() { - validators.add(new GeneralVerifier<>(elementDescription)); } protected abstract StorageState store(); protected abstract void update(); - protected void verify() { - collectValidators(); - validators.forEach(validator -> validationResults.addAll(validator.validate())); - } + public Message verifyAndAdd(String principalSid, boolean publicElement) { + var transformError = transformEntity(); + if (transformError != null) { + return transformError; + } - public Message verifyAndAdd(String principalSid, boolean publicElement) throws SepaParseException { - if (shouldTransform) { + StorageState state = store(); + if (state == StorageState.STORED) { + createAndStorePermission(principalSid, publicElement); try { - this.elementDescription = transform(); - } catch (IOException e) { - return new ErrorMessage(NotificationType.UNKNOWN_ERROR.uiNotification()); - } - } - verify(); - if (isVerifiedSuccessfully()) { - StorageState state = store(); - if (state == StorageState.STORED) { - createAndStorePermission(principalSid, publicElement); - try { - storeAssets(); - } catch (IOException | NoServiceEndpointsAvailableException e) { - e.printStackTrace(); - } - return successMessage(); - } else if (state == StorageState.ALREADY_STORED) { - return addedToUserSuccessMessage(); - } else { - return skippedSuccessMessage(); + storeAssets(); + } catch (IOException | NoServiceEndpointsAvailableException e) { + LOG.error("Could not store assets for app id '{}'", elementDescription.getAppId(), e); } + return successMessage(); } else { - return errorMessage(); + return addedToUserSuccessMessage(); } } public Message verifyAndUpdate() { - try { - this.elementDescription = transform(); - } catch (JsonProcessingException e) { - return new ErrorMessage(NotificationType.UNKNOWN_ERROR.uiNotification()); + var transformError = transformEntity(); + if (transformError != null) { + return transformError; } - verify(); - if (isVerifiedSuccessfully()) { - update(); - try { - updateAssets(); - } catch (IOException | NoServiceEndpointsAvailableException e) { - e.printStackTrace(); - } - return successMessage(); - } else { - return errorMessage(); + + update(); + try { + updateAssets(); + } catch (IOException | NoServiceEndpointsAvailableException e) { + LOG.error("Could not update assets for app id '{}'", elementDescription.getAppId(), e); } + return successMessage(); } @@ -148,37 +121,29 @@ public abstract class ElementVerifier<T extends NamedStreamPipesEntity> { } } - private Message errorMessage() { - return new ErrorMessage(elementDescription.getName(), collectNotifications()); - } - private Message successMessage() { - List<Notification> notifications = collectNotifications(); + List<Notification> notifications = new ArrayList<>(); notifications.add(NotificationType.STORAGE_SUCCESS.uiNotification()); return new SuccessMessage(elementDescription.getName(), notifications); } - private Message skippedSuccessMessage() { - List<Notification> notifications = collectNotifications(); - notifications.add(new Notification("Already exists", "This element is already in your list of elements, skipped.")); - return new SuccessMessage(elementDescription.getName(), notifications); - } - private Message addedToUserSuccessMessage() { - List<Notification> notifications = collectNotifications(); + List<Notification> notifications = new ArrayList<>(); notifications.add(new Notification("Already stored", "Element description already stored, added element to user")); return new SuccessMessage(elementDescription.getName(), notifications); } - private List<Notification> collectNotifications() { - List<Notification> notifications = new ArrayList<>(); - validationResults.forEach(vr -> notifications.add(vr.getNotification())); - return notifications; - } + private Message transformEntity() { + if (!shouldTransform) { + return null; + } - private boolean isVerifiedSuccessfully() { - return validationResults.stream() - .noneMatch(validator -> (validator instanceof VerificationError)); + try { + this.elementDescription = transform(); + return null; + } catch (IOException e) { + return new ErrorMessage(NotificationType.UNKNOWN_ERROR.uiNotification()); + } } protected T transform() throws JsonProcessingException { diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/TypedElementVerifier.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/TypedElementVerifier.java new file mode 100644 index 0000000000..ca200410e1 --- /dev/null +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/TypedElementVerifier.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.manager.verification; + +import org.apache.streampipes.commons.exceptions.NoServiceEndpointsAvailableException; +import org.apache.streampipes.manager.assets.AssetManager; +import org.apache.streampipes.model.base.NamedStreamPipesEntity; +import org.apache.streampipes.storage.api.pipeline.IPipelineElementDescriptionStorage; +import org.apache.streampipes.svcdiscovery.api.model.SpServiceUrlProvider; + +import java.io.IOException; +import java.util.function.Consumer; +import java.util.function.Predicate; + +public class TypedElementVerifier<T extends NamedStreamPipesEntity> extends ElementVerifier<T> { + + private final Predicate<T> existsChecker; + private final Consumer<T> storeOperation; + private final Consumer<T> updateOperation; + private final SpServiceUrlProvider serviceUrlProvider; + + public TypedElementVerifier( + String graphData, + Class<T> elementClass, + IPipelineElementDescriptionStorage storageApi, + Predicate<T> existsChecker, + Consumer<T> storeOperation, + Consumer<T> updateOperation, + SpServiceUrlProvider serviceUrlProvider + ) { + super(graphData, elementClass, storageApi); + this.existsChecker = existsChecker; + this.storeOperation = storeOperation; + this.updateOperation = updateOperation; + this.serviceUrlProvider = serviceUrlProvider; + } + + public TypedElementVerifier( + T elementDescription, + IPipelineElementDescriptionStorage storageApi, + Predicate<T> existsChecker, + Consumer<T> storeOperation, + Consumer<T> updateOperation, + SpServiceUrlProvider serviceUrlProvider + ) { + super(elementDescription, storageApi); + this.existsChecker = existsChecker; + this.storeOperation = storeOperation; + this.updateOperation = updateOperation; + this.serviceUrlProvider = serviceUrlProvider; + } + + @Override + protected StorageState store() { + if (!existsChecker.test(elementDescription)) { + storeOperation.accept(elementDescription); + return StorageState.STORED; + } + return StorageState.ALREADY_STORED; + } + + @Override + protected void update() { + updateOperation.accept(elementDescription); + } + + @Override + protected void storeAssets() throws IOException, NoServiceEndpointsAvailableException { + if (elementDescription.isIncludesAssets()) { + AssetManager.storeAsset(serviceUrlProvider, elementDescription.getAppId()); + } + } +} diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/extractor/TypeExtractor.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/extractor/TypeExtractor.java index 32f5122f45..d860eed539 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/extractor/TypeExtractor.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/extractor/TypeExtractor.java @@ -19,79 +19,124 @@ package org.apache.streampipes.manager.verification.extractor; import org.apache.streampipes.commons.exceptions.SepaParseException; -import org.apache.streampipes.manager.verification.AdapterVerifier; -import org.apache.streampipes.manager.verification.DataProcessorVerifier; -import org.apache.streampipes.manager.verification.DataSinkVerifier; -import org.apache.streampipes.manager.verification.DataStreamVerifier; import org.apache.streampipes.manager.verification.ElementVerifier; +import org.apache.streampipes.manager.verification.TypedElementVerifier; import org.apache.streampipes.model.SpDataStream; +import org.apache.streampipes.model.base.NamedStreamPipesEntity; import org.apache.streampipes.model.connect.adapter.AdapterDescription; import org.apache.streampipes.model.graph.DataProcessorDescription; import org.apache.streampipes.model.graph.DataSinkDescription; import org.apache.streampipes.serializers.json.JacksonSerializer; +import org.apache.streampipes.storage.api.pipeline.IPipelineElementDescriptionStorage; +import org.apache.streampipes.storage.management.StorageDispatcher; +import org.apache.streampipes.svcdiscovery.api.model.SpServiceUrlProvider; import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ObjectNode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -import java.util.logging.Logger; +import java.util.function.Consumer; +import java.util.function.Predicate; public class TypeExtractor { - private static final Logger logger = Logger.getAnonymousLogger(); + private static final Logger LOG = LoggerFactory.getLogger(TypeExtractor.class); + private static final String CLASS_FIELD = "@class"; private final String extensionElementDescription; + private final IPipelineElementDescriptionStorage storageApi; public TypeExtractor(String extensionElementDescription) { - this.extensionElementDescription = extensionElementDescription; + this(extensionElementDescription, defaultStorageApi()); + } + public TypeExtractor( + String extensionElementDescription, + IPipelineElementDescriptionStorage storageApi + ) { + this.extensionElementDescription = extensionElementDescription; + this.storageApi = storageApi; } public ElementVerifier<?> getTypeVerifier() throws SepaParseException { + var jsonClassName = getClassName(); + LOG.info("Detected type {}", jsonClassName); + return getTypeDef(jsonClassName); + } + + private String getClassName() throws SepaParseException { try { ObjectNode jsonNode = - JacksonSerializer.getObjectMapper().readValue(this.extensionElementDescription, ObjectNode.class); - String jsonClassName = jsonNode.get("@class").asText(); - return getTypeDef(jsonClassName); + JacksonSerializer.getObjectMapper().readValue(extensionElementDescription, ObjectNode.class); + JsonNode classNode = jsonNode.get(CLASS_FIELD); + if (classNode == null || classNode.isNull()) { + throw new SepaParseException(); + } + return classNode.asText(); } catch (JsonProcessingException e) { throw new SepaParseException(); } } private ElementVerifier<?> getTypeDef(String jsonClassName) throws SepaParseException { - if (jsonClassName == null) { - throw new SepaParseException(); - } else { - if (jsonClassName.equals(ep())) { - logger.info("Detected type data stream"); - return new DataStreamVerifier(extensionElementDescription); - } else if (jsonClassName.equals(epa())) { - logger.info("Detected type data processor"); - return new DataProcessorVerifier(extensionElementDescription); - } else if (jsonClassName.equals(ec())) { - logger.info("Detected type data sink"); - return new DataSinkVerifier(extensionElementDescription); - } else if (jsonClassName.equals(adapter())) { - return new AdapterVerifier(extensionElementDescription); - } else { - throw new SepaParseException(); - } - } - } - - private static String ep() { - return SpDataStream.class.getCanonicalName(); - } - - private static String epa() { - return DataProcessorDescription.class.getCanonicalName(); + return switch (jsonClassName) { + case "org.apache.streampipes.model.SpDataStream" -> createVerifier( + SpDataStream.class, + storageApi::exists, + storageApi::storeDataStream, + storageApi::update, + SpServiceUrlProvider.DATA_STREAM + ); + case "org.apache.streampipes.model.graph.DataProcessorDescription" -> createVerifier( + DataProcessorDescription.class, + storageApi::exists, + storageApi::storeDataProcessor, + storageApi::update, + SpServiceUrlProvider.DATA_PROCESSOR + ); + case "org.apache.streampipes.model.graph.DataSinkDescription" -> createVerifier( + DataSinkDescription.class, + storageApi::exists, + storageApi::storeDataSink, + storageApi::update, + SpServiceUrlProvider.DATA_SINK + ); + case "org.apache.streampipes.model.connect.adapter.AdapterDescription" -> createVerifier( + AdapterDescription.class, + storageApi::exists, + storageApi::storeAdapterDescription, + storageApi::update, + SpServiceUrlProvider.ADAPTER + ); + default -> throw new SepaParseException(); + }; } - private static String ec() { - return DataSinkDescription.class.getCanonicalName(); + private <T extends NamedStreamPipesEntity> ElementVerifier<T> createVerifier( + Class<T> elementClass, + Predicate<T> existsChecker, + Consumer<T> storeOperation, + Consumer<T> updateOperation, + SpServiceUrlProvider serviceUrlProvider + ) { + return new TypedElementVerifier<>( + extensionElementDescription, + elementClass, + storageApi, + existsChecker, + storeOperation, + updateOperation, + serviceUrlProvider + ); } - private static String adapter() { - return AdapterDescription.class.getCanonicalName(); + private static IPipelineElementDescriptionStorage defaultStorageApi() { + return StorageDispatcher + .INSTANCE + .getNoSqlStore() + .getPipelineElementDescriptionStorage(); } } diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/messages/VerificationError.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/messages/VerificationError.java deleted file mode 100644 index 107dc7350c..0000000000 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/messages/VerificationError.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.streampipes.manager.verification.messages; - -import org.apache.streampipes.model.message.NotificationType; - -public class VerificationError extends VerificationResult { - - public VerificationError(NotificationType type) { - super(type); - } - -} diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/messages/VerificationResult.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/messages/VerificationResult.java deleted file mode 100644 index 12042d553a..0000000000 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/messages/VerificationResult.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.streampipes.manager.verification.messages; - -import org.apache.streampipes.model.message.Notification; -import org.apache.streampipes.model.message.NotificationType; - -public abstract class VerificationResult { - - private NotificationType type; - - public VerificationResult(NotificationType type) { - this.type = type; - } - - public Notification getNotification() { - return new Notification(type.title(), type.description()); - } - -} diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/messages/VerificationWarning.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/messages/VerificationWarning.java deleted file mode 100644 index 0676e73cbb..0000000000 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/messages/VerificationWarning.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.streampipes.manager.verification.messages; - -import org.apache.streampipes.model.message.NotificationType; - -public class VerificationWarning extends VerificationResult { - - public VerificationWarning(NotificationType type) { - super(type); - } - -} diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/structure/AbstractVerifier.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/structure/AbstractVerifier.java deleted file mode 100644 index 41732e4bdf..0000000000 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/structure/AbstractVerifier.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.streampipes.manager.verification.structure; - -import org.apache.streampipes.manager.verification.messages.VerificationError; -import org.apache.streampipes.manager.verification.messages.VerificationResult; -import org.apache.streampipes.manager.verification.messages.VerificationWarning; -import org.apache.streampipes.model.message.NotificationType; - -import java.util.ArrayList; -import java.util.List; - -public abstract class AbstractVerifier implements Verifier { - - protected List<VerificationResult> validationResults; - - public AbstractVerifier() { - this.validationResults = new ArrayList<>(); - } - - protected void addWarning(NotificationType notificationType) { - validationResults.add(new VerificationWarning(notificationType)); - } - - protected void addError(NotificationType notificationType) { - validationResults.add(new VerificationError(notificationType)); - } -} diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/structure/GeneralVerifier.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/structure/GeneralVerifier.java deleted file mode 100644 index 3aff61a253..0000000000 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/structure/GeneralVerifier.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.streampipes.manager.verification.structure; - -import org.apache.streampipes.manager.verification.messages.VerificationResult; -import org.apache.streampipes.model.base.NamedStreamPipesEntity; -import org.apache.streampipes.model.extensions.ExtensionAssetType; -import org.apache.streampipes.model.message.NotificationType; - -import java.util.List; - -public class GeneralVerifier<T extends NamedStreamPipesEntity> extends AbstractVerifier { - - private final T description; - - public GeneralVerifier(T description) { - this.description = description; - } - - @Override - public List<VerificationResult> validate() { - if (!description.isIncludesAssets() || !description.getIncludedAssets().contains(ExtensionAssetType.ICON)) { - addWarning(NotificationType.WARNING_NO_ICON); - } - if (description.getName() == null) { - addWarning(NotificationType.WARNING_NO_NAME); - } - - return validationResults; - } - -} diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/structure/Verifier.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/structure/Verifier.java deleted file mode 100644 index f7b649812b..0000000000 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/structure/Verifier.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.streampipes.manager.verification.structure; - -import org.apache.streampipes.manager.verification.messages.VerificationResult; - -import java.util.List; - -public interface Verifier { - - List<VerificationResult> validate(); -} diff --git a/streampipes-pipeline-management/src/test/java/org/apache/streampipes/manager/verification/TestTypeExtractor.java b/streampipes-pipeline-management/src/test/java/org/apache/streampipes/manager/verification/TestTypeExtractor.java new file mode 100644 index 0000000000..722d3d38f1 --- /dev/null +++ b/streampipes-pipeline-management/src/test/java/org/apache/streampipes/manager/verification/TestTypeExtractor.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.manager.verification; + +import org.apache.streampipes.commons.exceptions.SepaParseException; +import org.apache.streampipes.manager.verification.extractor.TypeExtractor; +import org.apache.streampipes.model.SpDataStream; +import org.apache.streampipes.model.connect.adapter.AdapterDescription; +import org.apache.streampipes.model.graph.DataProcessorDescription; +import org.apache.streampipes.model.graph.DataSinkDescription; +import org.apache.streampipes.model.message.NotificationType; +import org.apache.streampipes.storage.api.pipeline.IPipelineElementDescriptionStorage; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + +public class TestTypeExtractor { + + private IPipelineElementDescriptionStorage storageApi; + + @BeforeEach + public void setUp() { + storageApi = mock(IPipelineElementDescriptionStorage.class); + } + + @Test + public void verifyAndUpdateDataStream() throws SepaParseException { + var message = new TypeExtractor(payload(SpDataStream.class), storageApi) + .getTypeVerifier() + .verifyAndUpdate(); + + assertTrue(message.isSuccess()); + verify(storageApi).update(any(SpDataStream.class)); + } + + @Test + public void verifyAndUpdateDataProcessor() throws SepaParseException { + var message = new TypeExtractor(payload(DataProcessorDescription.class), storageApi) + .getTypeVerifier() + .verifyAndUpdate(); + + assertTrue(message.isSuccess()); + verify(storageApi).update(any(DataProcessorDescription.class)); + } + + @Test + public void verifyAndUpdateDataSink() throws SepaParseException { + var message = new TypeExtractor(payload(DataSinkDescription.class), storageApi) + .getTypeVerifier() + .verifyAndUpdate(); + + assertTrue(message.isSuccess()); + verify(storageApi).update(any(DataSinkDescription.class)); + } + + @Test + public void verifyAndUpdateAdapter() throws SepaParseException { + var message = new TypeExtractor(payload(AdapterDescription.class), storageApi) + .getTypeVerifier() + .verifyAndUpdate(); + + assertTrue(message.isSuccess()); + verify(storageApi).update(any(AdapterDescription.class)); + } + + @Test + public void verifyAndUpdateWithoutNameOrIcon_onlyContainsStorageSuccessNotification() throws SepaParseException { + var message = new TypeExtractor(payload(DataProcessorDescription.class), storageApi) + .getTypeVerifier() + .verifyAndUpdate(); + + assertEquals(1, message.getNotifications().size()); + assertEquals(NotificationType.STORAGE_SUCCESS.title(), message.getNotifications().get(0).getTitle()); + } + + @Test + public void missingClassProperty_throwsSepaParseException() { + assertThrows( + SepaParseException.class, + () -> new TypeExtractor("{\"name\":\"test\"}", storageApi).getTypeVerifier() + ); + } + + private String payload(Class<?> clazz) { + return "{" + + "\"@class\":\"" + clazz.getCanonicalName() + "\"," + + "\"appId\":\"test-app\"," + + "\"elementId\":\"test-element\"" + + "}"; + } +}
