This is an automated email from the ASF dual-hosted git repository.

zehnder pushed a commit to branch 
3388-adapter-deletion-does-not-clean-up-asset-links
in repository https://gitbox.apache.org/repos/asf/streampipes.git


The following commit(s) were added to 
refs/heads/3388-adapter-deletion-does-not-clean-up-asset-links by this push:
     new 179ed94965 fix(#3388): Add new module for asset management
179ed94965 is described below

commit 179ed9496578832dd61460399385dd10e45a1ef1
Author: Philipp Zehnder <[email protected]>
AuthorDate: Mon Dec 30 10:15:18 2024 +0100

    fix(#3388): Add new module for asset management
---
 asset-model-management/pom.xml                     |  87 +++++++++
 .../management/AssetModelManagement.java           | 137 ++++++++++++++
 .../management/AssetModelManagementTest.java       | 208 +++++++++++++++++++++
 pom.xml                                            |   1 +
 .../management/AdapterMasterManagement.java        |  66 ++++---
 .../management/AdapterUpdateManagement.java        |   2 +-
 .../streampipes/model/assets/SpAssetModel.java     |  13 ++
 streampipes-rest/pom.xml                           |   6 +
 .../apache/streampipes/rest/ResetManagement.java   |   7 +-
 .../rest/impl/AssetManagementResource.java         |  54 +++---
 .../rest/impl/connect/AdapterResource.java         |  11 +-
 .../deleteAdapterWithAssetLink.spec.ts             |  43 +++++
 .../delete-adapter-dialog.component.ts             |  12 +-
 13 files changed, 580 insertions(+), 67 deletions(-)

diff --git a/asset-model-management/pom.xml b/asset-model-management/pom.xml
new file mode 100644
index 0000000000..44a9f431c7
--- /dev/null
+++ b/asset-model-management/pom.xml
@@ -0,0 +1,87 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  ~
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.streampipes</groupId>
+    <artifactId>streampipes-parent</artifactId>
+    <version>0.97.0-SNAPSHOT</version>
+  </parent>
+
+  <artifactId>asset-model-management</artifactId>
+
+  <properties>
+    <maven.compiler.source>17</maven.compiler.source>
+    <maven.compiler.target>17</maven.compiler.target>
+    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+  </properties>
+
+
+  <dependencies>
+
+    <!-- StreamPipes dependencies -->
+    <dependency>
+      <groupId>org.apache.streampipes</groupId>
+      <artifactId>streampipes-storage-api</artifactId>
+      <version>0.97.0-SNAPSHOT</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.streampipes</groupId>
+      <artifactId>streampipes-storage-management</artifactId>
+      <version>0.97.0-SNAPSHOT</version>
+    </dependency>
+
+    <!-- External dependencies -->
+    <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-databind</artifactId>
+    </dependency>
+
+    <!-- Test dependencies -->
+    <dependency>
+      <groupId>org.junit.jupiter</groupId>
+      <artifactId>junit-jupiter-api</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-core</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+  </dependencies>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-checkstyle-plugin</artifactId>
+        <configuration>
+          <propertyExpansion>
+            checkstyle.config.base.path=${project.parent.basedir}/tools/maven
+          </propertyExpansion>
+        </configuration>
+      </plugin>
+    </plugins>
+  </build>
+
+</project>
\ No newline at end of file
diff --git 
a/asset-model-management/src/main/java/org/apache/streampipes/assetmodel/management/AssetModelManagement.java
 
b/asset-model-management/src/main/java/org/apache/streampipes/assetmodel/management/AssetModelManagement.java
new file mode 100644
index 0000000000..5a0b0e72df
--- /dev/null
+++ 
b/asset-model-management/src/main/java/org/apache/streampipes/assetmodel/management/AssetModelManagement.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.assetmodel.management;
+
+import org.apache.streampipes.model.assets.SpAssetModel;
+import org.apache.streampipes.storage.api.IGenericStorage;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+
+/**
+ * This class provides convinience methods to work with asset models
+ */
+public class AssetModelManagement {
+
+  private final IGenericStorage genericStorage;
+  private final ObjectMapper objectMapper;
+
+  public AssetModelManagement(IGenericStorage genericStorage) {
+    this.genericStorage = genericStorage;
+    this.objectMapper = new ObjectMapper();
+  }
+
+  /**
+   * Retrieves all asset models from generic storage and converts them to a 
list of asset models.
+   *
+   * @return a list of asset models
+   */
+  public List<SpAssetModel> findAll() throws IOException {
+
+    try {
+      return genericStorage.findAll(SpAssetModel.APP_DOC_TYPE)
+                           .stream()
+                           .map(this::convertMapToAssetModel)
+                           .toList();
+    } catch (IOException e) {
+      throw new IOException("Error while fetching all asset models from 
generic storage.", e);
+    }
+  }
+
+
+  /**
+   * Retrieves a single asset model by its ID.
+   *
+   * @param assetId the ID of the asset model to retrieve
+   * @return the asset model
+   * @throws IOException if an I/O error occurs
+   */
+  public SpAssetModel findOne(String assetId) throws NoSuchElementException, 
IOException {
+    var assetModelData = genericStorage.findOne(assetId);
+
+    if (assetModelData == null) {
+      throw new NoSuchElementException("Asset model with ID " + assetId + " 
not found.");
+    }
+
+    return this.convertMapToAssetModel(assetModelData);
+  }
+
+  /**
+   * Creates a new asset model.
+   *
+   * @param asset the asset model to create
+   * @return the created asset model
+   * @throws IOException if an I/O error occurs
+   */
+  public SpAssetModel create(String asset) throws IOException {
+    var assetModelInstanceInDatabase = genericStorage.create(asset);
+
+    return this.convertMapToAssetModel(assetModelInstanceInDatabase);
+  }
+
+  /**
+   *
+   * @param assetId    the ID of the asset model to update
+   * @param assetModel the updated asset model
+   * @return the updated asset model
+   * @throws IOException if an I/O error occurs
+   */
+  public SpAssetModel update(String assetId, SpAssetModel assetModel) throws 
IOException {
+    var assetModelAsJson =  this.convertAssetModelToJson(assetModel);
+    return update(assetId, assetModelAsJson);
+  }
+
+  /**
+   * Updates an existing asset model.
+   *
+   * @param assetId    the ID of the asset model to update
+   * @param assetModelJson the updated asset model as a JSON string
+   * @return the updated asset model
+   * @throws IOException if an I/O error occurs
+   */
+  public SpAssetModel update(String assetId, String assetModelJson) throws 
IOException {
+    var updatedAssetModelAsMap = genericStorage.update(assetId, 
assetModelJson);
+    return this.convertMapToAssetModel(updatedAssetModelAsMap);
+  }
+
+  /**
+   * Deletes an asset model by its ID and revision.
+   *
+   * @param assetId the ID of the asset model to delete
+   * @param rev     the revision of the asset model to delete
+   * @throws IOException if an I/O error occurs
+   */
+  public void delete(String assetId, String rev) throws IOException {
+    genericStorage.delete(assetId, rev);
+  }
+
+  private SpAssetModel convertMapToAssetModel(Map<String, Object> 
assetModelMap) {
+    return objectMapper.convertValue(assetModelMap, SpAssetModel.class);
+  }
+
+  private String convertAssetModelToJson(SpAssetModel assetModel) throws 
JsonProcessingException {
+    return objectMapper.writeValueAsString(assetModel);
+  }
+
+}
diff --git 
a/asset-model-management/src/test/java/org/apache/streampipes/assetmodel/management/AssetModelManagementTest.java
 
b/asset-model-management/src/test/java/org/apache/streampipes/assetmodel/management/AssetModelManagementTest.java
new file mode 100644
index 0000000000..de52be285b
--- /dev/null
+++ 
b/asset-model-management/src/test/java/org/apache/streampipes/assetmodel/management/AssetModelManagementTest.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.assetmodel.management;
+
+import org.apache.streampipes.model.assets.SpAssetModel;
+import org.apache.streampipes.storage.api.IGenericStorage;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+class AssetModelManagementTest {
+
+  private static final String SAMPLE_ASSET_MODEL_ID = "1";
+  private static final String SAMPLE_ASSET_MODEL_NAME = "Asset1";
+  private static final Map<String, Object> SAMPLE_ASSET_MODEL_AS_MAP = Map.of(
+      "_id",
+      SAMPLE_ASSET_MODEL_ID,
+      "assetName",
+      SAMPLE_ASSET_MODEL_NAME
+  );
+  private static final String SAMPLE_ASSET_MODEL_AS_JSON = """
+                                                           {
+                                                           "_id": 
"SAMPLE_ASSET_MODEL_ID",
+                                                           "assetName": 
"SAMPLE_ASSET_MODEL_NAME"
+                                                           }
+                                                           """;
+
+  private static final String REV = "1";
+
+  private IGenericStorage genericStorage;
+  private AssetModelManagement assetModelManagement;
+
+  @BeforeEach
+  void setUp() {
+    genericStorage = Mockito.mock(IGenericStorage.class);
+    assetModelManagement = new AssetModelManagement(genericStorage);
+  }
+
+  @Test
+  void findAll_ReturnsListOfAssetModels() throws IOException {
+    
when(genericStorage.findAll(SpAssetModel.APP_DOC_TYPE)).thenReturn(List.of(SAMPLE_ASSET_MODEL_AS_MAP));
+
+    var result = assetModelManagement.findAll();
+
+    assertEquals(1, result.size());
+    assertEquals(
+        SAMPLE_ASSET_MODEL_ID,
+        result.get(0)
+              .getId()
+    );
+    assertEquals(
+        SAMPLE_ASSET_MODEL_NAME,
+        result.get(0)
+              .getAssetName()
+    );
+  }
+
+  @Test
+  void findAll_ReturnsEmptyListWhenNoData() throws IOException {
+    
when(genericStorage.findAll(SpAssetModel.APP_DOC_TYPE)).thenReturn(List.of());
+
+    var result = assetModelManagement.findAll();
+
+    assertTrue(result.isEmpty());
+  }
+
+  @Test
+  void findAll_ThrowsIOException() throws IOException {
+    when(genericStorage.findAll(SpAssetModel.APP_DOC_TYPE)).thenThrow(new 
IOException());
+
+    assertThrows(IOException.class, () -> assetModelManagement.findAll());
+  }
+
+
+  @Test
+  void findOne_ReturnsAssetModel() throws IOException {
+    
when(genericStorage.findOne(SAMPLE_ASSET_MODEL_ID)).thenReturn(SAMPLE_ASSET_MODEL_AS_MAP);
+
+    var result = assetModelManagement.findOne(SAMPLE_ASSET_MODEL_ID);
+
+    assertEquals(SAMPLE_ASSET_MODEL_ID, result.getId());
+    assertEquals(SAMPLE_ASSET_MODEL_NAME, result.getAssetName());
+  }
+
+  @Test
+  void findOne_ThrowsIOException() throws IOException {
+    when(genericStorage.findOne(SAMPLE_ASSET_MODEL_ID)).thenThrow(new 
IOException());
+
+    assertThrows(IOException.class, () -> 
assetModelManagement.findOne(SAMPLE_ASSET_MODEL_ID));
+  }
+
+  @Test
+  void findOne_ReturnsNoSuchElementExceptionWhenNotFound() throws IOException {
+    when(genericStorage.findOne(SAMPLE_ASSET_MODEL_ID)).thenReturn(null);
+
+    assertThrows(NoSuchElementException.class, () -> 
assetModelManagement.findOne(SAMPLE_ASSET_MODEL_ID));
+  }
+
+  @Test
+  void create_ReturnsCreatedAssetModel() throws IOException {
+    
when(genericStorage.create(SAMPLE_ASSET_MODEL_NAME)).thenReturn(SAMPLE_ASSET_MODEL_AS_MAP);
+
+    var result = assetModelManagement.create(SAMPLE_ASSET_MODEL_NAME);
+
+    assertEquals(SAMPLE_ASSET_MODEL_ID, result.getId());
+    assertEquals(SAMPLE_ASSET_MODEL_NAME, result.getAssetName());
+  }
+
+  @Test
+  void create_ThrowsIOException() throws IOException {
+    when(genericStorage.create(SAMPLE_ASSET_MODEL_AS_JSON)).thenThrow(new 
IOException());
+
+    assertThrows(IOException.class, () -> 
assetModelManagement.create(SAMPLE_ASSET_MODEL_AS_JSON));
+  }
+
+
+  @Test
+  void update_ReturnsUpdatedAssetModel() throws IOException {
+    var assetModelToUpdate = new SpAssetModel();
+    assetModelToUpdate.setId(SAMPLE_ASSET_MODEL_ID);
+    assetModelToUpdate.setAssetName(SAMPLE_ASSET_MODEL_NAME);
+
+    when(genericStorage.update(any(), 
any())).thenReturn(SAMPLE_ASSET_MODEL_AS_MAP);
+
+    var result = assetModelManagement.update(SAMPLE_ASSET_MODEL_ID, 
assetModelToUpdate);
+
+    assertEquals(SAMPLE_ASSET_MODEL_ID, result.getId());
+    assertEquals(SAMPLE_ASSET_MODEL_NAME, result.getAssetName());
+  }
+
+  @Test
+  void update_GenericStorageThrowsIOException() throws IOException {
+    when(genericStorage.update(any(), any())).thenThrow(new IOException());
+
+    assertThrows(IOException.class, () -> 
assetModelManagement.update(SAMPLE_ASSET_MODEL_ID, new SpAssetModel()));
+  }
+
+  @Test
+  void update_ReturnsUpdatedAssetModelFromJson() throws IOException {
+    when(genericStorage.update(any(), 
any())).thenReturn(SAMPLE_ASSET_MODEL_AS_MAP);
+
+    var result = assetModelManagement.update(SAMPLE_ASSET_MODEL_ID, 
SAMPLE_ASSET_MODEL_AS_JSON);
+
+    assertEquals(SAMPLE_ASSET_MODEL_ID, result.getId());
+    assertEquals(SAMPLE_ASSET_MODEL_NAME, result.getAssetName());
+  }
+
+  @Test
+  void update_ThrowsIOExceptionWhenUpdatingFromJson() throws IOException {
+    when(genericStorage.update(any(), any())).thenThrow(new IOException());
+
+    assertThrows(
+        IOException.class,
+        () -> assetModelManagement.update(SAMPLE_ASSET_MODEL_ID, 
SAMPLE_ASSET_MODEL_AS_JSON)
+    );
+  }
+
+  @Test
+  void delete_RemovesAssetModel() throws IOException {
+    doNothing().when(genericStorage)
+               .delete(SAMPLE_ASSET_MODEL_ID, REV);
+
+    assetModelManagement.delete(SAMPLE_ASSET_MODEL_ID, REV);
+
+    verify(genericStorage, times(1)).delete(SAMPLE_ASSET_MODEL_ID, REV);
+  }
+
+  @Test
+  void delete_ThrowsIOException() throws IOException {
+    doThrow(new IOException()).when(genericStorage)
+                              .delete(SAMPLE_ASSET_MODEL_ID, REV);
+
+    assertThrows(IOException.class, () -> 
assetModelManagement.delete(SAMPLE_ASSET_MODEL_ID, REV));
+  }
+
+}
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index 455675231a..b32678f846 100644
--- a/pom.xml
+++ b/pom.xml
@@ -872,6 +872,7 @@
         <module>streampipes-wrapper-kafka-streams</module>
         <module>streampipes-wrapper-siddhi</module>
         <module>streampipes-wrapper-standalone</module>
+      <module>asset-model-management</module>
     </modules>
 
     <profiles>
diff --git 
a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterMasterManagement.java
 
b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterMasterManagement.java
index 79310b96b1..6b84e60847 100644
--- 
a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterMasterManagement.java
+++ 
b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterMasterManagement.java
@@ -116,72 +116,86 @@ public class AdapterMasterManagement {
   }
 
   /**
-   * First the adapter is stopped removed, then the corresponding data source 
is deleted
+   * This method deletes the adapter and the related resources inlcuding the 
data stream, and the asset links in the
+   * asset model
    *
    * @param elementId The elementId of the adapter instance
-   * @throws AdapterException when adapter can not be stopped
    */
-  public void deleteAdapter(String elementId) throws AdapterException {
+  public void deleteAdapter(String elementId) {
 
-    // Stop stream adapter
+    var adapterDescription = getAdapterDescription(elementId);
+
+    stopAdapterWithLogging(elementId);
+
+    deleteAdaterFromCouchDbAndFromLoggingService(elementId);
+
+    deleteCorrespondingDataStream(adapterDescription);
+  }
+
+  private void stopAdapterWithLogging(String elementId) {
+    LOG.info("Attempting to stop adapter: {}", elementId);
     try {
-      stopStreamAdapter(elementId);
+      stopAdapter(elementId);
+      LOG.info("Successfully stopped adapter with id: {}", elementId);
     } catch (AdapterException e) {
-      LOG.info("Could not stop adapter: " + elementId, e);
+      LOG.error("Failed to stop adapter with id: {}", elementId, e);
     }
+  }
 
-    AdapterDescription adapter = 
adapterInstanceStorage.getElementById(elementId);
-    // Delete adapter
+  private void deleteAdaterFromCouchDbAndFromLoggingService(String elementId) {
     adapterResourceManager.delete(elementId);
     ExtensionsLogProvider.INSTANCE.remove(elementId);
-    LOG.info("Successfully deleted adapter: " + elementId);
+    LOG.info("Successfully deleted adapter in couchdb: {}", elementId);
+  }
 
-    // Delete data stream
-    
this.dataStreamResourceManager.delete(adapter.getCorrespondingDataStreamElementId());
-    LOG.info("Successfully deleted data stream: " + 
adapter.getCorrespondingDataStreamElementId());
+  private void deleteCorrespondingDataStream(AdapterDescription 
adapterDescription) {
+    var correspondingDataStreamElementId = 
adapterDescription.getCorrespondingDataStreamElementId();
+    dataStreamResourceManager.delete(correspondingDataStreamElementId);
+    LOG.info("Successfully deleted data stream in couchdb: {}", 
correspondingDataStreamElementId);
   }
 
   public List<AdapterDescription> getAllAdapterInstances() {
     return adapterInstanceStorage.findAll();
   }
 
-  public void stopStreamAdapter(String elementId) throws AdapterException {
-    AdapterDescription ad = adapterInstanceStorage.getElementById(elementId);
 
-    WorkerRestClient.stopStreamAdapter(ad.getSelectedEndpointUrl(), ad);
+  public void stopAdapter(String elementId) throws AdapterException {
+    var adapterDescription = getAdapterDescription(elementId);
+
+    
WorkerRestClient.stopStreamAdapter(adapterDescription.getSelectedEndpointUrl(), 
adapterDescription);
     ExtensionsLogProvider.INSTANCE.reset(elementId);
 
     // remove the adapter from the metrics manager so that
     // no metrics for this adapter are exposed anymore
     try {
-      adapterMetrics.remove(ad.getElementId(), ad.getName());
+      adapterMetrics.remove(adapterDescription.getElementId(), 
adapterDescription.getName());
     } catch (NoSuchElementException e) {
-      LOG.error("Could not remove adapter metrics for adapter {}", 
ad.getName());
+      LOG.error("Could not remove adapter metrics for adapter {}", 
adapterDescription.getName());
     }
   }
 
   public void startStreamAdapter(String elementId) throws AdapterException {
 
-    var ad = adapterInstanceStorage.getElementById(elementId);
+    var adapterDescription = getAdapterDescription(elementId);
 
     try {
       // Find endpoint to start adapter on
       var baseUrl = new 
ExtensionsServiceEndpointGenerator().getEndpointBaseUrl(
-          ad.getAppId(),
+          adapterDescription.getAppId(),
           SpServiceUrlProvider.ADAPTER,
-          ad.getDeploymentConfiguration()
-            .getDesiredServiceTags()
+          adapterDescription.getDeploymentConfiguration()
+                            .getDesiredServiceTags()
       );
 
       // Update selected endpoint URL of adapter
-      ad.setSelectedEndpointUrl(baseUrl);
-      adapterInstanceStorage.updateElement(ad);
+      adapterDescription.setSelectedEndpointUrl(baseUrl);
+      adapterInstanceStorage.updateElement(adapterDescription);
 
       // Invoke adapter instance
       WorkerRestClient.invokeStreamAdapter(baseUrl, elementId);
 
       // register the adapter at the metrics manager so that the 
AdapterHealthCheck can send metrics
-      adapterMetrics.register(ad.getElementId(), ad.getName());
+      adapterMetrics.register(adapterDescription.getElementId(), 
adapterDescription.getName());
 
       LOG.info("Started adapter " + elementId + " on: " + baseUrl);
     } catch (NoServiceEndpointsAvailableException e) {
@@ -200,4 +214,8 @@ public class AdapterMasterManagement {
       throw new AdapterException();
     }
   }
+
+  private AdapterDescription getAdapterDescription(String elementId) {
+    return adapterInstanceStorage.getElementById(elementId);
+  }
 }
diff --git 
a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterUpdateManagement.java
 
b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterUpdateManagement.java
index b64b661495..6d97b2dcc1 100644
--- 
a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterUpdateManagement.java
+++ 
b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterUpdateManagement.java
@@ -65,7 +65,7 @@ public class AdapterUpdateManagement {
     boolean shouldRestart = ad.isRunning();
 
     if (ad.isRunning()) {
-      this.adapterMasterManagement.stopStreamAdapter(ad.getElementId());
+      this.adapterMasterManagement.stopAdapter(ad.getElementId());
     }
 
     // update data source in database
diff --git 
a/streampipes-model/src/main/java/org/apache/streampipes/model/assets/SpAssetModel.java
 
b/streampipes-model/src/main/java/org/apache/streampipes/model/assets/SpAssetModel.java
index 0140eafdf5..8f493efff8 100644
--- 
a/streampipes-model/src/main/java/org/apache/streampipes/model/assets/SpAssetModel.java
+++ 
b/streampipes-model/src/main/java/org/apache/streampipes/model/assets/SpAssetModel.java
@@ -20,9 +20,11 @@ package org.apache.streampipes.model.assets;
 
 import org.apache.streampipes.commons.constants.GenericDocTypes;
 
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.gson.annotations.SerializedName;
 
+@JsonIgnoreProperties(ignoreUnknown = true)
 public class SpAssetModel extends SpAsset {
 
   public static final String APP_DOC_TYPE = 
GenericDocTypes.DOC_ASSET_MANGEMENT;
@@ -30,6 +32,9 @@ public class SpAssetModel extends SpAsset {
   @JsonProperty("_id")
   private @SerializedName("_id") String id;
 
+  @JsonProperty("_rev")
+  private @SerializedName("_rev") String rev;
+
   private boolean removable;
 
   public SpAssetModel() {
@@ -48,6 +53,14 @@ public class SpAssetModel extends SpAsset {
     return removable;
   }
 
+  public String getRev() {
+    return rev;
+  }
+
+  public void setRev(String rev) {
+    this.rev = rev;
+  }
+
   public void setRemovable(boolean removable) {
     this.removable = removable;
   }
diff --git a/streampipes-rest/pom.xml b/streampipes-rest/pom.xml
index 43f2d74594..2cacb72d0b 100644
--- a/streampipes-rest/pom.xml
+++ b/streampipes-rest/pom.xml
@@ -29,6 +29,11 @@
 
     <dependencies>
         <!-- StreamPipes dependencies -->
+        <dependency>
+            <groupId>org.apache.streampipes</groupId>
+            <artifactId>asset-model-management</artifactId>
+            <version>0.97.0-SNAPSHOT</version>
+        </dependency>
         <dependency>
             <groupId>org.apache.streampipes</groupId>
             <artifactId>streampipes-commons</artifactId>
@@ -117,6 +122,7 @@
             <artifactId>mockito-core</artifactId>
             <scope>test</scope>
         </dependency>
+
     </dependencies>
 
     <build>
diff --git 
a/streampipes-rest/src/main/java/org/apache/streampipes/rest/ResetManagement.java
 
b/streampipes-rest/src/main/java/org/apache/streampipes/rest/ResetManagement.java
index 0c88e461b6..44fc94d6ed 100644
--- 
a/streampipes-rest/src/main/java/org/apache/streampipes/rest/ResetManagement.java
+++ 
b/streampipes-rest/src/main/java/org/apache/streampipes/rest/ResetManagement.java
@@ -19,7 +19,6 @@
 package org.apache.streampipes.rest;
 
 import org.apache.streampipes.commons.exceptions.SpRuntimeException;
-import org.apache.streampipes.commons.exceptions.connect.AdapterException;
 import org.apache.streampipes.commons.prometheus.adapter.AdapterMetricsManager;
 import 
org.apache.streampipes.connect.management.management.AdapterMasterManagement;
 import org.apache.streampipes.dataexplorer.management.DataExplorerDispatcher;
@@ -116,11 +115,7 @@ public class ResetManagement {
 
     List<AdapterDescription> allAdapters = 
adapterMasterManagement.getAllAdapterInstances();
     allAdapters.forEach(adapterDescription -> {
-      try {
-        
adapterMasterManagement.deleteAdapter(adapterDescription.getElementId());
-      } catch (AdapterException e) {
-        logger.error("Failed to delete adapter with id: " + 
adapterDescription.getElementId(), e);
-      }
+      adapterMasterManagement.deleteAdapter(adapterDescription.getElementId());
     });
   }
 
diff --git 
a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/AssetManagementResource.java
 
b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/AssetManagementResource.java
index f15531ec27..de1247e2fd 100644
--- 
a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/AssetManagementResource.java
+++ 
b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/AssetManagementResource.java
@@ -18,10 +18,11 @@
 
 package org.apache.streampipes.rest.impl;
 
+import org.apache.streampipes.assetmodel.management.AssetModelManagement;
+import org.apache.streampipes.model.assets.SpAssetModel;
 import 
org.apache.streampipes.rest.core.base.impl.AbstractAuthGuardedRestResource;
 import org.apache.streampipes.rest.security.AuthConstants;
 import org.apache.streampipes.rest.shared.exception.SpMessageException;
-import org.apache.streampipes.storage.api.IGenericStorage;
 import org.apache.streampipes.storage.management.StorageDispatcher;
 
 import org.slf4j.Logger;
@@ -41,7 +42,7 @@ import org.springframework.web.bind.annotation.RestController;
 
 import java.io.IOException;
 import java.util.List;
-import java.util.Map;
+import java.util.NoSuchElementException;
 
 @RestController
 @RequestMapping("/api/v2/assets")
@@ -49,12 +50,18 @@ public class AssetManagementResource extends 
AbstractAuthGuardedRestResource {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(AssetManagementResource.class);
 
-  private static final String APP_DOC_TYPE = "asset-management";
+  private final AssetModelManagement assetModelManagement;
+
+  public AssetManagementResource() {
+    var genericStorage = StorageDispatcher.INSTANCE.getNoSqlStore()
+                                                   .getGenericStorage();
+    assetModelManagement = new AssetModelManagement(genericStorage);
+  }
 
   @GetMapping(produces = MediaType.APPLICATION_JSON_VALUE)
   @PreAuthorize(AuthConstants.HAS_READ_ASSETS_PRIVILEGE)
-  public List<Map<String, Object>> getAll() throws IOException {
-    return getGenericStorage().findAll(APP_DOC_TYPE);
+  public List<SpAssetModel> getAll() throws IOException {
+    return assetModelManagement.findAll();
   }
 
   @PostMapping(
@@ -62,10 +69,10 @@ public class AssetManagementResource extends 
AbstractAuthGuardedRestResource {
       consumes = MediaType.APPLICATION_JSON_VALUE
   )
   @PreAuthorize(AuthConstants.HAS_WRITE_ASSETS_PRIVILEGE)
-  public ResponseEntity<?> create(@RequestBody String asset) {
+  public ResponseEntity<?> create(@RequestBody String assetModel) {
     try {
-      Map<String, Object> obj = getGenericStorage().create(asset);
-      return ok(obj);
+      var updatedAssetModel = assetModelManagement.create(assetModel);
+      return ok(updatedAssetModel);
     } catch (IOException e) {
       LOG.error("Could not connect to storage", e);
       return fail();
@@ -74,10 +81,13 @@ public class AssetManagementResource extends 
AbstractAuthGuardedRestResource {
 
   @GetMapping(path = "/{id}", produces = MediaType.APPLICATION_JSON_VALUE)
   @PreAuthorize(AuthConstants.HAS_READ_ASSETS_PRIVILEGE)
-  public ResponseEntity<Map<String, Object>> getCategory(@PathVariable("id") 
String assetId) {
+  public ResponseEntity<SpAssetModel> getCategory(@PathVariable("id") String 
assetId) {
     try {
-      Map<String, Object> obj = getGenericStorage().findOne(assetId);
-      return ok(obj);
+      var assetModel = assetModelManagement.findOne(assetId);
+      return ok(assetModel);
+    } catch (NoSuchElementException e) {
+      LOG.error("Asset model not found", e);
+      throw new SpMessageException(HttpStatus.NOT_FOUND, e);
     } catch (IOException e) {
       LOG.error("Could not connect to storage", e);
       throw new SpMessageException(HttpStatus.INTERNAL_SERVER_ERROR, e);
@@ -89,11 +99,13 @@ public class AssetManagementResource extends 
AbstractAuthGuardedRestResource {
       produces = MediaType.APPLICATION_JSON_VALUE,
       consumes = MediaType.APPLICATION_JSON_VALUE)
   @PreAuthorize(AuthConstants.HAS_WRITE_ASSETS_PRIVILEGE)
-  public ResponseEntity<Map<String, Object>> update(@PathVariable("id") String 
assetId,
-                                                    @RequestBody String asset) 
{
+  public ResponseEntity<SpAssetModel> update(
+      @PathVariable("id") String assetId,
+      @RequestBody String assetModel
+  ) {
     try {
-      Map<String, Object> obj = getGenericStorage().update(assetId, asset);
-      return ok(obj);
+      var updatedAssetModel = assetModelManagement.update(assetId, assetModel);
+      return ok(updatedAssetModel);
     } catch (IOException e) {
       LOG.error("Could not connect to storage", e);
       throw new SpMessageException(HttpStatus.INTERNAL_SERVER_ERROR, e);
@@ -102,10 +114,12 @@ public class AssetManagementResource extends 
AbstractAuthGuardedRestResource {
 
   @DeleteMapping(path = "/{id}/{rev}", produces = 
MediaType.APPLICATION_JSON_VALUE)
   @PreAuthorize(AuthConstants.HAS_WRITE_ASSETS_PRIVILEGE)
-  public ResponseEntity<Void> delete(@PathVariable("id") String assetId,
-                                     @PathVariable("rev") String rev) {
+  public ResponseEntity<Void> delete(
+      @PathVariable("id") String assetId,
+      @PathVariable("rev") String rev
+  ) {
     try {
-      getGenericStorage().delete(assetId, rev);
+      assetModelManagement.delete(assetId, rev);
       return ok();
     } catch (IOException e) {
       LOG.error("Could not connect to storage", e);
@@ -113,8 +127,4 @@ public class AssetManagementResource extends 
AbstractAuthGuardedRestResource {
     }
   }
 
-  private IGenericStorage getGenericStorage() {
-    return StorageDispatcher.INSTANCE.getNoSqlStore().getGenericStorage();
-  }
-
 }
diff --git 
a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/AdapterResource.java
 
b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/AdapterResource.java
index b0a7b2c8e6..d02b722446 100644
--- 
a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/AdapterResource.java
+++ 
b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/AdapterResource.java
@@ -188,7 +188,7 @@ public class AdapterResource extends 
AbstractAdapterResource<AdapterMasterManage
   @PreAuthorize("this.hasWriteAuthority() and hasPermission('#elementId', 
'WRITE')")
   public ResponseEntity<?> stopAdapter(@PathVariable("id") String elementId) {
     try {
-      managementService.stopStreamAdapter(elementId);
+      managementService.stopAdapter(elementId);
       return ok(Notifications.success("Adapter started"));
     } catch (AdapterException e) {
       LOG.error("Could not stop adapter with id {}", elementId, e);
@@ -220,13 +220,8 @@ public class AdapterResource extends 
AbstractAdapterResource<AdapterMasterManage
                                                                     
.getPipelineStorageAPI();
 
     if (pipelinesUsingAdapter.isEmpty()) {
-      try {
-        managementService.deleteAdapter(elementId);
-        return ok(Notifications.success("Adapter with id: " + elementId + " is 
deleted."));
-      } catch (AdapterException e) {
-        LOG.error("Error while deleting adapter with id {}", elementId, e);
-        return ok(Notifications.error(e.getMessage()));
-      }
+      managementService.deleteAdapter(elementId);
+      return ok(Notifications.success("Adapter with id: " + elementId + " is 
deleted."));
     } else if (!deleteAssociatedPipelines) {
       List<String> namesOfPipelinesUsingAdapter = pipelinesUsingAdapter
           .stream()
diff --git 
a/ui/cypress/tests/assetManagement/deleteAssetLinks/deleteAdapterWithAssetLink.spec.ts
 
b/ui/cypress/tests/assetManagement/deleteAssetLinks/deleteAdapterWithAssetLink.spec.ts
new file mode 100644
index 0000000000..0cf0f2f4f7
--- /dev/null
+++ 
b/ui/cypress/tests/assetManagement/deleteAssetLinks/deleteAdapterWithAssetLink.spec.ts
@@ -0,0 +1,43 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+import { AssetBtns } from '../../../support/utils/asset/AssetBtns';
+import { AssetUtils } from '../../../support/utils/asset/AssetUtils';
+import { ConnectUtils } from '../../../support/utils/connect/ConnectUtils';
+
+describe('Delete adapter and auto remove asset links of adapter and data 
stream', () => {
+    beforeEach('Setup Test', () => {
+        cy.initStreamPipesTest();
+    });
+
+    it('Perform Test', () => {
+        const assetName = 'TestAsset';
+
+        AssetUtils.addAssetWithOneAdapter(assetName);
+
+        ConnectUtils.deleteAdapter();
+
+        // Check that asset is still there and asset links of adapter and
+        // data stream are removed
+        AssetUtils.goToAssets();
+        AssetUtils.checkAmountOfAssets(1);
+        AssetUtils.editAsset(assetName);
+        AssetBtns.assetLinksTab().click();
+        AssetUtils.checkAmountOfLinkedResources(0);
+    });
+});
diff --git 
a/ui/src/app/connect/dialog/delete-adapter-dialog/delete-adapter-dialog.component.ts
 
b/ui/src/app/connect/dialog/delete-adapter-dialog/delete-adapter-dialog.component.ts
index 5ed5b4ed62..b9b8d7eab5 100644
--- 
a/ui/src/app/connect/dialog/delete-adapter-dialog/delete-adapter-dialog.component.ts
+++ 
b/ui/src/app/connect/dialog/delete-adapter-dialog/delete-adapter-dialog.component.ts
@@ -41,7 +41,7 @@ export class DeleteAdapterDialogComponent {
 
     constructor(
         private dialogRef: DialogRef<DeleteAdapterDialogComponent>,
-        private dataMarketplaceService: AdapterService,
+        private adapterService: AdapterService,
     ) {}
 
     close(refreshAdapters: boolean) {
@@ -53,13 +53,13 @@ export class DeleteAdapterDialogComponent {
         this.currentStatus = 'Deleting adapter...';
         this.deleteAssociatedPipelines = deleteAssociatedPipelines;
 
-        this.dataMarketplaceService
+        this.adapterService
             .deleteAdapter(this.adapter, deleteAssociatedPipelines)
-            .subscribe(
-                data => {
+            .subscribe({
+                next: () => {
                     this.close(true);
                 },
-                error => {
+                error: error => {
                     if (error.status === 409) {
                         if (deleteAssociatedPipelines) {
                             this.namesOfPipelinesNotOwnedByUser = error.error;
@@ -70,6 +70,6 @@ export class DeleteAdapterDialogComponent {
                         this.isInProgress = false;
                     }
                 },
-            );
+            });
     }
 }


Reply via email to