This is an automated email from the ASF dual-hosted git repository.

riemer pushed a commit to branch simplify-element-verification
in repository https://gitbox.apache.org/repos/asf/streampipes.git

commit 257cb6b0ba2a56e012e231460e8c48ead56d9dc4
Author: Dominik Riemer <[email protected]>
AuthorDate: Thu Mar 5 08:25:17 2026 +0100

    chore: Simplify code to verify pipeline elements
---
 .../management/AdapterMasterManagement.java        |  20 +--
 .../manager/verification/AdapterVerifier.java      |  65 ----------
 .../verification/DataProcessorVerifier.java        |  66 ----------
 .../manager/verification/DataSinkVerifier.java     |  66 ----------
 .../manager/verification/DataStreamVerifier.java   |  66 ----------
 .../manager/verification/ElementVerifier.java      | 139 ++++++++-------------
 .../manager/verification/TypedElementVerifier.java |  89 +++++++++++++
 .../verification/extractor/TypeExtractor.java      | 125 ++++++++++++------
 .../verification/messages/VerificationError.java   |  29 -----
 .../verification/messages/VerificationResult.java  |  36 ------
 .../verification/messages/VerificationWarning.java |  29 -----
 .../verification/structure/AbstractVerifier.java   |  44 -------
 .../verification/structure/GeneralVerifier.java    |  48 -------
 .../manager/verification/structure/Verifier.java   |  28 -----
 .../manager/verification/TestTypeExtractor.java    | 114 +++++++++++++++++
 15 files changed, 352 insertions(+), 612 deletions(-)

diff --git 
a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterMasterManagement.java
 
b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterMasterManagement.java
index 4aa3625f0d..925f2447ef 100644
--- 
a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterMasterManagement.java
+++ 
b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterMasterManagement.java
@@ -18,20 +18,20 @@
 package org.apache.streampipes.connect.management.management;
 
 import 
org.apache.streampipes.commons.exceptions.NoServiceEndpointsAvailableException;
-import org.apache.streampipes.commons.exceptions.SepaParseException;
 import org.apache.streampipes.commons.exceptions.connect.AdapterException;
 import org.apache.streampipes.commons.prometheus.adapter.AdapterMetrics;
 import org.apache.streampipes.connect.management.util.GroundingUtils;
 import org.apache.streampipes.loadbalance.LoadManager;
 import org.apache.streampipes.loadbalance.pipeline.ExtensionsLogProvider;
 import 
org.apache.streampipes.manager.execution.endpoint.ExtensionsServiceEndpointGenerator;
-import org.apache.streampipes.manager.verification.DataStreamVerifier;
+import org.apache.streampipes.manager.verification.TypedElementVerifier;
 import org.apache.streampipes.model.SpDataStream;
 import org.apache.streampipes.model.connect.adapter.AdapterDescription;
 import org.apache.streampipes.model.util.ElementIdGenerator;
 import org.apache.streampipes.resource.management.AdapterResourceManager;
 import org.apache.streampipes.resource.management.DataStreamResourceManager;
 import org.apache.streampipes.storage.api.connect.IAdapterStorage;
+import org.apache.streampipes.storage.management.StorageDispatcher;
 import org.apache.streampipes.svcdiscovery.api.model.SpServiceUrlProvider;
 
 import org.slf4j.Logger;
@@ -199,11 +199,15 @@ public class AdapterMasterManagement {
   }
 
   private void installDataSource(SpDataStream stream, String principalSid) 
throws AdapterException {
-    try {
-      new DataStreamVerifier(stream).verifyAndAdd(principalSid, false);
-    } catch (SepaParseException e) {
-      LOG.error("Error while installing data source: {}", 
stream.getElementId(), e);
-      throw new AdapterException();
-    }
+    var storageApi = 
StorageDispatcher.INSTANCE.getNoSqlStore().getPipelineElementDescriptionStorage();
+    var verifier = new TypedElementVerifier<>(
+        stream,
+        storageApi,
+        storageApi::exists,
+        storageApi::storeDataStream,
+        storageApi::update,
+        SpServiceUrlProvider.DATA_STREAM
+    );
+    verifier.verifyAndAdd(principalSid, false);
   }
 }
diff --git 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/AdapterVerifier.java
 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/AdapterVerifier.java
