Github user markap14 commented on a diff in the pull request:
https://github.com/apache/nifi/pull/461#discussion_r64256472
--- Diff:
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java
---
@@ -321,6 +320,157 @@ public Response getControllerServices(
return clusterContext(generateOkResponse(entity)).build();
}
+ /**
+ * Updates the specified process group.
+ *
+ * @param httpServletRequest request
+ * @param id The id of the process group.
+ * @param scheduleComponentsEntity A scheduleComponentsEntity.
+ * @return A processGroupEntity.
+ */
+ @PUT
+ @Consumes(MediaType.APPLICATION_JSON)
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("process-groups/{id}")
+ // TODO - @PreAuthorize("hasRole('ROLE_DFM')")
+ @ApiOperation(
+ value = "Updates a process group",
+ response = ScheduleComponentsEntity.class,
+ authorizations = {
+ @Authorization(value = "Data Flow Manager", type = "ROLE_DFM")
+ }
+ )
+ @ApiResponses(
+ value = {
+ @ApiResponse(code = 400, message = "NiFi was unable to
complete the request because it was invalid. The request should not be retried
without modification."),
+ @ApiResponse(code = 401, message = "Client could not be
authenticated."),
+ @ApiResponse(code = 403, message = "Client is not authorized
to make this request."),
+ @ApiResponse(code = 404, message = "The specified resource
could not be found."),
+ @ApiResponse(code = 409, message = "The request was valid but
NiFi was not in the appropriate state to process it. Retrying the same request
later may be successful.")
+ }
+ )
+ public Response scheduleComponents(
+ @Context HttpServletRequest httpServletRequest,
+ @ApiParam(
+ value = "The process group id.",
+ required = true
+ )
+ @PathParam("id") String id,
+ ScheduleComponentsEntity scheduleComponentsEntity) {
+
+ authorizeFlow();
+
+ // ensure the same id is being used
+ if (!id.equals(scheduleComponentsEntity.getId())) {
+ throw new IllegalArgumentException(String.format("The process
group id (%s) in the request body does "
+ + "not equal the process group id of the requested
resource (%s).", scheduleComponentsEntity.getId(), id));
+ }
+
+ final ScheduledState state;
+ if (scheduleComponentsEntity.getState() == null) {
+ throw new IllegalArgumentException("The scheduled state must
be specified.");
+ } else {
+ try {
+ state =
ScheduledState.valueOf(scheduleComponentsEntity.getState());
+ } catch (final IllegalArgumentException iae) {
+ throw new IllegalArgumentException(String.format("The
scheduled must be one of [].",
StringUtils.join(EnumSet.of(ScheduledState.RUNNING, ScheduledState.STOPPED), ",
")));
+ }
+ }
+
+ // ensure its a supported scheduled state
+ if (ScheduledState.DISABLED.equals(state) ||
ScheduledState.STARTING.equals(state) || ScheduledState.STOPPING.equals(state))
{
+ throw new IllegalArgumentException(String.format("The
scheduled must be one of [].",
StringUtils.join(EnumSet.of(ScheduledState.RUNNING, ScheduledState.STOPPED), ",
")));
+ }
+
+ // if the components are not specified, gather all components and
their current revision
+ if (scheduleComponentsEntity.getComponents() == null) {
+ // TODO - this will break while clustered until nodes are able
to process/replicate requests
+ // get the current revisions for the components being updated
+ final Set<Revision> revisions =
serviceFacade.getRevisionsFromGroup(id, group -> {
+ final Set<String> componentIds = new HashSet<>();
+
+ // ensure authorized for each processor we will attempt to
schedule
+
group.findAllProcessors().stream().filter(ScheduledState.RUNNING.equals(state)
? ProcessGroup.SCHEDULABLE_PROCESSORS :
ProcessGroup.UNSCHEDULABLE_PROCESSORS).forEach(processor -> {
+ if (processor.isAuthorized(authorizer,
RequestAction.WRITE)) {
+ componentIds.add(processor.getIdentifier());
+ }
+ });
+
+ // ensure authorized for each input port we will attempt
to schedule
+
group.findAllInputPorts().stream().filter(ScheduledState.RUNNING.equals(state)
? ProcessGroup.SCHEDULABLE_PORTS :
ProcessGroup.UNSCHEDULABLE_PORTS).forEach(inputPort -> {
+ if (inputPort.isAuthorized(authorizer,
RequestAction.WRITE)) {
+ componentIds.add(inputPort.getIdentifier());
+ }
+ });
+
+ // ensure authorized for each output port we will attempt
to schedule
+
group.findAllOutputPorts().stream().filter(ScheduledState.RUNNING.equals(state)
? ProcessGroup.SCHEDULABLE_PORTS :
ProcessGroup.UNSCHEDULABLE_PORTS).forEach(outputPort -> {
+ if (outputPort.isAuthorized(authorizer,
RequestAction.WRITE)) {
+ componentIds.add(outputPort.getIdentifier());
+ }
+ });
+
+ return componentIds;
+ });
+
+ // build the component mapping
+ final Map<String, RevisionDTO> componentsToSchedule = new
HashMap<>();
+ revisions.forEach(revision -> {
+ final RevisionDTO dto = new RevisionDTO();
+ dto.setClientId(revision.getClientId());
+ dto.setVersion(revision.getVersion());
+ componentsToSchedule.put(revision.getComponentId(), dto);
+ });
+
+ // set the components and their current revision
+ scheduleComponentsEntity.setComponents(componentsToSchedule);
+ }
+
+ if (properties.isClusterManager()) {
+ return clusterManager.applyRequest(HttpMethod.PUT,
getAbsolutePath(), scheduleComponentsEntity, getHeaders()).getResponse();
+ }
+
+ final NiFiUser user = NiFiUserUtils.getNiFiUser();
+
+ final Map<String, RevisionDTO> componentsToSchedule =
scheduleComponentsEntity.getComponents();
+ final Map<String, Revision> componentRevisions =
componentsToSchedule.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey,
e -> getRevision(e.getValue(), e.getKey())));
+ final Set<Revision> revisions = new
HashSet<>(componentRevisions.values());
+
+ // handle expects request (usually from the cluster manager)
+ final boolean validationPhase =
isValidationPhase(httpServletRequest);
+ if (validationPhase || !isTwoPhaseRequest(httpServletRequest)) {
+
+ // ensure access to everything component being scheduled
--- End diff --
should this comment say "every component"?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---