This is an automated email from the ASF dual-hosted git repository.
zehnder pushed a commit to branch
3388-adapter-deletion-does-not-clean-up-asset-links
in repository https://gitbox.apache.org/repos/asf/streampipes.git
The following commit(s) were added to
refs/heads/3388-adapter-deletion-does-not-clean-up-asset-links by this push:
new 179ed94965 fix(#3388): Add new module for asset management
179ed94965 is described below
commit 179ed9496578832dd61460399385dd10e45a1ef1
Author: Philipp Zehnder <[email protected]>
AuthorDate: Mon Dec 30 10:15:18 2024 +0100
fix(#3388): Add new module for asset management
---
asset-model-management/pom.xml | 87 +++++++++
.../management/AssetModelManagement.java | 137 ++++++++++++++
.../management/AssetModelManagementTest.java | 208 +++++++++++++++++++++
pom.xml | 1 +
.../management/AdapterMasterManagement.java | 66 ++++---
.../management/AdapterUpdateManagement.java | 2 +-
.../streampipes/model/assets/SpAssetModel.java | 13 ++
streampipes-rest/pom.xml | 6 +
.../apache/streampipes/rest/ResetManagement.java | 7 +-
.../rest/impl/AssetManagementResource.java | 54 +++---
.../rest/impl/connect/AdapterResource.java | 11 +-
.../deleteAdapterWithAssetLink.spec.ts | 43 +++++
.../delete-adapter-dialog.component.ts | 12 +-
13 files changed, 580 insertions(+), 67 deletions(-)
diff --git a/asset-model-management/pom.xml b/asset-model-management/pom.xml
new file mode 100644
index 0000000000..44a9f431c7
--- /dev/null
+++ b/asset-model-management/pom.xml
@@ -0,0 +1,87 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ ~
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.streampipes</groupId>
+ <artifactId>streampipes-parent</artifactId>
+ <version>0.97.0-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>asset-model-management</artifactId>
+
+ <properties>
+ <maven.compiler.source>17</maven.compiler.source>
+ <maven.compiler.target>17</maven.compiler.target>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ </properties>
+
+
+ <dependencies>
+
+ <!-- StreamPipes dependencies -->
+ <dependency>
+ <groupId>org.apache.streampipes</groupId>
+ <artifactId>streampipes-storage-api</artifactId>
+ <version>0.97.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.streampipes</groupId>
+ <artifactId>streampipes-storage-management</artifactId>
+ <version>0.97.0-SNAPSHOT</version>
+ </dependency>
+
+ <!-- External dependencies -->
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ </dependency>
+
+ <!-- Test dependencies -->
+ <dependency>
+ <groupId>org.junit.jupiter</groupId>
+ <artifactId>junit-jupiter-api</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-core</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-checkstyle-plugin</artifactId>
+ <configuration>
+ <propertyExpansion>
+ checkstyle.config.base.path=${project.parent.basedir}/tools/maven
+ </propertyExpansion>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
\ No newline at end of file
diff --git
a/asset-model-management/src/main/java/org/apache/streampipes/assetmodel/management/AssetModelManagement.java
b/asset-model-management/src/main/java/org/apache/streampipes/assetmodel/management/AssetModelManagement.java
new file mode 100644
index 0000000000..5a0b0e72df
--- /dev/null
+++
b/asset-model-management/src/main/java/org/apache/streampipes/assetmodel/management/AssetModelManagement.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.assetmodel.management;
+
+import org.apache.streampipes.model.assets.SpAssetModel;
+import org.apache.streampipes.storage.api.IGenericStorage;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+
+/**
+ * This class provides convinience methods to work with asset models
+ */
+public class AssetModelManagement {
+
+ private final IGenericStorage genericStorage;
+ private final ObjectMapper objectMapper;
+
+ public AssetModelManagement(IGenericStorage genericStorage) {
+ this.genericStorage = genericStorage;
+ this.objectMapper = new ObjectMapper();
+ }
+
+ /**
+ * Retrieves all asset models from generic storage and converts them to a
list of asset models.
+ *
+ * @return a list of asset models
+ */
+ public List<SpAssetModel> findAll() throws IOException {
+
+ try {
+ return genericStorage.findAll(SpAssetModel.APP_DOC_TYPE)
+ .stream()
+ .map(this::convertMapToAssetModel)
+ .toList();
+ } catch (IOException e) {
+ throw new IOException("Error while fetching all asset models from
generic storage.", e);
+ }
+ }
+
+
+ /**
+ * Retrieves a single asset model by its ID.
+ *
+ * @param assetId the ID of the asset model to retrieve
+ * @return the asset model
+ * @throws IOException if an I/O error occurs
+ */
+ public SpAssetModel findOne(String assetId) throws NoSuchElementException,
IOException {
+ var assetModelData = genericStorage.findOne(assetId);
+
+ if (assetModelData == null) {
+ throw new NoSuchElementException("Asset model with ID " + assetId + "
not found.");
+ }
+
+ return this.convertMapToAssetModel(assetModelData);
+ }
+
+ /**
+ * Creates a new asset model.
+ *
+ * @param asset the asset model to create
+ * @return the created asset model
+ * @throws IOException if an I/O error occurs
+ */
+ public SpAssetModel create(String asset) throws IOException {
+ var assetModelInstanceInDatabase = genericStorage.create(asset);
+
+ return this.convertMapToAssetModel(assetModelInstanceInDatabase);
+ }
+
+ /**
+ *
+ * @param assetId the ID of the asset model to update
+ * @param assetModel the updated asset model
+ * @return the updated asset model
+ * @throws IOException if an I/O error occurs
+ */
+ public SpAssetModel update(String assetId, SpAssetModel assetModel) throws
IOException {
+ var assetModelAsJson = this.convertAssetModelToJson(assetModel);
+ return update(assetId, assetModelAsJson);
+ }
+
+ /**
+ * Updates an existing asset model.
+ *
+ * @param assetId the ID of the asset model to update
+ * @param assetModelJson the updated asset model as a JSON string
+ * @return the updated asset model
+ * @throws IOException if an I/O error occurs
+ */
+ public SpAssetModel update(String assetId, String assetModelJson) throws
IOException {
+ var updatedAssetModelAsMap = genericStorage.update(assetId,
assetModelJson);
+ return this.convertMapToAssetModel(updatedAssetModelAsMap);
+ }
+
+ /**
+ * Deletes an asset model by its ID and revision.
+ *
+ * @param assetId the ID of the asset model to delete
+ * @param rev the revision of the asset model to delete
+ * @throws IOException if an I/O error occurs
+ */
+ public void delete(String assetId, String rev) throws IOException {
+ genericStorage.delete(assetId, rev);
+ }
+
+ private SpAssetModel convertMapToAssetModel(Map<String, Object>
assetModelMap) {
+ return objectMapper.convertValue(assetModelMap, SpAssetModel.class);
+ }
+
+ private String convertAssetModelToJson(SpAssetModel assetModel) throws
JsonProcessingException {
+ return objectMapper.writeValueAsString(assetModel);
+ }
+
+}
diff --git
a/asset-model-management/src/test/java/org/apache/streampipes/assetmodel/management/AssetModelManagementTest.java
b/asset-model-management/src/test/java/org/apache/streampipes/assetmodel/management/AssetModelManagementTest.java
new file mode 100644
index 0000000000..de52be285b
--- /dev/null
+++
b/asset-model-management/src/test/java/org/apache/streampipes/assetmodel/management/AssetModelManagementTest.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.assetmodel.management;
+
+import org.apache.streampipes.model.assets.SpAssetModel;
+import org.apache.streampipes.storage.api.IGenericStorage;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+class AssetModelManagementTest {
+
+ private static final String SAMPLE_ASSET_MODEL_ID = "1";
+ private static final String SAMPLE_ASSET_MODEL_NAME = "Asset1";
+ private static final Map<String, Object> SAMPLE_ASSET_MODEL_AS_MAP = Map.of(
+ "_id",
+ SAMPLE_ASSET_MODEL_ID,
+ "assetName",
+ SAMPLE_ASSET_MODEL_NAME
+ );
+ private static final String SAMPLE_ASSET_MODEL_AS_JSON = """
+ {
+ "_id":
"SAMPLE_ASSET_MODEL_ID",
+ "assetName":
"SAMPLE_ASSET_MODEL_NAME"
+ }
+ """;
+
+ private static final String REV = "1";
+
+ private IGenericStorage genericStorage;
+ private AssetModelManagement assetModelManagement;
+
+ @BeforeEach
+ void setUp() {
+ genericStorage = Mockito.mock(IGenericStorage.class);
+ assetModelManagement = new AssetModelManagement(genericStorage);
+ }
+
+ @Test
+ void findAll_ReturnsListOfAssetModels() throws IOException {
+
when(genericStorage.findAll(SpAssetModel.APP_DOC_TYPE)).thenReturn(List.of(SAMPLE_ASSET_MODEL_AS_MAP));
+
+ var result = assetModelManagement.findAll();
+
+ assertEquals(1, result.size());
+ assertEquals(
+ SAMPLE_ASSET_MODEL_ID,
+ result.get(0)
+ .getId()
+ );
+ assertEquals(
+ SAMPLE_ASSET_MODEL_NAME,
+ result.get(0)
+ .getAssetName()
+ );
+ }
+
+ @Test
+ void findAll_ReturnsEmptyListWhenNoData() throws IOException {
+
when(genericStorage.findAll(SpAssetModel.APP_DOC_TYPE)).thenReturn(List.of());
+
+ var result = assetModelManagement.findAll();
+
+ assertTrue(result.isEmpty());
+ }
+
+ @Test
+ void findAll_ThrowsIOException() throws IOException {
+ when(genericStorage.findAll(SpAssetModel.APP_DOC_TYPE)).thenThrow(new
IOException());
+
+ assertThrows(IOException.class, () -> assetModelManagement.findAll());
+ }
+
+
+ @Test
+ void findOne_ReturnsAssetModel() throws IOException {
+
when(genericStorage.findOne(SAMPLE_ASSET_MODEL_ID)).thenReturn(SAMPLE_ASSET_MODEL_AS_MAP);
+
+ var result = assetModelManagement.findOne(SAMPLE_ASSET_MODEL_ID);
+
+ assertEquals(SAMPLE_ASSET_MODEL_ID, result.getId());
+ assertEquals(SAMPLE_ASSET_MODEL_NAME, result.getAssetName());
+ }
+
+ @Test
+ void findOne_ThrowsIOException() throws IOException {
+ when(genericStorage.findOne(SAMPLE_ASSET_MODEL_ID)).thenThrow(new
IOException());
+
+ assertThrows(IOException.class, () ->
assetModelManagement.findOne(SAMPLE_ASSET_MODEL_ID));
+ }
+
+ @Test
+ void findOne_ReturnsNoSuchElementExceptionWhenNotFound() throws IOException {
+ when(genericStorage.findOne(SAMPLE_ASSET_MODEL_ID)).thenReturn(null);
+
+ assertThrows(NoSuchElementException.class, () ->
assetModelManagement.findOne(SAMPLE_ASSET_MODEL_ID));
+ }
+
+ @Test
+ void create_ReturnsCreatedAssetModel() throws IOException {
+
when(genericStorage.create(SAMPLE_ASSET_MODEL_NAME)).thenReturn(SAMPLE_ASSET_MODEL_AS_MAP);
+
+ var result = assetModelManagement.create(SAMPLE_ASSET_MODEL_NAME);
+
+ assertEquals(SAMPLE_ASSET_MODEL_ID, result.getId());
+ assertEquals(SAMPLE_ASSET_MODEL_NAME, result.getAssetName());
+ }
+
+ @Test
+ void create_ThrowsIOException() throws IOException {
+ when(genericStorage.create(SAMPLE_ASSET_MODEL_AS_JSON)).thenThrow(new
IOException());
+
+ assertThrows(IOException.class, () ->
assetModelManagement.create(SAMPLE_ASSET_MODEL_AS_JSON));
+ }
+
+
+ @Test
+ void update_ReturnsUpdatedAssetModel() throws IOException {
+ var assetModelToUpdate = new SpAssetModel();
+ assetModelToUpdate.setId(SAMPLE_ASSET_MODEL_ID);
+ assetModelToUpdate.setAssetName(SAMPLE_ASSET_MODEL_NAME);
+
+ when(genericStorage.update(any(),
any())).thenReturn(SAMPLE_ASSET_MODEL_AS_MAP);
+
+ var result = assetModelManagement.update(SAMPLE_ASSET_MODEL_ID,
assetModelToUpdate);
+
+ assertEquals(SAMPLE_ASSET_MODEL_ID, result.getId());
+ assertEquals(SAMPLE_ASSET_MODEL_NAME, result.getAssetName());
+ }
+
+ @Test
+ void update_GenericStorageThrowsIOException() throws IOException {
+ when(genericStorage.update(any(), any())).thenThrow(new IOException());
+
+ assertThrows(IOException.class, () ->
assetModelManagement.update(SAMPLE_ASSET_MODEL_ID, new SpAssetModel()));
+ }
+
+ @Test
+ void update_ReturnsUpdatedAssetModelFromJson() throws IOException {
+ when(genericStorage.update(any(),
any())).thenReturn(SAMPLE_ASSET_MODEL_AS_MAP);
+
+ var result = assetModelManagement.update(SAMPLE_ASSET_MODEL_ID,
SAMPLE_ASSET_MODEL_AS_JSON);
+
+ assertEquals(SAMPLE_ASSET_MODEL_ID, result.getId());
+ assertEquals(SAMPLE_ASSET_MODEL_NAME, result.getAssetName());
+ }
+
+ @Test
+ void update_ThrowsIOExceptionWhenUpdatingFromJson() throws IOException {
+ when(genericStorage.update(any(), any())).thenThrow(new IOException());
+
+ assertThrows(
+ IOException.class,
+ () -> assetModelManagement.update(SAMPLE_ASSET_MODEL_ID,
SAMPLE_ASSET_MODEL_AS_JSON)
+ );
+ }
+
+ @Test
+ void delete_RemovesAssetModel() throws IOException {
+ doNothing().when(genericStorage)
+ .delete(SAMPLE_ASSET_MODEL_ID, REV);
+
+ assetModelManagement.delete(SAMPLE_ASSET_MODEL_ID, REV);
+
+ verify(genericStorage, times(1)).delete(SAMPLE_ASSET_MODEL_ID, REV);
+ }
+
+ @Test
+ void delete_ThrowsIOException() throws IOException {
+ doThrow(new IOException()).when(genericStorage)
+ .delete(SAMPLE_ASSET_MODEL_ID, REV);
+
+ assertThrows(IOException.class, () ->
assetModelManagement.delete(SAMPLE_ASSET_MODEL_ID, REV));
+ }
+
+}
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index 455675231a..b32678f846 100644
--- a/pom.xml
+++ b/pom.xml
@@ -872,6 +872,7 @@
<module>streampipes-wrapper-kafka-streams</module>
<module>streampipes-wrapper-siddhi</module>
<module>streampipes-wrapper-standalone</module>
+ <module>asset-model-management</module>
</modules>
<profiles>
diff --git
a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterMasterManagement.java
b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterMasterManagement.java
index 79310b96b1..6b84e60847 100644
---
a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterMasterManagement.java
+++
b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterMasterManagement.java
@@ -116,72 +116,86 @@ public class AdapterMasterManagement {
}
/**
- * First the adapter is stopped removed, then the corresponding data source
is deleted
+ * This method deletes the adapter and the related resources inlcuding the
data stream, and the asset links in the
+ * asset model
*
* @param elementId The elementId of the adapter instance
- * @throws AdapterException when adapter can not be stopped
*/
- public void deleteAdapter(String elementId) throws AdapterException {
+ public void deleteAdapter(String elementId) {
- // Stop stream adapter
+ var adapterDescription = getAdapterDescription(elementId);
+
+ stopAdapterWithLogging(elementId);
+
+ deleteAdaterFromCouchDbAndFromLoggingService(elementId);
+
+ deleteCorrespondingDataStream(adapterDescription);
+ }
+
+ private void stopAdapterWithLogging(String elementId) {
+ LOG.info("Attempting to stop adapter: {}", elementId);
try {
- stopStreamAdapter(elementId);
+ stopAdapter(elementId);
+ LOG.info("Successfully stopped adapter with id: {}", elementId);
} catch (AdapterException e) {
- LOG.info("Could not stop adapter: " + elementId, e);
+ LOG.error("Failed to stop adapter with id: {}", elementId, e);
}
+ }
- AdapterDescription adapter =
adapterInstanceStorage.getElementById(elementId);
- // Delete adapter
+ private void deleteAdaterFromCouchDbAndFromLoggingService(String elementId) {
adapterResourceManager.delete(elementId);
ExtensionsLogProvider.INSTANCE.remove(elementId);
- LOG.info("Successfully deleted adapter: " + elementId);
+ LOG.info("Successfully deleted adapter in couchdb: {}", elementId);
+ }
- // Delete data stream
-
this.dataStreamResourceManager.delete(adapter.getCorrespondingDataStreamElementId());
- LOG.info("Successfully deleted data stream: " +
adapter.getCorrespondingDataStreamElementId());
+ private void deleteCorrespondingDataStream(AdapterDescription
adapterDescription) {
+ var correspondingDataStreamElementId =
adapterDescription.getCorrespondingDataStreamElementId();
+ dataStreamResourceManager.delete(correspondingDataStreamElementId);
+ LOG.info("Successfully deleted data stream in couchdb: {}",
correspondingDataStreamElementId);
}
public List<AdapterDescription> getAllAdapterInstances() {
return adapterInstanceStorage.findAll();
}
- public void stopStreamAdapter(String elementId) throws AdapterException {
- AdapterDescription ad = adapterInstanceStorage.getElementById(elementId);
- WorkerRestClient.stopStreamAdapter(ad.getSelectedEndpointUrl(), ad);
+ public void stopAdapter(String elementId) throws AdapterException {
+ var adapterDescription = getAdapterDescription(elementId);
+
+
WorkerRestClient.stopStreamAdapter(adapterDescription.getSelectedEndpointUrl(),
adapterDescription);
ExtensionsLogProvider.INSTANCE.reset(elementId);
// remove the adapter from the metrics manager so that
// no metrics for this adapter are exposed anymore
try {
- adapterMetrics.remove(ad.getElementId(), ad.getName());
+ adapterMetrics.remove(adapterDescription.getElementId(),
adapterDescription.getName());
} catch (NoSuchElementException e) {
- LOG.error("Could not remove adapter metrics for adapter {}",
ad.getName());
+ LOG.error("Could not remove adapter metrics for adapter {}",
adapterDescription.getName());
}
}
public void startStreamAdapter(String elementId) throws AdapterException {
- var ad = adapterInstanceStorage.getElementById(elementId);
+ var adapterDescription = getAdapterDescription(elementId);
try {
// Find endpoint to start adapter on
var baseUrl = new
ExtensionsServiceEndpointGenerator().getEndpointBaseUrl(
- ad.getAppId(),
+ adapterDescription.getAppId(),
SpServiceUrlProvider.ADAPTER,
- ad.getDeploymentConfiguration()
- .getDesiredServiceTags()
+ adapterDescription.getDeploymentConfiguration()
+ .getDesiredServiceTags()
);
// Update selected endpoint URL of adapter
- ad.setSelectedEndpointUrl(baseUrl);
- adapterInstanceStorage.updateElement(ad);
+ adapterDescription.setSelectedEndpointUrl(baseUrl);
+ adapterInstanceStorage.updateElement(adapterDescription);
// Invoke adapter instance
WorkerRestClient.invokeStreamAdapter(baseUrl, elementId);
// register the adapter at the metrics manager so that the
AdapterHealthCheck can send metrics
- adapterMetrics.register(ad.getElementId(), ad.getName());
+ adapterMetrics.register(adapterDescription.getElementId(),
adapterDescription.getName());
LOG.info("Started adapter " + elementId + " on: " + baseUrl);
} catch (NoServiceEndpointsAvailableException e) {
@@ -200,4 +214,8 @@ public class AdapterMasterManagement {
throw new AdapterException();
}
}
+
+ private AdapterDescription getAdapterDescription(String elementId) {
+ return adapterInstanceStorage.getElementById(elementId);
+ }
}
diff --git
a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterUpdateManagement.java
b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterUpdateManagement.java
index b64b661495..6d97b2dcc1 100644
---
a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterUpdateManagement.java
+++
b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterUpdateManagement.java
@@ -65,7 +65,7 @@ public class AdapterUpdateManagement {
boolean shouldRestart = ad.isRunning();
if (ad.isRunning()) {
- this.adapterMasterManagement.stopStreamAdapter(ad.getElementId());
+ this.adapterMasterManagement.stopAdapter(ad.getElementId());
}
// update data source in database
diff --git
a/streampipes-model/src/main/java/org/apache/streampipes/model/assets/SpAssetModel.java
b/streampipes-model/src/main/java/org/apache/streampipes/model/assets/SpAssetModel.java
index 0140eafdf5..8f493efff8 100644
---
a/streampipes-model/src/main/java/org/apache/streampipes/model/assets/SpAssetModel.java
+++
b/streampipes-model/src/main/java/org/apache/streampipes/model/assets/SpAssetModel.java
@@ -20,9 +20,11 @@ package org.apache.streampipes.model.assets;
import org.apache.streampipes.commons.constants.GenericDocTypes;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.gson.annotations.SerializedName;
+@JsonIgnoreProperties(ignoreUnknown = true)
public class SpAssetModel extends SpAsset {
public static final String APP_DOC_TYPE =
GenericDocTypes.DOC_ASSET_MANGEMENT;
@@ -30,6 +32,9 @@ public class SpAssetModel extends SpAsset {
@JsonProperty("_id")
private @SerializedName("_id") String id;
+ @JsonProperty("_rev")
+ private @SerializedName("_rev") String rev;
+
private boolean removable;
public SpAssetModel() {
@@ -48,6 +53,14 @@ public class SpAssetModel extends SpAsset {
return removable;
}
+ public String getRev() {
+ return rev;
+ }
+
+ public void setRev(String rev) {
+ this.rev = rev;
+ }
+
public void setRemovable(boolean removable) {
this.removable = removable;
}
diff --git a/streampipes-rest/pom.xml b/streampipes-rest/pom.xml
index 43f2d74594..2cacb72d0b 100644
--- a/streampipes-rest/pom.xml
+++ b/streampipes-rest/pom.xml
@@ -29,6 +29,11 @@
<dependencies>
<!-- StreamPipes dependencies -->
+ <dependency>
+ <groupId>org.apache.streampipes</groupId>
+ <artifactId>asset-model-management</artifactId>
+ <version>0.97.0-SNAPSHOT</version>
+ </dependency>
<dependency>
<groupId>org.apache.streampipes</groupId>
<artifactId>streampipes-commons</artifactId>
@@ -117,6 +122,7 @@
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
+
</dependencies>
<build>
diff --git
a/streampipes-rest/src/main/java/org/apache/streampipes/rest/ResetManagement.java
b/streampipes-rest/src/main/java/org/apache/streampipes/rest/ResetManagement.java
index 0c88e461b6..44fc94d6ed 100644
---
a/streampipes-rest/src/main/java/org/apache/streampipes/rest/ResetManagement.java
+++
b/streampipes-rest/src/main/java/org/apache/streampipes/rest/ResetManagement.java
@@ -19,7 +19,6 @@
package org.apache.streampipes.rest;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
-import org.apache.streampipes.commons.exceptions.connect.AdapterException;
import org.apache.streampipes.commons.prometheus.adapter.AdapterMetricsManager;
import
org.apache.streampipes.connect.management.management.AdapterMasterManagement;
import org.apache.streampipes.dataexplorer.management.DataExplorerDispatcher;
@@ -116,11 +115,7 @@ public class ResetManagement {
List<AdapterDescription> allAdapters =
adapterMasterManagement.getAllAdapterInstances();
allAdapters.forEach(adapterDescription -> {
- try {
-
adapterMasterManagement.deleteAdapter(adapterDescription.getElementId());
- } catch (AdapterException e) {
- logger.error("Failed to delete adapter with id: " +
adapterDescription.getElementId(), e);
- }
+ adapterMasterManagement.deleteAdapter(adapterDescription.getElementId());
});
}
diff --git
a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/AssetManagementResource.java
b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/AssetManagementResource.java
index f15531ec27..de1247e2fd 100644
---
a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/AssetManagementResource.java
+++
b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/AssetManagementResource.java
@@ -18,10 +18,11 @@
package org.apache.streampipes.rest.impl;
+import org.apache.streampipes.assetmodel.management.AssetModelManagement;
+import org.apache.streampipes.model.assets.SpAssetModel;
import
org.apache.streampipes.rest.core.base.impl.AbstractAuthGuardedRestResource;
import org.apache.streampipes.rest.security.AuthConstants;
import org.apache.streampipes.rest.shared.exception.SpMessageException;
-import org.apache.streampipes.storage.api.IGenericStorage;
import org.apache.streampipes.storage.management.StorageDispatcher;
import org.slf4j.Logger;
@@ -41,7 +42,7 @@ import org.springframework.web.bind.annotation.RestController;
import java.io.IOException;
import java.util.List;
-import java.util.Map;
+import java.util.NoSuchElementException;
@RestController
@RequestMapping("/api/v2/assets")
@@ -49,12 +50,18 @@ public class AssetManagementResource extends
AbstractAuthGuardedRestResource {
private static final Logger LOG =
LoggerFactory.getLogger(AssetManagementResource.class);
- private static final String APP_DOC_TYPE = "asset-management";
+ private final AssetModelManagement assetModelManagement;
+
+ public AssetManagementResource() {
+ var genericStorage = StorageDispatcher.INSTANCE.getNoSqlStore()
+ .getGenericStorage();
+ assetModelManagement = new AssetModelManagement(genericStorage);
+ }
@GetMapping(produces = MediaType.APPLICATION_JSON_VALUE)
@PreAuthorize(AuthConstants.HAS_READ_ASSETS_PRIVILEGE)
- public List<Map<String, Object>> getAll() throws IOException {
- return getGenericStorage().findAll(APP_DOC_TYPE);
+ public List<SpAssetModel> getAll() throws IOException {
+ return assetModelManagement.findAll();
}
@PostMapping(
@@ -62,10 +69,10 @@ public class AssetManagementResource extends
AbstractAuthGuardedRestResource {
consumes = MediaType.APPLICATION_JSON_VALUE
)
@PreAuthorize(AuthConstants.HAS_WRITE_ASSETS_PRIVILEGE)
- public ResponseEntity<?> create(@RequestBody String asset) {
+ public ResponseEntity<?> create(@RequestBody String assetModel) {
try {
- Map<String, Object> obj = getGenericStorage().create(asset);
- return ok(obj);
+ var updatedAssetModel = assetModelManagement.create(assetModel);
+ return ok(updatedAssetModel);
} catch (IOException e) {
LOG.error("Could not connect to storage", e);
return fail();
@@ -74,10 +81,13 @@ public class AssetManagementResource extends
AbstractAuthGuardedRestResource {
@GetMapping(path = "/{id}", produces = MediaType.APPLICATION_JSON_VALUE)
@PreAuthorize(AuthConstants.HAS_READ_ASSETS_PRIVILEGE)
- public ResponseEntity<Map<String, Object>> getCategory(@PathVariable("id")
String assetId) {
+ public ResponseEntity<SpAssetModel> getCategory(@PathVariable("id") String
assetId) {
try {
- Map<String, Object> obj = getGenericStorage().findOne(assetId);
- return ok(obj);
+ var assetModel = assetModelManagement.findOne(assetId);
+ return ok(assetModel);
+ } catch (NoSuchElementException e) {
+ LOG.error("Asset model not found", e);
+ throw new SpMessageException(HttpStatus.NOT_FOUND, e);
} catch (IOException e) {
LOG.error("Could not connect to storage", e);
throw new SpMessageException(HttpStatus.INTERNAL_SERVER_ERROR, e);
@@ -89,11 +99,13 @@ public class AssetManagementResource extends
AbstractAuthGuardedRestResource {
produces = MediaType.APPLICATION_JSON_VALUE,
consumes = MediaType.APPLICATION_JSON_VALUE)
@PreAuthorize(AuthConstants.HAS_WRITE_ASSETS_PRIVILEGE)
- public ResponseEntity<Map<String, Object>> update(@PathVariable("id") String
assetId,
- @RequestBody String asset)
{
+ public ResponseEntity<SpAssetModel> update(
+ @PathVariable("id") String assetId,
+ @RequestBody String assetModel
+ ) {
try {
- Map<String, Object> obj = getGenericStorage().update(assetId, asset);
- return ok(obj);
+ var updatedAssetModel = assetModelManagement.update(assetId, assetModel);
+ return ok(updatedAssetModel);
} catch (IOException e) {
LOG.error("Could not connect to storage", e);
throw new SpMessageException(HttpStatus.INTERNAL_SERVER_ERROR, e);
@@ -102,10 +114,12 @@ public class AssetManagementResource extends
AbstractAuthGuardedRestResource {
@DeleteMapping(path = "/{id}/{rev}", produces =
MediaType.APPLICATION_JSON_VALUE)
@PreAuthorize(AuthConstants.HAS_WRITE_ASSETS_PRIVILEGE)
- public ResponseEntity<Void> delete(@PathVariable("id") String assetId,
- @PathVariable("rev") String rev) {
+ public ResponseEntity<Void> delete(
+ @PathVariable("id") String assetId,
+ @PathVariable("rev") String rev
+ ) {
try {
- getGenericStorage().delete(assetId, rev);
+ assetModelManagement.delete(assetId, rev);
return ok();
} catch (IOException e) {
LOG.error("Could not connect to storage", e);
@@ -113,8 +127,4 @@ public class AssetManagementResource extends
AbstractAuthGuardedRestResource {
}
}
- private IGenericStorage getGenericStorage() {
- return StorageDispatcher.INSTANCE.getNoSqlStore().getGenericStorage();
- }
-
}
diff --git
a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/AdapterResource.java
b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/AdapterResource.java
index b0a7b2c8e6..d02b722446 100644
---
a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/AdapterResource.java
+++
b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/AdapterResource.java
@@ -188,7 +188,7 @@ public class AdapterResource extends
AbstractAdapterResource<AdapterMasterManage
@PreAuthorize("this.hasWriteAuthority() and hasPermission('#elementId',
'WRITE')")
public ResponseEntity<?> stopAdapter(@PathVariable("id") String elementId) {
try {
- managementService.stopStreamAdapter(elementId);
+ managementService.stopAdapter(elementId);
return ok(Notifications.success("Adapter started"));
} catch (AdapterException e) {
LOG.error("Could not stop adapter with id {}", elementId, e);
@@ -220,13 +220,8 @@ public class AdapterResource extends
AbstractAdapterResource<AdapterMasterManage
.getPipelineStorageAPI();
if (pipelinesUsingAdapter.isEmpty()) {
- try {
- managementService.deleteAdapter(elementId);
- return ok(Notifications.success("Adapter with id: " + elementId + " is
deleted."));
- } catch (AdapterException e) {
- LOG.error("Error while deleting adapter with id {}", elementId, e);
- return ok(Notifications.error(e.getMessage()));
- }
+ managementService.deleteAdapter(elementId);
+ return ok(Notifications.success("Adapter with id: " + elementId + " is
deleted."));
} else if (!deleteAssociatedPipelines) {
List<String> namesOfPipelinesUsingAdapter = pipelinesUsingAdapter
.stream()
diff --git
a/ui/cypress/tests/assetManagement/deleteAssetLinks/deleteAdapterWithAssetLink.spec.ts
b/ui/cypress/tests/assetManagement/deleteAssetLinks/deleteAdapterWithAssetLink.spec.ts
new file mode 100644
index 0000000000..0cf0f2f4f7
--- /dev/null
+++
b/ui/cypress/tests/assetManagement/deleteAssetLinks/deleteAdapterWithAssetLink.spec.ts
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+import { AssetBtns } from '../../../support/utils/asset/AssetBtns';
+import { AssetUtils } from '../../../support/utils/asset/AssetUtils';
+import { ConnectUtils } from '../../../support/utils/connect/ConnectUtils';
+
+describe('Delete adapter and auto remove asset links of adapter and data
stream', () => {
+ beforeEach('Setup Test', () => {
+ cy.initStreamPipesTest();
+ });
+
+ it('Perform Test', () => {
+ const assetName = 'TestAsset';
+
+ AssetUtils.addAssetWithOneAdapter(assetName);
+
+ ConnectUtils.deleteAdapter();
+
+ // Check that asset is still there and asset links of adapter and
+ // data stream are removed
+ AssetUtils.goToAssets();
+ AssetUtils.checkAmountOfAssets(1);
+ AssetUtils.editAsset(assetName);
+ AssetBtns.assetLinksTab().click();
+ AssetUtils.checkAmountOfLinkedResources(0);
+ });
+});
diff --git
a/ui/src/app/connect/dialog/delete-adapter-dialog/delete-adapter-dialog.component.ts
b/ui/src/app/connect/dialog/delete-adapter-dialog/delete-adapter-dialog.component.ts
index 5ed5b4ed62..b9b8d7eab5 100644
---
a/ui/src/app/connect/dialog/delete-adapter-dialog/delete-adapter-dialog.component.ts
+++
b/ui/src/app/connect/dialog/delete-adapter-dialog/delete-adapter-dialog.component.ts
@@ -41,7 +41,7 @@ export class DeleteAdapterDialogComponent {
constructor(
private dialogRef: DialogRef<DeleteAdapterDialogComponent>,
- private dataMarketplaceService: AdapterService,
+ private adapterService: AdapterService,
) {}
close(refreshAdapters: boolean) {
@@ -53,13 +53,13 @@ export class DeleteAdapterDialogComponent {
this.currentStatus = 'Deleting adapter...';
this.deleteAssociatedPipelines = deleteAssociatedPipelines;
- this.dataMarketplaceService
+ this.adapterService
.deleteAdapter(this.adapter, deleteAssociatedPipelines)
- .subscribe(
- data => {
+ .subscribe({
+ next: () => {
this.close(true);
},
- error => {
+ error: error => {
if (error.status === 409) {
if (deleteAssociatedPipelines) {
this.namesOfPipelinesNotOwnedByUser = error.error;
@@ -70,6 +70,6 @@ export class DeleteAdapterDialogComponent {
this.isInProgress = false;
}
},
- );
+ });
}
}