Github user markap14 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/461#discussion_r64256472
  
    --- Diff: 
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java
 ---
    @@ -321,6 +320,157 @@ public Response getControllerServices(
             return clusterContext(generateOkResponse(entity)).build();
         }
     
    +    /**
    +     * Updates the specified process group.
    +     *
    +     * @param httpServletRequest request
    +     * @param id The id of the process group.
    +     * @param scheduleComponentsEntity A scheduleComponentsEntity.
    +     * @return A processGroupEntity.
    +     */
    +    @PUT
    +    @Consumes(MediaType.APPLICATION_JSON)
    +    @Produces(MediaType.APPLICATION_JSON)
    +    @Path("process-groups/{id}")
    +    // TODO - @PreAuthorize("hasRole('ROLE_DFM')")
    +    @ApiOperation(
    +        value = "Updates a process group",
    +        response = ScheduleComponentsEntity.class,
    +        authorizations = {
    +            @Authorization(value = "Data Flow Manager", type = "ROLE_DFM")
    +        }
    +    )
    +    @ApiResponses(
    +        value = {
    +            @ApiResponse(code = 400, message = "NiFi was unable to 
complete the request because it was invalid. The request should not be retried 
without modification."),
    +            @ApiResponse(code = 401, message = "Client could not be 
authenticated."),
    +            @ApiResponse(code = 403, message = "Client is not authorized 
to make this request."),
    +            @ApiResponse(code = 404, message = "The specified resource 
could not be found."),
    +            @ApiResponse(code = 409, message = "The request was valid but 
NiFi was not in the appropriate state to process it. Retrying the same request 
later may be successful.")
    +        }
    +    )
    +    public Response scheduleComponents(
    +        @Context HttpServletRequest httpServletRequest,
    +        @ApiParam(
    +            value = "The process group id.",
    +            required = true
    +        )
    +        @PathParam("id") String id,
    +        ScheduleComponentsEntity scheduleComponentsEntity) {
    +
    +        authorizeFlow();
    +
    +        // ensure the same id is being used
    +        if (!id.equals(scheduleComponentsEntity.getId())) {
    +            throw new IllegalArgumentException(String.format("The process 
group id (%s) in the request body does "
    +                + "not equal the process group id of the requested 
resource (%s).", scheduleComponentsEntity.getId(), id));
    +        }
    +
    +        final ScheduledState state;
    +        if (scheduleComponentsEntity.getState() == null) {
    +            throw new IllegalArgumentException("The scheduled state must 
be specified.");
    +        } else {
    +            try {
    +                state = 
ScheduledState.valueOf(scheduleComponentsEntity.getState());
    +            } catch (final IllegalArgumentException iae) {
    +                throw new IllegalArgumentException(String.format("The 
scheduled must be one of [].", 
StringUtils.join(EnumSet.of(ScheduledState.RUNNING, ScheduledState.STOPPED), ", 
")));
    +            }
    +        }
    +
    +        // ensure its a supported scheduled state
    +        if (ScheduledState.DISABLED.equals(state) || 
ScheduledState.STARTING.equals(state) || ScheduledState.STOPPING.equals(state)) 
{
    +            throw new IllegalArgumentException(String.format("The 
scheduled must be one of [].", 
StringUtils.join(EnumSet.of(ScheduledState.RUNNING, ScheduledState.STOPPED), ", 
")));
    +        }
    +
    +        // if the components are not specified, gather all components and 
their current revision
    +        if (scheduleComponentsEntity.getComponents() == null) {
    +            // TODO - this will break while clustered until nodes are able 
to process/replicate requests
    +            // get the current revisions for the components being updated
    +            final Set<Revision> revisions = 
serviceFacade.getRevisionsFromGroup(id, group -> {
    +                final Set<String> componentIds = new HashSet<>();
    +
    +                // ensure authorized for each processor we will attempt to 
schedule
    +                
group.findAllProcessors().stream().filter(ScheduledState.RUNNING.equals(state) 
? ProcessGroup.SCHEDULABLE_PROCESSORS : 
ProcessGroup.UNSCHEDULABLE_PROCESSORS).forEach(processor -> {
    +                    if (processor.isAuthorized(authorizer, 
RequestAction.WRITE)) {
    +                        componentIds.add(processor.getIdentifier());
    +                    }
    +                });
    +
    +                // ensure authorized for each input port we will attempt 
to schedule
    +                
group.findAllInputPorts().stream().filter(ScheduledState.RUNNING.equals(state) 
? ProcessGroup.SCHEDULABLE_PORTS : 
ProcessGroup.UNSCHEDULABLE_PORTS).forEach(inputPort -> {
    +                    if (inputPort.isAuthorized(authorizer, 
RequestAction.WRITE)) {
    +                        componentIds.add(inputPort.getIdentifier());
    +                    }
    +                });
    +
    +                // ensure authorized for each output port we will attempt 
to schedule
    +                
group.findAllOutputPorts().stream().filter(ScheduledState.RUNNING.equals(state) 
? ProcessGroup.SCHEDULABLE_PORTS : 
ProcessGroup.UNSCHEDULABLE_PORTS).forEach(outputPort -> {
    +                    if (outputPort.isAuthorized(authorizer, 
RequestAction.WRITE)) {
    +                        componentIds.add(outputPort.getIdentifier());
    +                    }
    +                });
    +
    +                return componentIds;
    +            });
    +
    +            // build the component mapping
    +            final Map<String, RevisionDTO> componentsToSchedule = new 
HashMap<>();
    +            revisions.forEach(revision -> {
    +                final RevisionDTO dto = new RevisionDTO();
    +                dto.setClientId(revision.getClientId());
    +                dto.setVersion(revision.getVersion());
    +                componentsToSchedule.put(revision.getComponentId(), dto);
    +            });
    +
    +            // set the components and their current revision
    +            scheduleComponentsEntity.setComponents(componentsToSchedule);
    +        }
    +
    +        if (properties.isClusterManager()) {
    +            return clusterManager.applyRequest(HttpMethod.PUT, 
getAbsolutePath(), scheduleComponentsEntity, getHeaders()).getResponse();
    +        }
    +
    +        final NiFiUser user = NiFiUserUtils.getNiFiUser();
    +
    +        final Map<String, RevisionDTO> componentsToSchedule = 
scheduleComponentsEntity.getComponents();
    +        final Map<String, Revision> componentRevisions = 
componentsToSchedule.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey,
 e -> getRevision(e.getValue(), e.getKey())));
    +        final Set<Revision> revisions = new 
HashSet<>(componentRevisions.values());
    +
    +        // handle expects request (usually from the cluster manager)
    +        final boolean validationPhase = 
isValidationPhase(httpServletRequest);
    +        if (validationPhase || !isTwoPhaseRequest(httpServletRequest)) {
    +
    +            // ensure access to everything component being scheduled
    --- End diff --
    
    should this comment say "every component"?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to