Repository: nifi Updated Branches: refs/heads/NIFI-108 2e22954cd -> e762d3c7d
NIFI-108: - Starting to add support for endpoints that will listing flowfiles in a queue. Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/e762d3c7 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/e762d3c7 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/e762d3c7 Branch: refs/heads/NIFI-108 Commit: e762d3c7de8f16255250a73470a6338b73abcd4c Parents: 2e22954 Author: Matt Gilman <[email protected]> Authored: Thu Dec 17 13:03:25 2015 -0500 Committer: Matt Gilman <[email protected]> Committed: Thu Dec 17 13:03:25 2015 -0500 ---------------------------------------------------------------------- .../apache/nifi/web/api/dto/DropRequestDTO.java | 11 +- .../nifi/web/api/dto/FlowFileSummaryDTO.java | 134 +++++++++ .../nifi/web/api/dto/ListingRequestDTO.java | 169 +++++++++++ .../web/api/entity/ListingRequestEntity.java | 44 +++ .../org/apache/nifi/web/NiFiServiceFacade.java | 31 ++ .../nifi/web/StandardNiFiServiceFacade.java | 18 +- .../apache/nifi/web/api/ConnectionResource.java | 301 ++++++++++++++++++- .../org/apache/nifi/web/api/dto/DtoFactory.java | 46 +++ .../org/apache/nifi/web/dao/ConnectionDAO.java | 33 +- .../web/dao/impl/StandardConnectionDAO.java | 36 ++- 10 files changed, 807 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/e762d3c7/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/DropRequestDTO.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/DropRequestDTO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/DropRequestDTO.java index c0b94a1..0cf48f3 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/DropRequestDTO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/DropRequestDTO.java @@ -51,12 +51,12 @@ public class DropRequestDTO { private String state; /** - * The id for this component. + * The id for this drop request. * * @return The id */ @ApiModelProperty( - value = "The id of the component." + value = "The id for this drop request." ) public String getId() { return this.id; @@ -67,12 +67,12 @@ public class DropRequestDTO { } /** - * The uri for linking to this component in this NiFi. + * The uri for linking to this drop request in this NiFi. * * @return The uri */ @ApiModelProperty( - value = "The URI for futures requests to the component." + value = "The URI for futures requests to this drop request." ) public String getUri() { return uri; @@ -128,6 +128,9 @@ public class DropRequestDTO { /** * @return the reason, if any, that this drop request failed */ + @ApiModelProperty( + value = "The reason, if any, that this drop request failed." + ) public String getFailureReason() { return failureReason; } http://git-wip-us.apache.org/repos/asf/nifi/blob/e762d3c7/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/FlowFileSummaryDTO.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/FlowFileSummaryDTO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/FlowFileSummaryDTO.java new file mode 100644 index 0000000..accb512 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/FlowFileSummaryDTO.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.web.api.dto; + +import com.wordnik.swagger.annotations.ApiModelProperty; +import org.apache.nifi.web.api.dto.util.TimestampAdapter; + +import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter; +import java.util.Date; + +public class FlowFileSummaryDTO { + + private String uuid; + private String filename; + private Integer position; + private Long size; + private Date lastQueuedTime; + private Date linageStartDate; + private Boolean isPenalized; + + /** + * @return the FlowFile uuid + */ + @ApiModelProperty( + value = "The FlowFile UUID." + ) + public String getUuid() { + return uuid; + } + + public void setUuid(String uuid) { + this.uuid = uuid; + } + + /** + * @return the FlowFile filename + */ + @ApiModelProperty( + value = "The FlowFile filename." + ) + public String getFilename() { + return filename; + } + + public void setFilename(String filename) { + this.filename = filename; + } + + /** + * @return the FlowFile's position in the queue. + */ + @ApiModelProperty( + value = "The FlowFile's position in the queue." + ) + public Integer getPosition() { + return position; + } + + public void setPosition(Integer position) { + this.position = position; + } + + /** + * @return the FlowFile file size + */ + @ApiModelProperty( + value = "The FlowFile file size." + ) + public Long getSize() { + return size; + } + + public void setSize(Long size) { + this.size = size; + } + + /** + * @return when the FlowFile was last added to the queue + */ + @XmlJavaTypeAdapter(TimestampAdapter.class) + @ApiModelProperty( + value = "When the FlowFile was last added to the queue." + ) + public Date getLastQueuedTime() { + return lastQueuedTime; + } + + public void setLastQueuedTime(Date lastQueuedTime) { + this.lastQueuedTime = lastQueuedTime; + } + + /** + * @return when the FlowFile's greatest ancestor entered the flow + */ + @XmlJavaTypeAdapter(TimestampAdapter.class) + @ApiModelProperty( + value = "When the FlowFile's greatest ancestor entered the flow." + ) + public Date getLinageStartDate() { + return linageStartDate; + } + + public void setLinageStartDate(Date linageStartDate) { + this.linageStartDate = linageStartDate; + } + + /** + * @return if the FlowFile is penalized + */ + @ApiModelProperty( + value = "If the FlowFile is penalized." + ) + public Boolean getPenalized() { + return isPenalized; + } + + public void setPenalized(Boolean penalized) { + isPenalized = penalized; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/e762d3c7/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ListingRequestDTO.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ListingRequestDTO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ListingRequestDTO.java new file mode 100644 index 0000000..53c2a74 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ListingRequestDTO.java @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.web.api.dto; + +import com.wordnik.swagger.annotations.ApiModelProperty; +import org.apache.nifi.web.api.dto.util.TimestampAdapter; + +import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter; +import java.util.Date; +import java.util.List; + +public class ListingRequestDTO { + + private String id; + private String uri; + + private Date submissionTime; + private Date lastUpdated; + + private Integer percentCompleted; + private Boolean finished; + private String failureReason; + + private String state; + + private List<FlowFileSummaryDTO> flowFileSummaries; + + /** + * @return the id for this listing request. + */ + @ApiModelProperty( + value = "The id for this listing request." + ) + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + + /** + * @return the URI for this listing request. + */ + @ApiModelProperty( + value = "The URI for futures requests to this listing request." + ) + public String getUri() { + return uri; + } + + public void setUri(String uri) { + this.uri = uri; + } + + /** + * @return time the query was submitted + */ + @XmlJavaTypeAdapter(TimestampAdapter.class) + @ApiModelProperty( + value = "The timestamp when the query was submitted." + ) + public Date getSubmissionTime() { + return submissionTime; + } + + public void setSubmissionTime(Date submissionTime) { + this.submissionTime = submissionTime; + } + + /** + * @return the time this request was last updated + */ + @XmlJavaTypeAdapter(TimestampAdapter.class) + @ApiModelProperty( + value = "The last time this listing request was updated." + ) + public Date getLastUpdated() { + return lastUpdated; + } + + public void setLastUpdated(Date lastUpdated) { + this.lastUpdated = lastUpdated; + } + + /** + * @return percent completed + */ + @ApiModelProperty( + value = "The current percent complete." + ) + public Integer getPercentCompleted() { + return percentCompleted; + } + + public void setPercentCompleted(Integer percentCompleted) { + this.percentCompleted = percentCompleted; + } + + /** + * @return whether the query has finished + */ + @ApiModelProperty( + value = "Whether the query has finished." + ) + public Boolean getFinished() { + return finished; + } + + public void setFinished(Boolean finished) { + this.finished = finished; + } + + /** + * @return the reason, if any, that this listing request failed + */ + @ApiModelProperty( + value = "The reason, if any, that this listing request failed." + ) + public String getFailureReason() { + return failureReason; + } + + public void setFailureReason(String failureReason) { + this.failureReason = failureReason; + } + + /** + * @return the current state of the listing request. + */ + @ApiModelProperty( + value = "The current state of the listing request." + ) + public String getState() { + return state; + } + + public void setState(String state) { + this.state = state; + } + + /** + * @return the FlowFile summaries. + */ + @ApiModelProperty( + value = "The FlowFile summaries. The summaries will be populated once the request has completed." + ) + public List<FlowFileSummaryDTO> getFlowFileSummaries() { + return flowFileSummaries; + } + + public void setFlowFileSummaries(List<FlowFileSummaryDTO> flowFileSummaries) { + this.flowFileSummaries = flowFileSummaries; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/e762d3c7/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ListingRequestEntity.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ListingRequestEntity.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ListingRequestEntity.java new file mode 100644 index 0000000..5fee5c9 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ListingRequestEntity.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.web.api.entity; + +import org.apache.nifi.web.api.dto.ListingRequestDTO; + +import javax.xml.bind.annotation.XmlRootElement; + +/** + * A serialized representation of this class can be placed in the entity body of a request or response to or from the API. This particular entity holds a reference to a ListingRequestDTO. + */ +@XmlRootElement(name = "listingRequestEntity") +public class ListingRequestEntity extends Entity { + + private ListingRequestDTO listingRequest; + + /** + * The ListingRequestDTO that is being serialized. + * + * @return The ListingRequestDTO object + */ + public ListingRequestDTO getListingRequest() { + return listingRequest; + } + + public void setListingRequest(ListingRequestDTO listingRequest) { + this.listingRequest = listingRequest; + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/e762d3c7/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java index 73d76bd..ef5f202 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java @@ -36,6 +36,7 @@ import org.apache.nifi.web.api.dto.DocumentedTypeDTO; import org.apache.nifi.web.api.dto.FlowSnippetDTO; import org.apache.nifi.web.api.dto.FunnelDTO; import org.apache.nifi.web.api.dto.LabelDTO; +import org.apache.nifi.web.api.dto.ListingRequestDTO; import org.apache.nifi.web.api.dto.NodeDTO; import org.apache.nifi.web.api.dto.NodeSystemDiagnosticsDTO; import org.apache.nifi.web.api.dto.PortDTO; @@ -556,6 +557,36 @@ public interface NiFiServiceFacade { */ DropRequestDTO deleteFlowFileDropRequest(String groupId, String connectionId, String dropRequestId); + /** + * Creates a new flow file listing request. + * + * @param groupId group + * @param connectionId The ID of the connection + * @param listingRequestId The ID of the listing request + * @return The ListingRequest + */ + ListingRequestDTO createFlowFileListingRequest(String groupId, String connectionId, String listingRequestId); + + /** + * Gets a new flow file listing request. + * + * @param groupId group + * @param connectionId The ID of the connection + * @param listingRequestId The ID of the listing request + * @return The ListingRequest + */ + ListingRequestDTO getFlowFileListingRequest(String groupId, String connectionId, String listingRequestId); + + /** + * Deletes a new flow file listing request. + * + * @param groupId group + * @param connectionId The ID of the connection + * @param listingRequestId The ID of the listing request + * @return The ListingRequest + */ + ListingRequestDTO deleteFlowFileListingRequest(String groupId, String connectionId, String listingRequestId); + // ---------------------------------------- // InputPort methods // ---------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/e762d3c7/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java index e7a3328..8b0ef37 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java @@ -80,6 +80,7 @@ import org.apache.nifi.remote.RootGroupPort; import org.apache.nifi.reporting.Bulletin; import org.apache.nifi.reporting.BulletinQuery; import org.apache.nifi.reporting.BulletinRepository; +import org.apache.nifi.web.api.dto.ListingRequestDTO; import org.apache.nifi.web.security.user.NiFiUserUtils; import org.apache.nifi.user.AccountStatus; import org.apache.nifi.user.NiFiUser; @@ -817,6 +818,11 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { } @Override + public ListingRequestDTO deleteFlowFileListingRequest(String groupId, String connectionId, String listingRequestId) { + return dtoFactory.createListingRequestDTO(connectionDAO.deleteFlowFileListingRequest(groupId, connectionId, listingRequestId)); + } + + @Override public ConfigurationSnapshot<Void> deleteProcessor(final Revision revision, final String groupId, final String processorId) { return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<Void>() { @Override @@ -1069,7 +1075,12 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { @Override public DropRequestDTO createFlowFileDropRequest(String groupId, String connectionId, String dropRequestId) { - return dtoFactory.createDropRequestDTO(connectionDAO.createFileFlowDropRequest(groupId, connectionId, dropRequestId)); + return dtoFactory.createDropRequestDTO(connectionDAO.createFlowFileDropRequest(groupId, connectionId, dropRequestId)); + } + + @Override + public ListingRequestDTO createFlowFileListingRequest(String groupId, String connectionId, String listingRequestId) { + return dtoFactory.createListingRequestDTO(connectionDAO.createFlowFileListingRequest(groupId, connectionId, listingRequestId)); } @Override @@ -2127,6 +2138,11 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { } @Override + public ListingRequestDTO getFlowFileListingRequest(String groupId, String connectionId, String listingRequestId) { + return dtoFactory.createListingRequestDTO(connectionDAO.getFlowFileListingRequest(groupId, connectionId, listingRequestId)); + } + + @Override public StatusHistoryDTO getConnectionStatusHistory(String groupId, String connectionId) { return controllerFacade.getConnectionStatusHistory(groupId, connectionId); } http://git-wip-us.apache.org/repos/asf/nifi/blob/e762d3c7/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectionResource.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectionResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectionResource.java index 6741348..8ee765c 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectionResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectionResource.java @@ -58,14 +58,15 @@ import org.apache.nifi.web.ConfigurationSnapshot; import org.apache.nifi.web.IllegalClusterResourceRequestException; import org.apache.nifi.web.NiFiServiceFacade; import org.apache.nifi.web.Revision; -import static org.apache.nifi.web.api.ApplicationResource.CLIENT_ID; import org.apache.nifi.web.api.dto.ConnectableDTO; import org.apache.nifi.web.api.dto.ConnectionDTO; +import org.apache.nifi.web.api.dto.ListingRequestDTO; import org.apache.nifi.web.api.dto.PositionDTO; import org.apache.nifi.web.api.dto.RevisionDTO; import org.apache.nifi.web.api.dto.status.StatusHistoryDTO; import org.apache.nifi.web.api.entity.ConnectionEntity; import org.apache.nifi.web.api.entity.ConnectionsEntity; +import org.apache.nifi.web.api.entity.ListingRequestEntity; import org.apache.nifi.web.api.entity.StatusHistoryEntity; import org.apache.nifi.web.api.request.ClientIdParameter; import org.apache.nifi.web.api.request.ConnectableTypeParameter; @@ -839,7 +840,7 @@ public class ConnectionResource extends ApplicationResource { @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.") } ) - public Response deleteRelationshipTarget( + public Response deleteConnection( @Context HttpServletRequest httpServletRequest, @ApiParam( value = "The revision is used to verify the client is working with the latest version of the flow.", @@ -891,8 +892,25 @@ public class ConnectionResource extends ApplicationResource { return clusterContext(generateOkResponse(entity)).build(); } + @GET + @Consumes(MediaType.WILDCARD) + @Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML}) + @Path("/{connection-id}/flowfiles/{flow-file-uuid}") + public Response getFlowFile() { + return null; + } + + @DELETE + @Consumes(MediaType.WILDCARD) + @Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML}) + @Path("/{connection-id}/flowfiles/{flow-file-uuid}") + public Response deleteFlowFile() { + return null; + } + /** - * Drops the flowfiles in the queue of the specified connection. + * Drops the flowfiles in the queue of the specified connection. This endpoint is DEPRECATED. Please use + * POST /nifi-api/controller/process-groups/{process-group-id}/connections/{connection-id}/drop-requests instead. * * @param httpServletRequest request * @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response. @@ -906,6 +924,7 @@ public class ConnectionResource extends ApplicationResource { @PreAuthorize("hasRole('ROLE_DFM')") @ApiOperation( value = "Drops the contents of the queue in this connection.", + notes = "This endpoint is DEPRECATED. Please use POST /nifi-api/controller/process-groups/{process-group-id}/connections/{connection-id}/drop-requests instead.", response = DropRequestEntity.class, authorizations = { @Authorization(value = "Data Flow Manager", type = "ROLE_DFM") @@ -913,6 +932,7 @@ public class ConnectionResource extends ApplicationResource { ) @ApiResponses( value = { + @ApiResponse(code = 202, message = "The request has been accepted. A HTTP response header will contain the URI where the response can be polled."), @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), @ApiResponse(code = 401, message = "Client could not be authenticated."), @ApiResponse(code = 403, message = "Client is not authorized to make this request."), @@ -920,6 +940,7 @@ public class ConnectionResource extends ApplicationResource { @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.") } ) + @Deprecated public Response dropQueueContents( @Context HttpServletRequest httpServletRequest, @ApiParam( @@ -933,6 +954,272 @@ public class ConnectionResource extends ApplicationResource { ) @PathParam("connection-id") String id) { + // defer to the new endpoint that references /drop-requests in the URI + return createDropRequest(httpServletRequest, clientId, id); + } + + /** + * Creates a request to list the flowfiles in the queue of the specified connection. + * + * @param httpServletRequest request + * @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response. + * @param id The id of the connection + * @return A listRequestEntity + */ + @POST + @Consumes(MediaType.WILDCARD) + @Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML}) + @Path("/{connection-id}/listing-requests") + @PreAuthorize("hasRole('ROLE_DFM')") + @ApiOperation( + value = "Lists the contents of the queue in this connection.", + response = ListingRequestEntity.class, + authorizations = { + @Authorization(value = "Data Flow Manager", type = "ROLE_DFM") + } + ) + @ApiResponses( + value = { + @ApiResponse(code = 202, message = "The request has been accepted. A HTTP response header will contain the URI where the response can be polled."), + @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), + @ApiResponse(code = 401, message = "Client could not be authenticated."), + @ApiResponse(code = 403, message = "Client is not authorized to make this request."), + @ApiResponse(code = 404, message = "The specified resource could not be found."), + @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.") + } + ) + public Response getFlowFileListing( + @Context HttpServletRequest httpServletRequest, + @ApiParam( + value = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response.", + required = false + ) + @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId, + @ApiParam( + value = "The connection id.", + required = true + ) + @PathParam("connection-id") String id) { + + // replicate if cluster manager + if (properties.isClusterManager()) { + return clusterManager.applyRequest(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse(); + } + + // ensure the id is the same across the cluster + final String listingRequestId; + final ClusterContext clusterContext = ClusterContextThreadLocal.getContext(); + if (clusterContext != null) { + listingRequestId = UUID.nameUUIDFromBytes(clusterContext.getIdGenerationSeed().getBytes(StandardCharsets.UTF_8)).toString(); + } else { + listingRequestId = UUID.randomUUID().toString(); + } + + // submit the listing request + final ListingRequestDTO listRequest = serviceFacade.createFlowFileListingRequest(groupId, id, listingRequestId); + listRequest.setUri(generateResourceUri("controller", "process-groups", groupId, "connections", id, "listing-requests", listRequest.getId())); + + // create the revision + final RevisionDTO revision = new RevisionDTO(); + revision.setClientId(clientId.getClientId()); + + // create the response entity + final ListingRequestEntity entity = new ListingRequestEntity(); + entity.setRevision(revision); + entity.setListingRequest(listRequest); + + // generate the URI where the response will be + final URI location = URI.create(listRequest.getUri()); + return Response.status(Status.ACCEPTED).location(location).entity(entity).build(); + } + + /** + * Checks the status of an outstanding listing request. + * + * @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response. + * @param connectionId The id of the connection + * @param listingRequestId The id of the drop request + * @return A dropRequestEntity + */ + @GET + @Consumes(MediaType.WILDCARD) + @Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML}) + @Path("/{connection-id}/listing-requests/{listing-request-id}") + @PreAuthorize("hasRole('ROLE_DFM')") + @ApiOperation( + value = "Gets the current status of a listing request for the specified connection.", + response = ListingRequestEntity.class, + authorizations = { + @Authorization(value = "Data Flow Manager", type = "ROLE_DFM") + } + ) + @ApiResponses( + value = { + @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), + @ApiResponse(code = 401, message = "Client could not be authenticated."), + @ApiResponse(code = 403, message = "Client is not authorized to make this request."), + @ApiResponse(code = 404, message = "The specified resource could not be found."), + @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.") + } + ) + public Response getListingRequest( + @ApiParam( + value = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response.", + required = false + ) + @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId, + @ApiParam( + value = "The connection id.", + required = true + ) + @PathParam("connection-id") String connectionId, + @ApiParam( + value = "The listing request id.", + required = true + ) + @PathParam("listing-request-id") String listingRequestId) { + + // replicate if cluster manager + if (properties.isClusterManager()) { + return clusterManager.applyRequest(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse(); + } + + // get the listing request + final ListingRequestDTO listingRequest = serviceFacade.getFlowFileListingRequest(groupId, connectionId, listingRequestId); + listingRequest.setUri(generateResourceUri("controller", "process-groups", groupId, "connections", connectionId, "listing-requests", listingRequestId)); + + // create the revision + final RevisionDTO revision = new RevisionDTO(); + revision.setClientId(clientId.getClientId()); + + // create the response entity + final ListingRequestEntity entity = new ListingRequestEntity(); + entity.setRevision(revision); + entity.setListingRequest(listingRequest); + + return generateOkResponse(entity).build(); + } + + /** + * Deletes the specified listing request. + * + * @param httpServletRequest request + * @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response. + * @param connectionId The connection id + * @param listingRequestId The drop request id + * @return A dropRequestEntity + */ + @DELETE + @Consumes(MediaType.WILDCARD) + @Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML}) + @Path("/{connection-id}/listing-requests/{listing-request-id}") + @ApiOperation( + value = "Cancels and/or removes a request to list the contents of this connection.", + response = DropRequestEntity.class, + authorizations = { + @Authorization(value = "Data Flow Manager", type = "ROLE_DFM") + } + ) + @ApiResponses( + value = { + @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), + @ApiResponse(code = 401, message = "Client could not be authenticated."), + @ApiResponse(code = 403, message = "Client is not authorized to make this request."), + @ApiResponse(code = 404, message = "The specified resource could not be found."), + @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.") + } + ) + public Response deleteListingRequest( + @Context HttpServletRequest httpServletRequest, + @ApiParam( + value = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response.", + required = false + ) + @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId, + @ApiParam( + value = "The connection id.", + required = true + ) + @PathParam("connection-id") String connectionId, + @ApiParam( + value = "The listing request id.", + required = true + ) + @PathParam("listing-request-id") String listingRequestId) { + + // replicate if cluster manager + if (properties.isClusterManager()) { + return clusterManager.applyRequest(HttpMethod.DELETE, getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse(); + } + + // handle expects request (usually from the cluster manager) + final String expects = httpServletRequest.getHeader(WebClusterManager.NCM_EXPECTS_HTTP_HEADER); + if (expects != null) { + return generateContinueResponse().build(); + } + + // delete the listing request + final ListingRequestDTO dropRequest = serviceFacade.deleteFlowFileListingRequest(groupId, connectionId, listingRequestId); + dropRequest.setUri(generateResourceUri("controller", "process-groups", groupId, "connections", connectionId, "listing-requests", listingRequestId)); + + // prune the results as they were already received when the listing completed + dropRequest.setFlowFileSummaries(null); + + // create the revision + final RevisionDTO revision = new RevisionDTO(); + revision.setClientId(clientId.getClientId()); + + // create the response entity + final ListingRequestEntity entity = new ListingRequestEntity(); + entity.setRevision(revision); + entity.setListingRequest(dropRequest); + + return generateOkResponse(entity).build(); + } + + /** + * Creates a request to delete the flowfiles in the queue of the specified connection. + * + * @param httpServletRequest request + * @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response. + * @param id The id of the connection + * @return A dropRequestEntity + */ + @POST + @Consumes(MediaType.WILDCARD) + @Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML}) + @Path("/{connection-id}/drop-requests") + @PreAuthorize("hasRole('ROLE_DFM')") + @ApiOperation( + value = "Creates a request to drop the contents of the queue in this connection.", + response = DropRequestEntity.class, + authorizations = { + @Authorization(value = "Data Flow Manager", type = "ROLE_DFM") + } + ) + @ApiResponses( + value = { + @ApiResponse(code = 202, message = "The request has been accepted. A HTTP response header will contain the URI where the response can be polled."), + @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), + @ApiResponse(code = 401, message = "Client could not be authenticated."), + @ApiResponse(code = 403, message = "Client is not authorized to make this request."), + @ApiResponse(code = 404, message = "The specified resource could not be found."), + @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.") + } + ) + public Response createDropRequest( + @Context HttpServletRequest httpServletRequest, + @ApiParam( + value = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response.", + required = false + ) + @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId, + @ApiParam( + value = "The connection id.", + required = true + ) + @PathParam("connection-id") String id) { + // replicate if cluster manager if (properties.isClusterManager()) { return clusterManager.applyRequest(HttpMethod.DELETE, getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse(); @@ -968,11 +1255,7 @@ public class ConnectionResource extends ApplicationResource { // generate the URI where the response will be final URI location = URI.create(dropRequest.getUri()); - if (dropRequest.isFinished()) { - return generateCreatedResponse(location, entity).build(); - } else { - return Response.status(Status.ACCEPTED).location(location).entity(entity).build(); - } + return Response.status(Status.ACCEPTED).location(location).entity(entity).build(); } /** @@ -1057,7 +1340,7 @@ public class ConnectionResource extends ApplicationResource { @Path("/{connection-id}/drop-requests/{drop-request-id}") @PreAuthorize("hasRole('ROLE_DFM')") @ApiOperation( - value = "Cancels and/or removes a request drop of the contents in this connection.", + value = "Cancels and/or removes a request to drop the contents of this connection.", response = DropRequestEntity.class, authorizations = { @Authorization(value = "Data Flow Manager", type = "ROLE_DFM") http://git-wip-us.apache.org/repos/asf/nifi/blob/e762d3c7/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java index f26d1b7..b93ff95 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java @@ -75,6 +75,9 @@ import org.apache.nifi.controller.ProcessorNode; import org.apache.nifi.controller.Snippet; import org.apache.nifi.controller.Template; import org.apache.nifi.controller.label.Label; +import org.apache.nifi.controller.queue.FlowFileSummary; +import org.apache.nifi.controller.queue.ListFlowFileState; +import org.apache.nifi.controller.queue.ListFlowFileStatus; import org.apache.nifi.controller.status.ConnectionStatus; import org.apache.nifi.controller.status.PortStatus; import org.apache.nifi.controller.status.ProcessGroupStatus; @@ -345,6 +348,49 @@ public final class DtoFactory { return dto; } + private boolean isListingRequestComplete(final ListFlowFileState state) { + return ListFlowFileState.COMPLETE.equals(state) || ListFlowFileState.CANCELED.equals(state) || ListFlowFileState.FAILURE.equals(state); + } + + public ListingRequestDTO createListingRequestDTO(final ListFlowFileStatus listingRequest) { + final ListingRequestDTO dto = new ListingRequestDTO(); + dto.setId(listingRequest.getRequestIdentifier()); + dto.setSubmissionTime(new Date(listingRequest.getRequestSubmissionTime())); + dto.setLastUpdated(new Date(listingRequest.getLastUpdated())); + dto.setState(listingRequest.getState().toString()); + dto.setFailureReason(listingRequest.getFailureReason()); + dto.setFinished(isListingRequestComplete(listingRequest.getState())); + + if (isListingRequestComplete(listingRequest.getState())) { + dto.setPercentCompleted(100); + + final List<FlowFileSummary> flowFileSummaries = listingRequest.getFlowFileSummaries(); + if (flowFileSummaries != null) { + final List<FlowFileSummaryDTO> summaryDtos = new ArrayList<>(flowFileSummaries.size()); + for (final FlowFileSummary summary : flowFileSummaries) { + summaryDtos.add(createFlowFileSummaryDTO(summary)); + } + dto.setFlowFileSummaries(summaryDtos); + } + } else { + dto.setPercentCompleted(50); + } + + return dto; + } + + public FlowFileSummaryDTO createFlowFileSummaryDTO(final FlowFileSummary summary) { + final FlowFileSummaryDTO dto = new FlowFileSummaryDTO(); + dto.setUuid(summary.getUuid()); + dto.setFilename(summary.getFilename()); + dto.setLastQueuedTime(new Date(summary.lastQueuedTime())); + dto.setLinageStartDate(new Date(summary.getLineageStartDate())); + dto.setPenalized(summary.isPenalized()); + dto.setPosition(summary.getPosition()); + dto.setSize(summary.getSize()); + return dto; + } + /** * Creates a ConnectionDTO from the specified Connection. * http://git-wip-us.apache.org/repos/asf/nifi/blob/e762d3c7/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ConnectionDAO.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ConnectionDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ConnectionDAO.java index 2be4403..642c47e 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ConnectionDAO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ConnectionDAO.java @@ -19,6 +19,7 @@ package org.apache.nifi.web.dao; import java.util.Set; import org.apache.nifi.connectable.Connection; import org.apache.nifi.controller.queue.DropFlowFileStatus; +import org.apache.nifi.controller.queue.ListFlowFileStatus; import org.apache.nifi.web.api.dto.ConnectionDTO; public interface ConnectionDAO { @@ -43,6 +44,16 @@ public interface ConnectionDAO { DropFlowFileStatus getFlowFileDropRequest(String groupId, String id, String dropRequestId); /** + * Gets the specified flow file listing request. + * + * @param groupId group id + * @param id connection id + * @param listingRequestId The listing request id + * @return The listing request status + */ + ListFlowFileStatus getFlowFileListingRequest(String groupId, String id, String listingRequestId); + + /** * Gets the connections for the specified source processor. * * @param groupId group id @@ -85,7 +96,17 @@ public interface ConnectionDAO { * @param dropRequestId drop request id * @return The drop request status */ - DropFlowFileStatus createFileFlowDropRequest(String groupId, String id, String dropRequestId); + DropFlowFileStatus createFlowFileDropRequest(String groupId, String id, String dropRequestId); + + /** + * Creates a new flow file listing request. + * + * @param groupId group id + * @param id connection id + * @param listingRequestId listing request id + * @return The listing request status + */ + ListFlowFileStatus createFlowFileListingRequest(String groupId, String id, String listingRequestId); /** * Verifies the create request can be processed. @@ -137,4 +158,14 @@ public interface ConnectionDAO { * @return The drop request */ DropFlowFileStatus deleteFlowFileDropRequest(String groupId, String id, String dropRequestId); + + /** + * Deletes the specified flow file listing request. + * + * @param groupId group id + * @param id connection id + * @param listingRequestId The listing request id + * @return The listing request status + */ + ListFlowFileStatus deleteFlowFileListingRequest(String groupId, String id, String listingRequestId); } http://git-wip-us.apache.org/repos/asf/nifi/blob/e762d3c7/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectionDAO.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectionDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectionDAO.java index 565e5af..0e9a90a 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectionDAO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectionDAO.java @@ -34,6 +34,7 @@ import org.apache.nifi.controller.ProcessorNode; import org.apache.nifi.controller.exception.ValidationException; import org.apache.nifi.controller.queue.DropFlowFileStatus; import org.apache.nifi.controller.queue.FlowFileQueue; +import org.apache.nifi.controller.queue.ListFlowFileStatus; import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.groups.RemoteProcessGroup; import org.apache.nifi.flowfile.FlowFilePrioritizer; @@ -87,6 +88,19 @@ public class StandardConnectionDAO extends ComponentDAO implements ConnectionDAO } @Override + public ListFlowFileStatus getFlowFileListingRequest(String groupId, String connectionId, String listingRequestId) { + final Connection connection = locateConnection(groupId, connectionId); + final FlowFileQueue queue = connection.getFlowFileQueue(); + + final ListFlowFileStatus listRequest = queue.getListFlowFileStatus(listingRequestId); + if (listRequest == null) { + throw new ResourceNotFoundException(String.format("Unable to find listing request with id '%s'.", listingRequestId)); + } + + return listRequest; + } + + @Override public Set<Connection> getConnectionsForSource(final String groupId, final String processorId) { final Set<Connection> connections = new HashSet<>(getConnections(groupId)); for (final Iterator<Connection> connectionIter = connections.iterator(); connectionIter.hasNext();) { @@ -312,7 +326,7 @@ public class StandardConnectionDAO extends ComponentDAO implements ConnectionDAO } @Override - public DropFlowFileStatus createFileFlowDropRequest(String groupId, String id, String dropRequestId) { + public DropFlowFileStatus createFlowFileDropRequest(String groupId, String id, String dropRequestId) { final Connection connection = locateConnection(groupId, id); final FlowFileQueue queue = connection.getFlowFileQueue(); @@ -325,6 +339,13 @@ public class StandardConnectionDAO extends ComponentDAO implements ConnectionDAO } @Override + public ListFlowFileStatus createFlowFileListingRequest(String groupId, String id, String listingRequestId) { + final Connection connection = locateConnection(groupId, id); + final FlowFileQueue queue = connection.getFlowFileQueue(); + return queue.listFlowFiles(listingRequestId); + } + + @Override public void verifyCreate(String groupId, ConnectionDTO connectionDTO) { // validate the incoming request final List<String> validationErrors = validateProposedConfiguration(groupId, connectionDTO); @@ -508,6 +529,19 @@ public class StandardConnectionDAO extends ComponentDAO implements ConnectionDAO return dropFlowFileStatus; } + @Override + public ListFlowFileStatus deleteFlowFileListingRequest(String groupId, String connectionId, String listingRequestId) { + final Connection connection = locateConnection(groupId, connectionId); + final FlowFileQueue queue = connection.getFlowFileQueue(); + + final ListFlowFileStatus listFlowFileStatus = queue.cancelListFlowFileRequest(listingRequestId); + if (listFlowFileStatus == null) { + throw new ResourceNotFoundException(String.format("Unable to find listing request with id '%s'.", listingRequestId)); + } + + return listFlowFileStatus; + } + /* setters */ public void setFlowController(final FlowController flowController) { this.flowController = flowController;
