[ 
https://issues.apache.org/jira/browse/NIFI-4224?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16118903#comment-16118903
 ] 

ASF GitHub Bot commented on NIFI-4224:
--------------------------------------

Github user mcgilman commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2051#discussion_r132005935
  
    --- Diff: 
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
 ---
    @@ -325,6 +441,859 @@ public Response updateProcessGroup(
             );
         }
     
    +
    +    @GET
    +    @Consumes(MediaType.WILDCARD)
    +    @Produces(MediaType.APPLICATION_JSON)
    +    @Path("{groupId}/variable-registry/update-requests/{updateId}")
    +    @ApiOperation(value = "Gets a process group's variable registry", 
response = VariableRegistryUpdateRequestEntity.class, authorizations = {
    +        @Authorization(value = "Read - /process-groups/{uuid}", type = "")
    +    })
    +    @ApiResponses(value = {
    +        @ApiResponse(code = 400, message = "NiFi was unable to complete 
the request because it was invalid. The request should not be retried without 
modification."),
    +        @ApiResponse(code = 401, message = "Client could not be 
authenticated."),
    +        @ApiResponse(code = 403, message = "Client is not authorized to 
make this request."),
    +        @ApiResponse(code = 404, message = "The specified resource could 
not be found."),
    +        @ApiResponse(code = 409, message = "The request was valid but NiFi 
was not in the appropriate state to process it. Retrying the same request later 
may be successful.")
    +    })
    +    public Response getVariableRegistryUpdateRequest(
    +        @ApiParam(value = "The process group id.", required = true) 
@PathParam("groupId") final String groupId,
    +        @ApiParam(value = "The ID of the Variable Registry Update 
Request", required = true) @PathParam("updateId") final String updateId) {
    +
    +        if (groupId == null || updateId == null) {
    +            throw new IllegalArgumentException("Group ID and Update ID 
must both be specified.");
    +        }
    +
    +        if (isReplicateRequest()) {
    +            return replicate(HttpMethod.GET);
    +        }
    +
    +        // authorize access
    +        serviceFacade.authorizeAccess(lookup -> {
    +            final Authorizable processGroup = 
lookup.getProcessGroup(groupId).getAuthorizable();
    +            processGroup.authorize(authorizer, RequestAction.READ, 
NiFiUserUtils.getNiFiUser());
    +        });
    +
    +        final VariableRegistryUpdateRequest request = 
varRegistryUpdateRequests.get(updateId);
    +        if (request == null) {
    +            throw new ResourceNotFoundException("Could not find a Variable 
Registry Update Request with identifier " + updateId);
    +        }
    +
    +        if (!groupId.equals(request.getProcessGroupId())) {
    +            throw new ResourceNotFoundException("Could not find a Variable 
Registry Update Request with identifier " + updateId + " for Process Group with 
identifier " + groupId);
    +        }
    +
    +        final VariableRegistryUpdateRequestEntity entity = new 
VariableRegistryUpdateRequestEntity();
    +        entity.setId(request.getRequestId());
    +        
entity.setRequestDto(dtoFactory.createVariableRegistryUpdateRequestDto(request));
    +        entity.setUri(generateResourceUri("process-groups", groupId, 
"variable-registry", updateId));
    +        return generateOkResponse(entity).build();
    +    }
    +
    +
    +    @DELETE
    +    @Consumes(MediaType.WILDCARD)
    +    @Produces(MediaType.APPLICATION_JSON)
    +    @Path("{groupId}/variable-registry/update-requests/{updateId}")
    +    @ApiOperation(value = "Deletes an update request for a process group's 
variable registry. If the request is not yet complete, it will automatically be 
cancelled.",
    +        response = VariableRegistryUpdateRequestEntity.class, 
authorizations = {
    +            @Authorization(value = "Read - /process-groups/{uuid}", type = 
"")
    +    })
    +    @ApiResponses(value = {
    +        @ApiResponse(code = 400, message = "NiFi was unable to complete 
the request because it was invalid. The request should not be retried without 
modification."),
    +        @ApiResponse(code = 401, message = "Client could not be 
authenticated."),
    +        @ApiResponse(code = 403, message = "Client is not authorized to 
make this request."),
    +        @ApiResponse(code = 404, message = "The specified resource could 
not be found."),
    +        @ApiResponse(code = 409, message = "The request was valid but NiFi 
was not in the appropriate state to process it. Retrying the same request later 
may be successful.")
    +    })
    +    public Response deleteVariableRegistryUpdateRequest(
    +        @ApiParam(value = "The process group id.", required = true) 
@PathParam("groupId") final String groupId,
    +        @ApiParam(value = "The ID of the Variable Registry Update 
Request", required = true) @PathParam("updateId") final String updateId) {
    +
    +        if (groupId == null || updateId == null) {
    +            throw new IllegalArgumentException("Group ID and Update ID 
must both be specified.");
    +        }
    +
    +        if (isReplicateRequest()) {
    +            return replicate(HttpMethod.DELETE);
    +        }
    +
    +        // authorize access
    +        serviceFacade.authorizeAccess(lookup -> {
    +            final Authorizable processGroup = 
lookup.getProcessGroup(groupId).getAuthorizable();
    +            processGroup.authorize(authorizer, RequestAction.READ, 
NiFiUserUtils.getNiFiUser());
    +        });
    +
    +        final VariableRegistryUpdateRequest request = 
varRegistryUpdateRequests.remove(updateId);
    +        if (request == null) {
    +            throw new ResourceNotFoundException("Could not find a Variable 
Registry Update Request with identifier " + updateId);
    +        }
    +
    +        if (!groupId.equals(request.getProcessGroupId())) {
    +            throw new ResourceNotFoundException("Could not find a Variable 
Registry Update Request with identifier " + updateId + " for Process Group with 
identifier " + groupId);
    +        }
    +
    +        request.cancel();
    +
    +        final VariableRegistryUpdateRequestEntity entity = new 
VariableRegistryUpdateRequestEntity();
    +        entity.setId(request.getRequestId());
    +        
entity.setRequestDto(dtoFactory.createVariableRegistryUpdateRequestDto(request));
    +        entity.setUri(generateResourceUri("process-groups", groupId, 
"variable-registry", updateId));
    +        return generateOkResponse(entity).build();
    +    }
    +
    +
    +    @PUT
    +    @Consumes(MediaType.APPLICATION_JSON)
    +    @Produces(MediaType.APPLICATION_JSON)
    +    @Path("{id}/variable-registry")
    +    @ApiOperation(value = "Updates the contents of a Process Group's 
variable Registry", response = VariableRegistryEntity.class, authorizations = {
    +        @Authorization(value = "Write - /process-groups/{uuid}", type = "")
    +    })
    +    @ApiResponses(value = {
    +        @ApiResponse(code = 400, message = "NiFi was unable to complete 
the request because it was invalid. The request should not be retried without 
modification."),
    +        @ApiResponse(code = 401, message = "Client could not be 
authenticated."),
    +        @ApiResponse(code = 403, message = "Client is not authorized to 
make this request."),
    +        @ApiResponse(code = 404, message = "The specified resource could 
not be found."),
    +        @ApiResponse(code = 409, message = "The request was valid but NiFi 
was not in the appropriate state to process it. Retrying the same request later 
may be successful.")
    +    })
    +    public Response updateVariableRegistry(
    +        @Context final HttpServletRequest httpServletRequest,
    +        @ApiParam(value = "The process group id.", required = true) 
@PathParam("id") final String groupId,
    +        @ApiParam(value = "The process group configuration details.", 
required = true) final VariableRegistryEntity requestEntity) {
    +
    +        if (requestEntity == null || requestEntity.getVariableRegistry() 
== null) {
    +            throw new IllegalArgumentException("Variable Registry details 
must be specified.");
    +        }
    +
    +        if (requestEntity.getRevision() == null) {
    +            throw new IllegalArgumentException("Revision must be 
specified.");
    +        }
    +
    +        // ensure the same id is being used
    +        final VariableRegistryDTO registryDto = 
requestEntity.getVariableRegistry();
    +        if (!groupId.equals(registryDto.getProcessGroupId())) {
    +            throw new IllegalArgumentException(String.format("The process 
group id (%s) in the request body does "
    +                + "not equal the process group id of the requested 
resource (%s).", registryDto.getProcessGroupId(), groupId));
    +        }
    +
    +        if (isReplicateRequest()) {
    +            return replicate(HttpMethod.PUT, requestEntity);
    +        }
    +
    +        // handle expects request (usually from the cluster manager)
    +        final Revision requestRevision = getRevision(requestEntity, 
groupId);
    +        return withWriteLock(
    +            serviceFacade,
    +            requestEntity,
    +            requestRevision,
    +            lookup -> {
    +                Authorizable authorizable = 
lookup.getProcessGroup(groupId).getAuthorizable();
    +                authorizable.authorize(authorizer, RequestAction.WRITE, 
NiFiUserUtils.getNiFiUser());
    +            },
    +            null,
    +            (revision, processGroupEntity) -> {
    +                // update the process group
    +                final VariableRegistryEntity entity = 
serviceFacade.updateVariableRegistry(revision, registryDto);
    +                populateRemainingVariableRegistryEntityContent(entity);
    +
    +                return generateOkResponse(entity).build();
    +            });
    +    }
    +
    +
    +    /**
    +     * Updates the variable registry for the specified process group.
    +     *
    +     * @param httpServletRequest request
    +     * @param groupId The id of the process group.
    +     * @param requestEntity the Variable Registry Entity
    +     * @return A Variable Registry Entry.
    +     */
    +    @POST
    +    @Consumes(MediaType.APPLICATION_JSON)
    +    @Produces(MediaType.APPLICATION_JSON)
    +    @Path("{id}/variable-registry/update-requests")
    +    @ApiOperation(value = "Submits a request to update a process group's 
variable registry", response = VariableRegistryUpdateRequestEntity.class, 
authorizations = {
    +        @Authorization(value = "Write - /process-groups/{uuid}", type = "")
    +    })
    +    @ApiResponses(value = {
    +        @ApiResponse(code = 400, message = "NiFi was unable to complete 
the request because it was invalid. The request should not be retried without 
modification."),
    +        @ApiResponse(code = 401, message = "Client could not be 
authenticated."),
    +        @ApiResponse(code = 403, message = "Client is not authorized to 
make this request."),
    +        @ApiResponse(code = 404, message = "The specified resource could 
not be found."),
    +        @ApiResponse(code = 409, message = "The request was valid but NiFi 
was not in the appropriate state to process it. Retrying the same request later 
may be successful.")
    +    })
    +    public Response submitUpdateVariableRegistryRequest(
    +        @Context final HttpServletRequest httpServletRequest,
    +        @ApiParam(value = "The process group id.", required = true) 
@PathParam("id") final String groupId,
    +        @ApiParam(value = "The process group configuration details.", 
required = true) final VariableRegistryEntity requestEntity) {
    +
    +        if (requestEntity == null || requestEntity.getVariableRegistry() 
== null) {
    +            throw new IllegalArgumentException("Variable Registry details 
must be specified.");
    +        }
    +
    +        if (requestEntity.getRevision() == null) {
    +            throw new IllegalArgumentException("Revision must be 
specified.");
    +        }
    +
    +        // In order to update variables in a variable registry, we have to 
perform the following steps:
    +        // 1. Determine Affected Components (this includes any Processors 
and Controller Services and any components that reference an affected 
Controller Service).
    +        //    1a. Determine ID's of components
    +        //    1b. Determine Revision's of associated components
    +        // 2. Stop All Affected Processors
    +        // 3. Disable All Affected Controller Services
    +        // 4. Update the Variables
    +        // 5. Re-Enable all Affected Controller Services (services only, 
not dependent components)
    +        // 6. Re-Enable all Processors that Depended on the Controller 
Services
    +
    +        // Determine the affected components (and their associated 
revisions)
    +        final VariableRegistryEntity computedEntity = 
serviceFacade.populateAffectedComponents(requestEntity.getVariableRegistry());
    +        final VariableRegistryDTO computedRegistryDto = 
computedEntity.getVariableRegistry();
    +        if (computedRegistryDto == null) {
    +            throw new ResourceNotFoundException(String.format("Unable to 
locate group with id '%s'.", groupId));
    +        }
    +
    +        final Set<AffectedComponentDTO> affectedComponents = 
serviceFacade.identifyComponentsAffectedByVariableRegistryUpdate(requestEntity.getVariableRegistry());
    +
    +        final Map<String, List<AffectedComponentDTO>> 
affectedComponentsByType = affectedComponents.stream()
    +            .collect(Collectors.groupingBy(comp -> 
comp.getComponentType()));
    +
    +        final List<AffectedComponentDTO> affectedProcessors = 
affectedComponentsByType.get(AffectedComponentDTO.COMPONENT_TYPE_PROCESSOR);
    +        final List<AffectedComponentDTO> affectedServices = 
affectedComponentsByType.get(AffectedComponentDTO.COMPONENT_TYPE_CONTROLLER_SERVICE);
    +
    +
    +        if (isReplicateRequest()) {
    +            // update the variable registry
    +            final VariableRegistryUpdateRequest updateRequest = 
createVariableRegistryUpdateRequest(groupId);
    +            
updateRequest.getIdentifyRelevantComponentsStep().setComplete(true);
    +            final URI originalUri = getAbsolutePath();
    +
    +            // Submit the task to be run in the background
    +            final Runnable taskWrapper = () -> {
    +                try {
    +                    updateVariableRegistryReplicated(groupId, originalUri, 
affectedProcessors, affectedServices, updateRequest, requestEntity);
    +                } catch (final Exception e) {
    +                    logger.error("Failed to update variable registry", e);
    +                    updateRequest.setFailureReason("An unexpected error 
has occurred: " + e);
    +                }
    +            };
    +
    +            variableRegistryThreadPool.submit(taskWrapper);
    +
    +            final VariableRegistryUpdateRequestEntity responseEntity = new 
VariableRegistryUpdateRequestEntity();
    +            
responseEntity.setRequestDto(dtoFactory.createVariableRegistryUpdateRequestDto(updateRequest));
    +            
responseEntity.getRequestDto().setUri(generateResourceUri("process-groups", 
groupId, "variable-registry", "update-requests", updateRequest.getRequestId()));
    +
    +            final URI location = 
URI.create(responseEntity.getRequestDto().getUri());
    +            return 
Response.status(Status.ACCEPTED).location(location).entity(responseEntity).build();
    +        }
    +
    +
    +        final Revision requestRevision = 
getRevision(requestEntity.getRevision(), requestEntity.getId());
    +        return withWriteLock(
    +            serviceFacade,
    +            requestEntity,
    +            requestRevision,
    +            lookup -> {
    +                final NiFiUser user = NiFiUserUtils.getNiFiUser();
    +
    +                final Authorizable groupAuthorizable = 
lookup.getProcessGroup(groupId).getAuthorizable();
    +                groupAuthorizable.authorize(authorizer, 
RequestAction.WRITE, user);
    +
    +                // For every component that is affected, the user must 
have READ permissions and WRITE permissions
    +                // (because this action requires stopping the component).
    +                if (affectedProcessors != null) {
    +                    for (final AffectedComponentDTO affectedComponent : 
affectedProcessors) {
    +                        final Authorizable authorizable = 
lookup.getProcessor(affectedComponent.getComponentId()).getAuthorizable();
    +
    +                        if (!authorizable.isAuthorized(authorizer, 
RequestAction.READ, user)) {
    --- End diff --
    
    Should use `authorize(...)` here and below. `authorize(...)` will indicate 
that the user is attempting to access/modify a component. isAuthorize(...)  is 
meant to simply check if the user is allowed but does not indicate an access 
attempt. In this context, the user is actually modifying, albeit indirectly, 
the component.


> Add Variable Registry at Process Group level
> --------------------------------------------
>
>                 Key: NIFI-4224
>                 URL: https://issues.apache.org/jira/browse/NIFI-4224
>             Project: Apache NiFi
>          Issue Type: New Feature
>          Components: Core Framework
>            Reporter: Mark Payne
>            Assignee: Mark Payne
>
> Currently, NiFi exposes a variable registry that is configurable by adding 
> the name of a properties file to nifi.properties and then treating the 
> referenced properties file as key/value pairs for the variable registry. 
> This, however, is very limiting, as it provides a global scope for all 
> variables, and it requires a restart of NiFi in order to pick up any updates 
> to the file. We should expose a Process Group-level Variable Registry.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to