deleted file mode 100644
index e7545675d7..0000000000
--- 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/AdapterVerifier.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.manager.verification;
-
-import 
org.apache.streampipes.commons.exceptions.NoServiceEndpointsAvailableException;
-import org.apache.streampipes.commons.exceptions.SepaParseException;
-import org.apache.streampipes.manager.assets.AssetManager;
-import org.apache.streampipes.model.connect.adapter.AdapterDescription;
-import org.apache.streampipes.svcdiscovery.api.model.SpServiceUrlProvider;
-
-import java.io.IOException;
-
-public class AdapterVerifier extends ElementVerifier<AdapterDescription> {
-
-  public AdapterVerifier(String graphData) throws SepaParseException {
-    super(graphData, AdapterDescription.class);
-  }
-
-  @Override
-  protected StorageState store() {
-    var storageState = StorageState.STORED;
-    if (!storageApi.exists(elementDescription)) {
-      storageApi.storeAdapterDescription(elementDescription);
-    } else {
-      storageState = StorageState.ALREADY_STORED;
-    }
-    return storageState;
-  }
-
-  @Override
-  protected void update() {
-    storageApi.update(elementDescription);
-  }
-
-  @Override
-  protected void storeAssets() throws IOException, 
NoServiceEndpointsAvailableException {
-    if (elementDescription.isIncludesAssets()) {
-      AssetManager.storeAsset(SpServiceUrlProvider.ADAPTER, 
elementDescription.getAppId());
-    }
-  }
-
-  @Override
-  protected void updateAssets() throws IOException, 
NoServiceEndpointsAvailableException {
-    if (elementDescription.isIncludesAssets()) {
-      AssetManager.deleteAsset(elementDescription.getAppId());
-      storeAssets();
-    }
-  }
-}
diff --git 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/DataProcessorVerifier.java
 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/DataProcessorVerifier.java
deleted file mode 100644
index c5a86d8ed7..0000000000
--- 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/DataProcessorVerifier.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.manager.verification;
-
-import 
org.apache.streampipes.commons.exceptions.NoServiceEndpointsAvailableException;
-import org.apache.streampipes.commons.exceptions.SepaParseException;
-import org.apache.streampipes.manager.assets.AssetManager;
-import org.apache.streampipes.model.graph.DataProcessorDescription;
-import org.apache.streampipes.svcdiscovery.api.model.SpServiceUrlProvider;
-
-import java.io.IOException;
-
-public class DataProcessorVerifier extends 
ElementVerifier<DataProcessorDescription> {
-
-  public DataProcessorVerifier(String graphData)
-      throws SepaParseException {
-    super(graphData, DataProcessorDescription.class);
-    // TODO Auto-generated constructor stub
-  }
-
-  @Override
-  protected void collectValidators() {
-    super.collectValidators();
-  }
-
-  @Override
-  protected StorageState store() {
-    StorageState storageState = StorageState.STORED;
-
-    if (!storageApi.exists(elementDescription)) {
-      storageApi.storeDataProcessor(elementDescription);
-    } else {
-      storageState = StorageState.ALREADY_STORED;
-    }
-    return storageState;
-  }
-
-  @Override
-  protected void update() {
-    storageApi.update(elementDescription);
-  }
-
-  @Override
-  protected void storeAssets() throws IOException, 
NoServiceEndpointsAvailableException {
-    if (elementDescription.isIncludesAssets()) {
-      AssetManager.storeAsset(SpServiceUrlProvider.DATA_PROCESSOR, 
elementDescription.getAppId());
-    }
-  }
-
-}
diff --git 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/DataSinkVerifier.java
 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/DataSinkVerifier.java
deleted file mode 100644
index 6bbd41e29e..0000000000
--- 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/DataSinkVerifier.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.manager.verification;
-
-import 
org.apache.streampipes.commons.exceptions.NoServiceEndpointsAvailableException;
-import org.apache.streampipes.commons.exceptions.SepaParseException;
-import org.apache.streampipes.manager.assets.AssetManager;
-import org.apache.streampipes.model.graph.DataSinkDescription;
-import org.apache.streampipes.svcdiscovery.api.model.SpServiceUrlProvider;
-
-import java.io.IOException;
-
-public class DataSinkVerifier extends ElementVerifier<DataSinkDescription> {
-
-
-  public DataSinkVerifier(String graphData)
-      throws SepaParseException {
-    super(graphData, DataSinkDescription.class);
-  }
-
-
-  @Override
-  protected StorageState store() {
-    StorageState storageState = StorageState.STORED;
-    if (!storageApi.exists(elementDescription)) {
-      storageApi.storeDataSink(elementDescription);
-    } else {
-      storageState = StorageState.ALREADY_STORED;
-    }
-    return storageState;
-  }
-
-  @Override
-  protected void collectValidators() {
-    super.collectValidators();
-  }
-
-
-  @Override
-  protected void update() {
-    storageApi.update(elementDescription);
-  }
-
-  @Override
-  protected void storeAssets() throws IOException, 
NoServiceEndpointsAvailableException {
-    if (elementDescription.isIncludesAssets()) {
-      AssetManager.storeAsset(SpServiceUrlProvider.DATA_SINK, 
elementDescription.getAppId());
-    }
-  }
-}
diff --git 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/DataStreamVerifier.java
 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/DataStreamVerifier.java
