[ 
https://issues.apache.org/jira/browse/NIFI-4224?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16118897#comment-16118897
 ] 

ASF GitHub Bot commented on NIFI-4224:
--------------------------------------

Github user mcgilman commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2051#discussion_r131998273
  
    --- Diff: 
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
 ---
    @@ -325,6 +441,859 @@ public Response updateProcessGroup(
             );
         }
     
    +
    +    @GET
    +    @Consumes(MediaType.WILDCARD)
    +    @Produces(MediaType.APPLICATION_JSON)
    +    @Path("{groupId}/variable-registry/update-requests/{updateId}")
    +    @ApiOperation(value = "Gets a process group's variable registry", 
response = VariableRegistryUpdateRequestEntity.class, authorizations = {
    +        @Authorization(value = "Read - /process-groups/{uuid}", type = "")
    +    })
    +    @ApiResponses(value = {
    +        @ApiResponse(code = 400, message = "NiFi was unable to complete 
the request because it was invalid. The request should not be retried without 
modification."),
    +        @ApiResponse(code = 401, message = "Client could not be 
authenticated."),
    +        @ApiResponse(code = 403, message = "Client is not authorized to 
make this request."),
    +        @ApiResponse(code = 404, message = "The specified resource could 
not be found."),
    +        @ApiResponse(code = 409, message = "The request was valid but NiFi 
was not in the appropriate state to process it. Retrying the same request later 
may be successful.")
    +    })
    +    public Response getVariableRegistryUpdateRequest(
    +        @ApiParam(value = "The process group id.", required = true) 
@PathParam("groupId") final String groupId,
    +        @ApiParam(value = "The ID of the Variable Registry Update 
Request", required = true) @PathParam("updateId") final String updateId) {
    +
    +        if (groupId == null || updateId == null) {
    +            throw new IllegalArgumentException("Group ID and Update ID 
must both be specified.");
    +        }
    +
    +        if (isReplicateRequest()) {
    +            return replicate(HttpMethod.GET);
    +        }
    +
    +        // authorize access
    +        serviceFacade.authorizeAccess(lookup -> {
    +            final Authorizable processGroup = 
lookup.getProcessGroup(groupId).getAuthorizable();
    +            processGroup.authorize(authorizer, RequestAction.READ, 
NiFiUserUtils.getNiFiUser());
    +        });
    +
    +        final VariableRegistryUpdateRequest request = 
varRegistryUpdateRequests.get(updateId);
    +        if (request == null) {
    +            throw new ResourceNotFoundException("Could not find a Variable 
Registry Update Request with identifier " + updateId);
    +        }
    +
    +        if (!groupId.equals(request.getProcessGroupId())) {
    +            throw new ResourceNotFoundException("Could not find a Variable 
Registry Update Request with identifier " + updateId + " for Process Group with 
identifier " + groupId);
    +        }
    +
    +        final VariableRegistryUpdateRequestEntity entity = new 
VariableRegistryUpdateRequestEntity();
    +        entity.setId(request.getRequestId());
    +        
entity.setRequestDto(dtoFactory.createVariableRegistryUpdateRequestDto(request));
    +        entity.setUri(generateResourceUri("process-groups", groupId, 
"variable-registry", updateId));
    +        return generateOkResponse(entity).build();
    +    }
    +
    +
    +    @DELETE
    +    @Consumes(MediaType.WILDCARD)
    +    @Produces(MediaType.APPLICATION_JSON)
    +    @Path("{groupId}/variable-registry/update-requests/{updateId}")
    +    @ApiOperation(value = "Deletes an update request for a process group's 
variable registry. If the request is not yet complete, it will automatically be 
cancelled.",
    +        response = VariableRegistryUpdateRequestEntity.class, 
authorizations = {
    +            @Authorization(value = "Read - /process-groups/{uuid}", type = 
"")
    +    })
    +    @ApiResponses(value = {
    +        @ApiResponse(code = 400, message = "NiFi was unable to complete 
the request because it was invalid. The request should not be retried without 
modification."),
    +        @ApiResponse(code = 401, message = "Client could not be 
authenticated."),
    +        @ApiResponse(code = 403, message = "Client is not authorized to 
make this request."),
    +        @ApiResponse(code = 404, message = "The specified resource could 
not be found."),
    +        @ApiResponse(code = 409, message = "The request was valid but NiFi 
was not in the appropriate state to process it. Retrying the same request later 
may be successful.")
    +    })
    +    public Response deleteVariableRegistryUpdateRequest(
    +        @ApiParam(value = "The process group id.", required = true) 
@PathParam("groupId") final String groupId,
    +        @ApiParam(value = "The ID of the Variable Registry Update 
Request", required = true) @PathParam("updateId") final String updateId) {
    +
    +        if (groupId == null || updateId == null) {
    +            throw new IllegalArgumentException("Group ID and Update ID 
must both be specified.");
    +        }
    +
    +        if (isReplicateRequest()) {
    +            return replicate(HttpMethod.DELETE);
    +        }
    +
    +        // authorize access
    +        serviceFacade.authorizeAccess(lookup -> {
    +            final Authorizable processGroup = 
lookup.getProcessGroup(groupId).getAuthorizable();
    +            processGroup.authorize(authorizer, RequestAction.READ, 
NiFiUserUtils.getNiFiUser());
    +        });
    +
    +        final VariableRegistryUpdateRequest request = 
varRegistryUpdateRequests.remove(updateId);
    +        if (request == null) {
    +            throw new ResourceNotFoundException("Could not find a Variable 
Registry Update Request with identifier " + updateId);
    +        }
    +
    +        if (!groupId.equals(request.getProcessGroupId())) {
    +            throw new ResourceNotFoundException("Could not find a Variable 
Registry Update Request with identifier " + updateId + " for Process Group with 
identifier " + groupId);
    +        }
    +
    +        request.cancel();
    +
    +        final VariableRegistryUpdateRequestEntity entity = new 
VariableRegistryUpdateRequestEntity();
    +        entity.setId(request.getRequestId());
    +        
entity.setRequestDto(dtoFactory.createVariableRegistryUpdateRequestDto(request));
    +        entity.setUri(generateResourceUri("process-groups", groupId, 
"variable-registry", updateId));
    +        return generateOkResponse(entity).build();
    +    }
    +
    +
    +    @PUT
    +    @Consumes(MediaType.APPLICATION_JSON)
    +    @Produces(MediaType.APPLICATION_JSON)
    +    @Path("{id}/variable-registry")
    +    @ApiOperation(value = "Updates the contents of a Process Group's 
variable Registry", response = VariableRegistryEntity.class, authorizations = {
    +        @Authorization(value = "Write - /process-groups/{uuid}", type = "")
    +    })
    +    @ApiResponses(value = {
    +        @ApiResponse(code = 400, message = "NiFi was unable to complete 
the request because it was invalid. The request should not be retried without 
modification."),
    +        @ApiResponse(code = 401, message = "Client could not be 
authenticated."),
    +        @ApiResponse(code = 403, message = "Client is not authorized to 
make this request."),
    +        @ApiResponse(code = 404, message = "The specified resource could 
not be found."),
    +        @ApiResponse(code = 409, message = "The request was valid but NiFi 
was not in the appropriate state to process it. Retrying the same request later 
may be successful.")
    +    })
    +    public Response updateVariableRegistry(
    +        @Context final HttpServletRequest httpServletRequest,
    +        @ApiParam(value = "The process group id.", required = true) 
@PathParam("id") final String groupId,
    +        @ApiParam(value = "The process group configuration details.", 
required = true) final VariableRegistryEntity requestEntity) {
    +
    +        if (requestEntity == null || requestEntity.getVariableRegistry() 
== null) {
    +            throw new IllegalArgumentException("Variable Registry details 
must be specified.");
    +        }
    +
    +        if (requestEntity.getRevision() == null) {
    +            throw new IllegalArgumentException("Revision must be 
specified.");
    +        }
    +
    +        // ensure the same id is being used
    +        final VariableRegistryDTO registryDto = 
requestEntity.getVariableRegistry();
    +        if (!groupId.equals(registryDto.getProcessGroupId())) {
    +            throw new IllegalArgumentException(String.format("The process 
group id (%s) in the request body does "
    +                + "not equal the process group id of the requested 
resource (%s).", registryDto.getProcessGroupId(), groupId));
    +        }
    +
    +        if (isReplicateRequest()) {
    +            return replicate(HttpMethod.PUT, requestEntity);
    +        }
    +
    +        // handle expects request (usually from the cluster manager)
    +        final Revision requestRevision = getRevision(requestEntity, 
groupId);
    +        return withWriteLock(
    +            serviceFacade,
    +            requestEntity,
    +            requestRevision,
    +            lookup -> {
    +                Authorizable authorizable = 
lookup.getProcessGroup(groupId).getAuthorizable();
    +                authorizable.authorize(authorizer, RequestAction.WRITE, 
NiFiUserUtils.getNiFiUser());
    +            },
    +            null,
    +            (revision, processGroupEntity) -> {
    +                // update the process group
    +                final VariableRegistryEntity entity = 
serviceFacade.updateVariableRegistry(revision, registryDto);
    +                populateRemainingVariableRegistryEntityContent(entity);
    +
    +                return generateOkResponse(entity).build();
    +            });
    +    }
    +
    +
    +    /**
    +     * Updates the variable registry for the specified process group.
    +     *
    +     * @param httpServletRequest request
    +     * @param groupId The id of the process group.
    +     * @param requestEntity the Variable Registry Entity
    +     * @return A Variable Registry Entry.
    +     */
    +    @POST
    +    @Consumes(MediaType.APPLICATION_JSON)
    +    @Produces(MediaType.APPLICATION_JSON)
    +    @Path("{id}/variable-registry/update-requests")
    +    @ApiOperation(value = "Submits a request to update a process group's 
variable registry", response = VariableRegistryUpdateRequestEntity.class, 
authorizations = {
    +        @Authorization(value = "Write - /process-groups/{uuid}", type = "")
    +    })
    +    @ApiResponses(value = {
    +        @ApiResponse(code = 400, message = "NiFi was unable to complete 
the request because it was invalid. The request should not be retried without 
modification."),
    +        @ApiResponse(code = 401, message = "Client could not be 
authenticated."),
    +        @ApiResponse(code = 403, message = "Client is not authorized to 
make this request."),
    +        @ApiResponse(code = 404, message = "The specified resource could 
not be found."),
    +        @ApiResponse(code = 409, message = "The request was valid but NiFi 
was not in the appropriate state to process it. Retrying the same request later 
may be successful.")
    +    })
    +    public Response submitUpdateVariableRegistryRequest(
    +        @Context final HttpServletRequest httpServletRequest,
    +        @ApiParam(value = "The process group id.", required = true) 
@PathParam("id") final String groupId,
    +        @ApiParam(value = "The process group configuration details.", 
required = true) final VariableRegistryEntity requestEntity) {
    +
    +        if (requestEntity == null || requestEntity.getVariableRegistry() 
== null) {
    +            throw new IllegalArgumentException("Variable Registry details 
must be specified.");
    +        }
    +
    +        if (requestEntity.getRevision() == null) {
    +            throw new IllegalArgumentException("Revision must be 
specified.");
    +        }
    +
    +        // In order to update variables in a variable registry, we have to 
perform the following steps:
    +        // 1. Determine Affected Components (this includes any Processors 
and Controller Services and any components that reference an affected 
Controller Service).
    +        //    1a. Determine ID's of components
    +        //    1b. Determine Revision's of associated components
    +        // 2. Stop All Affected Processors
    +        // 3. Disable All Affected Controller Services
    +        // 4. Update the Variables
    +        // 5. Re-Enable all Affected Controller Services (services only, 
not dependent components)
    +        // 6. Re-Enable all Processors that Depended on the Controller 
Services
    +
    +        // Determine the affected components (and their associated 
revisions)
    +        final VariableRegistryEntity computedEntity = 
serviceFacade.populateAffectedComponents(requestEntity.getVariableRegistry());
    +        final VariableRegistryDTO computedRegistryDto = 
computedEntity.getVariableRegistry();
    +        if (computedRegistryDto == null) {
    +            throw new ResourceNotFoundException(String.format("Unable to 
locate group with id '%s'.", groupId));
    +        }
    +
    +        final Set<AffectedComponentDTO> affectedComponents = 
serviceFacade.identifyComponentsAffectedByVariableRegistryUpdate(requestEntity.getVariableRegistry());
    +
    +        final Map<String, List<AffectedComponentDTO>> 
affectedComponentsByType = affectedComponents.stream()
    +            .collect(Collectors.groupingBy(comp -> 
comp.getComponentType()));
    +
    +        final List<AffectedComponentDTO> affectedProcessors = 
affectedComponentsByType.get(AffectedComponentDTO.COMPONENT_TYPE_PROCESSOR);
    +        final List<AffectedComponentDTO> affectedServices = 
affectedComponentsByType.get(AffectedComponentDTO.COMPONENT_TYPE_CONTROLLER_SERVICE);
    +
    +
    +        if (isReplicateRequest()) {
    +            // update the variable registry
    +            final VariableRegistryUpdateRequest updateRequest = 
createVariableRegistryUpdateRequest(groupId);
    +            
updateRequest.getIdentifyRelevantComponentsStep().setComplete(true);
    +            final URI originalUri = getAbsolutePath();
    +
    +            // Submit the task to be run in the background
    +            final Runnable taskWrapper = () -> {
    +                try {
    +                    updateVariableRegistryReplicated(groupId, originalUri, 
affectedProcessors, affectedServices, updateRequest, requestEntity);
    +                } catch (final Exception e) {
    +                    logger.error("Failed to update variable registry", e);
    +                    updateRequest.setFailureReason("An unexpected error 
has occurred: " + e);
    +                }
    +            };
    +
    +            variableRegistryThreadPool.submit(taskWrapper);
    +
    +            final VariableRegistryUpdateRequestEntity responseEntity = new 
VariableRegistryUpdateRequestEntity();
    +            
responseEntity.setRequestDto(dtoFactory.createVariableRegistryUpdateRequestDto(updateRequest));
    +            
responseEntity.getRequestDto().setUri(generateResourceUri("process-groups", 
groupId, "variable-registry", "update-requests", updateRequest.getRequestId()));
    +
    +            final URI location = 
URI.create(responseEntity.getRequestDto().getUri());
    +            return 
Response.status(Status.ACCEPTED).location(location).entity(responseEntity).build();
    +        }
    +
    +
    +        final Revision requestRevision = 
getRevision(requestEntity.getRevision(), requestEntity.getId());
    +        return withWriteLock(
    +            serviceFacade,
    +            requestEntity,
    +            requestRevision,
    +            lookup -> {
    +                final NiFiUser user = NiFiUserUtils.getNiFiUser();
    +
    +                final Authorizable groupAuthorizable = 
lookup.getProcessGroup(groupId).getAuthorizable();
    +                groupAuthorizable.authorize(authorizer, 
RequestAction.WRITE, user);
    +
    +                // For every component that is affected, the user must 
have READ permissions and WRITE permissions
    +                // (because this action requires stopping the component).
    +                if (affectedProcessors != null) {
    +                    for (final AffectedComponentDTO affectedComponent : 
affectedProcessors) {
    +                        final Authorizable authorizable = 
lookup.getProcessor(affectedComponent.getComponentId()).getAuthorizable();
    +
    +                        if (!authorizable.isAuthorized(authorizer, 
RequestAction.READ, user)) {
    +                            throw new AccessDeniedException("User does not 
have Read permissions to Processor with ID " + 
affectedComponent.getComponentId());
    +                        }
    +                        if (!authorizable.isAuthorized(authorizer, 
RequestAction.WRITE, user)) {
    +                            throw new AccessDeniedException("User does not 
have Write permissions to Processor with ID " + 
affectedComponent.getComponentId());
    +                        }
    +                    }
    +                }
    +
    +                if (affectedServices != null) {
    +                    for (final AffectedComponentDTO affectedComponent : 
affectedServices) {
    +                        final Authorizable authorizable = 
lookup.getControllerService(affectedComponent.getComponentId()).getAuthorizable();
    +
    +                        if (!authorizable.isAuthorized(authorizer, 
RequestAction.READ, user)) {
    +                            throw new AccessDeniedException("User does not 
have Read permissions to Controller Service with ID " + 
affectedComponent.getComponentId());
    +                        }
    +                        if (!authorizable.isAuthorized(authorizer, 
RequestAction.WRITE, user)) {
    +                            throw new AccessDeniedException("User does not 
have Write permissions to Controller Service with ID " + 
affectedComponent.getComponentId());
    +                        }
    +                    }
    +                }
    +            },
    +            null,
    +            (revision, varRegistryEntity) -> {
    +                return updateVariableRegistryLocal(groupId, 
affectedProcessors, affectedServices, requestEntity);
    +            });
    +    }
    +
    +    private Pause createPause(final VariableRegistryUpdateRequest 
updateRequest) {
    +        return new Pause() {
    +            @Override
    +            public boolean pause() {
    +                if (updateRequest.isComplete()) {
    +                    return false;
    +                }
    +
    +                try {
    +                    Thread.sleep(500);
    +                } catch (final InterruptedException ie) {
    +                    Thread.currentThread().interrupt();
    +                    return false;
    +                }
    +
    +                return !updateRequest.isComplete();
    +            }
    +        };
    +    }
    +
    +    private void updateVariableRegistryReplicated(final String groupId, 
final URI originalUri, final Collection<AffectedComponentDTO> 
affectedProcessors,
    +        final Collection<AffectedComponentDTO> affectedServices,
    +        final VariableRegistryUpdateRequest updateRequest, final 
VariableRegistryEntity requestEntity) {
    +
    +        final NiFiProperties properties = getProperties();
    +        final Client jerseyClient = WebUtils.createClient(new 
DefaultClientConfig(), SslContextFactory.createSslContext(properties));
    +        final int connectionTimeout = (int) 
FormatUtils.getTimeDuration(properties.getClusterNodeConnectionTimeout(), 
TimeUnit.MILLISECONDS);
    +        final int readTimeout = (int) 
FormatUtils.getTimeDuration(properties.getClusterNodeReadTimeout(), 
TimeUnit.MILLISECONDS);
    +        
jerseyClient.getProperties().put(ClientConfig.PROPERTY_CONNECT_TIMEOUT, 
connectionTimeout);
    +        
jerseyClient.getProperties().put(ClientConfig.PROPERTY_READ_TIMEOUT, 
readTimeout);
    +        
jerseyClient.getProperties().put(ClientConfig.PROPERTY_FOLLOW_REDIRECTS, 
Boolean.TRUE);
    +
    +        final Pause pause = createPause(updateRequest);
    +
    +        // stop processors
    +        if (affectedProcessors != null) {
    +            logger.info("In order to update Variable Registry for Process 
Group with ID {}, "
    +                + "replicating request to stop {} affected processors", 
groupId, affectedProcessors.size());
    +
    +            scheduleProcessors(groupId, originalUri, jerseyClient, 
updateRequest, pause,
    +                affectedProcessors, ScheduledState.STOPPED, 
updateRequest.getStopProcessorsStep());
    +        }
    +
    +        // disable controller services
    +        if (affectedServices != null) {
    +            logger.info("In order to update Variable Registry for Process 
Group with ID {}, "
    +                + "replicating request to stop {} affected Controller 
Services", groupId, affectedServices.size());
    +
    +            activateControllerServices(groupId, originalUri, jerseyClient, 
updateRequest, pause,
    +                affectedServices, ControllerServiceState.DISABLED, 
updateRequest.getDisableServicesStep());
    +        }
    +
    +        // apply updates
    +        logger.info("In order to update Variable Registry for Process 
Group with ID {}, "
    +            + "replicating request to apply updates to variable registry", 
groupId);
    +        applyVariableRegistryUpdate(groupId, originalUri, jerseyClient, 
updateRequest, requestEntity);
    +
    +        // re-enable controller services
    +        if (affectedServices != null) {
    +            logger.info("In order to update Variable Registry for Process 
Group with ID {}, "
    +                + "replicating request to re-enable {} affected services", 
groupId, affectedServices.size());
    +
    +            activateControllerServices(groupId, originalUri, jerseyClient, 
updateRequest, pause,
    +                affectedServices, ControllerServiceState.ENABLED, 
updateRequest.getEnableServicesStep());
    +        }
    +
    +        // restart processors
    +        if (affectedProcessors != null) {
    +            logger.info("In order to update Variable Registry for Process 
Group with ID {}, "
    +                + "replicating request to restart {} affected processors", 
groupId, affectedProcessors.size());
    +
    +            scheduleProcessors(groupId, originalUri, jerseyClient, 
updateRequest, pause,
    +                affectedProcessors, ScheduledState.RUNNING, 
updateRequest.getStartProcessorsStep());
    +        }
    +
    +        updateRequest.setComplete(true);
    +    }
    +
    +    /**
    +     * Periodically polls the process group with the given ID, waiting for 
all processors whose ID's are given to have the given Scheduled State.
    +     *
    +     * @param client the Jersey Client to use for making the request
    +     * @param groupId the ID of the Process Group to poll
    +     * @param processorIds the ID of all Processors whose state should be 
equal to the given desired state
    +     * @param desiredState the desired state for all processors with the 
ID's given
    +     * @param pause the Pause that can be used to wait between polling
    +     * @return <code>true</code> if successful, <code>false</code> if 
unable to wait for processors to reach the desired state
    +     */
    +    private boolean waitForProcessorStatus(final Client client, final URI 
originalUri, final String groupId, final Set<String> processorIds, final 
ScheduledState desiredState, final Pause pause) {
    +        URI groupUri;
    +        try {
    +            groupUri = new URI(originalUri.getScheme(), 
originalUri.getUserInfo(), originalUri.getHost(),
    +                originalUri.getPort(), "/nifi-api/flow/process-groups/" + 
groupId + "/status", "recursive=true", originalUri.getFragment());
    +        } catch (URISyntaxException e) {
    +            throw new RuntimeException(e);
    +        }
    +
    +        boolean continuePolling = true;
    +        while (continuePolling) {
    +            final ClientResponse response = 
client.resource(groupUri).header("Content-Type", 
"application/json").get(ClientResponse.class);
    +            if (response.getStatus() != Status.OK.getStatusCode()) {
    +                return false;
    +            }
    +
    +            final ProcessGroupStatusEntity statusEntity = 
response.getEntity(ProcessGroupStatusEntity.class);
    +            final ProcessGroupStatusDTO statusDto = 
statusEntity.getProcessGroupStatus();
    +            final ProcessGroupStatusSnapshotDTO statusSnapshotDto = 
statusDto.getAggregateSnapshot();
    +
    +            if (isProcessorStatusEqual(statusSnapshotDto, processorIds, 
desiredState)) {
    +                logger.debug("All {} processors of interest now have the 
desired state of {}", processorIds.size(), desiredState);
    +                return true;
    +            }
    +
    +            // Not all of the processors are in the desired state. Pause 
for a bit and poll again.
    +            continuePolling = pause.pause();
    +        }
    +
    +        return false;
    +    }
    +
    +    /**
    +     * Periodically polls the process group with the given ID, waiting for 
all processors whose ID's are given to have the given Scheduled State.
    +     *
    +     * @param groupId the ID of the Process Group to poll
    +     * @param processorIds the ID of all Processors whose state should be 
equal to the given desired state
    +     * @param desiredState the desired state for all processors with the 
ID's given
    +     * @param pause the Pause that can be used to wait between polling
    +     * @return <code>true</code> if successful, <code>false</code> if 
unable to wait for processors to reach the desired state
    +     */
    +    private boolean waitForLocalProcessorStatus(final String groupId, 
final Set<String> processorIds, final ScheduledState desiredState, final Pause 
pause) {
    +        boolean continuePolling = true;
    +        while (continuePolling) {
    +            final ProcessGroupStatusEntity statusEntity = 
serviceFacade.getProcessGroupStatus(groupId, true);
    +            final ProcessGroupStatusDTO statusDto = 
statusEntity.getProcessGroupStatus();
    +            final ProcessGroupStatusSnapshotDTO statusSnapshotDto = 
statusDto.getAggregateSnapshot();
    +
    +            if (isProcessorStatusEqual(statusSnapshotDto, processorIds, 
desiredState)) {
    +                logger.debug("All {} processors of interest now have the 
desired state of {}", processorIds.size(), desiredState);
    +                return true;
    +            }
    +
    +            // Not all of the processors are in the desired state. Pause 
for a bit and poll again.
    +            continuePolling = pause.pause();
    +        }
    +
    +        return false;
    +    }
    +
    +    private boolean isProcessorStatusEqual(final 
ProcessGroupStatusSnapshotDTO statusSnapshot, final Set<String> processorIds, 
final ScheduledState desiredState) {
    +        final String desiredStateName = desiredState.name();
    +
    +        final boolean allProcessorsMatch = 
statusSnapshot.getProcessorStatusSnapshots().stream()
    +            .map(entity -> entity.getProcessorStatusSnapshot())
    +            .filter(status -> processorIds.contains(status.getId()))
    +            .allMatch(status -> {
    +                final String runStatus = status.getRunStatus();
    +                final boolean stateMatches = 
desiredStateName.equalsIgnoreCase(runStatus);
    +                if (!stateMatches) {
    +                    return false;
    +                }
    +
    +                if (desiredState == ScheduledState.STOPPED && 
status.getActiveThreadCount() != 0) {
    +                    return false;
    +                }
    +
    +                return true;
    +            });
    +
    +        if (!allProcessorsMatch) {
    +            return false;
    +        }
    +
    +        for (final ProcessGroupStatusSnapshotEntity childGroupEntity : 
statusSnapshot.getProcessGroupStatusSnapshots()) {
    +            final ProcessGroupStatusSnapshotDTO childGroupStatus = 
childGroupEntity.getProcessGroupStatusSnapshot();
    +            final boolean allMatchChildLevel = 
isProcessorStatusEqual(childGroupStatus, processorIds, desiredState);
    +            if (!allMatchChildLevel) {
    +                return false;
    +            }
    +        }
    +
    +        return true;
    +    }
    +
    +
    +
    +    /**
    +     * Periodically polls the process group with the given ID, waiting for 
all controller services whose ID's are given to have the given Controller 
Service State.
    +     *
    +     * @param client the Jersey Client to use for making the HTTP Request
    +     * @param groupId the ID of the Process Group to poll
    +     * @param serviceIds the ID of all Controller Services whose state 
should be equal to the given desired state
    +     * @param desiredState the desired state for all services with the 
ID's given
    +     * @param pause the Pause that can be used to wait between polling
    +     * @return <code>true</code> if successful, <code>false</code> if 
unable to wait for services to reach the desired state
    +     */
    +    private boolean waitForControllerServiceStatus(final Client client, 
final URI originalUri, final String groupId, final Set<String> serviceIds, 
final ControllerServiceState desiredState,
    +        final Pause pause) {
    +        URI groupUri;
    +        try {
    +            groupUri = new URI(originalUri.getScheme(), 
originalUri.getUserInfo(), originalUri.getHost(),
    +                originalUri.getPort(), "/nifi-api/flow/process-groups/" + 
groupId + "/controller-services", 
"includeAncestorGroups=false,includeDescendantGroups=true", 
originalUri.getFragment());
    +        } catch (URISyntaxException e) {
    +            throw new RuntimeException(e);
    +        }
    +
    +        boolean continuePolling = true;
    +        while (continuePolling) {
    +            final ClientResponse response = 
client.resource(groupUri).header("Content-Type", 
"application/json").get(ClientResponse.class);
    +            if (response.getStatus() != Status.OK.getStatusCode()) {
    +                return false;
    +            }
    +
    +            final ControllerServicesEntity controllerServicesEntity = 
response.getEntity(ControllerServicesEntity.class);
    +            final Set<ControllerServiceEntity> serviceEntities = 
controllerServicesEntity.getControllerServices();
    +
    +            final String desiredStateName = desiredState.name();
    +            final boolean allServicesMatch = serviceEntities.stream()
    +                .map(entity -> entity.getComponent())
    +                .filter(service -> serviceIds.contains(service.getId()))
    +                .map(service -> service.getState())
    +                .allMatch(state -> state.equals(desiredStateName));
    +
    +            if (allServicesMatch) {
    +                logger.debug("All {} controller services of interest now 
have the desired state of {}", serviceIds.size(), desiredState);
    +                return true;
    +            }
    +
    +            // Not all of the processors are in the desired state. Pause 
for a bit and poll again.
    +            continuePolling = pause.pause();
    +        }
    +
    +        return false;
    +    }
    +
    +
    +    /**
    +     * Periodically polls the process group with the given ID, waiting for 
all controller services whose ID's are given to have the given Controller 
Service State.
    +     *
    +     * @param groupId the ID of the Process Group to poll
    +     * @param serviceIds the ID of all Controller Services whose state 
should be equal to the given desired state
    +     * @param desiredState the desired state for all services with the 
ID's given
    +     * @param pause the Pause that can be used to wait between polling
    +     * @param user the user that is retrieving the controller services
    +     * @return <code>true</code> if successful, <code>false</code> if 
unable to wait for services to reach the desired state
    +     */
    +    private boolean waitForLocalControllerServiceStatus(final String 
groupId, final Set<String> serviceIds, final ControllerServiceState 
desiredState, final Pause pause, final NiFiUser user) {
    +        boolean continuePolling = true;
    +        while (continuePolling) {
    +            final Set<ControllerServiceEntity> serviceEntities = 
serviceFacade.getControllerServices(groupId, false, true, user);
    +
    +            final String desiredStateName = desiredState.name();
    +            final boolean allServicesMatch = serviceEntities.stream()
    +                .map(entity -> entity.getComponent())
    +                .filter(service -> serviceIds.contains(service.getId()))
    +                .map(service -> service.getState())
    +                .allMatch(state -> desiredStateName.equals(state));
    +
    +            if (allServicesMatch) {
    +                logger.debug("All {} controller services of interest now 
have the desired state of {}", serviceIds.size(), desiredState);
    +                return true;
    +            }
    +
    +            // Not all of the processors are in the desired state. Pause 
for a bit and poll again.
    +            continuePolling = pause.pause();
    +        }
    +
    +        return false;
    +    }
    +
    +    private VariableRegistryUpdateRequest 
createVariableRegistryUpdateRequest(final String groupId) {
    +        final VariableRegistryUpdateRequest updateRequest = new 
VariableRegistryUpdateRequest(UUID.randomUUID().toString(), groupId);
    +
    +        // before adding to the request map, purge any old requests. Must 
do this by creating a List of ID's
    +        // and then removing those ID's one-at-a-time in order to avoid 
ConcurrentModificationException.
    +        final Date oneMinuteAgo = new Date(System.currentTimeMillis() - 
VARIABLE_REGISTRY_UPDATE_REQUEST_EXPIRATION);
    +        final List<String> completedRequestIds = 
varRegistryUpdateRequests.entrySet().stream()
    +            .filter(entry -> entry.getValue().isComplete())
    +            .filter(entry -> 
entry.getValue().getLastUpdated().before(oneMinuteAgo))
    +            .map(Map.Entry::getKey)
    +            .collect(Collectors.toList());
    +
    +        completedRequestIds.stream().forEach(id -> 
varRegistryUpdateRequests.remove(id));
    +
    +        final int requestCount = varRegistryUpdateRequests.size();
    +        if (requestCount > MAX_VARIABLE_REGISTRY_UPDATE_REQUESTS) {
    +            throw new IllegalStateException("There are already " + 
requestCount + " update requests for variable registries. "
    +                + "Cannot issue any more requests until the older ones are 
deleted or expire");
    +        }
    +
    +        this.varRegistryUpdateRequests.put(updateRequest.getRequestId(), 
updateRequest);
    +        return updateRequest;
    +    }
    +
    +    private Response updateVariableRegistryLocal(final String groupId, 
final List<AffectedComponentDTO> affectedProcessors, final 
List<AffectedComponentDTO> affectedServices,
    +        final VariableRegistryEntity requestEntity) {
    +
    +        final Set<String> affectedProcessorIds = affectedProcessors == 
null ? Collections.emptySet() : affectedProcessors.stream()
    +            .map(component -> component.getComponentId())
    +            .collect(Collectors.toSet());
    +        Map<String, Revision> processorRevisionMap = getRevisions(groupId, 
affectedProcessorIds);
    +
    +        final Set<String> affectedServiceIds = affectedServices == null ? 
Collections.emptySet() : affectedServices.stream()
    +            .map(component -> component.getComponentId())
    +            .collect(Collectors.toSet());
    +        Map<String, Revision> serviceRevisionMap = getRevisions(groupId, 
affectedServiceIds);
    +
    +        // update the variable registry
    +        final VariableRegistryUpdateRequest updateRequest = 
createVariableRegistryUpdateRequest(groupId);
    +        
updateRequest.getIdentifyRelevantComponentsStep().setComplete(true);
    +        final Pause pause = createPause(updateRequest);
    +
    +        final Revision requestRevision = 
getRevision(requestEntity.getRevision(), requestEntity.getId());
    +
    +        final NiFiUser user = NiFiUserUtils.getNiFiUser();
    +        final Runnable updateTask = new Runnable() {
    +            @Override
    +            public void run() {
    +                try {
    +                    // Stop processors
    +                    performUpdateVariableRegistryStep(groupId, 
updateRequest, updateRequest.getStopProcessorsStep(), "Stopping Processors",
    +                        () -> stopProcessors(user, updateRequest, groupId, 
processorRevisionMap, pause));
    +
    +                    // Update revision map because this will have modified 
the revisions of our components.
    +                    final Map<String, Revision> 
updatedProcessorRevisionMap = getRevisions(groupId, affectedProcessorIds);
    +
    +                    // Disable controller services
    +                    performUpdateVariableRegistryStep(groupId, 
updateRequest, updateRequest.getDisableServicesStep(), "Disabling Controller 
Services",
    +                        () -> disableControllerServices(user, 
updateRequest, groupId, serviceRevisionMap, pause));
    +
    +                    // Update revision map because this will have modified 
the revisions of our components.
    +                    final Map<String, Revision> updatedServiceRevisionMap 
= getRevisions(groupId, affectedServiceIds);
    +
    +                    // Apply the updates
    +                    performUpdateVariableRegistryStep(groupId, 
updateRequest, updateRequest.getApplyUpdatesStep(), "Applying updates to 
Variable Registry",
    +                        () -> serviceFacade.updateVariableRegistry(user, 
requestRevision, requestEntity.getVariableRegistry()));
    +
    +                    // Re-enable the controller services
    +                    performUpdateVariableRegistryStep(groupId, 
updateRequest, updateRequest.getEnableServicesStep(), "Re-enabling Controller 
Services",
    +                        () -> enableControllerServices(user, groupId, 
updatedServiceRevisionMap, pause));
    +
    +                    // Restart processors
    +                    performUpdateVariableRegistryStep(groupId, 
updateRequest, updateRequest.getStartProcessorsStep(), "Restarting Processors",
    +                        () -> startProcessors(user, groupId, 
updatedProcessorRevisionMap, pause));
    +
    +                    // Set complete
    +                    updateRequest.setComplete(true);
    +                    updateRequest.setLastUpdated(new Date());
    +                } catch (final Exception e) {
    +                    logger.error("Failed to update Variable Registry for 
Proces Group with ID " + groupId, e);
    +                    updateRequest.setFailureReason("An unexpected error 
has occurred: " + e);
    +                }
    +            }
    +        };
    +
    +        // Submit the task to be run in the background
    +        variableRegistryThreadPool.submit(updateTask);
    +
    +        final VariableRegistryUpdateRequestEntity responseEntity = new 
VariableRegistryUpdateRequestEntity();
    --- End diff --
    
    Can we set the permissions on this entity for consistency with all of the 
other endpoints?


> Add Variable Registry at Process Group level
> --------------------------------------------
>
>                 Key: NIFI-4224
>                 URL: https://issues.apache.org/jira/browse/NIFI-4224
>             Project: Apache NiFi
>          Issue Type: New Feature
>          Components: Core Framework
>            Reporter: Mark Payne
>            Assignee: Mark Payne
>
> Currently, NiFi exposes a variable registry that is configurable by adding 
> the name of a properties file to nifi.properties and then treating the 
> referenced properties file as key/value pairs for the variable registry. 
> This, however, is very limiting, as it provides a global scope for all 
> variables, and it requires a restart of NiFi in order to pick up any updates 
> to the file. We should expose a Process Group-level Variable Registry.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to