Github user mcgilman commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2990#discussion_r217135560
  
    --- Diff: 
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/RemoteProcessGroupResource.java
 ---
    @@ -434,6 +436,197 @@ public Response updateRemoteProcessGroupOutputPort(
             );
         }
     
    +    /**
    +     * Updates the specified remote process group input port run status.
    +     *
    +     * @param httpServletRequest           request
    +     * @param id                           The id of the remote process 
group to update.
    +     * @param portId                       The id of the input port to 
update.
    +     * @param requestRemotePortRunStatusEntity The 
remoteProcessGroupPortRunStatusEntity
    +     * @return A remoteProcessGroupPortEntity
    +     */
    +    @PUT
    +    @Consumes(MediaType.APPLICATION_JSON)
    +    @Produces(MediaType.APPLICATION_JSON)
    +    @Path("{id}/input-ports/{port-id}/run-status")
    +    @ApiOperation(
    +            value = "Updates run status of a remote port",
    +            notes = NON_GUARANTEED_ENDPOINT,
    +            response = RemoteProcessGroupPortEntity.class,
    +            authorizations = {
    +                    @Authorization(value = "Write - 
/remote-process-groups/{uuid} or /operation/remote-process-groups/{uuid}")
    +            }
    +    )
    +    @ApiResponses(
    +            value = {
    +                    @ApiResponse(code = 400, message = "NiFi was unable to 
complete the request because it was invalid. The request should not be retried 
without modification."),
    +                    @ApiResponse(code = 401, message = "Client could not 
be authenticated."),
    +                    @ApiResponse(code = 403, message = "Client is not 
authorized to make this request."),
    +                    @ApiResponse(code = 404, message = "The specified 
resource could not be found."),
    +                    @ApiResponse(code = 409, message = "The request was 
valid but NiFi was not in the appropriate state to process it. Retrying the 
same request later may be successful.")
    +            }
    +    )
    +    public Response updateRemoteProcessGroupInputPortRunStatus(
    +            @Context final HttpServletRequest httpServletRequest,
    +            @ApiParam(
    +                    value = "The remote process group id.",
    +                    required = true
    +            )
    +            @PathParam("id") final String id,
    +            @ApiParam(
    +                    value = "The remote process group port id.",
    +                    required = true
    +            )
    +            @PathParam("port-id") final String portId,
    +            @ApiParam(
    +                    value = "The remote process group port.",
    +                    required = true
    +            ) final RemotePortRunStatusEntity 
requestRemotePortRunStatusEntity) {
    +
    +        if (requestRemotePortRunStatusEntity == null) {
    +            throw new IllegalArgumentException("Remote process group port 
run status must be specified.");
    +        }
    +
    +        if (requestRemotePortRunStatusEntity.getRevision() == null) {
    +            throw new IllegalArgumentException("Revision must be 
specified.");
    +        }
    +
    +        requestRemotePortRunStatusEntity.validateState();
    +
    +        if (isReplicateRequest()) {
    +            return replicate(HttpMethod.PUT, 
requestRemotePortRunStatusEntity);
    +        } else if (isDisconnectedFromCluster()) {
    +            
verifyDisconnectedNodeModification(requestRemotePortRunStatusEntity.isDisconnectedNodeAcknowledged());
    +        }
    +
    +        final Revision requestRevision = 
getRevision(requestRemotePortRunStatusEntity.getRevision(), id);
    +        final RemoteProcessGroupPortDTO remoteProcessGroupPort = new 
RemoteProcessGroupPortDTO();
    +        remoteProcessGroupPort.setId(portId);
    +        remoteProcessGroupPort.setGroupId(id);
    +        
remoteProcessGroupPort.setTransmitting(shouldTransmit(requestRemotePortRunStatusEntity));
    +
    +        return withWriteLock(
    +                serviceFacade,
    +                requestRemotePortRunStatusEntity,
    +                requestRevision,
    +                lookup -> {
    +                    final Authorizable remoteProcessGroup = 
lookup.getRemoteProcessGroup(id);
    +                    OperationAuthorizable.isAuthorized(remoteProcessGroup, 
authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser());
    +                },
    +                () -> 
serviceFacade.verifyUpdateRemoteProcessGroupInputPort(id, 
remoteProcessGroupPort),
    +                (revision, remoteProcessGroupPortEntity) -> {
    +                    // update the specified remote process group
    +                    final RemoteProcessGroupPortEntity controllerResponse 
= serviceFacade.updateRemoteProcessGroupInputPort(revision, id, 
remoteProcessGroupPort);
    +
    +                    // get the updated revision
    +                    final RevisionDTO updatedRevision = 
controllerResponse.getRevision();
    +
    +                    // build the response entity
    +                    final RemoteProcessGroupPortEntity entity = new 
RemoteProcessGroupPortEntity();
    +                    entity.setRevision(updatedRevision);
    +                    
entity.setRemoteProcessGroupPort(controllerResponse.getRemoteProcessGroupPort());
    +
    +                    return generateOkResponse(entity).build();
    +                }
    +        );
    +    }
    +
    +    /**
    +     * Updates the specified remote process group output port run status.
    +     *
    +     * @param httpServletRequest           request
    +     * @param id                           The id of the remote process 
group to update.
    +     * @param portId                       The id of the output port to 
update.
    +     * @param requestRemotePortRunStatusEntity The 
remoteProcessGroupPortEntity
    +     * @return A remoteProcessGroupPortEntity
    +     */
    +    @PUT
    +    @Consumes(MediaType.APPLICATION_JSON)
    +    @Produces(MediaType.APPLICATION_JSON)
    +    @Path("{id}/output-ports/{port-id}/run-status")
    +    @ApiOperation(
    +            value = "Updates run status of a remote port",
    +            notes = NON_GUARANTEED_ENDPOINT,
    +            response = RemoteProcessGroupPortEntity.class,
    +            authorizations = {
    +                    @Authorization(value = "Write - 
/remote-process-groups/{uuid} or /operation/remote-process-groups/{uuid}")
    +            }
    +    )
    +    @ApiResponses(
    +            value = {
    +                    @ApiResponse(code = 400, message = "NiFi was unable to 
complete the request because it was invalid. The request should not be retried 
without modification."),
    +                    @ApiResponse(code = 401, message = "Client could not 
be authenticated."),
    +                    @ApiResponse(code = 403, message = "Client is not 
authorized to make this request."),
    +                    @ApiResponse(code = 404, message = "The specified 
resource could not be found."),
    +                    @ApiResponse(code = 409, message = "The request was 
valid but NiFi was not in the appropriate state to process it. Retrying the 
same request later may be successful.")
    +            }
    +    )
    +    public Response updateRemoteProcessGroupOutputPortRunStatus(
    +            @Context HttpServletRequest httpServletRequest,
    +            @ApiParam(
    +                    value = "The remote process group id.",
    +                    required = true
    +            )
    +            @PathParam("id") String id,
    +            @ApiParam(
    +                    value = "The remote process group port id.",
    +                    required = true
    +            )
    +            @PathParam("port-id") String portId,
    +            @ApiParam(
    +                    value = "The remote process group port.",
    +                    required = true
    +            ) RemotePortRunStatusEntity requestRemotePortRunStatusEntity) {
    +
    +        if (requestRemotePortRunStatusEntity == null) {
    +            throw new IllegalArgumentException("Remote process group port 
run status must be specified.");
    +        }
    +
    +        if (requestRemotePortRunStatusEntity.getRevision() == null) {
    +            throw new IllegalArgumentException("Revision must be 
specified.");
    +        }
    +
    +        requestRemotePortRunStatusEntity.validateState();
    +
    +        if (isReplicateRequest()) {
    +            return replicate(HttpMethod.PUT, 
requestRemotePortRunStatusEntity);
    +        } else if (isDisconnectedFromCluster()) {
    +            
verifyDisconnectedNodeModification(requestRemotePortRunStatusEntity.isDisconnectedNodeAcknowledged());
    +        }
    +
    +        // handle expects request (usually from the cluster manager)
    +        final Revision requestRevision = 
getRevision(requestRemotePortRunStatusEntity.getRevision(), id);
    +        final RemoteProcessGroupPortDTO remoteProcessGroupPort = new 
RemoteProcessGroupPortDTO();
    +        remoteProcessGroupPort.setId(portId);
    +        remoteProcessGroupPort.setGroupId(id);
    +        
remoteProcessGroupPort.setTransmitting(shouldTransmit(requestRemotePortRunStatusEntity));
    +
    +        return withWriteLock(
    +                serviceFacade,
    +                requestRemotePortRunStatusEntity,
    +                requestRevision,
    +                lookup -> {
    +                    final Authorizable remoteProcessGroup = 
lookup.getRemoteProcessGroup(id);
    +                    OperationAuthorizable.isAuthorized(remoteProcessGroup, 
authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser());
    +                },
    +                () -> 
serviceFacade.verifyUpdateRemoteProcessGroupOutputPort(id, 
remoteProcessGroupPort),
    +                (revision, remoteProcessGroupPortEntity) -> {
    +                    // update the specified remote process group
    +                    final RemoteProcessGroupPortEntity controllerResponse 
= serviceFacade.updateRemoteProcessGroupOutputPort(revision, id, 
remoteProcessGroupPort);
    --- End diff --
    
    We need to recreate this `remoteProcessGroupPort` using the 
`remoteProcessGroupPortEntity` due to how we authorize/cache requests during 
our two phase commit.


---

Reply via email to