deleted file mode 100644
index f29c999c14..0000000000
--- 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/DataStreamVerifier.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.manager.verification;
-
-import 
org.apache.streampipes.commons.exceptions.NoServiceEndpointsAvailableException;
-import org.apache.streampipes.manager.assets.AssetManager;
-import org.apache.streampipes.model.SpDataStream;
-import org.apache.streampipes.svcdiscovery.api.model.SpServiceUrlProvider;
-
-import java.io.IOException;
-
-public class DataStreamVerifier extends ElementVerifier<SpDataStream> {
-
-  public DataStreamVerifier(String graphData) {
-    super(graphData, SpDataStream.class);
-  }
-
-  public DataStreamVerifier(SpDataStream stream) {
-    super(stream);
-  }
-
-  @Override
-  protected void collectValidators() {
-    super.collectValidators();
-  }
-
-  @Override
-  protected StorageState store() {
-    StorageState storageState = StorageState.STORED;
-
-    if (!storageApi.exists(elementDescription)) {
-      storageApi.storeDataStream(elementDescription);
-    } else {
-      storageState = StorageState.ALREADY_STORED;
-    }
-    return storageState;
-  }
-
-  @Override
-  protected void update() {
-    storageApi.update(elementDescription);
-  }
-
-  @Override
-  protected void storeAssets() throws IOException, 
NoServiceEndpointsAvailableException {
-    if (elementDescription.isIncludesAssets()) {
-      AssetManager.storeAsset(SpServiceUrlProvider.DATA_STREAM, 
elementDescription.getAppId());
-    }
-  }
-}
diff --git 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/ElementVerifier.java
 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/ElementVerifier.java
index a7215b938f..4b080f8b67 100644
--- 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/ElementVerifier.java
+++ 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/ElementVerifier.java
@@ -19,12 +19,7 @@
 package org.apache.streampipes.manager.verification;
 
 import 
org.apache.streampipes.commons.exceptions.NoServiceEndpointsAvailableException;
-import org.apache.streampipes.commons.exceptions.SepaParseException;
 import org.apache.streampipes.manager.assets.AssetManager;
-import org.apache.streampipes.manager.verification.messages.VerificationError;
-import org.apache.streampipes.manager.verification.messages.VerificationResult;
-import org.apache.streampipes.manager.verification.structure.GeneralVerifier;
-import org.apache.streampipes.manager.verification.structure.Verifier;
 import org.apache.streampipes.model.base.NamedStreamPipesEntity;
 import org.apache.streampipes.model.client.user.Permission;
 import org.apache.streampipes.model.client.user.PermissionBuilder;
@@ -36,9 +31,10 @@ import org.apache.streampipes.model.message.SuccessMessage;
 import org.apache.streampipes.resource.management.SpResourceManager;
 import org.apache.streampipes.serializers.json.JacksonSerializer;
 import 
