Repository: nifi
Updated Branches:
  refs/heads/NIFI-108 e762d3c7d -> 670733753


NIFI-108:
- Starting to add support for endpoints that will listing flowfiles in a queue.


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/4cc1fa85
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/4cc1fa85
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/4cc1fa85

Branch: refs/heads/NIFI-108
Commit: 4cc1fa85aedd5fd6611d712f3e69e96adf845d55
Parents: e762d3c
Author: Matt Gilman <[email protected]>
Authored: Thu Dec 17 16:06:24 2015 -0500
Committer: Matt Gilman <[email protected]>
Committed: Thu Dec 17 16:06:24 2015 -0500

----------------------------------------------------------------------
 .../nifi/web/api/dto/FlowFileSummaryDTO.java    |  32 +++
 .../org/apache/nifi/web/NiFiServiceFacade.java  |  22 ++
 .../nifi/web/StandardNiFiServiceFacade.java     |  11 +
 .../apache/nifi/web/api/ConnectionResource.java | 265 ++++++++++++++++++-
 .../org/apache/nifi/web/api/dto/DtoFactory.java |  14 +
 .../org/apache/nifi/web/dao/ConnectionDAO.java  |  25 +-
 .../web/dao/impl/StandardConnectionDAO.java     |  79 ++++++
 7 files changed, 440 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/4cc1fa85/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/FlowFileSummaryDTO.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/FlowFileSummaryDTO.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/FlowFileSummaryDTO.java
