[ https://issues.apache.org/jira/browse/NIFI-4224?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16121762#comment-16121762 ]
ASF GitHub Bot commented on NIFI-4224: -------------------------------------- Github user markap14 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2051#discussion_r132477480 --- Diff: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java --- @@ -325,6 +441,859 @@ public Response updateProcessGroup( ); } + + @GET + @Consumes(MediaType.WILDCARD) + @Produces(MediaType.APPLICATION_JSON) + @Path("{groupId}/variable-registry/update-requests/{updateId}") + @ApiOperation(value = "Gets a process group's variable registry", response = VariableRegistryUpdateRequestEntity.class, authorizations = { + @Authorization(value = "Read - /process-groups/{uuid}", type = "") + }) + @ApiResponses(value = { + @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), + @ApiResponse(code = 401, message = "Client could not be authenticated."), + @ApiResponse(code = 403, message = "Client is not authorized to make this request."), + @ApiResponse(code = 404, message = "The specified resource could not be found."), + @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.") + }) + public Response getVariableRegistryUpdateRequest( + @ApiParam(value = "The process group id.", required = true) @PathParam("groupId") final String groupId, + @ApiParam(value = "The ID of the Variable Registry Update Request", required = true) @PathParam("updateId") final String updateId) { + + if (groupId == null || updateId == null) { + throw new IllegalArgumentException("Group ID and Update ID must both be specified."); + } + + if (isReplicateRequest()) { + return replicate(HttpMethod.GET); + } + + // authorize access + serviceFacade.authorizeAccess(lookup -> { + final Authorizable processGroup = lookup.getProcessGroup(groupId).getAuthorizable(); + processGroup.authorize(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser()); + }); + + final VariableRegistryUpdateRequest request = varRegistryUpdateRequests.get(updateId); + if (request == null) { + throw new ResourceNotFoundException("Could not find a Variable Registry Update Request with identifier " + updateId); + } + + if (!groupId.equals(request.getProcessGroupId())) { + throw new ResourceNotFoundException("Could not find a Variable Registry Update Request with identifier " + updateId + " for Process Group with identifier " + groupId); + } + + final VariableRegistryUpdateRequestEntity entity = new VariableRegistryUpdateRequestEntity(); + entity.setId(request.getRequestId()); + entity.setRequestDto(dtoFactory.createVariableRegistryUpdateRequestDto(request)); + entity.setUri(generateResourceUri("process-groups", groupId, "variable-registry", updateId)); + return generateOkResponse(entity).build(); + } + + + @DELETE + @Consumes(MediaType.WILDCARD) + @Produces(MediaType.APPLICATION_JSON) + @Path("{groupId}/variable-registry/update-requests/{updateId}") + @ApiOperation(value = "Deletes an update request for a process group's variable registry. If the request is not yet complete, it will automatically be cancelled.", + response = VariableRegistryUpdateRequestEntity.class, authorizations = { + @Authorization(value = "Read - /process-groups/{uuid}", type = "") + }) + @ApiResponses(value = { + @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), + @ApiResponse(code = 401, message = "Client could not be authenticated."), + @ApiResponse(code = 403, message = "Client is not authorized to make this request."), + @ApiResponse(code = 404, message = "The specified resource could not be found."), + @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.") + }) + public Response deleteVariableRegistryUpdateRequest( + @ApiParam(value = "The process group id.", required = true) @PathParam("groupId") final String groupId, + @ApiParam(value = "The ID of the Variable Registry Update Request", required = true) @PathParam("updateId") final String updateId) { + + if (groupId == null || updateId == null) { + throw new IllegalArgumentException("Group ID and Update ID must both be specified."); + } + + if (isReplicateRequest()) { + return replicate(HttpMethod.DELETE); + } + + // authorize access + serviceFacade.authorizeAccess(lookup -> { + final Authorizable processGroup = lookup.getProcessGroup(groupId).getAuthorizable(); + processGroup.authorize(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser()); + }); + + final VariableRegistryUpdateRequest request = varRegistryUpdateRequests.remove(updateId); + if (request == null) { + throw new ResourceNotFoundException("Could not find a Variable Registry Update Request with identifier " + updateId); + } + + if (!groupId.equals(request.getProcessGroupId())) { + throw new ResourceNotFoundException("Could not find a Variable Registry Update Request with identifier " + updateId + " for Process Group with identifier " + groupId); + } + + request.cancel(); + + final VariableRegistryUpdateRequestEntity entity = new VariableRegistryUpdateRequestEntity(); + entity.setId(request.getRequestId()); + entity.setRequestDto(dtoFactory.createVariableRegistryUpdateRequestDto(request)); + entity.setUri(generateResourceUri("process-groups", groupId, "variable-registry", updateId)); + return generateOkResponse(entity).build(); + } + + + @PUT + @Consumes(MediaType.APPLICATION_JSON) + @Produces(MediaType.APPLICATION_JSON) + @Path("{id}/variable-registry") + @ApiOperation(value = "Updates the contents of a Process Group's variable Registry", response = VariableRegistryEntity.class, authorizations = { + @Authorization(value = "Write - /process-groups/{uuid}", type = "") + }) + @ApiResponses(value = { + @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), + @ApiResponse(code = 401, message = "Client could not be authenticated."), + @ApiResponse(code = 403, message = "Client is not authorized to make this request."), + @ApiResponse(code = 404, message = "The specified resource could not be found."), + @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.") + }) + public Response updateVariableRegistry( + @Context final HttpServletRequest httpServletRequest, + @ApiParam(value = "The process group id.", required = true) @PathParam("id") final String groupId, + @ApiParam(value = "The process group configuration details.", required = true) final VariableRegistryEntity requestEntity) { + + if (requestEntity == null || requestEntity.getVariableRegistry() == null) { + throw new IllegalArgumentException("Variable Registry details must be specified."); + } + + if (requestEntity.getRevision() == null) { + throw new IllegalArgumentException("Revision must be specified."); + } + + // ensure the same id is being used + final VariableRegistryDTO registryDto = requestEntity.getVariableRegistry(); + if (!groupId.equals(registryDto.getProcessGroupId())) { + throw new IllegalArgumentException(String.format("The process group id (%s) in the request body does " + + "not equal the process group id of the requested resource (%s).", registryDto.getProcessGroupId(), groupId)); + } + + if (isReplicateRequest()) { + return replicate(HttpMethod.PUT, requestEntity); + } + + // handle expects request (usually from the cluster manager) + final Revision requestRevision = getRevision(requestEntity, groupId); + return withWriteLock( + serviceFacade, + requestEntity, + requestRevision, + lookup -> { + Authorizable authorizable = lookup.getProcessGroup(groupId).getAuthorizable(); + authorizable.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser()); + }, + null, + (revision, processGroupEntity) -> { + // update the process group + final VariableRegistryEntity entity = serviceFacade.updateVariableRegistry(revision, registryDto); + populateRemainingVariableRegistryEntityContent(entity); + + return generateOkResponse(entity).build(); + }); + } + + + /** + * Updates the variable registry for the specified process group. + * + * @param httpServletRequest request + * @param groupId The id of the process group. + * @param requestEntity the Variable Registry Entity + * @return A Variable Registry Entry. + */ + @POST + @Consumes(MediaType.APPLICATION_JSON) + @Produces(MediaType.APPLICATION_JSON) + @Path("{id}/variable-registry/update-requests") + @ApiOperation(value = "Submits a request to update a process group's variable registry", response = VariableRegistryUpdateRequestEntity.class, authorizations = { + @Authorization(value = "Write - /process-groups/{uuid}", type = "") + }) + @ApiResponses(value = { + @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), + @ApiResponse(code = 401, message = "Client could not be authenticated."), + @ApiResponse(code = 403, message = "Client is not authorized to make this request."), + @ApiResponse(code = 404, message = "The specified resource could not be found."), + @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.") + }) + public Response submitUpdateVariableRegistryRequest( + @Context final HttpServletRequest httpServletRequest, + @ApiParam(value = "The process group id.", required = true) @PathParam("id") final String groupId, + @ApiParam(value = "The process group configuration details.", required = true) final VariableRegistryEntity requestEntity) { + + if (requestEntity == null || requestEntity.getVariableRegistry() == null) { + throw new IllegalArgumentException("Variable Registry details must be specified."); + } + + if (requestEntity.getRevision() == null) { + throw new IllegalArgumentException("Revision must be specified."); + } + + // In order to update variables in a variable registry, we have to perform the following steps: + // 1. Determine Affected Components (this includes any Processors and Controller Services and any components that reference an affected Controller Service). + // 1a. Determine ID's of components + // 1b. Determine Revision's of associated components + // 2. Stop All Affected Processors + // 3. Disable All Affected Controller Services + // 4. Update the Variables + // 5. Re-Enable all Affected Controller Services (services only, not dependent components) + // 6. Re-Enable all Processors that Depended on the Controller Services + + // Determine the affected components (and their associated revisions) + final VariableRegistryEntity computedEntity = serviceFacade.populateAffectedComponents(requestEntity.getVariableRegistry()); + final VariableRegistryDTO computedRegistryDto = computedEntity.getVariableRegistry(); + if (computedRegistryDto == null) { + throw new ResourceNotFoundException(String.format("Unable to locate group with id '%s'.", groupId)); + } + + final Set<AffectedComponentDTO> affectedComponents = serviceFacade.identifyComponentsAffectedByVariableRegistryUpdate(requestEntity.getVariableRegistry()); + + final Map<String, List<AffectedComponentDTO>> affectedComponentsByType = affectedComponents.stream() + .collect(Collectors.groupingBy(comp -> comp.getComponentType())); + + final List<AffectedComponentDTO> affectedProcessors = affectedComponentsByType.get(AffectedComponentDTO.COMPONENT_TYPE_PROCESSOR); + final List<AffectedComponentDTO> affectedServices = affectedComponentsByType.get(AffectedComponentDTO.COMPONENT_TYPE_CONTROLLER_SERVICE); + + + if (isReplicateRequest()) { + // update the variable registry + final VariableRegistryUpdateRequest updateRequest = createVariableRegistryUpdateRequest(groupId); + updateRequest.getIdentifyRelevantComponentsStep().setComplete(true); + final URI originalUri = getAbsolutePath(); + + // Submit the task to be run in the background + final Runnable taskWrapper = () -> { + try { + updateVariableRegistryReplicated(groupId, originalUri, affectedProcessors, affectedServices, updateRequest, requestEntity); + } catch (final Exception e) { + logger.error("Failed to update variable registry", e); + updateRequest.setFailureReason("An unexpected error has occurred: " + e); + } + }; + + variableRegistryThreadPool.submit(taskWrapper); + + final VariableRegistryUpdateRequestEntity responseEntity = new VariableRegistryUpdateRequestEntity(); + responseEntity.setRequestDto(dtoFactory.createVariableRegistryUpdateRequestDto(updateRequest)); + responseEntity.getRequestDto().setUri(generateResourceUri("process-groups", groupId, "variable-registry", "update-requests", updateRequest.getRequestId())); + + final URI location = URI.create(responseEntity.getRequestDto().getUri()); + return Response.status(Status.ACCEPTED).location(location).entity(responseEntity).build(); + } + + + final Revision requestRevision = getRevision(requestEntity.getRevision(), requestEntity.getId()); + return withWriteLock( + serviceFacade, + requestEntity, + requestRevision, + lookup -> { + final NiFiUser user = NiFiUserUtils.getNiFiUser(); + + final Authorizable groupAuthorizable = lookup.getProcessGroup(groupId).getAuthorizable(); + groupAuthorizable.authorize(authorizer, RequestAction.WRITE, user); + + // For every component that is affected, the user must have READ permissions and WRITE permissions + // (because this action requires stopping the component). + if (affectedProcessors != null) { + for (final AffectedComponentDTO affectedComponent : affectedProcessors) { + final Authorizable authorizable = lookup.getProcessor(affectedComponent.getComponentId()).getAuthorizable(); + + if (!authorizable.isAuthorized(authorizer, RequestAction.READ, user)) { --- End diff -- I used isAuthorized() here instead of authorize() because if the user is not authorized to modify a component, the call to authorize() will simply indicate that they are not authorized. However, the user will have no idea which component they are not authorized to modify. So by checking isAuthorized() and then throwing AccessDeniedException, we are able to include the ID of the component that the user can't access. It should have the same affect as calling authorize() otherwise, though. Do you think it's a bad idea to go this route? > Add Variable Registry at Process Group level > -------------------------------------------- > > Key: NIFI-4224 > URL: https://issues.apache.org/jira/browse/NIFI-4224 > Project: Apache NiFi > Issue Type: New Feature > Components: Core Framework > Reporter: Mark Payne > Assignee: Mark Payne > > Currently, NiFi exposes a variable registry that is configurable by adding > the name of a properties file to nifi.properties and then treating the > referenced properties file as key/value pairs for the variable registry. > This, however, is very limiting, as it provides a global scope for all > variables, and it requires a restart of NiFi in order to pick up any updates > to the file. We should expose a Process Group-level Variable Registry. -- This message was sent by Atlassian JIRA (v6.4.14#64029)