This is an automated email from the ASF dual-hosted git repository. riemer pushed a commit to branch fix-connect-access-rights in repository https://gitbox.apache.org/repos/asf/streampipes.git
commit 4a38ab47cc716190a01f4bd6e7effe86168753ac Author: Dominik Riemer <[email protected]> AuthorDate: Mon Nov 3 13:09:41 2025 +0100 fix: Access rights for adapter and pipeline endpoints --- .../rest/impl/AdapterMonitoringResource.java | 62 ++++++- .../streampipes/rest/impl/PipelineMonitoring.java | 4 +- .../streampipes/rest/impl/PipelineResource.java | 10 +- .../rest/impl/connect/AdapterResource.java | 192 ++++++++++++--------- .../rest/impl/connect/CompactAdapterResource.java | 2 +- 5 files changed, 168 insertions(+), 102 deletions(-) diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/AdapterMonitoringResource.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/AdapterMonitoringResource.java index 4424ffe7af..dfd3edddf0 100644 --- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/AdapterMonitoringResource.java +++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/AdapterMonitoringResource.java @@ -21,13 +21,18 @@ package org.apache.streampipes.rest.impl; import org.apache.streampipes.loadbalance.pipeline.ExtensionsLogProvider; import org.apache.streampipes.loadbalance.pipeline.ExtensionsServiceLogExecutor; +import org.apache.streampipes.model.base.NamedStreamPipesEntity; import org.apache.streampipes.model.client.user.DefaultPrivilege; -import org.apache.streampipes.model.monitoring.SpLogEntry; +import org.apache.streampipes.model.connect.adapter.AdapterDescription; import org.apache.streampipes.model.monitoring.SpMetricsEntry; +import org.apache.streampipes.rest.security.SpPermissionEvaluator; +import org.apache.streampipes.storage.api.IAdapterStorage; +import org.apache.streampipes.storage.management.StorageDispatcher; import org.springframework.http.MediaType; import org.springframework.http.ResponseEntity; import org.springframework.security.access.prepost.PreAuthorize; +import org.springframework.security.core.context.SecurityContextHolder; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; @@ -41,38 +46,59 @@ import java.util.Map; @RequestMapping("/api/v2/adapter-monitoring") public class AdapterMonitoringResource extends AbstractMonitoringResource { + private final IAdapterStorage adapterStorage; + + public AdapterMonitoringResource() { + this.adapterStorage = StorageDispatcher.INSTANCE.getNoSqlStore().getAdapterInstanceStorage(); + } + @GetMapping( path = "adapter/{elementId}/logs", produces = MediaType.APPLICATION_JSON_VALUE ) - @PreAuthorize("this.hasReadAuthority() and hasPermission('#elementId', 'READ')") - public ResponseEntity<List<SpLogEntry>> getLogInfoForAdapter( + @PreAuthorize("this.hasReadAuthority()") + public ResponseEntity<?> getLogInfoForAdapter( @PathVariable("elementId") String elementId ) { - return ok(ExtensionsLogProvider.INSTANCE.getLogInfosForResource(elementId)); + var adapterDescription = getAdapter(elementId); + if (checkAdapterPermission(adapterDescription, "READ")) { + return ok(ExtensionsLogProvider.INSTANCE.getLogInfosForResource(elementId)); + } else { + return unauthorized(); + } } @GetMapping( path = "adapter/{elementId}/metrics", produces = MediaType.APPLICATION_JSON_VALUE ) - @PreAuthorize("this.hasReadAuthority() and hasPermission('#elementId', 'READ')") - public ResponseEntity<SpMetricsEntry> getMetricsInfoForAdapter( + @PreAuthorize("this.hasReadAuthority()") + public ResponseEntity<?> getMetricsInfoForAdapter( @PathVariable("elementId") String elementId ) { - return ok(ExtensionsLogProvider.INSTANCE.getMetricInfosForResource(elementId)); + var adapterDescription = getAdapter(elementId); + if (checkAdapterPermission(adapterDescription, "READ")) { + return ok(ExtensionsLogProvider.INSTANCE.getMetricInfosForResource(elementId)); + } else { + return unauthorized(); + } } @GetMapping( path = "metrics", produces = MediaType.APPLICATION_JSON_VALUE ) - @PreAuthorize("this.hasReadAuthority() and hasPermission('#elementId', 'READ')") + @PreAuthorize("this.hasReadAuthority()") public ResponseEntity<Map<String, SpMetricsEntry>> getMetricsInfos( @RequestParam(value = "filter") List<String> elementIds ) { new ExtensionsServiceLogExecutor().triggerUpdate(); - return ok(ExtensionsLogProvider.INSTANCE.getMetricsInfoForResources(elementIds)); + var filteredElementIds = elementIds.stream() + .map(adapterStorage::getElementById) + .filter(a -> checkAdapterPermission(a, "READ")) + .map(NamedStreamPipesEntity::getElementId) + .toList(); + return ok(ExtensionsLogProvider.INSTANCE.getMetricsInfoForResources(filteredElementIds)); } /** @@ -88,4 +114,22 @@ public class AdapterMonitoringResource extends AbstractMonitoringResource { public boolean hasWriteAuthority() { return isAdminOrHasAnyAuthority(DefaultPrivilege.Constants.PRIVILEGE_WRITE_ADAPTER_VALUE); } + + public AdapterDescription getAdapter(String elementId) { + return adapterStorage.getElementById(elementId); + } + + /** + * Checks if the current user has the permission to read the adapter + */ + private boolean checkAdapterPermission(AdapterDescription adapterDescription, + String permission) { + var spPermissionEvaluator = new SpPermissionEvaluator(); + var authentication = SecurityContextHolder.getContext() + .getAuthentication(); + return spPermissionEvaluator.hasPermission( + authentication, + adapterDescription.getCorrespondingDataStreamElementId(), + permission); + } } diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/PipelineMonitoring.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/PipelineMonitoring.java index e17eedb05f..c0d1b986a2 100644 --- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/PipelineMonitoring.java +++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/PipelineMonitoring.java @@ -43,7 +43,7 @@ public class PipelineMonitoring extends AbstractMonitoringResource { value = "/pipeline/{pipelineId}/logs", produces = MediaType.APPLICATION_JSON_VALUE ) - @PreAuthorize("this.hasReadAuthority() and hasPermission('#pipelineId', 'READ')") + @PreAuthorize("this.hasReadAuthority() and hasPermission(#pipelineId, 'READ')") public ResponseEntity<Map<String, List<SpLogEntry>>> getLogInfoForPipeline( @PathVariable("pipelineId") String pipelineId ) { @@ -54,7 +54,7 @@ public class PipelineMonitoring extends AbstractMonitoringResource { value = "/pipeline/{pipelineId}/metrics", produces = MediaType.APPLICATION_JSON_VALUE ) - @PreAuthorize("this.hasReadAuthority() and hasPermission('#pipelineId', 'READ')") + @PreAuthorize("this.hasReadAuthority() and hasPermission(#pipelineId, 'READ')") public ResponseEntity<Map<String, SpMetricsEntry>> getMetricsInfoForPipeline( @PathVariable("pipelineId") String pipelineId, @RequestParam(value = "forceUpdate", required = false, defaultValue = "false") boolean forceUpdate diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/PipelineResource.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/PipelineResource.java index 6d13dd6b25..5b19d23166 100644 --- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/PipelineResource.java +++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/PipelineResource.java @@ -114,7 +114,7 @@ public class PipelineResource extends AbstractAuthGuardedRestResource { path = "/{pipelineId}", produces = MediaType.APPLICATION_JSON_VALUE) @Operation(summary = "Delete a pipeline with a given id", tags = {"Pipeline"}) - @PreAuthorize("this.hasWriteAuthority() and hasPermission('#pipelineId', 'WRITE')") + @PreAuthorize("this.hasWriteAuthority() and hasPermission(#pipelineId, 'WRITE')") public Message delete(@PathVariable("pipelineId") String pipelineId) { PipelineManager.deletePipeline(pipelineId); return Notifications.success("Pipeline deleted"); @@ -122,7 +122,7 @@ public class PipelineResource extends AbstractAuthGuardedRestResource { @GetMapping(path = "/{pipelineId}", produces = MediaType.APPLICATION_JSON_VALUE) @Operation(summary = "Get a specific pipeline with the given id", tags = {"Pipeline"}) - @PreAuthorize("this.hasReadAuthority() and hasPermission('#pipelineId', 'READ')") + @PreAuthorize("this.hasReadAuthority() and hasPermission(#pipelineId, 'READ')") public ResponseEntity<Pipeline> getElement(@PathVariable("pipelineId") String pipelineId) { Pipeline foundPipeline = PipelineManager.getPipeline(pipelineId); @@ -135,7 +135,7 @@ public class PipelineResource extends AbstractAuthGuardedRestResource { @GetMapping(path = "/{pipelineId}/start", produces = MediaType.APPLICATION_JSON_VALUE) @Operation(summary = "Start the pipeline with the given id", tags = {"Pipeline"}) - @PreAuthorize("this.hasWriteAuthority() and hasPermission('#pipelineId', 'WRITE')") + @PreAuthorize("this.hasWriteAuthority() and hasPermission(#pipelineId, 'WRITE')") public ResponseEntity<?> start(@PathVariable("pipelineId") String pipelineId) { try { PipelineOperationStatus status = PipelineManager.startPipeline(pipelineId); @@ -149,7 +149,7 @@ public class PipelineResource extends AbstractAuthGuardedRestResource { @GetMapping(path = "/{pipelineId}/stop", produces = MediaType.APPLICATION_JSON_VALUE) @Operation(summary = "Stop the pipeline with the given id", tags = {"Pipeline"}) - @PreAuthorize("this.hasWriteAuthority() and hasPermission('#pipelineId', 'WRITE')") + @PreAuthorize("this.hasWriteAuthority() and hasPermission(#pipelineId, 'WRITE')") public ResponseEntity<?> stop(@PathVariable("pipelineId") String pipelineId, @RequestParam(value = "forceStop", defaultValue = "false") boolean forceStop) { try { @@ -238,7 +238,7 @@ public class PipelineResource extends AbstractAuthGuardedRestResource { produces = MediaType.APPLICATION_JSON_VALUE, consumes = MediaType.APPLICATION_JSON_VALUE) @Operation(summary = "Update an existing pipeline", tags = {"Pipeline"}) - @PreAuthorize("this.hasWriteAuthority() and hasPermission('#pipelineId', 'WRITE')") + @PreAuthorize("this.hasWriteAuthority() and hasPermission(#pipelineId, 'WRITE')") public ResponseEntity<SuccessMessage> updatePipeline(@PathVariable("pipelineId") String pipelineId, @RequestBody Pipeline pipeline) { Pipeline storedPipeline = getPipelineStorage().getElementById(pipelineId); diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/AdapterResource.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/AdapterResource.java index 6765b9ce5a..c85bd2cd5e 100644 --- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/AdapterResource.java +++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/AdapterResource.java @@ -45,6 +45,7 @@ import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.http.HttpStatus; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import org.springframework.http.MediaType; import org.springframework.http.ResponseEntity; import org.springframework.security.access.prepost.PostFilter; @@ -98,7 +99,7 @@ public class AdapterResource extends AbstractAdapterResource<AdapterMasterManage return ok(Notifications.success(adapterId)); } - @PostMapping(path = "compact", consumes = { MediaType.APPLICATION_JSON_VALUE }, produces = { + @PostMapping(path = "compact", consumes = {MediaType.APPLICATION_JSON_VALUE}, produces = { MediaType.APPLICATION_JSON_VALUE, SpMediaType.YAML, SpMediaType.YML @@ -110,7 +111,7 @@ public class AdapterResource extends AbstractAdapterResource<AdapterMasterManage } @PutMapping(produces = MediaType.APPLICATION_JSON_VALUE, consumes = MediaType.APPLICATION_JSON_VALUE) - @PreAuthorize("this.hasWriteAuthority() and hasPermission('#adapterDescription.elementId', 'WRITE')") + @PreAuthorize("this.hasWriteAuthority() and hasPermission(#adapterDescription.correspondingDataStreamElementId, 'WRITE')") public ResponseEntity<? extends Message> updateAdapter(@RequestBody AdapterDescription adapterDescription) { var updateManager = new AdapterUpdateManagement(managementService); try { @@ -133,7 +134,7 @@ public class AdapterResource extends AbstractAdapterResource<AdapterMasterManage return ok(migrations); } - @GetMapping(path = "/{id}", produces = { MediaType.APPLICATION_JSON_VALUE, SpMediaType.YAML, SpMediaType.YML }) + @GetMapping(path = "/{id}", produces = {MediaType.APPLICATION_JSON_VALUE, SpMediaType.YAML, SpMediaType.YML}) @PreAuthorize("this.hasReadAuthority()") public ResponseEntity<?> getAdapter( @PathVariable("id") String elementId, @@ -145,7 +146,7 @@ public class AdapterResource extends AbstractAdapterResource<AdapterMasterManage // This check is done here because the adapter permission is checked based on // the corresponding data stream // and not based on the element id - if (!checkAdapterReadPermission(adapterDescription)) { + if (!checkAdapterPermission(adapterDescription, "READ")) { LOG.error("User is not allowed to read adapter {}", elementId); return ResponseEntity.status(HttpStatus.SC_UNAUTHORIZED) .build(); @@ -168,23 +169,29 @@ public class AdapterResource extends AbstractAdapterResource<AdapterMasterManage /** * Checks if the current user has the permission to read the adapter */ - private boolean checkAdapterReadPermission(AdapterDescription adapterDescription) { + private boolean checkAdapterPermission(AdapterDescription adapterDescription, + String permission) { var spPermissionEvaluator = new SpPermissionEvaluator(); var authentication = SecurityContextHolder.getContext() .getAuthentication(); return spPermissionEvaluator.hasPermission( authentication, adapterDescription.getCorrespondingDataStreamElementId(), - "READ"); + permission); } @PostMapping(path = "/{id}/stop", produces = MediaType.APPLICATION_JSON_VALUE) - @PreAuthorize("this.hasWriteAuthority() and hasPermission('#elementId', 'WRITE')") + @PreAuthorize("this.hasWriteAuthority()") public ResponseEntity<?> stopAdapter(@PathVariable("id") String elementId, - @RequestParam(value = "forceStop", defaultValue = "false") boolean forceStop) { + @RequestParam(value = "forceStop", defaultValue = "false") boolean forceStop) { try { - managementService.stopStreamAdapter(elementId, forceStop); - return ok(Notifications.success("Adapter stopped")); + var adapter = getAdapterDescription(elementId); + if (checkAdapterPermission(adapter, "WRITE")) { + managementService.stopStreamAdapter(elementId, forceStop); + return ok(Notifications.success("Adapter stopped")); + } else { + return unauthorized(); + } } catch (AdapterException e) { LOG.error("Could not stop adapter with id {}", elementId, e); return serverError(SpLogMessage.from(e)); @@ -192,11 +199,16 @@ public class AdapterResource extends AbstractAdapterResource<AdapterMasterManage } @PostMapping(path = "/{id}/start", produces = MediaType.APPLICATION_JSON_VALUE) - @PreAuthorize("this.hasWriteAuthority() and hasPermission('#elementId', 'WRITE')") + @PreAuthorize("this.hasWriteAuthority()") public ResponseEntity<?> startAdapter(@PathVariable("id") String elementId) { try { - managementService.startStreamAdapter(elementId); - return ok(Notifications.success("Adapter started")); + var adapterDescription = getAdapterDescription(elementId); + if (checkAdapterPermission(adapterDescription, "WRITE")) { + managementService.startStreamAdapter(elementId); + return ok(Notifications.success("Adapter started")); + } else { + return unauthorized(); + } } catch (AdapterException e) { LOG.error("Could not start adapter with id {}", elementId, e); return serverError(SpLogMessage.from(e)); @@ -204,86 +216,96 @@ public class AdapterResource extends AbstractAdapterResource<AdapterMasterManage } @DeleteMapping(path = "/{id}", produces = MediaType.APPLICATION_JSON_VALUE) - @PreAuthorize("this.hasWriteAuthority() and hasPermission('#elementId', 'WRITE')") + @PreAuthorize("this.hasWriteAuthority()") public ResponseEntity<?> deleteAdapter( @PathVariable("id") String elementId, @RequestParam(value = "deleteAssociatedPipelines", defaultValue = "false") boolean deleteAssociatedPipelines) { - List<String> pipelinesUsingAdapter = getPipelinesUsingAdapter(elementId); - IPipelineStorage pipelineStorageAPI = StorageDispatcher.INSTANCE.getNoSqlStore() - .getPipelineStorageAPI(); - - if (pipelinesUsingAdapter.isEmpty()) { - try { - managementService.deleteAdapter(elementId); - - return ok(Notifications.success("Adapter with id: " + elementId + " is dexleted.")); - } catch (AdapterException e) { - LOG.error("Error while deleting adapter with id {}", elementId, e); - return ok(Notifications.error(e.getMessage())); - } - } else if (!deleteAssociatedPipelines) { - List<String> namesOfPipelinesUsingAdapter = pipelinesUsingAdapter - .stream() - .map(pipelineId -> pipelineStorageAPI.getElementById( - pipelineId) - .getName()) - .collect(Collectors.toList()); - return ResponseEntity.status(HttpStatus.SC_CONFLICT) - .body(String.join(", ", namesOfPipelinesUsingAdapter)); - } else { - PermissionResourceManager permissionResourceManager = new PermissionResourceManager(); - // find out the names of pipelines that have an owner and the owner is not the - // current user - List<String> namesOfPipelinesNotOwnedByUser = pipelinesUsingAdapter - .stream() - .filter(pipelineId -> !permissionResourceManager.findForObjectId( - pipelineId) + try { + var adapter = getAdapterDescription(elementId); + if (checkAdapterPermission(adapter, "WRITE")) { + List<String> pipelinesUsingAdapter = getPipelinesUsingAdapter(elementId); + IPipelineStorage pipelineStorageAPI = StorageDispatcher.INSTANCE.getNoSqlStore() + .getPipelineStorageAPI(); + + if (pipelinesUsingAdapter.isEmpty()) { + try { + managementService.deleteAdapter(elementId); + + return ok(Notifications.success("Adapter with id: " + elementId + " is dexleted.")); + } catch (AdapterException e) { + LOG.error("Error while deleting adapter with id {}", elementId, e); + return ok(Notifications.error(e.getMessage())); + } + } else if (!deleteAssociatedPipelines) { + List<String> namesOfPipelinesUsingAdapter = pipelinesUsingAdapter .stream() - .findFirst() - .map( - Permission::getOwnerSid) - // if a pipeline has no owner, pretend the owner - // is the user so the user can delete it - .orElse( - this.getAuthenticatedUserSid()) - .equals( - this.getAuthenticatedUserSid())) - .map(pipelineId -> pipelineStorageAPI.getElementById( - pipelineId) - .getName()) - .collect(Collectors.toList()); - boolean isAdmin = SecurityContextHolder.getContext() - .getAuthentication() - .getAuthorities() - .stream() - .anyMatch(r -> r.getAuthority() - .equals( - DefaultRole.ROLE_ADMIN.name())); - // if the user is admin or owns all pipelines using this adapter, - // the user can delete all associated pipelines and this adapter - if (isAdmin || namesOfPipelinesNotOwnedByUser.isEmpty()) { - try { - for (String pipelineId : pipelinesUsingAdapter) { - PipelineManager.stopPipeline(pipelineId, false); - PipelineManager.deletePipeline(pipelineId); + .map(pipelineId -> pipelineStorageAPI.getElementById( + pipelineId) + .getName()) + .collect(Collectors.toList()); + return ResponseEntity.status(HttpStatus.SC_CONFLICT) + .body(String.join(", ", namesOfPipelinesUsingAdapter)); + } else { + PermissionResourceManager permissionResourceManager = new PermissionResourceManager(); + // find out the names of pipelines that have an owner and the owner is not the + // current user + List<String> namesOfPipelinesNotOwnedByUser = pipelinesUsingAdapter + .stream() + .filter(pipelineId -> !permissionResourceManager.findForObjectId( + pipelineId) + .stream() + .findFirst() + .map( + Permission::getOwnerSid) + // if a pipeline has no owner, pretend the owner + // is the user so the user can delete it + .orElse( + this.getAuthenticatedUserSid()) + .equals( + this.getAuthenticatedUserSid())) + .map(pipelineId -> pipelineStorageAPI.getElementById( + pipelineId) + .getName()) + .collect(Collectors.toList()); + boolean isAdmin = SecurityContextHolder.getContext() + .getAuthentication() + .getAuthorities() + .stream() + .anyMatch(r -> r.getAuthority() + .equals( + DefaultRole.ROLE_ADMIN.name())); + // if the user is admin or owns all pipelines using this adapter, + // the user can delete all associated pipelines and this adapter + if (isAdmin || namesOfPipelinesNotOwnedByUser.isEmpty()) { + try { + for (String pipelineId : pipelinesUsingAdapter) { + PipelineManager.stopPipeline(pipelineId, false); + PipelineManager.deletePipeline(pipelineId); + } + managementService.deleteAdapter(elementId); + + return ok(Notifications.success("Adapter with id: " + elementId + + " and all pipelines using the adapter are deleted.")); + } catch (Exception e) { + LOG.error( + "Error while deleting adapter with id " + + elementId + " and all pipelines using the adapter", + e); + return ok(Notifications.error(e.getMessage())); + } + } else { + // otherwise, hint the user the names of pipelines using the adapter but not + // owned by the user + return ResponseEntity.status(HttpStatus.SC_CONFLICT) + .body(String.join(", ", namesOfPipelinesNotOwnedByUser)); } - managementService.deleteAdapter(elementId); - - return ok(Notifications.success("Adapter with id: " + elementId - + " and all pipelines using the adapter are deleted.")); - } catch (Exception e) { - LOG.error( - "Error while deleting adapter with id " - + elementId + " and all pipelines using the adapter", - e); - return ok(Notifications.error(e.getMessage())); } } else { - // otherwise, hint the user the names of pipelines using the adapter but not - // owned by the user - return ResponseEntity.status(HttpStatus.SC_CONFLICT) - .body(String.join(", ", namesOfPipelinesNotOwnedByUser)); + return unauthorized(); } + } catch (AdapterException e) { + LOG.warn("Error while deleting adapter with id {}", elementId, e); + return badRequest(); } } diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/CompactAdapterResource.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/CompactAdapterResource.java index 89912db3d8..8ba5dd7675 100644 --- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/CompactAdapterResource.java +++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/CompactAdapterResource.java @@ -128,7 +128,7 @@ public class CompactAdapterResource extends AbstractAdapterResource<AdapterMaste "application/yml" } ) - @PreAuthorize("this.hasWriteAuthority() and hasPermission('#elementId', 'WRITE')") + @PreAuthorize("this.hasWriteAuthority() and hasPermission(#elementId, 'WRITE')") public ResponseEntity<?> updateAdapterCompact( @PathVariable("id") String elementId, @RequestBody CompactAdapter compactAdapter