org.apache.streampipes.storage.api.pipeline.IPipelineElementDescriptionStorage;
-import org.apache.streampipes.storage.management.StorageDispatcher;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -46,96 +42,73 @@ import java.util.List;
 
 public abstract class ElementVerifier<T extends NamedStreamPipesEntity> {
 
-  private String graphData;
-  private Class<T> elementClass;
+  private static final Logger LOG = 
LoggerFactory.getLogger(ElementVerifier.class);
+
+  private final String graphData;
+  private final Class<T> elementClass;
   private final boolean shouldTransform;
 
   protected T elementDescription;
 
-  protected List<VerificationResult> validationResults;
-  protected List<Verifier> validators;
-
-  protected IPipelineElementDescriptionStorage storageApi = StorageDispatcher
-      .INSTANCE
-      .getNoSqlStore()
-      .getPipelineElementDescriptionStorage();
+  protected final IPipelineElementDescriptionStorage storageApi;
 
-  public ElementVerifier(String graphData, Class<T> elementClass) {
+  public ElementVerifier(
+      String graphData,
+      Class<T> elementClass,
+      IPipelineElementDescriptionStorage storageApi
+  ) {
     this.elementClass = elementClass;
     this.graphData = graphData;
+    this.storageApi = storageApi;
     this.shouldTransform = true;
-    this.validators = new ArrayList<>();
-    this.validationResults = new ArrayList<>();
   }
 
-  public ElementVerifier(T elementDescription) {
+  public ElementVerifier(T elementDescription, 
IPipelineElementDescriptionStorage storageApi) {
     this.elementDescription = elementDescription;
+    this.storageApi = storageApi;
+    this.graphData = null;
+    this.elementClass = null;
     this.shouldTransform = false;
-    this.validators = new ArrayList<>();
-    this.validationResults = new ArrayList<>();
-  }
-
-  protected void collectValidators() {
-    validators.add(new GeneralVerifier<>(elementDescription));
   }
 
   protected abstract StorageState store();
 
   protected abstract void update();
 
-  protected void verify() {
-    collectValidators();
-    validators.forEach(validator -> 
validationResults.addAll(validator.validate()));
-  }
+  public Message verifyAndAdd(String principalSid, boolean publicElement) {
+    var transformError = transformEntity();
+    if (transformError != null) {
+      return transformError;
+    }
 
-  public Message verifyAndAdd(String principalSid, boolean publicElement) 
throws SepaParseException {
-    if (shouldTransform) {
+    StorageState state = store();
+    if (state == StorageState.STORED) {
+      createAndStorePermission(principalSid, publicElement);
       try {
-        this.elementDescription = transform();
-      } catch (IOException e) {
-        return new 
ErrorMessage(NotificationType.UNKNOWN_ERROR.uiNotification());
-      }
-    }
-    verify();
-    if (isVerifiedSuccessfully()) {
-      StorageState state = store();
-      if (state == StorageState.STORED) {
-        createAndStorePermission(principalSid, publicElement);
-        try {
-          storeAssets();
-        } catch (IOException | NoServiceEndpointsAvailableException e) {
-          e.printStackTrace();
-        }
-        return successMessage();
-      } else if (state == StorageState.ALREADY_STORED) {
-        return addedToUserSuccessMessage();
-      } else {
-        return skippedSuccessMessage();
+        storeAssets();
+      } catch (IOException | NoServiceEndpointsAvailableException e) {
+        LOG.error("Could not store assets for app id '{}'", 
elementDescription.getAppId(), e);
       }
+      return successMessage();
     } else {
-      return errorMessage();
+      return addedToUserSuccessMessage();
     }
 
   }
 
   public Message verifyAndUpdate() {
-    try {
-      this.elementDescription = transform();
-    } catch (JsonProcessingException e) {
-      return new ErrorMessage(NotificationType.UNKNOWN_ERROR.uiNotification());
+    var transformError = transformEntity();
+    if (transformError != null) {
+      return transformError;
     }
-    verify();
-    if (isVerifiedSuccessfully()) {
-      update();
-      try {
-        updateAssets();
-      } catch (IOException | NoServiceEndpointsAvailableException e) {
-        e.printStackTrace();
-      }
-      return successMessage();
-    } else {
-      return errorMessage();
+
+    update();
+    try {
+      updateAssets();
+    } catch (IOException | NoServiceEndpointsAvailableException e) {
+      LOG.error("Could not update assets for app id '{}'", 
elementDescription.getAppId(), e);
     }
+    return successMessage();
 
   }
 
@@ -148,37 +121,29 @@ public abstract class ElementVerifier<T extends 
NamedStreamPipesEntity> {
     }
   }
 
-  private Message errorMessage() {
-    return new ErrorMessage(elementDescription.getName(), 
collectNotifications());
-  }
-
   private Message successMessage() {
-    List<Notification> notifications = collectNotifications();
+    List<Notification> notifications = new ArrayList<>();
     notifications.add(NotificationType.STORAGE_SUCCESS.uiNotification());
     return new SuccessMessage(elementDescription.getName(), notifications);
   }
 
-  private Message skippedSuccessMessage() {
-    List<Notification> notifications = collectNotifications();
-    notifications.add(new Notification("Already exists", "This element is 
already in your list of elements, skipped."));
-    return new SuccessMessage(elementDescription.getName(), notifications);
-  }
-
   private Message addedToUserSuccessMessage() {
-    List<Notification> notifications = collectNotifications();
+    List<Notification> notifications = new ArrayList<>();
     notifications.add(new Notification("Already stored", "Element description 
already stored, added element to user"));
     return new SuccessMessage(elementDescription.getName(), notifications);
   }
 
-  private List<Notification> collectNotifications() {
-    List<Notification> notifications = new ArrayList<>();
-    validationResults.forEach(vr -> notifications.add(vr.getNotification()));
-    return notifications;
-  }
+  private Message transformEntity() {
+    if (!shouldTransform) {
+      return null;
+    }
 
-  private boolean isVerifiedSuccessfully() {
-    return validationResults.stream()
-                            .noneMatch(validator -> (validator instanceof 
VerificationError));
+    try {
+      this.elementDescription = transform();
+      return null;
+    } catch (IOException e) {
+      return new ErrorMessage(NotificationType.UNKNOWN_ERROR.uiNotification());
+    }
   }
 
   protected T transform() throws JsonProcessingException {
diff --git 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/TypedElementVerifier.java
 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/TypedElementVerifier.java
new file mode 100644
index 0000000000..ca200410e1
--- /dev/null
+++ 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/TypedElementVerifier.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.manager.verification;
+
+import 
org.apache.streampipes.commons.exceptions.NoServiceEndpointsAvailableException;
+import org.apache.streampipes.manager.assets.AssetManager;
+import org.apache.streampipes.model.base.NamedStreamPipesEntity;
+import 
org.apache.streampipes.storage.api.pipeline.IPipelineElementDescriptionStorage;
+import org.apache.streampipes.svcdiscovery.api.model.SpServiceUrlProvider;
+
+import java.io.IOException;
+import java.util.function.Consumer;
+import java.util.function.Predicate;
+
+public class TypedElementVerifier<T extends NamedStreamPipesEntity> extends 
ElementVerifier<T> {
+
+  private final Predicate<T> existsChecker;
+  private final Consumer<T> storeOperation;
+  private final Consumer<T> updateOperation;
+  private final SpServiceUrlProvider serviceUrlProvider;
+
+  public TypedElementVerifier(
+      String graphData,
+      Class<T> elementClass,
+      IPipelineElementDescriptionStorage storageApi,
+      Predicate<T> existsChecker,
+      Consumer<T> storeOperation,
+      Consumer<T> updateOperation,
+      SpServiceUrlProvider serviceUrlProvider
+  ) {
+    super(graphData, elementClass, storageApi);
+    this.existsChecker = existsChecker;
+    this.storeOperation = storeOperation;
+    this.updateOperation = updateOperation;
+    this.serviceUrlProvider = serviceUrlProvider;
+  }
+
+  public TypedElementVerifier(
+      T elementDescription,
+      IPipelineElementDescriptionStorage storageApi,
+      Predicate<T> existsChecker,
+      Consumer<T> storeOperation,
+      Consumer<T> updateOperation,
+      SpServiceUrlProvider serviceUrlProvider
+  ) {
+    super(elementDescription, storageApi);
+    this.existsChecker = existsChecker;
+    this.storeOperation = storeOperation;
+    this.updateOperation = updateOperation;
+    this.serviceUrlProvider = serviceUrlProvider;
+  }
+
+  @Override
+  protected StorageState store() {
+    if (!existsChecker.test(elementDescription)) {
+      storeOperation.accept(elementDescription);
+      return StorageState.STORED;
+    }
+    return StorageState.ALREADY_STORED;
+  }
+
+  @Override
+  protected void update() {
+    updateOperation.accept(elementDescription);
+  }
+
+  @Override
+  protected void storeAssets() throws IOException, 
NoServiceEndpointsAvailableException {
+    if (elementDescription.isIncludesAssets()) {
+      AssetManager.storeAsset(serviceUrlProvider, 
elementDescription.getAppId());
+    }
+  }
+}
diff --git 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/extractor/TypeExtractor.java
 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/extractor/TypeExtractor.java
index 32f5122f45..d860eed539 100644
--- 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/extractor/TypeExtractor.java
+++ 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/extractor/TypeExtractor.java
@@ -19,79 +19,124 @@
 package org.apache.streampipes.manager.verification.extractor;
 
 import org.apache.streampipes.commons.exceptions.SepaParseException;
-import org.apache.streampipes.manager.verification.AdapterVerifier;
-import org.apache.streampipes.manager.verification.DataProcessorVerifier;
-import org.apache.streampipes.manager.verification.DataSinkVerifier;
-import org.apache.streampipes.manager.verification.DataStreamVerifier;
 import org.apache.streampipes.manager.verification.ElementVerifier;
+import org.apache.streampipes.manager.verification.TypedElementVerifier;
 import org.apache.streampipes.model.SpDataStream;
+import org.apache.streampipes.model.base.NamedStreamPipesEntity;
 import org.apache.streampipes.model.connect.adapter.AdapterDescription;
 import org.apache.streampipes.model.graph.DataProcessorDescription;
 import org.apache.streampipes.model.graph.DataSinkDescription;
 import org.apache.streampipes.serializers.json.JacksonSerializer;
+import 
org.apache.streampipes.storage.api.pipeline.IPipelineElementDescriptionStorage;
+import org.apache.streampipes.storage.management.StorageDispatcher;
+import org.apache.streampipes.svcdiscovery.api.model.SpServiceUrlProvider;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-import java.util.logging.Logger;
+import java.util.function.Consumer;
+import java.util.function.Predicate;
 
 public class TypeExtractor {
 
-  private static final Logger logger = Logger.getAnonymousLogger();
+  private static final Logger LOG = 
LoggerFactory.getLogger(TypeExtractor.class);
+  private static final String CLASS_FIELD = "@class";
 
   private final String extensionElementDescription;
+  private final IPipelineElementDescriptionStorage storageApi;
 
   public TypeExtractor(String extensionElementDescription) {
-    this.extensionElementDescription = extensionElementDescription;
+    this(extensionElementDescription, defaultStorageApi());
+  }
 
+  public TypeExtractor(
+      String extensionElementDescription,
+      IPipelineElementDescriptionStorage storageApi
+  ) {
+    this.extensionElementDescription = extensionElementDescription;
+    this.storageApi = storageApi;
   }
 
   public ElementVerifier<?> getTypeVerifier() throws SepaParseException {
+    var jsonClassName = getClassName();
+    LOG.info("Detected type {}", jsonClassName);
+    return getTypeDef(jsonClassName);
+  }
+
+  private String getClassName() throws SepaParseException {
     try {
       ObjectNode jsonNode =
-          
JacksonSerializer.getObjectMapper().readValue(this.extensionElementDescription, 
ObjectNode.class);
-      String jsonClassName = jsonNode.get("@class").asText();
-      return getTypeDef(jsonClassName);
+          
JacksonSerializer.getObjectMapper().readValue(extensionElementDescription, 
ObjectNode.class);
+      JsonNode classNode = jsonNode.get(CLASS_FIELD);
+      if (classNode == null || classNode.isNull()) {
+        throw new SepaParseException();
+      }
+      return classNode.asText();
     } catch (JsonProcessingException e) {
       throw new SepaParseException();
     }
   }
 
   private ElementVerifier<?> getTypeDef(String jsonClassName) throws 
SepaParseException {
-    if (jsonClassName == null) {
-      throw new SepaParseException();
-    } else {
-      if (jsonClassName.equals(ep())) {
-        logger.info("Detected type data stream");
-        return new DataStreamVerifier(extensionElementDescription);
-      } else if (jsonClassName.equals(epa())) {
-        logger.info("Detected type data processor");
-        return new DataProcessorVerifier(extensionElementDescription);
-      } else if (jsonClassName.equals(ec())) {
-        logger.info("Detected type data sink");
-        return new DataSinkVerifier(extensionElementDescription);
-      } else if (jsonClassName.equals(adapter())) {
-        return new AdapterVerifier(extensionElementDescription);
-      } else {
-        throw new SepaParseException();
-      }
-    }
-  }
-
-  private static String ep() {
-    return SpDataStream.class.getCanonicalName();
-  }
-
-  private static String epa() {
-    return DataProcessorDescription.class.getCanonicalName();
+    return switch (jsonClassName) {
+      case "org.apache.streampipes.model.SpDataStream" -> createVerifier(
+          SpDataStream.class,
+          storageApi::exists,
+          storageApi::storeDataStream,
+          storageApi::update,
+          SpServiceUrlProvider.DATA_STREAM
+      );
+      case "org.apache.streampipes.model.graph.DataProcessorDescription" -> 
createVerifier(
+          DataProcessorDescription.class,
+          storageApi::exists,
+          storageApi::storeDataProcessor,
+          storageApi::update,
+          SpServiceUrlProvider.DATA_PROCESSOR
+      );
+      case "org.apache.streampipes.model.graph.DataSinkDescription" -> 
createVerifier(
+          DataSinkDescription.class,
+          storageApi::exists,
+          storageApi::storeDataSink,
+          storageApi::update,
+          SpServiceUrlProvider.DATA_SINK
+      );
+      case "org.apache.streampipes.model.connect.adapter.AdapterDescription" 
-> createVerifier(
+          AdapterDescription.class,
+          storageApi::exists,
+          storageApi::storeAdapterDescription,
+          storageApi::update,
+          SpServiceUrlProvider.ADAPTER
+      );
+      default -> throw new SepaParseException();
+    };
   }
 
-  private static String ec() {
-    return DataSinkDescription.class.getCanonicalName();
+  private <T extends NamedStreamPipesEntity> ElementVerifier<T> createVerifier(
+      Class<T> elementClass,
+      Predicate<T> existsChecker,
+      Consumer<T> storeOperation,
+      Consumer<T> updateOperation,
+      SpServiceUrlProvider serviceUrlProvider
+  ) {
+    return new TypedElementVerifier<>(
+        extensionElementDescription,
+        elementClass,
+        storageApi,
+        existsChecker,
+        storeOperation,
+        updateOperation,
+        serviceUrlProvider
+    );
   }
 
-  private static String adapter() {
-    return AdapterDescription.class.getCanonicalName();
+  private static IPipelineElementDescriptionStorage defaultStorageApi() {
+    return StorageDispatcher
+        .INSTANCE
+        .getNoSqlStore()
+        .getPipelineElementDescriptionStorage();
   }
 
 }
diff --git 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/messages/VerificationError.java
 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/messages/VerificationError.java
deleted file mode 100644
index 107dc7350c..0000000000
--- 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/messages/VerificationError.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.manager.verification.messages;
-
-import org.apache.streampipes.model.message.NotificationType;
-
-public class VerificationError extends VerificationResult {
-
-  public VerificationError(NotificationType type) {
-    super(type);
-  }
-
-}
diff --git 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/messages/VerificationResult.java
 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/messages/VerificationResult.java
deleted file mode 100644
index 12042d553a..0000000000
--- 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/messages/VerificationResult.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.manager.verification.messages;
-
-import org.apache.streampipes.model.message.Notification;
-import org.apache.streampipes.model.message.NotificationType;
-
-public abstract class VerificationResult {
-
-  private NotificationType type;
-
-  public VerificationResult(NotificationType type) {
-    this.type = type;
-  }
-
-  public Notification getNotification() {
-    return new Notification(type.title(), type.description());
-  }
-
-}
diff --git 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/messages/VerificationWarning.java
 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/messages/VerificationWarning.java
deleted file mode 100644
index 0676e73cbb..0000000000
--- 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/messages/VerificationWarning.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.manager.verification.messages;
-
-import org.apache.streampipes.model.message.NotificationType;
-
-public class VerificationWarning extends VerificationResult {
-
-  public VerificationWarning(NotificationType type) {
-    super(type);
-  }
-
-}
diff --git 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/structure/AbstractVerifier.java
 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/structure/AbstractVerifier.java
deleted file mode 100644
index 41732e4bdf..0000000000
--- 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/structure/AbstractVerifier.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.manager.verification.structure;
-
-import org.apache.streampipes.manager.verification.messages.VerificationError;
-import org.apache.streampipes.manager.verification.messages.VerificationResult;
-import 
org.apache.streampipes.manager.verification.messages.VerificationWarning;
-import org.apache.streampipes.model.message.NotificationType;
-
-import java.util.ArrayList;
-import java.util.List;
-
-public abstract class AbstractVerifier implements Verifier {
-
-  protected List<VerificationResult> validationResults;
-
-  public AbstractVerifier() {
-    this.validationResults = new ArrayList<>();
-  }
-
-  protected void addWarning(NotificationType notificationType) {
-    validationResults.add(new VerificationWarning(notificationType));
-  }
-
-  protected void addError(NotificationType notificationType) {
-    validationResults.add(new VerificationError(notificationType));
-  }
-}
diff --git 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/structure/GeneralVerifier.java
 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/structure/GeneralVerifier.java
deleted file mode 100644
index 3aff61a253..0000000000
--- 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/structure/GeneralVerifier.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.manager.verification.structure;
-
-import org.apache.streampipes.manager.verification.messages.VerificationResult;
-import org.apache.streampipes.model.base.NamedStreamPipesEntity;
-import org.apache.streampipes.model.extensions.ExtensionAssetType;
-import org.apache.streampipes.model.message.NotificationType;
-
-import java.util.List;
-
-public class GeneralVerifier<T extends NamedStreamPipesEntity> extends 
AbstractVerifier {
-
-  private final T description;
-
-  public GeneralVerifier(T description) {
-    this.description = description;
-  }
-
-  @Override
-  public List<VerificationResult> validate() {
-    if (!description.isIncludesAssets() || 
!description.getIncludedAssets().contains(ExtensionAssetType.ICON)) {
-      addWarning(NotificationType.WARNING_NO_ICON);
-    }
-    if (description.getName() == null) {
-      addWarning(NotificationType.WARNING_NO_NAME);
-    }
-
-    return validationResults;
-  }
-
-}
diff --git 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/structure/Verifier.java
 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/structure/Verifier.java
deleted file mode 100644
index f7b649812b..0000000000
--- 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/structure/Verifier.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.manager.verification.structure;
-
-import org.apache.streampipes.manager.verification.messages.VerificationResult;
-
-import java.util.List;
-
-public interface Verifier {
-
-  List<VerificationResult> validate();
-}
diff --git 
a/streampipes-pipeline-management/src/test/java/org/apache/streampipes/manager/verification/TestTypeExtractor.java
 
b/streampipes-pipeline-management/src/test/java/org/apache/streampipes/manager/verification/TestTypeExtractor.java
new file mode 100644
index 0000000000..722d3d38f1
--- /dev/null
+++ 
b/streampipes-pipeline-management/src/test/java/org/apache/streampipes/manager/verification/TestTypeExtractor.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.manager.verification;
+
+import org.apache.streampipes.commons.exceptions.SepaParseException;
+import org.apache.streampipes.manager.verification.extractor.TypeExtractor;
+import org.apache.streampipes.model.SpDataStream;
+import org.apache.streampipes.model.connect.adapter.AdapterDescription;
+import org.apache.streampipes.model.graph.DataProcessorDescription;
+import org.apache.streampipes.model.graph.DataSinkDescription;
+import org.apache.streampipes.model.message.NotificationType;
+import 
org.apache.streampipes.storage.api.pipeline.IPipelineElementDescriptionStorage;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+public class TestTypeExtractor {
+
+  private IPipelineElementDescriptionStorage storageApi;
+
+  @BeforeEach
+  public void setUp() {
+    storageApi = mock(IPipelineElementDescriptionStorage.class);
+  }
+
+  @Test
+  public void verifyAndUpdateDataStream() throws SepaParseException {
+    var message = new TypeExtractor(payload(SpDataStream.class), storageApi)
+        .getTypeVerifier()
+        .verifyAndUpdate();
+
+    assertTrue(message.isSuccess());
+    verify(storageApi).update(any(SpDataStream.class));
+  }
+
+  @Test
+  public void verifyAndUpdateDataProcessor() throws SepaParseException {
+    var message = new TypeExtractor(payload(DataProcessorDescription.class), 
storageApi)
+        .getTypeVerifier()
+        .verifyAndUpdate();
+
+    assertTrue(message.isSuccess());
+    verify(storageApi).update(any(DataProcessorDescription.class));
+  }
+
+  @Test
+  public void verifyAndUpdateDataSink() throws SepaParseException {
+    var message = new TypeExtractor(payload(DataSinkDescription.class), 
storageApi)
+        .getTypeVerifier()
+        .verifyAndUpdate();
+
+    assertTrue(message.isSuccess());
+    verify(storageApi).update(any(DataSinkDescription.class));
+  }
+
+  @Test
+  public void verifyAndUpdateAdapter() throws SepaParseException {
+    var message = new TypeExtractor(payload(AdapterDescription.class), 
storageApi)
+        .getTypeVerifier()
+        .verifyAndUpdate();
+
+    assertTrue(message.isSuccess());
+    verify(storageApi).update(any(AdapterDescription.class));
+  }
+
+  @Test
+  public void 
verifyAndUpdateWithoutNameOrIcon_onlyContainsStorageSuccessNotification() 
throws SepaParseException {
+    var message = new TypeExtractor(payload(DataProcessorDescription.class), 
storageApi)
+        .getTypeVerifier()
+        .verifyAndUpdate();
+
+    assertEquals(1, message.getNotifications().size());
+    assertEquals(NotificationType.STORAGE_SUCCESS.title(), 
message.getNotifications().get(0).getTitle());
+  }
+
+  @Test
+  public void missingClassProperty_throwsSepaParseException() {
+    assertThrows(
+        SepaParseException.class,
+        () -> new TypeExtractor("{\"name\":\"test\"}", 
storageApi).getTypeVerifier()
+    );
+  }
+
+  private String payload(Class<?> clazz) {
+    return "{"
+        + "\"@class\":\"" + clazz.getCanonicalName() + "\","
+        + "\"appId\":\"test-app\","
+        + "\"elementId\":\"test-element\""
+        + "}";
+  }
+}

Reply via email to