Github user markap14 commented on a diff in the pull request:
https://github.com/apache/nifi/pull/461#discussion_r64256311
--- Diff:
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java
---
@@ -321,6 +320,157 @@ public Response getControllerServices(
return clusterContext(generateOkResponse(entity)).build();
}
+ /**
+ * Updates the specified process group.
+ *
+ * @param httpServletRequest request
+ * @param id The id of the process group.
+ * @param scheduleComponentsEntity A scheduleComponentsEntity.
+ * @return A processGroupEntity.
+ */
+ @PUT
+ @Consumes(MediaType.APPLICATION_JSON)
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("process-groups/{id}")
+ // TODO - @PreAuthorize("hasRole('ROLE_DFM')")
+ @ApiOperation(
+ value = "Updates a process group",
+ response = ScheduleComponentsEntity.class,
+ authorizations = {
+ @Authorization(value = "Data Flow Manager", type = "ROLE_DFM")
+ }
+ )
+ @ApiResponses(
+ value = {
+ @ApiResponse(code = 400, message = "NiFi was unable to
complete the request because it was invalid. The request should not be retried
without modification."),
+ @ApiResponse(code = 401, message = "Client could not be
authenticated."),
+ @ApiResponse(code = 403, message = "Client is not authorized
to make this request."),
+ @ApiResponse(code = 404, message = "The specified resource
could not be found."),
+ @ApiResponse(code = 409, message = "The request was valid but
NiFi was not in the appropriate state to process it. Retrying the same request
later may be successful.")
+ }
+ )
+ public Response scheduleComponents(
+ @Context HttpServletRequest httpServletRequest,
+ @ApiParam(
+ value = "The process group id.",
+ required = true
+ )
+ @PathParam("id") String id,
+ ScheduleComponentsEntity scheduleComponentsEntity) {
+
+ authorizeFlow();
+
+ // ensure the same id is being used
+ if (!id.equals(scheduleComponentsEntity.getId())) {
+ throw new IllegalArgumentException(String.format("The process
group id (%s) in the request body does "
+ + "not equal the process group id of the requested
resource (%s).", scheduleComponentsEntity.getId(), id));
+ }
+
+ final ScheduledState state;
+ if (scheduleComponentsEntity.getState() == null) {
+ throw new IllegalArgumentException("The scheduled state must
be specified.");
+ } else {
+ try {
+ state =
ScheduledState.valueOf(scheduleComponentsEntity.getState());
+ } catch (final IllegalArgumentException iae) {
+ throw new IllegalArgumentException(String.format("The
scheduled must be one of [].",
StringUtils.join(EnumSet.of(ScheduledState.RUNNING, ScheduledState.STOPPED), ",
")));
+ }
+ }
+
+ // ensure its a supported scheduled state
+ if (ScheduledState.DISABLED.equals(state) ||
ScheduledState.STARTING.equals(state) || ScheduledState.STOPPING.equals(state))
{
+ throw new IllegalArgumentException(String.format("The
scheduled must be one of [].",
StringUtils.join(EnumSet.of(ScheduledState.RUNNING, ScheduledState.STOPPED), ",
")));
+ }
+
+ // if the components are not specified, gather all components and
their current revision
+ if (scheduleComponentsEntity.getComponents() == null) {
+ // TODO - this will break while clustered until nodes are able
to process/replicate requests
+ // get the current revisions for the components being updated
+ final Set<Revision> revisions =
serviceFacade.getRevisionsFromGroup(id, group -> {
+ final Set<String> componentIds = new HashSet<>();
+
+ // ensure authorized for each processor we will attempt to
schedule
+
group.findAllProcessors().stream().filter(ScheduledState.RUNNING.equals(state)
? ProcessGroup.SCHEDULABLE_PROCESSORS :
ProcessGroup.UNSCHEDULABLE_PROCESSORS).forEach(processor -> {
+ if (processor.isAuthorized(authorizer,
RequestAction.WRITE)) {
--- End diff --
It seems cleaner to me to use a filter here to check if the processor is
authorized, rather than mixing stream filters with a forEach and then
performing additional if-checks. I.e., instead of "if
(processor.isAuthorized...) {...}" we could just do ".filter(processor ->
processor.isAuthorized(...).forEach(...)" for processors, input ports & output
ports
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---