Repository: nifi Updated Branches: refs/heads/NIFI-108 e762d3c7d -> 670733753
NIFI-108: - Starting to add support for endpoints that will listing flowfiles in a queue. Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/4cc1fa85 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/4cc1fa85 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/4cc1fa85 Branch: refs/heads/NIFI-108 Commit: 4cc1fa85aedd5fd6611d712f3e69e96adf845d55 Parents: e762d3c Author: Matt Gilman <[email protected]> Authored: Thu Dec 17 16:06:24 2015 -0500 Committer: Matt Gilman <[email protected]> Committed: Thu Dec 17 16:06:24 2015 -0500 ---------------------------------------------------------------------- .../nifi/web/api/dto/FlowFileSummaryDTO.java | 32 +++ .../org/apache/nifi/web/NiFiServiceFacade.java | 22 ++ .../nifi/web/StandardNiFiServiceFacade.java | 11 + .../apache/nifi/web/api/ConnectionResource.java | 265 ++++++++++++++++++- .../org/apache/nifi/web/api/dto/DtoFactory.java | 14 + .../org/apache/nifi/web/dao/ConnectionDAO.java | 25 +- .../web/dao/impl/StandardConnectionDAO.java | 79 ++++++ 7 files changed, 440 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/4cc1fa85/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/FlowFileSummaryDTO.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/FlowFileSummaryDTO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/FlowFileSummaryDTO.java index accb512..06b2776 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/FlowFileSummaryDTO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/FlowFileSummaryDTO.java @@ -24,6 +24,8 @@ import java.util.Date; public class FlowFileSummaryDTO { + private String uri; + private String uuid; private String filename; private Integer position; @@ -32,6 +34,22 @@ public class FlowFileSummaryDTO { private Date linageStartDate; private Boolean isPenalized; + private String clusterNodeId; + + /** + * @return the FlowFile uri + */ + @ApiModelProperty( + value = "The URI that can be used to access this FlowFile." + ) + public String getUri() { + return uri; + } + + public void setUri(String uri) { + this.uri = uri; + } + /** * @return the FlowFile uuid */ @@ -131,4 +149,18 @@ public class FlowFileSummaryDTO { public void setPenalized(Boolean penalized) { isPenalized = penalized; } + + /** + * @return The id of the node where this FlowFile resides. + */ + @ApiModelProperty( + value = "The id of the node where this FlowFile resides." + ) + public String getClusterNodeId() { + return clusterNodeId; + } + + public void setClusterNodeId(String clusterNodeId) { + this.clusterNodeId = clusterNodeId; + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/4cc1fa85/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java index ef5f202..66efb27 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java @@ -33,6 +33,7 @@ import org.apache.nifi.web.api.dto.ControllerServiceDTO; import org.apache.nifi.web.api.dto.CounterDTO; import org.apache.nifi.web.api.dto.CountersDTO; import org.apache.nifi.web.api.dto.DocumentedTypeDTO; +import org.apache.nifi.web.api.dto.FlowFileDTO; import org.apache.nifi.web.api.dto.FlowSnippetDTO; import org.apache.nifi.web.api.dto.FunnelDTO; import org.apache.nifi.web.api.dto.LabelDTO; @@ -120,6 +121,17 @@ public interface NiFiServiceFacade { DownloadableContent getContent(Long eventId, String uri, ContentDirection contentDirection); /** + * Gets the content for the specified flowfile in the specified connection. + * + * @param groupId group + * @param connectionId connection + * @param flowfileUuid flowfile + * @param uri uri + * @return content + */ + DownloadableContent getContent(String groupId, String connectionId, String flowfileUuid, String uri); + + /** * Retrieves provenance. * * @param queryId identifier @@ -587,6 +599,16 @@ public interface NiFiServiceFacade { */ ListingRequestDTO deleteFlowFileListingRequest(String groupId, String connectionId, String listingRequestId); + /** + * Gets the specified flowfile from the specified connection. + * + * @param groupId group + * @param connectionId The ID of the connection + * @param flowFileUuid The UUID of the flowfile + * @return The FlowFileDTO + */ + FlowFileDTO getFlowFile(String groupId, String connectionId, String flowFileUuid); + // ---------------------------------------- // InputPort methods // ---------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/4cc1fa85/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java index 8b0ef37..7982f9a 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java @@ -80,6 +80,7 @@ import org.apache.nifi.remote.RootGroupPort; import org.apache.nifi.reporting.Bulletin; import org.apache.nifi.reporting.BulletinQuery; import org.apache.nifi.reporting.BulletinRepository; +import org.apache.nifi.web.api.dto.FlowFileDTO; import org.apache.nifi.web.api.dto.ListingRequestDTO; import org.apache.nifi.web.security.user.NiFiUserUtils; import org.apache.nifi.user.AccountStatus; @@ -1960,6 +1961,11 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { } @Override + public DownloadableContent getContent(String groupId, String connectionId, String flowFileUuid, String uri) { + return connectionDAO.getContent(groupId, connectionId, flowFileUuid, uri); + } + + @Override public DownloadableContent getContent(Long eventId, String uri, ContentDirection contentDirection) { return controllerFacade.getContent(eventId, uri, contentDirection); } @@ -2143,6 +2149,11 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { } @Override + public FlowFileDTO getFlowFile(String groupId, String connectionId, String flowFileUuid) { + return dtoFactory.createFlowFileDTO(connectionDAO.getFlowFile(groupId, connectionId, flowFileUuid)); + } + + @Override public StatusHistoryDTO getConnectionStatusHistory(String groupId, String connectionId) { return controllerFacade.getConnectionStatusHistory(groupId, connectionId); } http://git-wip-us.apache.org/repos/asf/nifi/blob/4cc1fa85/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectionResource.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectionResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectionResource.java index 8ee765c..8cb0693 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectionResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectionResource.java @@ -22,11 +22,16 @@ import com.wordnik.swagger.annotations.ApiParam; import com.wordnik.swagger.annotations.ApiResponse; import com.wordnik.swagger.annotations.ApiResponses; import com.wordnik.swagger.annotations.Authorization; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; import java.net.URI; import java.net.URISyntaxException; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -51,21 +56,31 @@ import javax.ws.rs.core.MediaType; import javax.ws.rs.core.MultivaluedMap; import javax.ws.rs.core.Response; import javax.ws.rs.core.Response.Status; +import javax.ws.rs.core.StreamingOutput; +import org.apache.nifi.cluster.manager.exception.UnknownNodeException; import org.apache.nifi.cluster.manager.impl.WebClusterManager; +import org.apache.nifi.cluster.node.Node; +import org.apache.nifi.cluster.protocol.NodeIdentifier; +import org.apache.nifi.controller.repository.claim.ContentDirection; +import org.apache.nifi.stream.io.StreamUtils; import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.web.ConfigurationSnapshot; +import org.apache.nifi.web.DownloadableContent; import org.apache.nifi.web.IllegalClusterResourceRequestException; import org.apache.nifi.web.NiFiServiceFacade; import org.apache.nifi.web.Revision; import org.apache.nifi.web.api.dto.ConnectableDTO; import org.apache.nifi.web.api.dto.ConnectionDTO; +import org.apache.nifi.web.api.dto.FlowFileDTO; +import org.apache.nifi.web.api.dto.FlowFileSummaryDTO; import org.apache.nifi.web.api.dto.ListingRequestDTO; import org.apache.nifi.web.api.dto.PositionDTO; import org.apache.nifi.web.api.dto.RevisionDTO; import org.apache.nifi.web.api.dto.status.StatusHistoryDTO; import org.apache.nifi.web.api.entity.ConnectionEntity; import org.apache.nifi.web.api.entity.ConnectionsEntity; +import org.apache.nifi.web.api.entity.FlowFileEntity; import org.apache.nifi.web.api.entity.ListingRequestEntity; import org.apache.nifi.web.api.entity.StatusHistoryEntity; import org.apache.nifi.web.api.request.ClientIdParameter; @@ -96,7 +111,7 @@ public class ConnectionResource extends ApplicationResource { private String groupId; /** - * Populate the uri's for the specified processors and their relationships. + * Populate the uri's for the specified connections. * * @param connections connections * @return dtos @@ -109,7 +124,10 @@ public class ConnectionResource extends ApplicationResource { } /** - * Populate the uri's for the specified processor and its relationships. + * Populate the uri's for the specified connection. + * + * @param connection connection + * @return dto */ private ConnectionDTO populateRemainingConnectionContent(ConnectionDTO connection) { // populate the remaining properties @@ -118,6 +136,18 @@ public class ConnectionResource extends ApplicationResource { } /** + * Populate the uri's for the specified flowfile. + * + * @param connectionId the connection id + * @param flowFile the flowfile + * @return the dto + */ + private FlowFileSummaryDTO populateRemainingFlowFileContent(final String connectionId, FlowFileSummaryDTO flowFile) { + flowFile.setUri(generateResourceUri("controller", "process-groups", groupId, "connections", connectionId, "flowfiles", flowFile.getUuid())); + return flowFile; + } + + /** * Gets all the connections. * * @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response. @@ -892,23 +922,244 @@ public class ConnectionResource extends ApplicationResource { return clusterContext(generateOkResponse(entity)).build(); } + /** + * Gets the specified flowfile from the specified connection. + * + * @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response. + * @param connectionId The connection id + * @param flowFileUuid The flowfile uuid + * @param clusterNodeId The cluster node id where the flowfile resides + * @return a flowFileDTO + */ @GET @Consumes(MediaType.WILDCARD) @Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML}) - @Path("/{connection-id}/flowfiles/{flow-file-uuid}") - public Response getFlowFile() { - return null; + @Path("/{connection-id}/flowfiles/{flowfile-uuid}") + @PreAuthorize("hasRole('ROLE_DFM')") + @ApiOperation( + value = "Gets a FlowFile from a Connection.", + authorizations = { + @Authorization(value = "Data Flow Manager", type = "ROLE_DFM") + } + ) + @ApiResponses( + value = { + @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), + @ApiResponse(code = 401, message = "Client could not be authenticated."), + @ApiResponse(code = 403, message = "Client is not authorized to make this request."), + @ApiResponse(code = 404, message = "The specified resource could not be found."), + @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.") + } + ) + public Response getFlowFile( + @ApiParam( + value = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response.", + required = false + ) + @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId, + @ApiParam( + value = "The connection id.", + required = true + ) + @PathParam("connection-id") String connectionId, + @ApiParam( + value = "The flowfile uuid.", + required = true + ) + @PathParam("flowfile-uuid") String flowFileUuid, + @ApiParam( + value = "The id of the node where the content exists if clustered.", + required = false + ) + @QueryParam("clusterNodeId") String clusterNodeId) { + + // replicate if cluster manager + if (properties.isClusterManager()) { + // determine where this request should be sent + if (clusterNodeId == null) { + throw new IllegalArgumentException("The id of the node in the cluster is required."); + } else { + // get the target node and ensure it exists + final Node targetNode = clusterManager.getNode(clusterNodeId); + if (targetNode == null) { + throw new UnknownNodeException("The specified cluster node does not exist."); + } + + final Set<NodeIdentifier> targetNodes = new HashSet<>(); + targetNodes.add(targetNode.getNodeId()); + + // replicate the request to the specific node + return clusterManager.applyRequest(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders(), targetNodes).getResponse(); + } + } + + // get the flowfile + final FlowFileDTO flowfileDto = serviceFacade.getFlowFile(groupId, connectionId, flowFileUuid); + populateRemainingFlowFileContent(connectionId, flowfileDto); + + // create the revision + final RevisionDTO revision = new RevisionDTO(); + revision.setClientId(clientId.getClientId()); + + // create the response entity + final FlowFileEntity entity = new FlowFileEntity(); + entity.setRevision(revision); + entity.setFlowFile(flowfileDto); + + return generateOkResponse(entity).build(); } @DELETE @Consumes(MediaType.WILDCARD) @Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML}) - @Path("/{connection-id}/flowfiles/{flow-file-uuid}") - public Response deleteFlowFile() { + @Path("/{connection-id}/flowfiles/{flowfile-uuid}") + @PreAuthorize("hasRole('ROLE_DFM')") + public Response deleteFlowFile( + @ApiParam( + value = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response.", + required = false + ) + @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId, + @ApiParam( + value = "The connection id.", + required = true + ) + @PathParam("connection-id") String connectionId, + @ApiParam( + value = "The flowfile uuid.", + required = true + ) + @PathParam("flowfile-uuid") String flowFileUuid, + @ApiParam( + value = "The id of the node where the content exists if clustered.", + required = false + ) + @QueryParam("clusterNodeId") String clusterNodeId) { + + // replicate if cluster manager + if (properties.isClusterManager()) { + // determine where this request should be sent + if (clusterNodeId == null) { + throw new IllegalArgumentException("The id of the node in the cluster is required."); + } else { + // get the target node and ensure it exists + final Node targetNode = clusterManager.getNode(clusterNodeId); + if (targetNode == null) { + throw new UnknownNodeException("The specified cluster node does not exist."); + } + + final Set<NodeIdentifier> targetNodes = new HashSet<>(); + targetNodes.add(targetNode.getNodeId()); + + // replicate the request to the specific node + return clusterManager.applyRequest(HttpMethod.DELETE, getAbsolutePath(), getRequestParameters(true), getHeaders(), targetNodes).getResponse(); + } + } + return null; } /** + * Gets the content for the specified flowfile in the specified connection. + * + * @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response. + * @param connectionId The connection id + * @param flowFileUuid The flowfile uuid + * @param clusterNodeId The cluster node id + * @return The content stream + */ + @GET + @Consumes(MediaType.WILDCARD) + @Produces(MediaType.WILDCARD) + @Path("/{connection-id}/flowfiles/{flowfile-uuid}/content") + @PreAuthorize("hasRole('ROLE_DFM')") + @ApiOperation( + value = "Gets the content for a FlowFile in a Connection.", + authorizations = { + @Authorization(value = "Data Flow Manager", type = "ROLE_DFM") + } + ) + @ApiResponses( + value = { + @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), + @ApiResponse(code = 401, message = "Client could not be authenticated."), + @ApiResponse(code = 403, message = "Client is not authorized to make this request."), + @ApiResponse(code = 404, message = "The specified resource could not be found."), + @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.") + } + ) + public Response downloadFlowFileContent( + @ApiParam( + value = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response.", + required = false + ) + @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId, + @ApiParam( + value = "The connection id.", + required = true + ) + @PathParam("connection-id") String connectionId, + @ApiParam( + value = "The flowfile uuid.", + required = true + ) + @PathParam("flowfile-uuid") String flowFileUuid, + @ApiParam( + value = "The id of the node where the content exists if clustered.", + required = false + ) + @QueryParam("clusterNodeId") String clusterNodeId) { + + // replicate if cluster manager + if (properties.isClusterManager()) { + // determine where this request should be sent + if (clusterNodeId == null) { + throw new IllegalArgumentException("The id of the node in the cluster is required."); + } else { + // get the target node and ensure it exists + final Node targetNode = clusterManager.getNode(clusterNodeId); + if (targetNode == null) { + throw new UnknownNodeException("The specified cluster node does not exist."); + } + + final Set<NodeIdentifier> targetNodes = new HashSet<>(); + targetNodes.add(targetNode.getNodeId()); + + // replicate the request to the specific node + return clusterManager.applyRequest(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders(), targetNodes).getResponse(); + } + } + + // get the uri of the request + final String uri = generateResourceUri("controller", "process-groups", groupId, "connections", connectionId, "flowfiles", flowFileUuid, "content"); + + // get an input stream to the content + final DownloadableContent content = serviceFacade.getContent(groupId, connectionId, flowFileUuid, uri); + + // generate a streaming response + final StreamingOutput response = new StreamingOutput() { + @Override + public void write(OutputStream output) throws IOException, WebApplicationException { + try (InputStream is = content.getContent()) { + // stream the content to the response + StreamUtils.copy(is, output); + + // flush the response + output.flush(); + } + } + }; + + // use the appropriate content type + String contentType = content.getType(); + if (contentType == null) { + contentType = MediaType.APPLICATION_OCTET_STREAM; + } + + return generateOkResponse(response).type(contentType).header("Content-Disposition", String.format("attachment; filename=\"%s\"", content.getFilename())).build(); + } + + /** * Drops the flowfiles in the queue of the specified connection. This endpoint is DEPRECATED. Please use * POST /nifi-api/controller/process-groups/{process-group-id}/connections/{connection-id}/drop-requests instead. * http://git-wip-us.apache.org/repos/asf/nifi/blob/4cc1fa85/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java index b93ff95..c32ad3a 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java @@ -78,6 +78,7 @@ import org.apache.nifi.controller.label.Label; import org.apache.nifi.controller.queue.FlowFileSummary; import org.apache.nifi.controller.queue.ListFlowFileState; import org.apache.nifi.controller.queue.ListFlowFileStatus; +import org.apache.nifi.controller.repository.FlowFileRecord; import org.apache.nifi.controller.status.ConnectionStatus; import org.apache.nifi.controller.status.PortStatus; import org.apache.nifi.controller.status.ProcessGroupStatus; @@ -87,6 +88,7 @@ import org.apache.nifi.diagnostics.GarbageCollection; import org.apache.nifi.diagnostics.StorageUsage; import org.apache.nifi.diagnostics.SystemDiagnostics; import org.apache.nifi.flowfile.FlowFilePrioritizer; +import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.groups.ProcessGroupCounts; import org.apache.nifi.groups.RemoteProcessGroup; @@ -391,6 +393,18 @@ public final class DtoFactory { return dto; } + public FlowFileDTO createFlowFileDTO(final FlowFileRecord record) { + final FlowFileDTO dto = new FlowFileDTO(); + dto.setUuid(record.getAttribute(CoreAttributes.UUID.key())); + dto.setFilename(record.getAttribute(CoreAttributes.FILENAME.key())); + dto.setLastQueuedTime(new Date(record.getLastQueueDate())); + dto.setLinageStartDate(new Date(record.getLineageStartDate())); + dto.setPenalized(record.isPenalized()); + dto.setSize(record.getSize()); + dto.setAttributes(record.getAttributes()); + return dto; + } + /** * Creates a ConnectionDTO from the specified Connection. * http://git-wip-us.apache.org/repos/asf/nifi/blob/4cc1fa85/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ConnectionDAO.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ConnectionDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ConnectionDAO.java index 642c47e..3884f51 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ConnectionDAO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ConnectionDAO.java @@ -20,6 +20,8 @@ import java.util.Set; import org.apache.nifi.connectable.Connection; import org.apache.nifi.controller.queue.DropFlowFileStatus; import org.apache.nifi.controller.queue.ListFlowFileStatus; +import org.apache.nifi.controller.repository.FlowFileRecord; +import org.apache.nifi.web.DownloadableContent; import org.apache.nifi.web.api.dto.ConnectionDTO; public interface ConnectionDAO { @@ -44,7 +46,7 @@ public interface ConnectionDAO { DropFlowFileStatus getFlowFileDropRequest(String groupId, String id, String dropRequestId); /** - * Gets the specified flow file listing request. + * Gets the specified flowfile listing request. * * @param groupId group id * @param id connection id @@ -54,6 +56,16 @@ public interface ConnectionDAO { ListFlowFileStatus getFlowFileListingRequest(String groupId, String id, String listingRequestId); /** + * Gets the specified flowfile in the specified connection. + * + * @param groupId group id + * @param id connection id + * @param flowFileUuid the flowfile uuid + * @return The flowfile + */ + FlowFileRecord getFlowFile(String groupId, String id, String flowFileUuid); + + /** * Gets the connections for the specified source processor. * * @param groupId group id @@ -168,4 +180,15 @@ public interface ConnectionDAO { * @return The listing request status */ ListFlowFileStatus deleteFlowFileListingRequest(String groupId, String id, String listingRequestId); + + /** + * Gets the content for the specified flowfile in the specified connection. + * + * @param groupId group id + * @param id connection id + * @param flowfileUuid flowfile uuid + * @param requestUri request uri + * @return The downloadable content + */ + DownloadableContent getContent(String groupId, String id, String flowfileUuid, String requestUri); } http://git-wip-us.apache.org/repos/asf/nifi/blob/4cc1fa85/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectionDAO.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectionDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectionDAO.java index 0e9a90a..ea9c1d7 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectionDAO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectionDAO.java @@ -16,15 +16,20 @@ */ package org.apache.nifi.web.dao.impl; +import java.io.IOException; +import java.io.InputStream; import java.util.ArrayList; import java.util.Collection; import java.util.HashSet; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.regex.Matcher; import javax.ws.rs.WebApplicationException; +import org.apache.nifi.admin.service.UserService; +import org.apache.nifi.authorization.DownloadAuthorization; import org.apache.nifi.connectable.Connectable; import org.apache.nifi.connectable.ConnectableType; import org.apache.nifi.connectable.Connection; @@ -35,6 +40,9 @@ import org.apache.nifi.controller.exception.ValidationException; import org.apache.nifi.controller.queue.DropFlowFileStatus; import org.apache.nifi.controller.queue.FlowFileQueue; import org.apache.nifi.controller.queue.ListFlowFileStatus; +import org.apache.nifi.controller.repository.ContentNotFoundException; +import org.apache.nifi.controller.repository.FlowFileRecord; +import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.groups.RemoteProcessGroup; import org.apache.nifi.flowfile.FlowFilePrioritizer; @@ -42,16 +50,24 @@ import org.apache.nifi.processor.Relationship; import org.apache.nifi.remote.RemoteGroupPort; import org.apache.nifi.user.NiFiUser; import org.apache.nifi.util.FormatUtils; +import org.apache.nifi.web.DownloadableContent; import org.apache.nifi.web.ResourceNotFoundException; import org.apache.nifi.web.api.dto.ConnectableDTO; import org.apache.nifi.web.api.dto.ConnectionDTO; import org.apache.nifi.web.api.dto.PositionDTO; import org.apache.nifi.web.dao.ConnectionDAO; +import org.apache.nifi.web.security.ProxiedEntitiesUtils; import org.apache.nifi.web.security.user.NiFiUserUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.security.access.AccessDeniedException; public class StandardConnectionDAO extends ComponentDAO implements ConnectionDAO { + private static final Logger logger = LoggerFactory.getLogger(StandardConnectionDAO.class); + private FlowController flowController; + private UserService userService; private Connection locateConnection(final String groupId, final String id) { return locateConnection(locateProcessGroup(flowController, groupId), id); @@ -101,6 +117,19 @@ public class StandardConnectionDAO extends ComponentDAO implements ConnectionDAO } @Override + public FlowFileRecord getFlowFile(String groupId, String id, String flowFileUuid) { + final Connection connection = locateConnection(groupId, id); + final FlowFileQueue queue = connection.getFlowFileQueue(); + final FlowFileRecord flowFile = queue.getFlowFile(flowFileUuid); + + if (flowFile == null) { + throw new ResourceNotFoundException(String.format("Unable to find FlowFile '%s' in Connection '%s'.", flowFileUuid, id)); + } + + return flowFile; + } + + @Override public Set<Connection> getConnectionsForSource(final String groupId, final String processorId) { final Set<Connection> connections = new HashSet<>(getConnections(groupId)); for (final Iterator<Connection> connectionIter = connections.iterator(); connectionIter.hasNext();) { @@ -542,8 +571,58 @@ public class StandardConnectionDAO extends ComponentDAO implements ConnectionDAO return listFlowFileStatus; } + @Override + public DownloadableContent getContent(String groupId, String id, String flowFileUuid, String requestUri) { + try { + final NiFiUser user = NiFiUserUtils.getNiFiUser(); + if (user == null) { + throw new WebApplicationException(new Throwable("Unable to access details for current user.")); + } + + final Connection connection = locateConnection(groupId, id); + final FlowFileQueue queue = connection.getFlowFileQueue(); + final FlowFileRecord flowFile = queue.getFlowFile(flowFileUuid); + + if (flowFile == null) { + throw new ResourceNotFoundException(String.format("Unable to find FlowFile '%s' in Connection '%s'.", flowFileUuid, id)); + } + + // calculate the dn chain + final List<String> dnChain = ProxiedEntitiesUtils.buildProxiedEntitiesChain(user); + + // ensure the users in this chain are allowed to download this content + final Map<String, String> attributes = flowFile.getAttributes(); + final DownloadAuthorization downloadAuthorization = userService.authorizeDownload(dnChain, attributes); + if (!downloadAuthorization.isApproved()) { + throw new AccessDeniedException(downloadAuthorization.getExplanation()); + } + + // get the filename and fall back to the identifier (should never happen) + String filename = attributes.get(CoreAttributes.FILENAME.key()); + if (filename == null) { + filename = flowFileUuid; + } + + // get the mime-type + final String type = attributes.get(CoreAttributes.MIME_TYPE.key()); + + // get the content + final InputStream content = flowController.getContent(flowFile, user.getIdentity(), requestUri); + return new DownloadableContent(filename, type, content); + } catch (final ContentNotFoundException cnfe) { + throw new ResourceNotFoundException("Unable to find the specified content."); + } catch (final IOException ioe) { + logger.error(String.format("Unable to get the content for flowfile (%s) at this time.", flowFileUuid), ioe); + throw new IllegalStateException("Unable to get the content at this time."); + } + } + /* setters */ public void setFlowController(final FlowController flowController) { this.flowController = flowController; } + + public void setUserService(UserService userService) { + this.userService = userService; + } }
