[ https://issues.apache.org/jira/browse/NIFI-4224?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16121861#comment-16121861 ]
ASF GitHub Bot commented on NIFI-4224: -------------------------------------- Github user markap14 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2051#discussion_r132502384 --- Diff: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java --- @@ -325,6 +441,859 @@ public Response updateProcessGroup( ); } + + @GET + @Consumes(MediaType.WILDCARD) + @Produces(MediaType.APPLICATION_JSON) + @Path("{groupId}/variable-registry/update-requests/{updateId}") + @ApiOperation(value = "Gets a process group's variable registry", response = VariableRegistryUpdateRequestEntity.class, authorizations = { + @Authorization(value = "Read - /process-groups/{uuid}", type = "") + }) + @ApiResponses(value = { + @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), + @ApiResponse(code = 401, message = "Client could not be authenticated."), + @ApiResponse(code = 403, message = "Client is not authorized to make this request."), + @ApiResponse(code = 404, message = "The specified resource could not be found."), + @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.") + }) + public Response getVariableRegistryUpdateRequest( + @ApiParam(value = "The process group id.", required = true) @PathParam("groupId") final String groupId, + @ApiParam(value = "The ID of the Variable Registry Update Request", required = true) @PathParam("updateId") final String updateId) { + + if (groupId == null || updateId == null) { + throw new IllegalArgumentException("Group ID and Update ID must both be specified."); + } + + if (isReplicateRequest()) { + return replicate(HttpMethod.GET); + } + + // authorize access + serviceFacade.authorizeAccess(lookup -> { + final Authorizable processGroup = lookup.getProcessGroup(groupId).getAuthorizable(); + processGroup.authorize(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser()); + }); + + final VariableRegistryUpdateRequest request = varRegistryUpdateRequests.get(updateId); + if (request == null) { + throw new ResourceNotFoundException("Could not find a Variable Registry Update Request with identifier " + updateId); + } + + if (!groupId.equals(request.getProcessGroupId())) { + throw new ResourceNotFoundException("Could not find a Variable Registry Update Request with identifier " + updateId + " for Process Group with identifier " + groupId); + } + + final VariableRegistryUpdateRequestEntity entity = new VariableRegistryUpdateRequestEntity(); + entity.setId(request.getRequestId()); + entity.setRequestDto(dtoFactory.createVariableRegistryUpdateRequestDto(request)); + entity.setUri(generateResourceUri("process-groups", groupId, "variable-registry", updateId)); + return generateOkResponse(entity).build(); + } + + + @DELETE + @Consumes(MediaType.WILDCARD) + @Produces(MediaType.APPLICATION_JSON) + @Path("{groupId}/variable-registry/update-requests/{updateId}") + @ApiOperation(value = "Deletes an update request for a process group's variable registry. If the request is not yet complete, it will automatically be cancelled.", + response = VariableRegistryUpdateRequestEntity.class, authorizations = { + @Authorization(value = "Read - /process-groups/{uuid}", type = "") + }) + @ApiResponses(value = { + @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), + @ApiResponse(code = 401, message = "Client could not be authenticated."), + @ApiResponse(code = 403, message = "Client is not authorized to make this request."), + @ApiResponse(code = 404, message = "The specified resource could not be found."), + @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.") + }) + public Response deleteVariableRegistryUpdateRequest( + @ApiParam(value = "The process group id.", required = true) @PathParam("groupId") final String groupId, + @ApiParam(value = "The ID of the Variable Registry Update Request", required = true) @PathParam("updateId") final String updateId) { + + if (groupId == null || updateId == null) { + throw new IllegalArgumentException("Group ID and Update ID must both be specified."); + } + + if (isReplicateRequest()) { + return replicate(HttpMethod.DELETE); + } + + // authorize access + serviceFacade.authorizeAccess(lookup -> { + final Authorizable processGroup = lookup.getProcessGroup(groupId).getAuthorizable(); + processGroup.authorize(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser()); + }); + + final VariableRegistryUpdateRequest request = varRegistryUpdateRequests.remove(updateId); + if (request == null) { + throw new ResourceNotFoundException("Could not find a Variable Registry Update Request with identifier " + updateId); + } + + if (!groupId.equals(request.getProcessGroupId())) { + throw new ResourceNotFoundException("Could not find a Variable Registry Update Request with identifier " + updateId + " for Process Group with identifier " + groupId); + } + + request.cancel(); + + final VariableRegistryUpdateRequestEntity entity = new VariableRegistryUpdateRequestEntity(); + entity.setId(request.getRequestId()); + entity.setRequestDto(dtoFactory.createVariableRegistryUpdateRequestDto(request)); + entity.setUri(generateResourceUri("process-groups", groupId, "variable-registry", updateId)); + return generateOkResponse(entity).build(); + } + + + @PUT + @Consumes(MediaType.APPLICATION_JSON) + @Produces(MediaType.APPLICATION_JSON) + @Path("{id}/variable-registry") + @ApiOperation(value = "Updates the contents of a Process Group's variable Registry", response = VariableRegistryEntity.class, authorizations = { + @Authorization(value = "Write - /process-groups/{uuid}", type = "") + }) + @ApiResponses(value = { + @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), + @ApiResponse(code = 401, message = "Client could not be authenticated."), + @ApiResponse(code = 403, message = "Client is not authorized to make this request."), + @ApiResponse(code = 404, message = "The specified resource could not be found."), + @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.") + }) + public Response updateVariableRegistry( + @Context final HttpServletRequest httpServletRequest, + @ApiParam(value = "The process group id.", required = true) @PathParam("id") final String groupId, + @ApiParam(value = "The process group configuration details.", required = true) final VariableRegistryEntity requestEntity) { + + if (requestEntity == null || requestEntity.getVariableRegistry() == null) { + throw new IllegalArgumentException("Variable Registry details must be specified."); + } + + if (requestEntity.getRevision() == null) { + throw new IllegalArgumentException("Revision must be specified."); + } + + // ensure the same id is being used + final VariableRegistryDTO registryDto = requestEntity.getVariableRegistry(); + if (!groupId.equals(registryDto.getProcessGroupId())) { + throw new IllegalArgumentException(String.format("The process group id (%s) in the request body does " + + "not equal the process group id of the requested resource (%s).", registryDto.getProcessGroupId(), groupId)); + } + + if (isReplicateRequest()) { + return replicate(HttpMethod.PUT, requestEntity); + } + + // handle expects request (usually from the cluster manager) + final Revision requestRevision = getRevision(requestEntity, groupId); + return withWriteLock( + serviceFacade, + requestEntity, + requestRevision, + lookup -> { + Authorizable authorizable = lookup.getProcessGroup(groupId).getAuthorizable(); + authorizable.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser()); + }, + null, + (revision, processGroupEntity) -> { + // update the process group + final VariableRegistryEntity entity = serviceFacade.updateVariableRegistry(revision, registryDto); + populateRemainingVariableRegistryEntityContent(entity); + + return generateOkResponse(entity).build(); + }); + } + + + /** + * Updates the variable registry for the specified process group. + * + * @param httpServletRequest request + * @param groupId The id of the process group. + * @param requestEntity the Variable Registry Entity + * @return A Variable Registry Entry. + */ + @POST + @Consumes(MediaType.APPLICATION_JSON) + @Produces(MediaType.APPLICATION_JSON) + @Path("{id}/variable-registry/update-requests") + @ApiOperation(value = "Submits a request to update a process group's variable registry", response = VariableRegistryUpdateRequestEntity.class, authorizations = { + @Authorization(value = "Write - /process-groups/{uuid}", type = "") + }) + @ApiResponses(value = { + @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), + @ApiResponse(code = 401, message = "Client could not be authenticated."), + @ApiResponse(code = 403, message = "Client is not authorized to make this request."), + @ApiResponse(code = 404, message = "The specified resource could not be found."), + @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.") + }) + public Response submitUpdateVariableRegistryRequest( + @Context final HttpServletRequest httpServletRequest, + @ApiParam(value = "The process group id.", required = true) @PathParam("id") final String groupId, + @ApiParam(value = "The process group configuration details.", required = true) final VariableRegistryEntity requestEntity) { + + if (requestEntity == null || requestEntity.getVariableRegistry() == null) { + throw new IllegalArgumentException("Variable Registry details must be specified."); + } + + if (requestEntity.getRevision() == null) { + throw new IllegalArgumentException("Revision must be specified."); + } + + // In order to update variables in a variable registry, we have to perform the following steps: + // 1. Determine Affected Components (this includes any Processors and Controller Services and any components that reference an affected Controller Service). + // 1a. Determine ID's of components + // 1b. Determine Revision's of associated components + // 2. Stop All Affected Processors + // 3. Disable All Affected Controller Services + // 4. Update the Variables + // 5. Re-Enable all Affected Controller Services (services only, not dependent components) + // 6. Re-Enable all Processors that Depended on the Controller Services + + // Determine the affected components (and their associated revisions) + final VariableRegistryEntity computedEntity = serviceFacade.populateAffectedComponents(requestEntity.getVariableRegistry()); + final VariableRegistryDTO computedRegistryDto = computedEntity.getVariableRegistry(); + if (computedRegistryDto == null) { + throw new ResourceNotFoundException(String.format("Unable to locate group with id '%s'.", groupId)); + } + + final Set<AffectedComponentDTO> affectedComponents = serviceFacade.identifyComponentsAffectedByVariableRegistryUpdate(requestEntity.getVariableRegistry()); + + final Map<String, List<AffectedComponentDTO>> affectedComponentsByType = affectedComponents.stream() + .collect(Collectors.groupingBy(comp -> comp.getComponentType())); + + final List<AffectedComponentDTO> affectedProcessors = affectedComponentsByType.get(AffectedComponentDTO.COMPONENT_TYPE_PROCESSOR); + final List<AffectedComponentDTO> affectedServices = affectedComponentsByType.get(AffectedComponentDTO.COMPONENT_TYPE_CONTROLLER_SERVICE); + + + if (isReplicateRequest()) { + // update the variable registry + final VariableRegistryUpdateRequest updateRequest = createVariableRegistryUpdateRequest(groupId); + updateRequest.getIdentifyRelevantComponentsStep().setComplete(true); + final URI originalUri = getAbsolutePath(); + + // Submit the task to be run in the background + final Runnable taskWrapper = () -> { + try { + updateVariableRegistryReplicated(groupId, originalUri, affectedProcessors, affectedServices, updateRequest, requestEntity); + } catch (final Exception e) { + logger.error("Failed to update variable registry", e); + updateRequest.setFailureReason("An unexpected error has occurred: " + e); + } + }; + + variableRegistryThreadPool.submit(taskWrapper); + + final VariableRegistryUpdateRequestEntity responseEntity = new VariableRegistryUpdateRequestEntity(); --- End diff -- Looking at this more, I think the entity class should really just extend Entity, not ComponentEntity. In such a case I dont think we need the permissions DTO on the entity. > Add Variable Registry at Process Group level > -------------------------------------------- > > Key: NIFI-4224 > URL: https://issues.apache.org/jira/browse/NIFI-4224 > Project: Apache NiFi > Issue Type: New Feature > Components: Core Framework > Reporter: Mark Payne > Assignee: Mark Payne > > Currently, NiFi exposes a variable registry that is configurable by adding > the name of a properties file to nifi.properties and then treating the > referenced properties file as key/value pairs for the variable registry. > This, however, is very limiting, as it provides a global scope for all > variables, and it requires a restart of NiFi in order to pick up any updates > to the file. We should expose a Process Group-level Variable Registry. -- This message was sent by Atlassian JIRA (v6.4.14#64029)