index accb512..06b2776 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/FlowFileSummaryDTO.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/FlowFileSummaryDTO.java
@@ -24,6 +24,8 @@ import java.util.Date;
 
 public class FlowFileSummaryDTO {
 
+    private String uri;
+
     private String uuid;
     private String filename;
     private Integer position;
@@ -32,6 +34,22 @@ public class FlowFileSummaryDTO {
     private Date linageStartDate;
     private Boolean isPenalized;
 
+    private String clusterNodeId;
+
+    /**
+     * @return the FlowFile uri
+     */
+    @ApiModelProperty(
+        value = "The URI that can be used to access this FlowFile."
+    )
+    public String getUri() {
+        return uri;
+    }
+
+    public void setUri(String uri) {
+        this.uri = uri;
+    }
+
     /**
      * @return the FlowFile uuid
      */
@@ -131,4 +149,18 @@ public class FlowFileSummaryDTO {
     public void setPenalized(Boolean penalized) {
         isPenalized = penalized;
     }
+
+    /**
+     * @return The id of the node where this FlowFile resides.
+     */
+    @ApiModelProperty(
+        value = "The id of the node where this FlowFile resides."
+    )
+    public String getClusterNodeId() {
+        return clusterNodeId;
+    }
+
+    public void setClusterNodeId(String clusterNodeId) {
+        this.clusterNodeId = clusterNodeId;
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/4cc1fa85/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
index ef5f202..66efb27 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
@@ -33,6 +33,7 @@ import org.apache.nifi.web.api.dto.ControllerServiceDTO;
 import org.apache.nifi.web.api.dto.CounterDTO;
 import org.apache.nifi.web.api.dto.CountersDTO;
 import org.apache.nifi.web.api.dto.DocumentedTypeDTO;
+import org.apache.nifi.web.api.dto.FlowFileDTO;
 import org.apache.nifi.web.api.dto.FlowSnippetDTO;
 import org.apache.nifi.web.api.dto.FunnelDTO;
 import org.apache.nifi.web.api.dto.LabelDTO;
@@ -120,6 +121,17 @@ public interface NiFiServiceFacade {
     DownloadableContent getContent(Long eventId, String uri, ContentDirection 
contentDirection);
 
     /**
+     * Gets the content for the specified flowfile in the specified connection.
+     *
+     * @param groupId group
+     * @param connectionId connection
+     * @param flowfileUuid flowfile
+     * @param uri uri
+     * @return content
+     */
+    DownloadableContent getContent(String groupId, String connectionId, String 
flowfileUuid, String uri);
+
+    /**
      * Retrieves provenance.
      *
      * @param queryId identifier
@@ -587,6 +599,16 @@ public interface NiFiServiceFacade {
      */
     ListingRequestDTO deleteFlowFileListingRequest(String groupId, String 
connectionId, String listingRequestId);
 
+    /**
+     * Gets the specified flowfile from the specified connection.
+     *
+     * @param groupId group
+     * @param connectionId The ID of the connection
+     * @param flowFileUuid The UUID of the flowfile
+     * @return The FlowFileDTO
+     */
+    FlowFileDTO getFlowFile(String groupId, String connectionId, String 
flowFileUuid);
+
     // ----------------------------------------
     // InputPort methods
     // ----------------------------------------

http://git-wip-us.apache.org/repos/asf/nifi/blob/4cc1fa85/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
index 8b0ef37..7982f9a 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
@@ -80,6 +80,7 @@ import org.apache.nifi.remote.RootGroupPort;
 import org.apache.nifi.reporting.Bulletin;
 import org.apache.nifi.reporting.BulletinQuery;
 import org.apache.nifi.reporting.BulletinRepository;
+import org.apache.nifi.web.api.dto.FlowFileDTO;
 import org.apache.nifi.web.api.dto.ListingRequestDTO;
 import org.apache.nifi.web.security.user.NiFiUserUtils;
 import org.apache.nifi.user.AccountStatus;
@@ -1960,6 +1961,11 @@ public class StandardNiFiServiceFacade implements 
NiFiServiceFacade {
     }
 
     @Override
+    public DownloadableContent getContent(String groupId, String connectionId, 
String flowFileUuid, String uri) {
+        return connectionDAO.getContent(groupId, connectionId, flowFileUuid, 
uri);
+    }
+
+    @Override
     public DownloadableContent getContent(Long eventId, String uri, 
ContentDirection contentDirection) {
         return controllerFacade.getContent(eventId, uri, contentDirection);
     }
@@ -2143,6 +2149,11 @@ public class StandardNiFiServiceFacade implements 
NiFiServiceFacade {
     }
 
     @Override
+    public FlowFileDTO getFlowFile(String groupId, String connectionId, String 
flowFileUuid) {
+        return dtoFactory.createFlowFileDTO(connectionDAO.getFlowFile(groupId, 
connectionId, flowFileUuid));
+    }
+
+    @Override
     public StatusHistoryDTO getConnectionStatusHistory(String groupId, String 
connectionId) {
         return controllerFacade.getConnectionStatusHistory(groupId, 
connectionId);
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/4cc1fa85/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectionResource.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectionResource.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectionResource.java
index 8ee765c..8cb0693 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectionResource.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectionResource.java
@@ -22,11 +22,16 @@ import com.wordnik.swagger.annotations.ApiParam;
 import com.wordnik.swagger.annotations.ApiResponse;
 import com.wordnik.swagger.annotations.ApiResponses;
 import com.wordnik.swagger.annotations.Authorization;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -51,21 +56,31 @@ import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.MultivaluedMap;
 import javax.ws.rs.core.Response;
 import javax.ws.rs.core.Response.Status;
+import javax.ws.rs.core.StreamingOutput;
 
+import org.apache.nifi.cluster.manager.exception.UnknownNodeException;
 import org.apache.nifi.cluster.manager.impl.WebClusterManager;
+import org.apache.nifi.cluster.node.Node;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.controller.repository.claim.ContentDirection;
+import org.apache.nifi.stream.io.StreamUtils;
 import org.apache.nifi.util.NiFiProperties;
 import org.apache.nifi.web.ConfigurationSnapshot;
+import org.apache.nifi.web.DownloadableContent;
 import org.apache.nifi.web.IllegalClusterResourceRequestException;
 import org.apache.nifi.web.NiFiServiceFacade;
 import org.apache.nifi.web.Revision;
 import org.apache.nifi.web.api.dto.ConnectableDTO;
 import org.apache.nifi.web.api.dto.ConnectionDTO;
+import org.apache.nifi.web.api.dto.FlowFileDTO;
+import org.apache.nifi.web.api.dto.FlowFileSummaryDTO;
 import org.apache.nifi.web.api.dto.ListingRequestDTO;
 import org.apache.nifi.web.api.dto.PositionDTO;
 import org.apache.nifi.web.api.dto.RevisionDTO;
 import org.apache.nifi.web.api.dto.status.StatusHistoryDTO;
 import org.apache.nifi.web.api.entity.ConnectionEntity;
 import org.apache.nifi.web.api.entity.ConnectionsEntity;
+import org.apache.nifi.web.api.entity.FlowFileEntity;
 import org.apache.nifi.web.api.entity.ListingRequestEntity;
 import org.apache.nifi.web.api.entity.StatusHistoryEntity;
 import org.apache.nifi.web.api.request.ClientIdParameter;
@@ -96,7 +111,7 @@ public class ConnectionResource extends ApplicationResource {
     private String groupId;
 
     /**
-     * Populate the uri's for the specified processors and their relationships.
+     * Populate the uri's for the specified connections.
      *
      * @param connections connections
      * @return dtos
@@ -109,7 +124,10 @@ public class ConnectionResource extends 
ApplicationResource {
     }
 
     /**
-     * Populate the uri's for the specified processor and its relationships.
+     * Populate the uri's for the specified connection.
+     *
+     * @param connection connection
+     * @return dto
      */
     private ConnectionDTO populateRemainingConnectionContent(ConnectionDTO 
connection) {
         // populate the remaining properties
@@ -118,6 +136,18 @@ public class ConnectionResource extends 
ApplicationResource {
     }
 
     /**
+     * Populate the uri's for the specified flowfile.
+     *
+     * @param connectionId the connection id
+     * @param flowFile the flowfile
+     * @return the dto
+     */
+    private FlowFileSummaryDTO populateRemainingFlowFileContent(final String 
connectionId, FlowFileSummaryDTO flowFile) {
+        flowFile.setUri(generateResourceUri("controller", "process-groups", 
groupId, "connections", connectionId, "flowfiles", flowFile.getUuid()));
+        return flowFile;
+    }
+
+    /**
      * Gets all the connections.
      *
      * @param clientId Optional client id. If the client id is not specified, 
a new one will be generated. This value (whether specified or generated) is 
included in the response.
@@ -892,23 +922,244 @@ public class ConnectionResource extends 
ApplicationResource {
         return clusterContext(generateOkResponse(entity)).build();
     }
 
+    /**
+     * Gets the specified flowfile from the specified connection.
+     *
+     * @param clientId Optional client id. If the client id is not specified, 
a new one will be generated. This value (whether specified or generated) is 
included in the response.
+     * @param connectionId The connection id
+     * @param flowFileUuid The flowfile uuid
+     * @param clusterNodeId The cluster node id where the flowfile resides
+     * @return a flowFileDTO
+     */
     @GET
     @Consumes(MediaType.WILDCARD)
     @Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML})
-    @Path("/{connection-id}/flowfiles/{flow-file-uuid}")
-    public Response getFlowFile() {
-        return null;
+    @Path("/{connection-id}/flowfiles/{flowfile-uuid}")
+    @PreAuthorize("hasRole('ROLE_DFM')")
+    @ApiOperation(
+        value = "Gets a FlowFile from a Connection.",
+        authorizations = {
+            @Authorization(value = "Data Flow Manager", type = "ROLE_DFM")
+        }
+    )
+    @ApiResponses(
+        value = {
+            @ApiResponse(code = 400, message = "NiFi was unable to complete 
the request because it was invalid. The request should not be retried without 
modification."),
+            @ApiResponse(code = 401, message = "Client could not be 
authenticated."),
+            @ApiResponse(code = 403, message = "Client is not authorized to 
make this request."),
+            @ApiResponse(code = 404, message = "The specified resource could 
not be found."),
+            @ApiResponse(code = 409, message = "The request was valid but NiFi 
was not in the appropriate state to process it. Retrying the same request later 
may be successful.")
+        }
+    )
+    public Response getFlowFile(
+            @ApiParam(
+                value = "If the client id is not specified, new one will be 
generated. This value (whether specified or generated) is included in the 
response.",
+                required = false
+            )
+            @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) 
ClientIdParameter clientId,
+            @ApiParam(
+                value = "The connection id.",
+                required = true
+            )
+            @PathParam("connection-id") String connectionId,
+            @ApiParam(
+                value = "The flowfile uuid.",
+                required = true
+            )
+            @PathParam("flowfile-uuid") String flowFileUuid,
+            @ApiParam(
+                value = "The id of the node where the content exists if 
clustered.",
+                required = false
+            )
+            @QueryParam("clusterNodeId") String clusterNodeId) {
+
+        // replicate if cluster manager
+        if (properties.isClusterManager()) {
+            // determine where this request should be sent
+            if (clusterNodeId == null) {
+                throw new IllegalArgumentException("The id of the node in the 
cluster is required.");
+            } else {
+                // get the target node and ensure it exists
+                final Node targetNode = clusterManager.getNode(clusterNodeId);
+                if (targetNode == null) {
+                    throw new UnknownNodeException("The specified cluster node 
does not exist.");
+                }
+
+                final Set<NodeIdentifier> targetNodes = new HashSet<>();
+                targetNodes.add(targetNode.getNodeId());
+
+                // replicate the request to the specific node
+                return clusterManager.applyRequest(HttpMethod.GET, 
getAbsolutePath(), getRequestParameters(true), getHeaders(), 
targetNodes).getResponse();
+            }
+        }
+
+        // get the flowfile
+        final FlowFileDTO flowfileDto = serviceFacade.getFlowFile(groupId, 
connectionId, flowFileUuid);
+        populateRemainingFlowFileContent(connectionId, flowfileDto);
+
+        // create the revision
+        final RevisionDTO revision = new RevisionDTO();
+        revision.setClientId(clientId.getClientId());
+
+        // create the response entity
+        final FlowFileEntity entity = new FlowFileEntity();
+        entity.setRevision(revision);
+        entity.setFlowFile(flowfileDto);
+
+        return generateOkResponse(entity).build();
     }
 
     @DELETE
     @Consumes(MediaType.WILDCARD)
     @Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML})
-    @Path("/{connection-id}/flowfiles/{flow-file-uuid}")
-    public Response deleteFlowFile() {
+    @Path("/{connection-id}/flowfiles/{flowfile-uuid}")
+    @PreAuthorize("hasRole('ROLE_DFM')")
+    public Response deleteFlowFile(
+            @ApiParam(
+                value = "If the client id is not specified, new one will be 
generated. This value (whether specified or generated) is included in the 
response.",
+                required = false
+            )
+            @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) 
ClientIdParameter clientId,
+            @ApiParam(
+                value = "The connection id.",
+                required = true
+            )
+            @PathParam("connection-id") String connectionId,
+            @ApiParam(
+                value = "The flowfile uuid.",
+                required = true
+            )
+            @PathParam("flowfile-uuid") String flowFileUuid,
+            @ApiParam(
+                value = "The id of the node where the content exists if 
clustered.",
+                required = false
+            )
+            @QueryParam("clusterNodeId") String clusterNodeId) {
+
+        // replicate if cluster manager
+        if (properties.isClusterManager()) {
+            // determine where this request should be sent
+            if (clusterNodeId == null) {
+                throw new IllegalArgumentException("The id of the node in the 
cluster is required.");
+            } else {
+                // get the target node and ensure it exists
+                final Node targetNode = clusterManager.getNode(clusterNodeId);
+                if (targetNode == null) {
+                    throw new UnknownNodeException("The specified cluster node 
does not exist.");
+                }
+
+                final Set<NodeIdentifier> targetNodes = new HashSet<>();
+                targetNodes.add(targetNode.getNodeId());
+
+                // replicate the request to the specific node
+                return clusterManager.applyRequest(HttpMethod.DELETE, 
getAbsolutePath(), getRequestParameters(true), getHeaders(), 
targetNodes).getResponse();
+            }
+        }
+
         return null;
     }
 
     /**
+     * Gets the content for the specified flowfile in the specified connection.
+     *
+     * @param clientId Optional client id. If the client id is not specified, 
a new one will be generated. This value (whether specified or generated) is 
included in the response.
+     * @param connectionId The connection id
+     * @param flowFileUuid The flowfile uuid
+     * @param clusterNodeId The cluster node id
+     * @return The content stream
+     */
+    @GET
+    @Consumes(MediaType.WILDCARD)
+    @Produces(MediaType.WILDCARD)
+    @Path("/{connection-id}/flowfiles/{flowfile-uuid}/content")
+    @PreAuthorize("hasRole('ROLE_DFM')")
+    @ApiOperation(
+        value = "Gets the content for a FlowFile in a Connection.",
+        authorizations = {
+            @Authorization(value = "Data Flow Manager", type = "ROLE_DFM")
+        }
+    )
+    @ApiResponses(
+        value = {
+            @ApiResponse(code = 400, message = "NiFi was unable to complete 
the request because it was invalid. The request should not be retried without 
modification."),
+            @ApiResponse(code = 401, message = "Client could not be 
authenticated."),
+            @ApiResponse(code = 403, message = "Client is not authorized to 
make this request."),
+            @ApiResponse(code = 404, message = "The specified resource could 
not be found."),
+            @ApiResponse(code = 409, message = "The request was valid but NiFi 
was not in the appropriate state to process it. Retrying the same request later 
may be successful.")
+        }
+    )
+    public Response downloadFlowFileContent(
+            @ApiParam(
+                value = "If the client id is not specified, new one will be 
generated. This value (whether specified or generated) is included in the 
response.",
+                required = false
+            )
+            @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) 
ClientIdParameter clientId,
+            @ApiParam(
+                value = "The connection id.",
+                required = true
+            )
+            @PathParam("connection-id") String connectionId,
+            @ApiParam(
+                value = "The flowfile uuid.",
+                required = true
+            )
+            @PathParam("flowfile-uuid") String flowFileUuid,
+            @ApiParam(
+                value = "The id of the node where the content exists if 
clustered.",
+                required = false
+            )
+            @QueryParam("clusterNodeId") String clusterNodeId) {
+
+        // replicate if cluster manager
+        if (properties.isClusterManager()) {
+            // determine where this request should be sent
+            if (clusterNodeId == null) {
+                throw new IllegalArgumentException("The id of the node in the 
cluster is required.");
+            } else {
+                // get the target node and ensure it exists
+                final Node targetNode = clusterManager.getNode(clusterNodeId);
+                if (targetNode == null) {
+                    throw new UnknownNodeException("The specified cluster node 
does not exist.");
+                }
+
+                final Set<NodeIdentifier> targetNodes = new HashSet<>();
+                targetNodes.add(targetNode.getNodeId());
+
+                // replicate the request to the specific node
+                return clusterManager.applyRequest(HttpMethod.GET, 
getAbsolutePath(), getRequestParameters(true), getHeaders(), 
targetNodes).getResponse();
+            }
+        }
+
+        // get the uri of the request
+        final String uri = generateResourceUri("controller", "process-groups", 
groupId, "connections", connectionId, "flowfiles", flowFileUuid, "content");
+
+        // get an input stream to the content
+        final DownloadableContent content = serviceFacade.getContent(groupId, 
connectionId, flowFileUuid, uri);
+
+        // generate a streaming response
+        final StreamingOutput response = new StreamingOutput() {
+            @Override
+            public void write(OutputStream output) throws IOException, 
WebApplicationException {
+                try (InputStream is = content.getContent()) {
+                    // stream the content to the response
+                    StreamUtils.copy(is, output);
+
+                    // flush the response
+                    output.flush();
+                }
+            }
+        };
+
+        // use the appropriate content type
+        String contentType = content.getType();
+        if (contentType == null) {
+            contentType = MediaType.APPLICATION_OCTET_STREAM;
+        }
+
+        return 
generateOkResponse(response).type(contentType).header("Content-Disposition", 
String.format("attachment; filename=\"%s\"", content.getFilename())).build();
+    }
+
+    /**
      * Drops the flowfiles in the queue of the specified connection. This 
endpoint is DEPRECATED. Please use
      * POST 
/nifi-api/controller/process-groups/{process-group-id}/connections/{connection-id}/drop-requests
 instead.
      *

http://git-wip-us.apache.org/repos/asf/nifi/blob/4cc1fa85/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
index b93ff95..c32ad3a 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
@@ -78,6 +78,7 @@ import org.apache.nifi.controller.label.Label;
 import org.apache.nifi.controller.queue.FlowFileSummary;
 import org.apache.nifi.controller.queue.ListFlowFileState;
 import org.apache.nifi.controller.queue.ListFlowFileStatus;
+import org.apache.nifi.controller.repository.FlowFileRecord;
 import org.apache.nifi.controller.status.ConnectionStatus;
 import org.apache.nifi.controller.status.PortStatus;
 import org.apache.nifi.controller.status.ProcessGroupStatus;
@@ -87,6 +88,7 @@ import org.apache.nifi.diagnostics.GarbageCollection;
 import org.apache.nifi.diagnostics.StorageUsage;
 import org.apache.nifi.diagnostics.SystemDiagnostics;
 import org.apache.nifi.flowfile.FlowFilePrioritizer;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.groups.ProcessGroup;
 import org.apache.nifi.groups.ProcessGroupCounts;
 import org.apache.nifi.groups.RemoteProcessGroup;
@@ -391,6 +393,18 @@ public final class DtoFactory {
         return dto;
     }
 
+    public FlowFileDTO createFlowFileDTO(final FlowFileRecord record) {
+        final FlowFileDTO dto = new FlowFileDTO();
+        dto.setUuid(record.getAttribute(CoreAttributes.UUID.key()));
+        dto.setFilename(record.getAttribute(CoreAttributes.FILENAME.key()));
+        dto.setLastQueuedTime(new Date(record.getLastQueueDate()));
+        dto.setLinageStartDate(new Date(record.getLineageStartDate()));
+        dto.setPenalized(record.isPenalized());
+        dto.setSize(record.getSize());
+        dto.setAttributes(record.getAttributes());
+        return dto;
+    }
+
     /**
      * Creates a ConnectionDTO from the specified Connection.
      *

http://git-wip-us.apache.org/repos/asf/nifi/blob/4cc1fa85/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ConnectionDAO.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ConnectionDAO.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ConnectionDAO.java
index 642c47e..3884f51 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ConnectionDAO.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ConnectionDAO.java
@@ -20,6 +20,8 @@ import java.util.Set;
 import org.apache.nifi.connectable.Connection;
 import org.apache.nifi.controller.queue.DropFlowFileStatus;
 import org.apache.nifi.controller.queue.ListFlowFileStatus;
+import org.apache.nifi.controller.repository.FlowFileRecord;
+import org.apache.nifi.web.DownloadableContent;
 import org.apache.nifi.web.api.dto.ConnectionDTO;
 
 public interface ConnectionDAO {
@@ -44,7 +46,7 @@ public interface ConnectionDAO {
     DropFlowFileStatus getFlowFileDropRequest(String groupId, String id, 
String dropRequestId);
 
     /**
-     * Gets the specified flow file listing request.
+     * Gets the specified flowfile listing request.
      *
      * @param groupId group id
      * @param id connection id
@@ -54,6 +56,16 @@ public interface ConnectionDAO {
     ListFlowFileStatus getFlowFileListingRequest(String groupId, String id, 
String listingRequestId);
 
     /**
+     * Gets the specified flowfile in the specified connection.
+     *
+     * @param groupId group id
+     * @param id connection id
+     * @param flowFileUuid the flowfile uuid
+     * @return The flowfile
+     */
+    FlowFileRecord getFlowFile(String groupId, String id, String flowFileUuid);
+
+    /**
      * Gets the connections for the specified source processor.
      *
      * @param groupId group id
@@ -168,4 +180,15 @@ public interface ConnectionDAO {
      * @return The listing request status
      */
     ListFlowFileStatus deleteFlowFileListingRequest(String groupId, String id, 
String listingRequestId);
+
+    /**
+     * Gets the content for the specified flowfile in the specified connection.
+     *
+     * @param groupId group id
+     * @param id connection id
+     * @param flowfileUuid flowfile uuid
+     * @param requestUri request uri
+     * @return The downloadable content
+     */
+    DownloadableContent getContent(String groupId, String id, String 
flowfileUuid, String requestUri);
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/4cc1fa85/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectionDAO.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectionDAO.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectionDAO.java
index 0e9a90a..ea9c1d7 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectionDAO.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectionDAO.java
@@ -16,15 +16,20 @@
  */
 package org.apache.nifi.web.dao.impl;
 
+import java.io.IOException;
+import java.io.InputStream;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.regex.Matcher;
 import javax.ws.rs.WebApplicationException;
 
+import org.apache.nifi.admin.service.UserService;
+import org.apache.nifi.authorization.DownloadAuthorization;
 import org.apache.nifi.connectable.Connectable;
 import org.apache.nifi.connectable.ConnectableType;
 import org.apache.nifi.connectable.Connection;
@@ -35,6 +40,9 @@ import 
org.apache.nifi.controller.exception.ValidationException;
 import org.apache.nifi.controller.queue.DropFlowFileStatus;
 import org.apache.nifi.controller.queue.FlowFileQueue;
 import org.apache.nifi.controller.queue.ListFlowFileStatus;
+import org.apache.nifi.controller.repository.ContentNotFoundException;
+import org.apache.nifi.controller.repository.FlowFileRecord;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.groups.ProcessGroup;
 import org.apache.nifi.groups.RemoteProcessGroup;
 import org.apache.nifi.flowfile.FlowFilePrioritizer;
@@ -42,16 +50,24 @@ import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.remote.RemoteGroupPort;
 import org.apache.nifi.user.NiFiUser;
 import org.apache.nifi.util.FormatUtils;
+import org.apache.nifi.web.DownloadableContent;
 import org.apache.nifi.web.ResourceNotFoundException;
 import org.apache.nifi.web.api.dto.ConnectableDTO;
 import org.apache.nifi.web.api.dto.ConnectionDTO;
 import org.apache.nifi.web.api.dto.PositionDTO;
 import org.apache.nifi.web.dao.ConnectionDAO;
+import org.apache.nifi.web.security.ProxiedEntitiesUtils;
 import org.apache.nifi.web.security.user.NiFiUserUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.security.access.AccessDeniedException;
 
 public class StandardConnectionDAO extends ComponentDAO implements 
ConnectionDAO {
 
+    private static final Logger logger = 
LoggerFactory.getLogger(StandardConnectionDAO.class);
+
     private FlowController flowController;
+    private UserService userService;
 
     private Connection locateConnection(final String groupId, final String id) 
{
         return locateConnection(locateProcessGroup(flowController, groupId), 
id);
@@ -101,6 +117,19 @@ public class StandardConnectionDAO extends ComponentDAO 
implements ConnectionDAO
     }
 
     @Override
+    public FlowFileRecord getFlowFile(String groupId, String id, String 
flowFileUuid) {
+        final Connection connection = locateConnection(groupId, id);
+        final FlowFileQueue queue = connection.getFlowFileQueue();
+        final FlowFileRecord flowFile = queue.getFlowFile(flowFileUuid);
+
+        if (flowFile == null) {
+            throw new ResourceNotFoundException(String.format("Unable to find 
FlowFile '%s' in Connection '%s'.", flowFileUuid, id));
+        }
+
+        return flowFile;
+    }
+
+    @Override
     public Set<Connection> getConnectionsForSource(final String groupId, final 
String processorId) {
         final Set<Connection> connections = new 
HashSet<>(getConnections(groupId));
         for (final Iterator<Connection> connectionIter = 
connections.iterator(); connectionIter.hasNext();) {
@@ -542,8 +571,58 @@ public class StandardConnectionDAO extends ComponentDAO 
implements ConnectionDAO
         return listFlowFileStatus;
     }
 
+    @Override
+    public DownloadableContent getContent(String groupId, String id, String 
flowFileUuid, String requestUri) {
+        try {
+            final NiFiUser user = NiFiUserUtils.getNiFiUser();
+            if (user == null) {
+                throw new WebApplicationException(new Throwable("Unable to 
access details for current user."));
+            }
+
+            final Connection connection = locateConnection(groupId, id);
+            final FlowFileQueue queue = connection.getFlowFileQueue();
+            final FlowFileRecord flowFile = queue.getFlowFile(flowFileUuid);
+
+            if (flowFile == null) {
+                throw new ResourceNotFoundException(String.format("Unable to 
find FlowFile '%s' in Connection '%s'.", flowFileUuid, id));
+            }
+
+            // calculate the dn chain
+            final List<String> dnChain = 
ProxiedEntitiesUtils.buildProxiedEntitiesChain(user);
+
+            // ensure the users in this chain are allowed to download this 
content
+            final Map<String, String> attributes = flowFile.getAttributes();
+            final DownloadAuthorization downloadAuthorization = 
userService.authorizeDownload(dnChain, attributes);
+            if (!downloadAuthorization.isApproved()) {
+                throw new 
AccessDeniedException(downloadAuthorization.getExplanation());
+            }
+
+            // get the filename and fall back to the identifier (should never 
happen)
+            String filename = attributes.get(CoreAttributes.FILENAME.key());
+            if (filename == null) {
+                filename = flowFileUuid;
+            }
+
+            // get the mime-type
+            final String type = attributes.get(CoreAttributes.MIME_TYPE.key());
+
+            // get the content
+            final InputStream content = flowController.getContent(flowFile, 
user.getIdentity(), requestUri);
+            return new DownloadableContent(filename, type, content);
+        } catch (final ContentNotFoundException cnfe) {
+            throw new ResourceNotFoundException("Unable to find the specified 
content.");
+        } catch (final IOException ioe) {
+            logger.error(String.format("Unable to get the content for flowfile 
(%s) at this time.", flowFileUuid), ioe);
+            throw new IllegalStateException("Unable to get the content at this 
time.");
+        }
+    }
+
     /* setters */
     public void setFlowController(final FlowController flowController) {
         this.flowController = flowController;
     }
+
+    public void setUserService(UserService userService) {
+        this.userService = userService;
+    }
 }

Reply via email to