http://git-wip-us.apache.org/repos/asf/nifi/blob/0d3bd2c4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ClusterResource.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ClusterResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ClusterResource.java index 857df56..ec4c69e 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ClusterResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ClusterResource.java @@ -16,24 +16,14 @@ */ package org.apache.nifi.web.api; -import java.util.ArrayList; -import java.util.List; - -import javax.servlet.http.HttpServletRequest; -import javax.ws.rs.Consumes; -import javax.ws.rs.DefaultValue; -import javax.ws.rs.FormParam; -import javax.ws.rs.GET; -import javax.ws.rs.HEAD; -import javax.ws.rs.PUT; -import javax.ws.rs.Path; -import javax.ws.rs.PathParam; -import javax.ws.rs.Produces; -import javax.ws.rs.QueryParam; -import javax.ws.rs.core.Context; -import javax.ws.rs.core.MediaType; -import javax.ws.rs.core.Response; - +import com.sun.jersey.api.core.ResourceContext; +import com.wordnik.swagger.annotations.Api; +import com.wordnik.swagger.annotations.ApiOperation; +import com.wordnik.swagger.annotations.ApiParam; +import com.wordnik.swagger.annotations.ApiResponse; +import com.wordnik.swagger.annotations.ApiResponses; +import com.wordnik.swagger.annotations.Authorization; +import org.apache.commons.lang3.StringUtils; import org.apache.nifi.cluster.manager.impl.WebClusterManager; import org.apache.nifi.cluster.node.Node; import org.apache.nifi.util.NiFiProperties; @@ -47,36 +37,29 @@ import org.apache.nifi.web.api.dto.ProcessorConfigDTO; import org.apache.nifi.web.api.dto.ProcessorDTO; import org.apache.nifi.web.api.dto.RevisionDTO; import org.apache.nifi.web.api.dto.search.NodeSearchResultDTO; -import org.apache.nifi.web.api.dto.status.ClusterConnectionStatusDTO; -import org.apache.nifi.web.api.dto.status.ClusterPortStatusDTO; -import org.apache.nifi.web.api.dto.status.ClusterProcessorStatusDTO; -import org.apache.nifi.web.api.dto.status.ClusterRemoteProcessGroupStatusDTO; -import org.apache.nifi.web.api.dto.status.ClusterStatusDTO; -import org.apache.nifi.web.api.dto.status.ClusterStatusHistoryDTO; -import org.apache.nifi.web.api.entity.ClusterConnectionStatusEntity; import org.apache.nifi.web.api.entity.ClusterEntity; -import org.apache.nifi.web.api.entity.ClusterPortStatusEntity; -import org.apache.nifi.web.api.entity.ClusterProcessorStatusEntity; -import org.apache.nifi.web.api.entity.ClusterRemoteProcessGroupStatusEntity; import org.apache.nifi.web.api.entity.ClusterSearchResultsEntity; -import org.apache.nifi.web.api.entity.ClusterStatusEntity; -import org.apache.nifi.web.api.entity.ClusterStatusHistoryEntity; import org.apache.nifi.web.api.entity.ProcessorEntity; import org.apache.nifi.web.api.request.ClientIdParameter; import org.apache.nifi.web.api.request.LongParameter; - -import org.apache.commons.lang3.StringUtils; import org.springframework.security.access.prepost.PreAuthorize; -import com.sun.jersey.api.core.ResourceContext; -import com.wordnik.swagger.annotations.Api; -import com.wordnik.swagger.annotations.ApiOperation; -import com.wordnik.swagger.annotations.ApiParam; -import com.wordnik.swagger.annotations.ApiResponse; -import com.wordnik.swagger.annotations.ApiResponses; -import com.wordnik.swagger.annotations.Authorization; -import org.apache.nifi.web.api.dto.status.ClusterProcessGroupStatusDTO; -import org.apache.nifi.web.api.entity.ClusterProcessGroupStatusEntity; +import javax.servlet.http.HttpServletRequest; +import javax.ws.rs.Consumes; +import javax.ws.rs.DefaultValue; +import javax.ws.rs.FormParam; +import javax.ws.rs.GET; +import javax.ws.rs.HEAD; +import javax.ws.rs.PUT; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.ws.rs.QueryParam; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import java.util.ArrayList; +import java.util.List; /** * RESTful endpoint for managing a cluster. @@ -108,62 +91,6 @@ public class ClusterResource extends ApplicationResource { } /** - * Gets the status of this NiFi cluster. - * - * @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response. - * @return A clusterStatusEntity - */ - @GET - @Consumes(MediaType.WILDCARD) - @Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML}) - @Path("/status") - @PreAuthorize("hasAnyRole('ROLE_MONITOR', 'ROLE_DFM', 'ROLE_ADMIN')") - @ApiOperation( - value = "Gets the status of the cluster", - response = ClusterStatusEntity.class, - authorizations = { - @Authorization(value = "Read Only", type = "ROLE_MONITOR"), - @Authorization(value = "DFM", type = "ROLE_DFM"), - @Authorization(value = "Admin", type = "ROLE_ADMIN") - } - ) - @ApiResponses( - value = { - @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), - @ApiResponse(code = 401, message = "Client could not be authenticated."), - @ApiResponse(code = 403, message = "Client is not authorized to make this request."), - @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.") - } - ) - public Response getClusterStatus( - @ApiParam( - value = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response.", - required = false - ) - @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId) { - - if (properties.isClusterManager()) { - - ClusterStatusDTO dto = serviceFacade.getClusterStatus(); - - // create the revision - RevisionDTO revision = new RevisionDTO(); - revision.setClientId(clientId.getClientId()); - - // create entity - final ClusterStatusEntity entity = new ClusterStatusEntity(); - entity.setClusterStatus(dto); - entity.setRevision(revision); - - // generate the response - return generateOkResponse(entity).build(); - } - - throw new IllegalClusterResourceRequestException("Only a cluster manager can process the request."); - - } - - /** * Returns a 200 OK response to indicate this is a valid cluster endpoint. * * @return An OK response with an empty entity body. @@ -519,622 +446,6 @@ public class ClusterResource extends ApplicationResource { throw new IllegalClusterResourceRequestException("Only a node can process the request."); } - /** - * Gets the processor status for every node. - * - * @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response. - * @param id The id of the processor - * @return A clusterProcessorStatusEntity - */ - @GET - @Consumes(MediaType.WILDCARD) - @Produces({MediaType.APPLICATION_XML, MediaType.APPLICATION_JSON}) - @Path("/processors/{id}/status") - @PreAuthorize("hasAnyRole('ROLE_MONITOR', 'ROLE_DFM', 'ROLE_ADMIN')") - @ApiOperation( - value = "Gets the processor status across the cluster", - response = ClusterProcessorStatusEntity.class, - authorizations = { - @Authorization(value = "Read Only", type = "ROLE_MONITOR"), - @Authorization(value = "DFM", type = "ROLE_DFM"), - @Authorization(value = "Admin", type = "ROLE_ADMIN") - } - ) - @ApiResponses( - value = { - @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), - @ApiResponse(code = 401, message = "Client could not be authenticated."), - @ApiResponse(code = 403, message = "Client is not authorized to make this request."), - @ApiResponse(code = 404, message = "The specified resource could not be found."), - @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.") - } - ) - public Response getProcessorStatus( - @ApiParam( - value = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response.", - required = false - ) - @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId, - @ApiParam( - value = "The processor id", - required = true - ) - @PathParam("id") String id) { - - if (properties.isClusterManager()) { - - final ClusterProcessorStatusDTO dto = serviceFacade.getClusterProcessorStatus(id); - - // create the revision - RevisionDTO revision = new RevisionDTO(); - revision.setClientId(clientId.getClientId()); - - // create entity - final ClusterProcessorStatusEntity entity = new ClusterProcessorStatusEntity(); - entity.setClusterProcessorStatus(dto); - entity.setRevision(revision); - - // generate the response - return generateOkResponse(entity).build(); - } - - throw new IllegalClusterResourceRequestException("Only a cluster manager can process the request."); - } - - /** - * Gets the processor status history for every node. - * - * @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response. - * @param id The id of the processor - * @return A clusterProcessorStatusHistoryEntity - */ - @GET - @Consumes(MediaType.WILDCARD) - @Produces({MediaType.APPLICATION_XML, MediaType.APPLICATION_JSON}) - @Path("/processors/{id}/status/history") - @PreAuthorize("hasAnyRole('ROLE_MONITOR', 'ROLE_DFM', 'ROLE_ADMIN')") - @ApiOperation( - value = "Gets processor status history across the cluster", - response = ClusterStatusHistoryEntity.class, - authorizations = { - @Authorization(value = "Read Only", type = "ROLE_MONITOR"), - @Authorization(value = "DFM", type = "ROLE_DFM"), - @Authorization(value = "Admin", type = "ROLE_ADMIN") - } - ) - @ApiResponses( - value = { - @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), - @ApiResponse(code = 401, message = "Client could not be authenticated."), - @ApiResponse(code = 403, message = "Client is not authorized to make this request."), - @ApiResponse(code = 404, message = "The specified resource could not be found."), - @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.") - } - ) - public Response getProcessorStatusHistory( - @ApiParam( - value = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response.", - required = false - ) - @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId, - @ApiParam( - value = "The processor id", - required = true - ) - @PathParam("id") String id) { - - if (properties.isClusterManager()) { - final ClusterStatusHistoryDTO dto = serviceFacade.getClusterProcessorStatusHistory(id); - - // create the revision - RevisionDTO revision = new RevisionDTO(); - revision.setClientId(clientId.getClientId()); - - // create entity - final ClusterStatusHistoryEntity entity = new ClusterStatusHistoryEntity(); - entity.setClusterStatusHistory(dto); - entity.setRevision(revision); - - // generate the response - return generateOkResponse(entity).build(); - } - - throw new IllegalClusterResourceRequestException("Only a cluster manager can process the request."); - } - - /** - * Gets the connection status for every node. - * - * @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response. - * @param id The id of the processor - * @return A clusterProcessorStatusEntity - */ - @GET - @Consumes(MediaType.WILDCARD) - @Produces({MediaType.APPLICATION_XML, MediaType.APPLICATION_JSON}) - @Path("/connections/{id}/status") - @PreAuthorize("hasAnyRole('ROLE_MONITOR', 'ROLE_DFM', 'ROLE_ADMIN')") - @ApiOperation( - value = "Gets connection status across the cluster", - response = ClusterConnectionStatusEntity.class, - authorizations = { - @Authorization(value = "Read Only", type = "ROLE_MONITOR"), - @Authorization(value = "DFM", type = "ROLE_DFM"), - @Authorization(value = "Admin", type = "ROLE_ADMIN") - } - ) - @ApiResponses( - value = { - @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), - @ApiResponse(code = 401, message = "Client could not be authenticated."), - @ApiResponse(code = 403, message = "Client is not authorized to make this request."), - @ApiResponse(code = 404, message = "The specified resource could not be found."), - @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.") - } - ) - public Response getConnectionStatus( - @ApiParam( - value = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response.", - required = false - ) - @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId, - @ApiParam( - value = "The connection id", - required = true - ) - @PathParam("id") String id) { - - if (properties.isClusterManager()) { - - final ClusterConnectionStatusDTO dto = serviceFacade.getClusterConnectionStatus(id); - - // create the revision - RevisionDTO revision = new RevisionDTO(); - revision.setClientId(clientId.getClientId()); - - // create entity - final ClusterConnectionStatusEntity entity = new ClusterConnectionStatusEntity(); - entity.setClusterConnectionStatus(dto); - entity.setRevision(revision); - - // generate the response - return generateOkResponse(entity).build(); - } - - throw new IllegalClusterResourceRequestException("Only a cluster manager can process the request."); - } - - /** - * Gets the connections status history for every node. - * - * @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response. - * @param id The id of the processor - * @return A clusterProcessorStatusHistoryEntity - */ - @GET - @Consumes(MediaType.WILDCARD) - @Produces({MediaType.APPLICATION_XML, MediaType.APPLICATION_JSON}) - @Path("/connections/{id}/status/history") - @PreAuthorize("hasAnyRole('ROLE_MONITOR', 'ROLE_DFM', 'ROLE_ADMIN')") - @ApiOperation( - value = "Gets connection status history across the cluster", - response = ClusterStatusHistoryEntity.class, - authorizations = { - @Authorization(value = "Read Only", type = "ROLE_MONITOR"), - @Authorization(value = "DFM", type = "ROLE_DFM"), - @Authorization(value = "Admin", type = "ROLE_ADMIN") - } - ) - @ApiResponses( - value = { - @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), - @ApiResponse(code = 401, message = "Client could not be authenticated."), - @ApiResponse(code = 403, message = "Client is not authorized to make this request."), - @ApiResponse(code = 404, message = "The specified resource could not be found."), - @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.") - } - ) - public Response getConnectionStatusHistory( - @ApiParam( - value = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response.", - required = false - ) - @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId, - @ApiParam( - value = "The connection id.", - required = true - ) - @PathParam("id") String id) { - - if (properties.isClusterManager()) { - final ClusterStatusHistoryDTO dto = serviceFacade.getClusterConnectionStatusHistory(id); - - // create the revision - RevisionDTO revision = new RevisionDTO(); - revision.setClientId(clientId.getClientId()); - - // create entity - final ClusterStatusHistoryEntity entity = new ClusterStatusHistoryEntity(); - entity.setClusterStatusHistory(dto); - entity.setRevision(revision); - - // generate the response - return generateOkResponse(entity).build(); - } - - throw new IllegalClusterResourceRequestException("Only a cluster manager can process the request."); - } - - /** - * Gets the process group status for every node. - * - * @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response. - * @param id The id of the process group - * @return A clusterProcessGroupStatusEntity - */ - @GET - @Consumes(MediaType.WILDCARD) - @Produces({MediaType.APPLICATION_XML, MediaType.APPLICATION_JSON}) - @Path("/process-groups/{id}/status") - @PreAuthorize("hasAnyRole('ROLE_MONITOR', 'ROLE_DFM', 'ROLE_ADMIN')") - @ApiOperation( - value = "Gets process group status across the cluster", - response = ClusterProcessGroupStatusEntity.class, - authorizations = { - @Authorization(value = "Read Only", type = "ROLE_MONITOR"), - @Authorization(value = "DFM", type = "ROLE_DFM"), - @Authorization(value = "Admin", type = "ROLE_ADMIN") - } - ) - @ApiResponses( - value = { - @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), - @ApiResponse(code = 401, message = "Client could not be authenticated."), - @ApiResponse(code = 403, message = "Client is not authorized to make this request."), - @ApiResponse(code = 404, message = "The specified resource could not be found."), - @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.") - } - ) - public Response getProcessGroupStatus( - @ApiParam( - value = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response.", - required = false - ) - @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId, - @ApiParam( - value = "The process group id.", - required = true - ) - @PathParam("id") String id) { - - if (properties.isClusterManager()) { - - final ClusterProcessGroupStatusDTO dto = serviceFacade.getClusterProcessGroupStatus(id); - - // create the revision - RevisionDTO revision = new RevisionDTO(); - revision.setClientId(clientId.getClientId()); - - // create entity - final ClusterProcessGroupStatusEntity entity = new ClusterProcessGroupStatusEntity(); - entity.setClusterProcessGroupStatus(dto); - entity.setRevision(revision); - - // generate the response - return generateOkResponse(entity).build(); - } - - throw new IllegalClusterResourceRequestException("Only a cluster manager can process the request."); - } - - /** - * Gets the process group status history for every node. - * - * @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response. - * @param id The id of the process group - * @return A clusterProcessGroupStatusHistoryEntity - */ - @GET - @Consumes(MediaType.WILDCARD) - @Produces({MediaType.APPLICATION_XML, MediaType.APPLICATION_JSON}) - @Path("/process-groups/{id}/status/history") - @PreAuthorize("hasAnyRole('ROLE_MONITOR', 'ROLE_DFM', 'ROLE_ADMIN')") - @ApiOperation( - value = "Gets process group status history across the cluster", - response = ClusterStatusHistoryEntity.class, - authorizations = { - @Authorization(value = "Read Only", type = "ROLE_MONITOR"), - @Authorization(value = "DFM", type = "ROLE_DFM"), - @Authorization(value = "Admin", type = "ROLE_ADMIN") - } - ) - @ApiResponses( - value = { - @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), - @ApiResponse(code = 401, message = "Client could not be authenticated."), - @ApiResponse(code = 403, message = "Client is not authorized to make this request."), - @ApiResponse(code = 404, message = "The specified resource could not be found."), - @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.") - } - ) - public Response getProcessGroupStatusHistory( - @ApiParam( - value = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response.", - required = false - ) - @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId, - @ApiParam( - value = "The process group id.", - required = true - ) - @PathParam("id") String id) { - - if (properties.isClusterManager()) { - final ClusterStatusHistoryDTO dto = serviceFacade.getClusterProcessGroupStatusHistory(id); - - // create the revision - RevisionDTO revision = new RevisionDTO(); - revision.setClientId(clientId.getClientId()); - - // create entity - final ClusterStatusHistoryEntity entity = new ClusterStatusHistoryEntity(); - entity.setClusterStatusHistory(dto); - entity.setRevision(revision); - - // generate the response - return generateOkResponse(entity).build(); - } - - throw new IllegalClusterResourceRequestException("Only a cluster manager can process the request."); - } - - /** - * Gets the remote process group status for every node. - * - * @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response. - * @param id The id of the remote process group - * @return A clusterRemoteProcessGroupStatusEntity - */ - @GET - @Consumes(MediaType.WILDCARD) - @Produces({MediaType.APPLICATION_XML, MediaType.APPLICATION_JSON}) - @Path("/remote-process-groups/{id}/status") - @PreAuthorize("hasAnyRole('ROLE_MONITOR', 'ROLE_DFM', 'ROLE_ADMIN')") - @ApiOperation( - value = "Gets remote process group status across the cluster", - response = ClusterRemoteProcessGroupStatusEntity.class, - authorizations = { - @Authorization(value = "Read Only", type = "ROLE_MONITOR"), - @Authorization(value = "DFM", type = "ROLE_DFM"), - @Authorization(value = "Admin", type = "ROLE_ADMIN") - } - ) - @ApiResponses( - value = { - @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), - @ApiResponse(code = 401, message = "Client could not be authenticated."), - @ApiResponse(code = 403, message = "Client is not authorized to make this request."), - @ApiResponse(code = 404, message = "The specified resource could not be found."), - @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.") - } - ) - public Response getRemoteProcessGroupStatus( - @ApiParam( - value = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response.", - required = false - ) - @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId, - @ApiParam( - value = "The remote process group id.", - required = true - ) - @PathParam("id") String id) { - - if (properties.isClusterManager()) { - - final ClusterRemoteProcessGroupStatusDTO dto = serviceFacade.getClusterRemoteProcessGroupStatus(id); - - // create the revision - RevisionDTO revision = new RevisionDTO(); - revision.setClientId(clientId.getClientId()); - - // create entity - final ClusterRemoteProcessGroupStatusEntity entity = new ClusterRemoteProcessGroupStatusEntity(); - entity.setClusterRemoteProcessGroupStatus(dto); - entity.setRevision(revision); - - // generate the response - return generateOkResponse(entity).build(); - } - - throw new IllegalClusterResourceRequestException("Only a cluster manager can process the request."); - } - - /** - * Gets the input port status for every node. - * - * @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response. - * @param id The id of the input port - * @return A clusterPortStatusEntity - */ - @GET - @Consumes(MediaType.WILDCARD) - @Produces({MediaType.APPLICATION_XML, MediaType.APPLICATION_JSON}) - @Path("/input-ports/{id}/status") - @PreAuthorize("hasAnyRole('ROLE_MONITOR', 'ROLE_DFM', 'ROLE_ADMIN')") - @ApiOperation( - value = "Gets input port status across the cluster", - response = ClusterPortStatusEntity.class, - authorizations = { - @Authorization(value = "Read Only", type = "ROLE_MONITOR"), - @Authorization(value = "DFM", type = "ROLE_DFM"), - @Authorization(value = "Admin", type = "ROLE_ADMIN") - } - ) - @ApiResponses( - value = { - @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), - @ApiResponse(code = 401, message = "Client could not be authenticated."), - @ApiResponse(code = 403, message = "Client is not authorized to make this request."), - @ApiResponse(code = 404, message = "The specified resource could not be found."), - @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.") - } - ) - public Response getInputPortStatus( - @ApiParam( - value = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response.", - required = false - ) - @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId, - @ApiParam( - value = "The input port id.", - required = true - ) - @PathParam("id") String id) { - - if (properties.isClusterManager()) { - - final ClusterPortStatusDTO dto = serviceFacade.getClusterInputPortStatus(id); - - // create the revision - RevisionDTO revision = new RevisionDTO(); - revision.setClientId(clientId.getClientId()); - - // create entity - final ClusterPortStatusEntity entity = new ClusterPortStatusEntity(); - entity.setClusterPortStatus(dto); - entity.setRevision(revision); - - // generate the response - return generateOkResponse(entity).build(); - } - - throw new IllegalClusterResourceRequestException("Only a cluster manager can process the request."); - } - - /** - * Gets the output port status for every node. - * - * @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response. - * @param id The id of the output port - * @return A clusterPortStatusEntity - */ - @GET - @Consumes(MediaType.WILDCARD) - @Produces({MediaType.APPLICATION_XML, MediaType.APPLICATION_JSON}) - @Path("/output-ports/{id}/status") - @PreAuthorize("hasAnyRole('ROLE_MONITOR', 'ROLE_DFM', 'ROLE_ADMIN')") - @ApiOperation( - value = "Gets output port status across the cluster", - response = ClusterPortStatusEntity.class, - authorizations = { - @Authorization(value = "Read Only", type = "ROLE_MONITOR"), - @Authorization(value = "DFM", type = "ROLE_DFM"), - @Authorization(value = "Admin", type = "ROLE_ADMIN") - } - ) - @ApiResponses( - value = { - @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), - @ApiResponse(code = 401, message = "Client could not be authenticated."), - @ApiResponse(code = 403, message = "Client is not authorized to make this request."), - @ApiResponse(code = 404, message = "The specified resource could not be found."), - @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.") - } - ) - public Response getOutputPortStatus( - @ApiParam( - value = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response.", - required = false - ) - @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId, - @ApiParam( - value = "The output port id.", - required = true - ) - @PathParam("id") String id) { - - if (properties.isClusterManager()) { - - final ClusterPortStatusDTO dto = serviceFacade.getClusterOutputPortStatus(id); - - // create the revision - RevisionDTO revision = new RevisionDTO(); - revision.setClientId(clientId.getClientId()); - - // create entity - final ClusterPortStatusEntity entity = new ClusterPortStatusEntity(); - entity.setClusterPortStatus(dto); - entity.setRevision(revision); - - // generate the response - return generateOkResponse(entity).build(); - } - - throw new IllegalClusterResourceRequestException("Only a cluster manager can process the request."); - } - - /** - * Gets the remote process group status history for every node. - * - * @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response. - * @param id The id of the processor - * @return A clusterRemoteProcessGroupStatusHistoryEntity - */ - @GET - @Consumes(MediaType.WILDCARD) - @Produces({MediaType.APPLICATION_XML, MediaType.APPLICATION_JSON}) - @Path("/remote-process-groups/{id}/status/history") - @PreAuthorize("hasAnyRole('ROLE_MONITOR', 'ROLE_DFM', 'ROLE_ADMIN')") - @ApiOperation( - value = "Gets the remote process group status history across the cluster", - response = ClusterStatusHistoryEntity.class, - authorizations = { - @Authorization(value = "Read Only", type = "ROLE_MONITOR"), - @Authorization(value = "DFM", type = "ROLE_DFM"), - @Authorization(value = "Admin", type = "ROLE_ADMIN") - } - ) - @ApiResponses( - value = { - @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), - @ApiResponse(code = 401, message = "Client could not be authenticated."), - @ApiResponse(code = 403, message = "Client is not authorized to make this request."), - @ApiResponse(code = 404, message = "The specified resource could not be found."), - @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.") - } - ) - public Response getRemoteProcessGroupStatusHistory( - @ApiParam( - value = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response.", - required = false - ) - @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId, - @ApiParam( - value = "The remote process group id.", - required = true - ) - @PathParam("id") String id) { - - if (properties.isClusterManager()) { - final ClusterStatusHistoryDTO dto = serviceFacade.getClusterRemoteProcessGroupStatusHistory(id); - - // create the revision - RevisionDTO revision = new RevisionDTO(); - revision.setClientId(clientId.getClientId()); - - // create entity - final ClusterStatusHistoryEntity entity = new ClusterStatusHistoryEntity(); - entity.setClusterStatusHistory(dto); - entity.setRevision(revision); - - // generate the response - return generateOkResponse(entity).build(); - } - - throw new IllegalClusterResourceRequestException("Only a cluster manager can process the request."); - } - // setters public void setServiceFacade(NiFiServiceFacade serviceFacade) { this.serviceFacade = serviceFacade;
http://git-wip-us.apache.org/repos/asf/nifi/blob/0d3bd2c4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectionResource.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectionResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectionResource.java index 738da04..712233f 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectionResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectionResource.java @@ -25,6 +25,7 @@ import com.wordnik.swagger.annotations.Authorization; import org.apache.commons.lang3.StringUtils; import org.apache.nifi.cluster.context.ClusterContext; import org.apache.nifi.cluster.context.ClusterContextThreadLocal; +import org.apache.nifi.cluster.manager.NodeResponse; import org.apache.nifi.cluster.manager.exception.UnknownNodeException; import org.apache.nifi.cluster.manager.impl.WebClusterManager; import org.apache.nifi.cluster.node.Node; @@ -33,7 +34,6 @@ import org.apache.nifi.stream.io.StreamUtils; import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.web.ConfigurationSnapshot; import org.apache.nifi.web.DownloadableContent; -import org.apache.nifi.web.IllegalClusterResourceRequestException; import org.apache.nifi.web.NiFiServiceFacade; import org.apache.nifi.web.Revision; import org.apache.nifi.web.api.dto.ConnectableDTO; @@ -44,8 +44,10 @@ import org.apache.nifi.web.api.dto.FlowFileSummaryDTO; import org.apache.nifi.web.api.dto.ListingRequestDTO; import org.apache.nifi.web.api.dto.PositionDTO; import org.apache.nifi.web.api.dto.RevisionDTO; +import org.apache.nifi.web.api.dto.status.ConnectionStatusDTO; import org.apache.nifi.web.api.dto.status.StatusHistoryDTO; import org.apache.nifi.web.api.entity.ConnectionEntity; +import org.apache.nifi.web.api.entity.ConnectionStatusEntity; import org.apache.nifi.web.api.entity.ConnectionsEntity; import org.apache.nifi.web.api.entity.DropRequestEntity; import org.apache.nifi.web.api.entity.FlowFileEntity; @@ -55,8 +57,6 @@ import org.apache.nifi.web.api.request.ClientIdParameter; import org.apache.nifi.web.api.request.ConnectableTypeParameter; import org.apache.nifi.web.api.request.IntegerParameter; import org.apache.nifi.web.api.request.LongParameter; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.springframework.security.access.prepost.PreAuthorize; import javax.servlet.http.HttpServletRequest; @@ -99,8 +99,6 @@ import java.util.UUID; @Api(hidden = true) public class ConnectionResource extends ApplicationResource { - private static final Logger logger = LoggerFactory.getLogger(ConnectionResource.class); - private NiFiServiceFacade serviceFacade; private WebClusterManager clusterManager; private NiFiProperties properties; @@ -284,6 +282,106 @@ public class ConnectionResource extends ApplicationResource { } /** + * Retrieves the specified connection status. + * + * @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response. + * @param id The id of the connection history to retrieve. + * @return A connectionStatusEntity. + */ + @GET + @Consumes(MediaType.WILDCARD) + @Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML}) + @Path("/{id}/status") + @PreAuthorize("hasAnyRole('ROLE_MONITOR', 'ROLE_DFM', 'ROLE_ADMIN')") + @ApiOperation( + value = "Gets status for a connection", + response = ConnectionStatusEntity.class, + authorizations = { + @Authorization(value = "Read Only", type = "ROLE_MONITOR"), + @Authorization(value = "Data Flow Manager", type = "ROLE_DFM"), + @Authorization(value = "Administrator", type = "ROLE_ADMIN") + } + ) + @ApiResponses( + value = { + @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), + @ApiResponse(code = 401, message = "Client could not be authenticated."), + @ApiResponse(code = 403, message = "Client is not authorized to make this request."), + @ApiResponse(code = 404, message = "The specified resource could not be found."), + @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.") + } + ) + public Response getConnectionStatus( + @ApiParam( + value = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response.", + required = false + ) + @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId, + @ApiParam( + value = "Whether or not to include the breakdown per node. Optional, defaults to false", + required = false + ) + @QueryParam("nodewise") @DefaultValue(NODEWISE) Boolean nodewise, + @ApiParam( + value = "The id of the node where to get the status.", + required = false + ) + @QueryParam("clusterNodeId") String clusterNodeId, + @ApiParam( + value = "The connection id.", + required = true + ) + @PathParam("id") String id) { + + // ensure a valid request + if (Boolean.TRUE.equals(nodewise) && clusterNodeId != null) { + throw new IllegalArgumentException("Nodewise requests cannot be directed at a specific node."); + } + + if (properties.isClusterManager()) { + // determine where this request should be sent + if (clusterNodeId == null) { + final NodeResponse nodeResponse = clusterManager.applyRequest(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders()); + final ConnectionStatusEntity entity = (ConnectionStatusEntity) nodeResponse.getUpdatedEntity(); + + // ensure there is an updated entity (result of merging) and prune the response as necessary + if (entity != null && !nodewise) { + entity.getConnectionStatus().setNodeSnapshots(null); + } + + return nodeResponse.getResponse(); + } else { + // get the target node and ensure it exists + final Node targetNode = clusterManager.getNode(clusterNodeId); + if (targetNode == null) { + throw new UnknownNodeException("The specified cluster node does not exist."); + } + + final Set<NodeIdentifier> targetNodes = new HashSet<>(); + targetNodes.add(targetNode.getNodeId()); + + // replicate the request to the specific node + return clusterManager.applyRequest(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders(), targetNodes).getResponse(); + } + } + + // get the specified connection status + final ConnectionStatusDTO connectionStatus = serviceFacade.getConnectionStatus(groupId, id); + + // create the revision + final RevisionDTO revision = new RevisionDTO(); + revision.setClientId(clientId.getClientId()); + + // generate the response entity + final ConnectionStatusEntity entity = new ConnectionStatusEntity(); + entity.setRevision(revision); + entity.setConnectionStatus(connectionStatus); + + // generate the response + return clusterContext(generateOkResponse(entity)).build(); + } + + /** * Retrieves the specified connection status history. * * @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response. @@ -327,7 +425,7 @@ public class ConnectionResource extends ApplicationResource { // replicate if cluster manager if (properties.isClusterManager()) { - throw new IllegalClusterResourceRequestException("This request is only supported in standalone mode."); + return clusterManager.applyRequest(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse(); } // get the specified processor status history @@ -565,7 +663,7 @@ public class ConnectionResource extends ApplicationResource { headersToOverride.put("content-type", MediaType.APPLICATION_JSON); // replicate put request - return (Response) clusterManager.applyRequest(HttpMethod.PUT, putUri, updateClientId(connectionEntity), getHeaders(headersToOverride)).getResponse(); + return clusterManager.applyRequest(HttpMethod.PUT, putUri, updateClientId(connectionEntity), getHeaders(headersToOverride)).getResponse(); } // get the connection http://git-wip-us.apache.org/repos/asf/nifi/blob/0d3bd2c4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerResource.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerResource.java index ef62a62..a3d0dc1 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerResource.java @@ -23,34 +23,17 @@ import com.wordnik.swagger.annotations.ApiParam; import com.wordnik.swagger.annotations.ApiResponse; import com.wordnik.swagger.annotations.ApiResponses; import com.wordnik.swagger.annotations.Authorization; - -import java.net.URI; -import java.util.HashMap; -import java.util.Map; -import javax.servlet.http.HttpServletRequest; -import javax.ws.rs.Consumes; -import javax.ws.rs.DefaultValue; -import javax.ws.rs.FormParam; -import javax.ws.rs.GET; -import javax.ws.rs.HEAD; -import javax.ws.rs.HttpMethod; -import javax.ws.rs.POST; -import javax.ws.rs.PUT; -import javax.ws.rs.Path; -import javax.ws.rs.PathParam; -import javax.ws.rs.Produces; -import javax.ws.rs.QueryParam; -import javax.ws.rs.WebApplicationException; -import javax.ws.rs.core.Context; -import javax.ws.rs.core.MediaType; -import javax.ws.rs.core.Response; +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.cluster.manager.NodeResponse; +import org.apache.nifi.cluster.manager.exception.UnknownNodeException; import org.apache.nifi.cluster.manager.impl.WebClusterManager; -import org.apache.nifi.web.security.user.NiFiUserUtils; +import org.apache.nifi.cluster.node.Node; +import org.apache.nifi.cluster.protocol.NodeIdentifier; import org.apache.nifi.user.NiFiUser; import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.web.ConfigurationSnapshot; -import org.apache.nifi.web.NiFiServiceFacade; import org.apache.nifi.web.IllegalClusterResourceRequestException; +import org.apache.nifi.web.NiFiServiceFacade; import org.apache.nifi.web.Revision; import org.apache.nifi.web.api.dto.AboutDTO; import org.apache.nifi.web.api.dto.BannerDTO; @@ -66,23 +49,46 @@ import org.apache.nifi.web.api.entity.AuthorityEntity; import org.apache.nifi.web.api.entity.BannerEntity; import org.apache.nifi.web.api.entity.ControllerConfigurationEntity; import org.apache.nifi.web.api.entity.ControllerEntity; +import org.apache.nifi.web.api.entity.ControllerServiceTypesEntity; import org.apache.nifi.web.api.entity.ControllerStatusEntity; import org.apache.nifi.web.api.entity.CounterEntity; import org.apache.nifi.web.api.entity.CountersEntity; import org.apache.nifi.web.api.entity.Entity; +import org.apache.nifi.web.api.entity.IdentityEntity; import org.apache.nifi.web.api.entity.PrioritizerTypesEntity; import org.apache.nifi.web.api.entity.ProcessGroupEntity; import org.apache.nifi.web.api.entity.ProcessorTypesEntity; +import org.apache.nifi.web.api.entity.ReportingTaskTypesEntity; import org.apache.nifi.web.api.entity.SearchResultsEntity; import org.apache.nifi.web.api.request.ClientIdParameter; import org.apache.nifi.web.api.request.IntegerParameter; import org.apache.nifi.web.api.request.LongParameter; -import org.apache.commons.lang3.StringUtils; -import org.apache.nifi.web.api.entity.ControllerServiceTypesEntity; -import org.apache.nifi.web.api.entity.IdentityEntity; -import org.apache.nifi.web.api.entity.ReportingTaskTypesEntity; +import org.apache.nifi.web.security.user.NiFiUserUtils; import org.springframework.security.access.prepost.PreAuthorize; +import javax.servlet.http.HttpServletRequest; +import javax.ws.rs.Consumes; +import javax.ws.rs.DefaultValue; +import javax.ws.rs.FormParam; +import javax.ws.rs.GET; +import javax.ws.rs.HEAD; +import javax.ws.rs.HttpMethod; +import javax.ws.rs.POST; +import javax.ws.rs.PUT; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.ws.rs.QueryParam; +import javax.ws.rs.WebApplicationException; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import java.net.URI; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + /** * RESTful endpoint for managing a Flow Controller. */ @@ -524,6 +530,10 @@ public class ControllerResource extends ApplicationResource { ) @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId) { + if (properties.isClusterManager()) { + return clusterManager.applyRequest(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse(); + } + final ControllerStatusDTO controllerStatus = serviceFacade.getControllerStatus(); // create the revision @@ -572,7 +582,50 @@ public class ControllerResource extends ApplicationResource { value = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response.", required = false ) - @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId) { + @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId, + @ApiParam( + value = "Whether or not to include the breakdown per node. Optional, defaults to false", + required = false + ) + @QueryParam("nodewise") @DefaultValue(NODEWISE) Boolean nodewise, + @ApiParam( + value = "The id of the node where to get the status.", + required = false + ) + @QueryParam("clusterNodeId") String clusterNodeId) { + + // ensure a valid request + if (Boolean.TRUE.equals(nodewise) && clusterNodeId != null) { + throw new IllegalArgumentException("Nodewise requests cannot be directed at a specific node."); + } + + // replicate if cluster manager + if (properties.isClusterManager()) { + // determine where this request should be sent + if (clusterNodeId == null) { + final NodeResponse nodeResponse = clusterManager.applyRequest(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders()); + final CountersEntity entity = (CountersEntity) nodeResponse.getUpdatedEntity(); + + // ensure there is an updated entity (result of merging) and prune the response as necessary + if (entity != null && !nodewise) { + entity.getCounters().setNodeSnapshots(null); + } + + return nodeResponse.getResponse(); + } else { + // get the target node and ensure it exists + final Node targetNode = clusterManager.getNode(clusterNodeId); + if (targetNode == null) { + throw new UnknownNodeException("The specified cluster node does not exist."); + } + + final Set<NodeIdentifier> targetNodes = new HashSet<>(); + targetNodes.add(targetNode.getNodeId()); + + // replicate the request to the specific node + return clusterManager.applyRequest(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders(), targetNodes).getResponse(); + } + } final CountersDTO countersReport = serviceFacade.getCounters(); http://git-wip-us.apache.org/repos/asf/nifi/blob/0d3bd2c4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/InputPortResource.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/InputPortResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/InputPortResource.java index d2be69d..2f7eed6 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/InputPortResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/InputPortResource.java @@ -22,12 +22,29 @@ import com.wordnik.swagger.annotations.ApiParam; import com.wordnik.swagger.annotations.ApiResponse; import com.wordnik.swagger.annotations.ApiResponses; import com.wordnik.swagger.annotations.Authorization; -import java.net.URI; -import java.net.URISyntaxException; -import java.util.HashMap; -import java.util.Map; -import java.util.Set; -import java.util.UUID; +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.cluster.manager.NodeResponse; +import org.apache.nifi.cluster.manager.exception.UnknownNodeException; +import org.apache.nifi.cluster.manager.impl.WebClusterManager; +import org.apache.nifi.cluster.node.Node; +import org.apache.nifi.cluster.protocol.NodeIdentifier; +import org.apache.nifi.util.NiFiProperties; +import org.apache.nifi.web.ConfigurationSnapshot; +import org.apache.nifi.web.NiFiServiceFacade; +import org.apache.nifi.web.Revision; +import org.apache.nifi.web.api.dto.PortDTO; +import org.apache.nifi.web.api.dto.PositionDTO; +import org.apache.nifi.web.api.dto.RevisionDTO; +import org.apache.nifi.web.api.dto.status.PortStatusDTO; +import org.apache.nifi.web.api.entity.InputPortEntity; +import org.apache.nifi.web.api.entity.InputPortsEntity; +import org.apache.nifi.web.api.entity.PortStatusEntity; +import org.apache.nifi.web.api.request.ClientIdParameter; +import org.apache.nifi.web.api.request.DoubleParameter; +import org.apache.nifi.web.api.request.IntegerParameter; +import org.apache.nifi.web.api.request.LongParameter; +import org.springframework.security.access.prepost.PreAuthorize; + import javax.servlet.http.HttpServletRequest; import javax.ws.rs.Consumes; import javax.ws.rs.DELETE; @@ -46,24 +63,13 @@ import javax.ws.rs.core.Context; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.MultivaluedMap; import javax.ws.rs.core.Response; -import org.apache.nifi.cluster.manager.impl.WebClusterManager; -import org.apache.nifi.util.NiFiProperties; -import org.apache.nifi.web.ConfigurationSnapshot; -import org.apache.nifi.web.NiFiServiceFacade; -import org.apache.nifi.web.Revision; -import org.apache.nifi.web.api.dto.PortDTO; -import org.apache.nifi.web.api.dto.PositionDTO; -import org.apache.nifi.web.api.dto.RevisionDTO; -import org.apache.nifi.web.api.entity.InputPortEntity; -import org.apache.nifi.web.api.entity.InputPortsEntity; -import org.apache.nifi.web.api.request.ClientIdParameter; -import org.apache.nifi.web.api.request.DoubleParameter; -import org.apache.nifi.web.api.request.IntegerParameter; -import org.apache.nifi.web.api.request.LongParameter; -import org.apache.commons.lang3.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.security.access.prepost.PreAuthorize; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.UUID; /** * RESTful endpoint for managing an Input Port. @@ -71,8 +77,6 @@ import org.springframework.security.access.prepost.PreAuthorize; @Api(hidden = true) public class InputPortResource extends ApplicationResource { - private static final Logger logger = LoggerFactory.getLogger(InputPortResource.class); - private NiFiServiceFacade serviceFacade; private WebClusterManager clusterManager; private NiFiProperties properties; @@ -276,7 +280,7 @@ public class InputPortResource extends ApplicationResource { headersToOverride.put("content-type", MediaType.APPLICATION_JSON); // replicate put request - return (Response) clusterManager.applyRequest(HttpMethod.PUT, putUri, updateClientId(portEntity), getHeaders(headersToOverride)).getResponse(); + return clusterManager.applyRequest(HttpMethod.PUT, putUri, updateClientId(portEntity), getHeaders(headersToOverride)).getResponse(); } @@ -370,6 +374,106 @@ public class InputPortResource extends ApplicationResource { } /** + * Retrieves the specified input port status. + * + * @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response. + * @param id The id of the processor history to retrieve. + * @return A portStatusEntity. + */ + @GET + @Consumes(MediaType.WILDCARD) + @Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML}) + @Path("/{id}/status") + @PreAuthorize("hasAnyRole('ROLE_MONITOR', 'ROLE_DFM', 'ROLE_ADMIN')") + @ApiOperation( + value = "Gets status for an input port", + response = PortStatusEntity.class, + authorizations = { + @Authorization(value = "Read Only", type = "ROLE_MONITOR"), + @Authorization(value = "Data Flow Manager", type = "ROLE_DFM"), + @Authorization(value = "Administrator", type = "ROLE_ADMIN") + } + ) + @ApiResponses( + value = { + @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), + @ApiResponse(code = 401, message = "Client could not be authenticated."), + @ApiResponse(code = 403, message = "Client is not authorized to make this request."), + @ApiResponse(code = 404, message = "The specified resource could not be found."), + @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.") + } + ) + public Response getInputPortStatus( + @ApiParam( + value = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response.", + required = false + ) + @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId, + @ApiParam( + value = "Whether or not to include the breakdown per node. Optional, defaults to false", + required = false + ) + @QueryParam("nodewise") @DefaultValue(NODEWISE) Boolean nodewise, + @ApiParam( + value = "The id of the node where to get the status.", + required = false + ) + @QueryParam("clusterNodeId") String clusterNodeId, + @ApiParam( + value = "The input port id.", + required = true + ) + @PathParam("id") String id) { + + // ensure a valid request + if (Boolean.TRUE.equals(nodewise) && clusterNodeId != null) { + throw new IllegalArgumentException("Nodewise requests cannot be directed at a specific node."); + } + + if (properties.isClusterManager()) { + // determine where this request should be sent + if (clusterNodeId == null) { + final NodeResponse nodeResponse = clusterManager.applyRequest(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders()); + final PortStatusEntity entity = (PortStatusEntity) nodeResponse.getUpdatedEntity(); + + // ensure there is an updated entity (result of merging) and prune the response as necessary + if (entity != null && !nodewise) { + entity.getPortStatus().setNodeSnapshots(null); + } + + return nodeResponse.getResponse(); + } else { + // get the target node and ensure it exists + final Node targetNode = clusterManager.getNode(clusterNodeId); + if (targetNode == null) { + throw new UnknownNodeException("The specified cluster node does not exist."); + } + + final Set<NodeIdentifier> targetNodes = new HashSet<>(); + targetNodes.add(targetNode.getNodeId()); + + // replicate the request to the specific node + return clusterManager.applyRequest(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders(), targetNodes).getResponse(); + } + } + + // get the specified input port status + final PortStatusDTO portStatus = serviceFacade.getInputPortStatus(groupId, id); + + // create the revision + final RevisionDTO revision = new RevisionDTO(); + revision.setClientId(clientId.getClientId()); + + // generate the response entity + final PortStatusEntity entity = new PortStatusEntity(); + entity.setRevision(revision); + entity.setPortStatus(portStatus); + + // generate the response + return clusterContext(generateOkResponse(entity)).build(); + } + + /** * Updates the specified input port. * * @param httpServletRequest request http://git-wip-us.apache.org/repos/asf/nifi/blob/0d3bd2c4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/NodeResource.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/NodeResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/NodeResource.java index c88cc68..d3eb77a 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/NodeResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/NodeResource.java @@ -22,6 +22,16 @@ import com.wordnik.swagger.annotations.ApiParam; import com.wordnik.swagger.annotations.ApiResponse; import com.wordnik.swagger.annotations.ApiResponses; import com.wordnik.swagger.annotations.Authorization; +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.util.NiFiProperties; +import org.apache.nifi.web.IllegalClusterResourceRequestException; +import org.apache.nifi.web.NiFiServiceFacade; +import org.apache.nifi.web.api.dto.NodeDTO; +import org.apache.nifi.web.api.dto.RevisionDTO; +import org.apache.nifi.web.api.entity.NodeEntity; +import org.apache.nifi.web.api.request.ClientIdParameter; +import org.springframework.security.access.prepost.PreAuthorize; + import javax.ws.rs.Consumes; import javax.ws.rs.DELETE; import javax.ws.rs.DefaultValue; @@ -34,19 +44,6 @@ import javax.ws.rs.Produces; import javax.ws.rs.QueryParam; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; -import org.apache.nifi.util.NiFiProperties; -import org.apache.nifi.web.api.dto.NodeDTO; -import org.apache.nifi.web.api.entity.NodeEntity; -import org.apache.nifi.web.api.request.ClientIdParameter; -import org.apache.commons.lang3.StringUtils; -import org.apache.nifi.web.IllegalClusterResourceRequestException; -import org.apache.nifi.web.NiFiServiceFacade; -import org.apache.nifi.web.api.dto.NodeSystemDiagnosticsDTO; -import org.apache.nifi.web.api.dto.RevisionDTO; -import org.apache.nifi.web.api.dto.status.NodeStatusDTO; -import org.apache.nifi.web.api.entity.ProcessGroupStatusEntity; -import org.apache.nifi.web.api.entity.SystemDiagnosticsEntity; -import org.springframework.security.access.prepost.PreAuthorize; /** * RESTful endpoint for managing a cluster connection. @@ -121,129 +118,6 @@ public class NodeResource extends ApplicationResource { throw new IllegalClusterResourceRequestException("Only a cluster manager can process the request."); } - /** - * Gets the status for the specified node. - * - * @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response. - * @param id The id of the node. - * @return A processGroupStatusEntity - */ - @GET - @Consumes(MediaType.WILDCARD) - @Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML}) - @Path("/{id}/status") - @PreAuthorize("hasAnyRole('ROLE_MONITOR', 'ROLE_DFM', 'ROLE_ADMIN')") - @ApiOperation( - value = "Gets process group status for a node in the cluster", - response = ProcessGroupStatusEntity.class, - authorizations = { - @Authorization(value = "Read Only", type = "ROLE_MONITOR"), - @Authorization(value = "Data Flow Manager", type = "ROLE_DFM"), - @Authorization(value = "Administrator", type = "ROLE_ADMIN") - } - ) - @ApiResponses( - value = { - @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), - @ApiResponse(code = 401, message = "Client could not be authenticated."), - @ApiResponse(code = 403, message = "Client is not authorized to make this request."), - @ApiResponse(code = 404, message = "The specified resource could not be found."), - @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.") - } - ) - public Response getNodeStatus( - @ApiParam( - value = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response.", - required = false - ) - @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId, - @ApiParam( - value = "The node id.", - required = true - ) - @PathParam("id") String id) { - - if (properties.isClusterManager()) { - // get the node statistics - final NodeStatusDTO nodeStatus = serviceFacade.getNodeStatus(id); - - // create the revision - final RevisionDTO revision = new RevisionDTO(); - revision.setClientId(clientId.getClientId()); - - // create the node statics entity - final ProcessGroupStatusEntity entity = new ProcessGroupStatusEntity(); - entity.setRevision(revision); - entity.setProcessGroupStatus(nodeStatus.getControllerStatus()); - - // generate the response - return generateOkResponse(entity).build(); - } - - throw new IllegalClusterResourceRequestException("Only a cluster manager can process the request."); - } - - /** - * Gets the system diagnositics for the specified node. - * - * @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response. - * @param id The id of the node. - * @return A systemDiagnosticsEntity - */ - @GET - @Consumes(MediaType.WILDCARD) - @Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML}) - @Path("/{id}/system-diagnostics") - @PreAuthorize("hasAnyRole('ROLE_MONITOR', 'ROLE_DFM', 'ROLE_ADMIN')") - @ApiOperation( - value = "Gets system diagnostics for a node in the cluester", - response = SystemDiagnosticsEntity.class, - authorizations = { - @Authorization(value = "Read Only", type = "ROLE_MONITOR"), - @Authorization(value = "Data Flow Manager", type = "ROLE_DFM"), - @Authorization(value = "Administrator", type = "ROLE_ADMIN") - } - ) - @ApiResponses( - value = { - @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), - @ApiResponse(code = 401, message = "Client could not be authenticated."), - @ApiResponse(code = 403, message = "Client is not authorized to make this request."), - @ApiResponse(code = 404, message = "The specified resource could not be found."), - @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.") - } - ) - public Response getNodeSystemDiagnostics( - @ApiParam( - value = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response.", - required = false - ) - @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId, - @ApiParam( - value = "The node id.", - required = true - ) - @PathParam("id") String id) { - - if (properties.isClusterManager()) { - // get the node statistics - final NodeSystemDiagnosticsDTO nodeSystemDiagnostics = serviceFacade.getNodeSystemDiagnostics(id); - - // create the revision - final RevisionDTO revision = new RevisionDTO(); - revision.setClientId(clientId.getClientId()); - - // create the node statics entity - final SystemDiagnosticsEntity entity = new SystemDiagnosticsEntity(); - entity.setRevision(revision); - entity.setSystemDiagnostics(nodeSystemDiagnostics.getSystemDiagnostics()); - - // generate the response - return generateOkResponse(entity).build(); - } - - throw new IllegalClusterResourceRequestException("Only a cluster manager can process the request."); - } /** * Updates the contents of the specified node in this NiFi cluster. http://git-wip-us.apache.org/repos/asf/nifi/blob/0d3bd2c4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/OutputPortResource.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/OutputPortResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/OutputPortResource.java index ccd08db..e76fcf0 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/OutputPortResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/OutputPortResource.java @@ -25,6 +25,7 @@ import com.wordnik.swagger.annotations.Authorization; import java.net.URI; import java.net.URISyntaxException; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; import java.util.Set; import java.util.UUID; @@ -46,7 +47,12 @@ import javax.ws.rs.core.Context; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.MultivaluedMap; import javax.ws.rs.core.Response; + +import org.apache.nifi.cluster.manager.NodeResponse; +import org.apache.nifi.cluster.manager.exception.UnknownNodeException; import org.apache.nifi.cluster.manager.impl.WebClusterManager; +import org.apache.nifi.cluster.node.Node; +import org.apache.nifi.cluster.protocol.NodeIdentifier; import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.web.ConfigurationSnapshot; import org.apache.nifi.web.NiFiServiceFacade; @@ -54,8 +60,10 @@ import org.apache.nifi.web.Revision; import org.apache.nifi.web.api.dto.PortDTO; import org.apache.nifi.web.api.dto.PositionDTO; import org.apache.nifi.web.api.dto.RevisionDTO; +import org.apache.nifi.web.api.dto.status.PortStatusDTO; import org.apache.nifi.web.api.entity.OutputPortEntity; import org.apache.nifi.web.api.entity.OutputPortsEntity; +import org.apache.nifi.web.api.entity.PortStatusEntity; import org.apache.nifi.web.api.request.ClientIdParameter; import org.apache.nifi.web.api.request.DoubleParameter; import org.apache.nifi.web.api.request.IntegerParameter; @@ -370,6 +378,106 @@ public class OutputPortResource extends ApplicationResource { } /** + * Retrieves the specified output port status. + * + * @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response. + * @param id The id of the processor history to retrieve. + * @return A portStatusEntity. + */ + @GET + @Consumes(MediaType.WILDCARD) + @Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML}) + @Path("/{id}/status") + @PreAuthorize("hasAnyRole('ROLE_MONITOR', 'ROLE_DFM', 'ROLE_ADMIN')") + @ApiOperation( + value = "Gets status for an output port", + response = PortStatusEntity.class, + authorizations = { + @Authorization(value = "Read Only", type = "ROLE_MONITOR"), + @Authorization(value = "Data Flow Manager", type = "ROLE_DFM"), + @Authorization(value = "Administrator", type = "ROLE_ADMIN") + } + ) + @ApiResponses( + value = { + @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), + @ApiResponse(code = 401, message = "Client could not be authenticated."), + @ApiResponse(code = 403, message = "Client is not authorized to make this request."), + @ApiResponse(code = 404, message = "The specified resource could not be found."), + @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.") + } + ) + public Response getOutputPortStatus( + @ApiParam( + value = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response.", + required = false + ) + @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId, + @ApiParam( + value = "Whether or not to include the breakdown per node. Optional, defaults to false", + required = false + ) + @QueryParam("nodewise") @DefaultValue(NODEWISE) Boolean nodewise, + @ApiParam( + value = "The id of the node where to get the status.", + required = false + ) + @QueryParam("clusterNodeId") String clusterNodeId, + @ApiParam( + value = "The output port id.", + required = true + ) + @PathParam("id") String id) { + + // ensure a valid request + if (Boolean.TRUE.equals(nodewise) && clusterNodeId != null) { + throw new IllegalArgumentException("Nodewise requests cannot be directed at a specific node."); + } + + if (properties.isClusterManager()) { + // determine where this request should be sent + if (clusterNodeId == null) { + final NodeResponse nodeResponse = clusterManager.applyRequest(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders()); + final PortStatusEntity entity = (PortStatusEntity) nodeResponse.getUpdatedEntity(); + + // ensure there is an updated entity (result of merging) and prune the response as necessary + if (entity != null && !nodewise) { + entity.getPortStatus().setNodeSnapshots(null); + } + + return nodeResponse.getResponse(); + } else { + // get the target node and ensure it exists + final Node targetNode = clusterManager.getNode(clusterNodeId); + if (targetNode == null) { + throw new UnknownNodeException("The specified cluster node does not exist."); + } + + final Set<NodeIdentifier> targetNodes = new HashSet<>(); + targetNodes.add(targetNode.getNodeId()); + + // replicate the request to the specific node + return clusterManager.applyRequest(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders(), targetNodes).getResponse(); + } + } + + // get the specified output port status + final PortStatusDTO portStatus = serviceFacade.getOutputPortStatus(groupId, id); + + // create the revision + final RevisionDTO revision = new RevisionDTO(); + revision.setClientId(clientId.getClientId()); + + // generate the response entity + final PortStatusEntity entity = new PortStatusEntity(); + entity.setRevision(revision); + entity.setPortStatus(portStatus); + + // generate the response + return clusterContext(generateOkResponse(entity)).build(); + } + + /** * Updates the specified output port. * * @param httpServletRequest request
