This is an automated email from the ASF dual-hosted git repository.
zehnder pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/streampipes.git
The following commit(s) were added to refs/heads/dev by this push:
new 30272f2819 3862 deleting resource delete asset link (#3874)
30272f2819 is described below
commit 30272f2819460c8183ae32e12e1e08f43f8bc57b
Author: Jacqueline Höllig <[email protected]>
AuthorDate: Mon Oct 27 17:21:29 2025 +0100
3862 deleting resource delete asset link (#3874)
---
.../manager/pipeline/PipelineManager.java | 29 +++---
.../DataExplorerWidgetResourceManager.java | 2 +-
.../rest/impl/AssetManagementResource.java | 14 +--
.../rest/impl/connect/AdapterResource.java | 107 ++++++++++-----------
.../streampipes/storage/api/CRUDStorage.java | 1 +
.../streampipes/storage/api/IGenericStorage.java | 2 +
.../storage/couchdb/impl/DefaultCrudStorage.java | 13 ++-
.../storage/couchdb/impl/GenericStorageImpl.java | 39 ++++++++
8 files changed, 122 insertions(+), 85 deletions(-)
diff --git
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/pipeline/PipelineManager.java
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/pipeline/PipelineManager.java
index ca477d73db..e4964621e0 100644
---
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/pipeline/PipelineManager.java
+++
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/pipeline/PipelineManager.java
@@ -67,7 +67,7 @@ public class PipelineManager {
* @return pipelineId of the stored pipeline
*/
public static String addPipeline(String principalSid,
- Pipeline pipeline) {
+ Pipeline pipeline) {
String pipelineId = Objects.isNull(pipeline.getPipelineId())
? UUIDGenerator.generateUuid()
@@ -81,7 +81,6 @@ public class PipelineManager {
return pipelineId;
}
-
/**
* Starts all processing elements of the pipeline with the pipelineId
*
@@ -94,15 +93,16 @@ public class PipelineManager {
}
/**
- * Stops all processing elements of the pipeline
+ * Stops all processing elements of the pipeline
*
* @param pipelineId of pipeline to be stopped
- * @param forceStop when it is true, the pipeline is stopped, even if not
all processing element
+ * @param forceStop when it is true, the pipeline is stopped, even if not
all
+ * processing element
* containers could be reached
* @return pipeline status of the start operation
*/
public static PipelineOperationStatus stopPipeline(String pipelineId,
- boolean forceStop) {
+ boolean forceStop) {
Pipeline pipeline = getPipeline(pipelineId);
return new PipelineExecutor(pipeline).stopPipeline(forceStop);
@@ -123,8 +123,7 @@ public class PipelineManager {
public static List<PipelineOperationStatus> stopAllPipelines(boolean
forceStop) {
List<PipelineOperationStatus> status = new ArrayList<>();
- List<Pipeline> pipelines =
-
StorageDispatcher.INSTANCE.getNoSqlStore().getPipelineStorageAPI().findAll();
+ List<Pipeline> pipelines =
StorageDispatcher.INSTANCE.getNoSqlStore().getPipelineStorageAPI().findAll();
pipelines.forEach(p -> {
if (p.isRunning()) {
@@ -134,7 +133,6 @@ public class PipelineManager {
return status;
}
-
/**
* Checks for the pipelines that contain the processing element
*
@@ -143,9 +141,8 @@ public class PipelineManager {
*/
public static List<Pipeline> getPipelinesContainingElements(String
elementId) {
return PipelineManager.getAllPipelines().stream()
- .filter(pipeline ->
- mergePipelineElement(pipeline)
- .anyMatch(el -> el.getElementId().equals(elementId)))
+ .filter(pipeline -> mergePipelineElement(pipeline)
+ .anyMatch(el -> el.getElementId().equals(elementId)))
.collect(Collectors.toList());
}
@@ -153,15 +150,13 @@ public class PipelineManager {
return Stream.concat(
Stream.concat(
pipeline.getStreams().stream(),
- pipeline.getSepas().stream()
- ),
- pipeline.getActions().stream()
- );
+ pipeline.getSepas().stream()),
+ pipeline.getActions().stream());
}
private static void preparePipelineBasics(String username,
- Pipeline pipeline,
- String pipelineId) {
+ Pipeline pipeline,
+ String pipelineId) {
pipeline.setPipelineId(pipelineId);
pipeline.setRunning(false);
pipeline.setCreatedByUser(username);
diff --git
a/streampipes-resource-management/src/main/java/org/apache/streampipes/resource/management/DataExplorerWidgetResourceManager.java
b/streampipes-resource-management/src/main/java/org/apache/streampipes/resource/management/DataExplorerWidgetResourceManager.java
index 9ec75d026e..645391a11f 100644
---
a/streampipes-resource-management/src/main/java/org/apache/streampipes/resource/management/DataExplorerWidgetResourceManager.java
+++
b/streampipes-resource-management/src/main/java/org/apache/streampipes/resource/management/DataExplorerWidgetResourceManager.java
@@ -26,7 +26,7 @@ public class DataExplorerWidgetResourceManager extends
AbstractCRUDResourceManag
private final DataExplorerResourceManager dashboardManager;
public DataExplorerWidgetResourceManager(DataExplorerResourceManager
dashboardManager,
-
CRUDStorage<DataExplorerWidgetModel> db) {
+ CRUDStorage<DataExplorerWidgetModel> db) {
super(db, DataExplorerWidgetModel.class);
this.dashboardManager = dashboardManager;
}
diff --git
a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/AssetManagementResource.java
b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/AssetManagementResource.java
index f15531ec27..e4599211d3 100644
---
a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/AssetManagementResource.java
+++
b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/AssetManagementResource.java
@@ -57,10 +57,7 @@ public class AssetManagementResource extends
AbstractAuthGuardedRestResource {
return getGenericStorage().findAll(APP_DOC_TYPE);
}
- @PostMapping(
- produces = MediaType.APPLICATION_JSON_VALUE,
- consumes = MediaType.APPLICATION_JSON_VALUE
- )
+ @PostMapping(produces = MediaType.APPLICATION_JSON_VALUE, consumes =
MediaType.APPLICATION_JSON_VALUE)
@PreAuthorize(AuthConstants.HAS_WRITE_ASSETS_PRIVILEGE)
public ResponseEntity<?> create(@RequestBody String asset) {
try {
@@ -84,13 +81,10 @@ public class AssetManagementResource extends
AbstractAuthGuardedRestResource {
}
}
- @PutMapping(
- path = "/{id}",
- produces = MediaType.APPLICATION_JSON_VALUE,
- consumes = MediaType.APPLICATION_JSON_VALUE)
+ @PutMapping(path = "/{id}", produces = MediaType.APPLICATION_JSON_VALUE,
consumes = MediaType.APPLICATION_JSON_VALUE)
@PreAuthorize(AuthConstants.HAS_WRITE_ASSETS_PRIVILEGE)
public ResponseEntity<Map<String, Object>> update(@PathVariable("id") String
assetId,
- @RequestBody String asset)
{
+ @RequestBody String asset) {
try {
Map<String, Object> obj = getGenericStorage().update(assetId, asset);
return ok(obj);
@@ -103,7 +97,7 @@ public class AssetManagementResource extends
AbstractAuthGuardedRestResource {
@DeleteMapping(path = "/{id}/{rev}", produces =
MediaType.APPLICATION_JSON_VALUE)
@PreAuthorize(AuthConstants.HAS_WRITE_ASSETS_PRIVILEGE)
public ResponseEntity<Void> delete(@PathVariable("id") String assetId,
- @PathVariable("rev") String rev) {
+ @PathVariable("rev") String rev) {
try {
getGenericStorage().delete(assetId, rev);
return ok();
diff --git
a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/AdapterResource.java
b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/AdapterResource.java
index cf046735b1..6765b9ce5a 100644
---
a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/AdapterResource.java
+++
b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/AdapterResource.java
@@ -72,11 +72,10 @@ public class AdapterResource extends
AbstractAdapterResource<AdapterMasterManage
public AdapterResource() {
super(() -> new AdapterMasterManagement(
StorageDispatcher.INSTANCE.getNoSqlStore()
- .getAdapterInstanceStorage(),
+ .getAdapterInstanceStorage(),
new SpResourceManager().manageAdapters(),
new SpResourceManager().manageDataStreams(),
- AdapterMetricsManager.INSTANCE.getAdapterMetrics()
- ));
+ AdapterMetricsManager.INSTANCE.getAdapterMetrics()));
}
@PostMapping(consumes = MediaType.APPLICATION_JSON_VALUE)
@@ -99,7 +98,7 @@ public class AdapterResource extends
AbstractAdapterResource<AdapterMasterManage
return ok(Notifications.success(adapterId));
}
- @PostMapping(path = "compact", consumes =
{MediaType.APPLICATION_JSON_VALUE}, produces = {
+ @PostMapping(path = "compact", consumes = { MediaType.APPLICATION_JSON_VALUE
}, produces = {
MediaType.APPLICATION_JSON_VALUE,
SpMediaType.YAML,
SpMediaType.YML
@@ -124,36 +123,32 @@ public class AdapterResource extends
AbstractAdapterResource<AdapterMasterManage
return ok(Notifications.success(adapterDescription.getElementId()));
}
- @PutMapping(path = "pipeline-migration-preflight", consumes =
MediaType.APPLICATION_JSON_VALUE,
- produces = MediaType.APPLICATION_JSON_VALUE)
+ @PutMapping(path = "pipeline-migration-preflight", consumes =
MediaType.APPLICATION_JSON_VALUE, produces = MediaType.APPLICATION_JSON_VALUE)
@PreAuthorize(AuthConstants.HAS_WRITE_ADAPTER_PRIVILEGE)
public ResponseEntity<List<PipelineUpdateInfo>>
performPipelineMigrationPreflight(
- @RequestBody AdapterDescription adapterDescription
- ) {
+ @RequestBody AdapterDescription adapterDescription) {
var updateManager = new AdapterUpdateManagement(managementService);
var migrations = updateManager.checkPipelineMigrations(adapterDescription);
return ok(migrations);
}
- @GetMapping(path = "/{id}", produces = {MediaType.APPLICATION_JSON_VALUE,
SpMediaType.YAML, SpMediaType.YML})
+ @GetMapping(path = "/{id}", produces = { MediaType.APPLICATION_JSON_VALUE,
SpMediaType.YAML, SpMediaType.YML })
@PreAuthorize("this.hasReadAuthority()")
public ResponseEntity<?> getAdapter(
@PathVariable("id") String elementId,
- @RequestParam(value = "output",
- defaultValue = "full",
- required = false) String outputMode
- ) {
+ @RequestParam(value = "output", defaultValue = "full", required = false)
String outputMode) {
try {
var adapterDescription = getAdapterDescription(elementId);
- // This check is done here because the adapter permission is checked
based on the corresponding data stream
+ // This check is done here because the adapter permission is checked
based on
+ // the corresponding data stream
// and not based on the element id
if (!checkAdapterReadPermission(adapterDescription)) {
LOG.error("User is not allowed to read adapter {}", elementId);
return ResponseEntity.status(HttpStatus.SC_UNAUTHORIZED)
- .build();
+ .build();
}
if (outputMode.equalsIgnoreCase("compact")) {
@@ -176,18 +171,17 @@ public class AdapterResource extends
AbstractAdapterResource<AdapterMasterManage
private boolean checkAdapterReadPermission(AdapterDescription
adapterDescription) {
var spPermissionEvaluator = new SpPermissionEvaluator();
var authentication = SecurityContextHolder.getContext()
- .getAuthentication();
+ .getAuthentication();
return spPermissionEvaluator.hasPermission(
authentication,
adapterDescription.getCorrespondingDataStreamElementId(),
- "READ"
- );
+ "READ");
}
@PostMapping(path = "/{id}/stop", produces =
MediaType.APPLICATION_JSON_VALUE)
@PreAuthorize("this.hasWriteAuthority() and hasPermission('#elementId',
'WRITE')")
public ResponseEntity<?> stopAdapter(@PathVariable("id") String elementId,
- @RequestParam(value = "forceStop",
defaultValue = "false") boolean forceStop) {
+ @RequestParam(value = "forceStop", defaultValue = "false") boolean
forceStop) {
try {
managementService.stopStreamAdapter(elementId, forceStop);
return ok(Notifications.success("Adapter stopped"));
@@ -213,17 +207,16 @@ public class AdapterResource extends
AbstractAdapterResource<AdapterMasterManage
@PreAuthorize("this.hasWriteAuthority() and hasPermission('#elementId',
'WRITE')")
public ResponseEntity<?> deleteAdapter(
@PathVariable("id") String elementId,
- @RequestParam(value = "deleteAssociatedPipelines", defaultValue =
"false")
- boolean deleteAssociatedPipelines
- ) {
+ @RequestParam(value = "deleteAssociatedPipelines", defaultValue =
"false") boolean deleteAssociatedPipelines) {
List<String> pipelinesUsingAdapter = getPipelinesUsingAdapter(elementId);
IPipelineStorage pipelineStorageAPI =
StorageDispatcher.INSTANCE.getNoSqlStore()
-
.getPipelineStorageAPI();
+ .getPipelineStorageAPI();
if (pipelinesUsingAdapter.isEmpty()) {
try {
managementService.deleteAdapter(elementId);
- return ok(Notifications.success("Adapter with id: " + elementId + " is
deleted."));
+
+ return ok(Notifications.success("Adapter with id: " + elementId + " is
dexleted."));
} catch (AdapterException e) {
LOG.error("Error while deleting adapter with id {}", elementId, e);
return ok(Notifications.error(e.getMessage()));
@@ -232,40 +225,40 @@ public class AdapterResource extends
AbstractAdapterResource<AdapterMasterManage
List<String> namesOfPipelinesUsingAdapter = pipelinesUsingAdapter
.stream()
.map(pipelineId -> pipelineStorageAPI.getElementById(
- pipelineId)
- .getName())
+ pipelineId)
+ .getName())
.collect(Collectors.toList());
return ResponseEntity.status(HttpStatus.SC_CONFLICT)
- .body(String.join(", ",
namesOfPipelinesUsingAdapter));
+ .body(String.join(", ", namesOfPipelinesUsingAdapter));
} else {
PermissionResourceManager permissionResourceManager = new
PermissionResourceManager();
- // find out the names of pipelines that have an owner and the owner is
not the current user
+ // find out the names of pipelines that have an owner and the owner is
not the
+ // current user
List<String> namesOfPipelinesNotOwnedByUser = pipelinesUsingAdapter
.stream()
- .filter(pipelineId ->
- !permissionResourceManager.findForObjectId(
- pipelineId)
- .stream()
- .findFirst()
- .map(
- Permission::getOwnerSid)
- // if a pipeline has no owner,
pretend the owner
- // is the user so the user can
delete it
- .orElse(
-
this.getAuthenticatedUserSid())
- .equals(
-
this.getAuthenticatedUserSid()))
+ .filter(pipelineId -> !permissionResourceManager.findForObjectId(
+ pipelineId)
+ .stream()
+ .findFirst()
+ .map(
+ Permission::getOwnerSid)
+ // if a pipeline has no owner, pretend the owner
+ // is the user so the user can delete it
+ .orElse(
+ this.getAuthenticatedUserSid())
+ .equals(
+ this.getAuthenticatedUserSid()))
.map(pipelineId -> pipelineStorageAPI.getElementById(
- pipelineId)
- .getName())
+ pipelineId)
+ .getName())
.collect(Collectors.toList());
boolean isAdmin = SecurityContextHolder.getContext()
- .getAuthentication()
- .getAuthorities()
- .stream()
- .anyMatch(r -> r.getAuthority()
- .equals(
-
DefaultRole.ROLE_ADMIN.name()));
+ .getAuthentication()
+ .getAuthorities()
+ .stream()
+ .anyMatch(r -> r.getAuthority()
+ .equals(
+ DefaultRole.ROLE_ADMIN.name()));
// if the user is admin or owns all pipelines using this adapter,
// the user can delete all associated pipelines and this adapter
if (isAdmin || namesOfPipelinesNotOwnedByUser.isEmpty()) {
@@ -275,19 +268,21 @@ public class AdapterResource extends
AbstractAdapterResource<AdapterMasterManage
PipelineManager.deletePipeline(pipelineId);
}
managementService.deleteAdapter(elementId);
+
return ok(Notifications.success("Adapter with id: " + elementId
- + " and all pipelines using the
adapter are deleted."));
+ + " and all pipelines using the adapter are deleted."));
} catch (Exception e) {
LOG.error(
"Error while deleting adapter with id "
- + elementId + " and all pipelines using the adapter", e
- );
+ + elementId + " and all pipelines using the adapter",
+ e);
return ok(Notifications.error(e.getMessage()));
}
} else {
- // otherwise, hint the user the names of pipelines using the adapter
but not owned by the user
+ // otherwise, hint the user the names of pipelines using the adapter
but not
+ // owned by the user
return ResponseEntity.status(HttpStatus.SC_CONFLICT)
- .body(String.join(", ",
namesOfPipelinesNotOwnedByUser));
+ .body(String.join(", ", namesOfPipelinesNotOwnedByUser));
}
}
}
@@ -309,8 +304,8 @@ public class AdapterResource extends
AbstractAdapterResource<AdapterMasterManage
private List<String> getPipelinesUsingAdapter(String adapterId) {
return StorageDispatcher.INSTANCE.getNoSqlStore()
- .getPipelineStorageAPI()
- .getPipelinesUsingAdapter(adapterId);
+ .getPipelineStorageAPI()
+ .getPipelinesUsingAdapter(adapterId);
}
}
diff --git
a/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/CRUDStorage.java
b/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/CRUDStorage.java
index ff16df6dc8..96c941fb36 100644
---
a/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/CRUDStorage.java
+++
b/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/CRUDStorage.java
@@ -34,6 +34,7 @@ public interface CRUDStorage<T> {
void deleteElement(T element);
default void deleteElementById(String id) {
+
var element = getElementById(id);
if (element != null) {
deleteElement(element);
diff --git
a/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/IGenericStorage.java
b/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/IGenericStorage.java
index d15028669f..494f09d02d 100644
---
a/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/IGenericStorage.java
+++
b/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/IGenericStorage.java
@@ -46,4 +46,6 @@ public interface IGenericStorage {
throws IOException;
GenericStorageAttachment findAttachment(String docId, String attachmentName)
throws IOException;
+
+ void deleteAssetLinkToResource(String id) throws IOException;
}
diff --git
a/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/DefaultCrudStorage.java
b/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/DefaultCrudStorage.java
index 84c6b80dc8..bb4d06352b 100644
---
a/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/DefaultCrudStorage.java
+++
b/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/DefaultCrudStorage.java
@@ -20,16 +20,22 @@ package org.apache.streampipes.storage.couchdb.impl;
import org.apache.streampipes.model.shared.api.Storable;
import org.apache.streampipes.storage.api.CRUDStorage;
+import org.apache.streampipes.storage.couchdb.CouchDbStorageManager;
import org.apache.streampipes.storage.couchdb.dao.AbstractDao;
import org.lightcouch.CouchDbClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.IOException;
import java.util.function.Supplier;
public class DefaultCrudStorage<T extends Storable> extends AbstractDao<T>
implements CRUDStorage<T> {
+ private static final Logger LOG =
LoggerFactory.getLogger(DefaultCrudStorage.class);
+
public DefaultCrudStorage(Supplier<CouchDbClient> couchDbClientSupplier,
- Class<T> clazz) {
+ Class<T> clazz) {
super(couchDbClientSupplier, clazz);
}
@@ -46,6 +52,11 @@ public class DefaultCrudStorage<T extends Storable> extends
AbstractDao<T> imple
@Override
public void deleteElement(T element) {
+ try {
+
CouchDbStorageManager.INSTANCE.getGenericStorage().deleteAssetLinkToResource(element.getElementId());
+ } catch (IOException e) {
+ LOG.error("Asset link for " + element.getElementId() + " could not be
deleted.");
+ }
delete(element.getElementId());
}
}
diff --git
a/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/GenericStorageImpl.java
b/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/GenericStorageImpl.java
index 2390f4fa8d..bc22e227de 100644
---
a/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/GenericStorageImpl.java
+++
b/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/GenericStorageImpl.java
@@ -157,6 +157,8 @@ public class GenericStorageImpl implements IGenericStorage {
return new GenericStorageAttachment(content.getType().getMimeType(),
content.asBytes());
}
+
+
private Map<String, Object> queryDocuments(String route) throws IOException {
Request req = Utils.getRequest(route);
Content content = executeAndReturnContent(req);
@@ -175,5 +177,42 @@ public class GenericStorageImpl implements IGenericStorage
{
private String getDatabaseRoute() {
return Utils.getDatabaseRoute(GenericCouchDbConstants.DB_NAME);
+ }
+
+ @Override
+ public void deleteAssetLinkToResource(String id) throws IOException {
+
+ var assets = this.findAll("asset-management");
+ for (Map<String, Object> asset :assets){
+
+ deleteAssetLinks(asset, id);
+ String docId = (String) asset.get("_id");
+ String assetJson = mapper.writeValueAsString(asset);
+ this.update(docId,assetJson);
+ }
+ }
+
+ private void deleteAssetLinks(Map<String, Object> asset, String
resourceIdToDelete){
+
+ if (asset.containsKey("assetLinks")) {
+ List<Map<String, Object>> assetLinks = (List<Map<String, Object>>)
asset.get("assetLinks");
+
+ for (int i = 0; i < assetLinks.size(); i++) {
+ Map<String, Object> link = assetLinks.get(i);
+ if (link.containsKey("resourceId") &&
link.get("resourceId").equals(resourceIdToDelete)) {
+ assetLinks.remove(i);
+ i--;
+ }
+ }
+ }
+ if (asset.containsKey("assets")) {
+ List<Map<String, Object>> nestedAssets = (List<Map<String, Object>>)
asset.get("assets");
+ for (Map<String, Object> nestedAsset : nestedAssets) {
+ deleteAssetLinks(nestedAsset, resourceIdToDelete);
+ }
+ }
+
+
+
}
}