NIFI-730: - Starting to add support for deleting flow files from a queue by creating endpoints and starting to wire everything together. - Adding context menu item for initiating the request to drop flow files.
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/e0ac7cde Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/e0ac7cde Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/e0ac7cde Branch: refs/heads/master Commit: e0ac7cde372f428b0655465b7adc59ad41f8f270 Parents: b4bfcc1 Author: Matt Gilman <[email protected]> Authored: Mon Oct 12 10:00:54 2015 -0400 Committer: Matt Gilman <[email protected]> Committed: Mon Oct 12 10:00:54 2015 -0400 ---------------------------------------------------------------------- .../apache/nifi/web/api/dto/DropRequestDTO.java | 129 +++++++++++ .../nifi/web/api/entity/DropRequestEntity.java | 44 ++++ .../org/apache/nifi/web/NiFiServiceFacade.java | 25 +++ .../nifi/web/StandardNiFiServiceFacade.java | 24 ++ .../apache/nifi/web/api/ConnectionResource.java | 217 ++++++++++++++++++- .../org/apache/nifi/web/dao/ConnectionDAO.java | 22 ++ .../web/dao/impl/StandardConnectionDAO.java | 15 ++ .../src/main/webapp/js/nf/canvas/nf-actions.js | 58 +++++ .../main/webapp/js/nf/canvas/nf-context-menu.js | 10 + 9 files changed, 542 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/e0ac7cde/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/DropRequestDTO.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/DropRequestDTO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/DropRequestDTO.java new file mode 100644 index 0000000..dd4289f --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/DropRequestDTO.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.web.api.dto; + +import com.wordnik.swagger.annotations.ApiModelProperty; +import java.util.Date; +import javax.xml.bind.annotation.XmlType; +import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter; +import org.apache.nifi.web.api.dto.util.TimestampAdapter; + +/** + * A request to drop the contents of a connection. + */ +@XmlType(name = "dropRequest") +public class DropRequestDTO { + + private String id; + private String uri; + + private Date submissionTime; + private Date expiration; + + private Integer percentCompleted; + private Boolean finished; + + /** + * The id for this component. + * + * @return The id + */ + @ApiModelProperty( + value = "The id of the component." + ) + public String getId() { + return this.id; + } + + public void setId(final String id) { + this.id = id; + } + + /** + * The uri for linking to this component in this NiFi. + * + * @return The uri + */ + @ApiModelProperty( + value = "The URI for futures requests to the component." + ) + public String getUri() { + return uri; + } + + public void setUri(String uri) { + this.uri = uri; + } + + /** + * @return time the query was submitted + */ + @XmlJavaTypeAdapter(TimestampAdapter.class) + @ApiModelProperty( + value = "The timestamp when the query was submitted." + ) + public Date getSubmissionTime() { + return submissionTime; + } + + public void setSubmissionTime(Date submissionTime) { + this.submissionTime = submissionTime; + } + + /** + * @return expiration time of the query results + */ + @XmlJavaTypeAdapter(TimestampAdapter.class) + @ApiModelProperty( + value = "The timestamp when the query will expire." + ) + public Date getExpiration() { + return expiration; + } + + public void setExpiration(Date expiration) { + this.expiration = expiration; + } + + /** + * @return percent completed + */ + @ApiModelProperty( + value = "The current percent complete." + ) + public Integer getPercentCompleted() { + return percentCompleted; + } + + public void setPercentCompleted(Integer percentCompleted) { + this.percentCompleted = percentCompleted; + } + + /** + * @return whether the query has finished + */ + @ApiModelProperty( + value = "Whether the query has finished." + ) + public Boolean isFinished() { + return finished; + } + + public void setFinished(Boolean finished) { + this.finished = finished; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/e0ac7cde/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/DropRequestEntity.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/DropRequestEntity.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/DropRequestEntity.java new file mode 100644 index 0000000..078c019 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/DropRequestEntity.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.web.api.entity; + +import javax.xml.bind.annotation.XmlRootElement; + +import org.apache.nifi.web.api.dto.DropRequestDTO; + +/** + * A serialized representation of this class can be placed in the entity body of a request or response to or from the API. This particular entity holds a reference to a DropRequestDTO. + */ +@XmlRootElement(name = "dropRequestEntity") +public class DropRequestEntity extends Entity { + + private DropRequestDTO dropRequest; + + /** + * The DropRequestDTO that is being serialized. + * + * @return The DropRequestDTO object + */ + public DropRequestDTO getDropRequest() { + return dropRequest; + } + + public void setDropRequest(DropRequestDTO dropRequest) { + this.dropRequest = dropRequest; + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/e0ac7cde/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java index c98b1e4..28f6b61 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java @@ -43,6 +43,7 @@ import org.apache.nifi.web.api.dto.ProcessGroupDTO; import org.apache.nifi.web.api.dto.ProcessorDTO; import org.apache.nifi.web.api.dto.ComponentHistoryDTO; import org.apache.nifi.web.api.dto.ControllerServiceReferencingComponentDTO; +import org.apache.nifi.web.api.dto.DropRequestDTO; import org.apache.nifi.web.api.dto.PropertyDescriptorDTO; import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO; import org.apache.nifi.web.api.dto.RemoteProcessGroupPortDTO; @@ -525,6 +526,30 @@ public interface NiFiServiceFacade { */ ConfigurationSnapshot<Void> deleteConnection(Revision revision, String groupId, String connectionId); + /** + * Creates a new flow file drop request. + * + * @param groupId group + * @param connectionId The ID of the connection + * @return + */ + DropRequestDTO createFlowFileDropRequest(String groupId, String connectionId); + + /** + * Gets the specified flow file drop request. + * + * @param dropRequestId The flow file drop request + * @return The DropRequest + */ + DropRequestDTO getFlowFileDropRequest(String dropRequestId); + + /** + * Cancels/removes the specified flow file drop request. + * + * @param dropRequestId The flow file drop request + */ + void deleteFlowFileDropRequest(String dropRequestId); + // ---------------------------------------- // InputPort methods // ---------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/e0ac7cde/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java index 2286213..7f0a296 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java @@ -162,6 +162,7 @@ import org.apache.nifi.controller.service.ControllerServiceState; import org.apache.nifi.reporting.ComponentType; import org.apache.nifi.web.api.dto.ControllerServiceDTO; import org.apache.nifi.web.api.dto.ControllerServiceReferencingComponentDTO; +import org.apache.nifi.web.api.dto.DropRequestDTO; import org.apache.nifi.web.api.dto.PropertyDescriptorDTO; import org.apache.nifi.web.api.dto.ReportingTaskDTO; import org.apache.nifi.web.api.dto.status.ClusterProcessGroupStatusDTO; @@ -809,6 +810,11 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { } @Override + public void deleteFlowFileDropRequest(String dropRequestId) { + // TODO + } + + @Override public ConfigurationSnapshot<Void> deleteProcessor(final Revision revision, final String groupId, final String processorId) { return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<Void>() { @Override @@ -1060,6 +1066,18 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { } @Override + public DropRequestDTO createFlowFileDropRequest(String groupId, String connectionId) { + // TODO + final DropRequestDTO dto = new DropRequestDTO(); + dto.setFinished(false); + dto.setSubmissionTime(new Date()); + dto.setExpiration(new Date(System.currentTimeMillis() + 10000)); + dto.setId(UUID.randomUUID().toString()); + dto.setPercentCompleted(100); + return dto; + } + + @Override public ConfigurationSnapshot<ProcessorDTO> createProcessor(final Revision revision, final String groupId, final ProcessorDTO processorDTO) { return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<ProcessorDTO>() { @Override @@ -2092,6 +2110,12 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { } @Override + public DropRequestDTO getFlowFileDropRequest(String dropRequestId) { + // TODO + return null; + } + + @Override public StatusHistoryDTO getConnectionStatusHistory(String groupId, String connectionId) { return controllerFacade.getConnectionStatusHistory(groupId, connectionId); } http://git-wip-us.apache.org/repos/asf/nifi/blob/e0ac7cde/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectionResource.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectionResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectionResource.java index 64c14fa..dfc20fb 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectionResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectionResource.java @@ -24,6 +24,7 @@ import com.wordnik.swagger.annotations.ApiResponses; import com.wordnik.swagger.annotations.Authorization; import java.net.URI; import java.net.URISyntaxException; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -49,6 +50,7 @@ import javax.ws.rs.core.Context; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.MultivaluedMap; import javax.ws.rs.core.Response; +import javax.ws.rs.core.Response.Status; import org.apache.nifi.cluster.manager.impl.WebClusterManager; import org.apache.nifi.util.NiFiProperties; @@ -71,6 +73,10 @@ import org.apache.nifi.web.api.request.IntegerParameter; import org.apache.nifi.web.api.request.LongParameter; import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.cluster.context.ClusterContext; +import org.apache.nifi.cluster.context.ClusterContextThreadLocal; +import org.apache.nifi.web.api.dto.DropRequestDTO; +import org.apache.nifi.web.api.entity.DropRequestEntity; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.security.access.prepost.PreAuthorize; @@ -476,8 +482,7 @@ public class ConnectionResource extends ApplicationResource { @ApiParam( value = "The connection configuration details.", required = true - ) - ConnectionEntity connectionEntity) { + ) ConnectionEntity connectionEntity) { if (connectionEntity == null || connectionEntity.getConnection() == null) { throw new IllegalArgumentException("Connection details must be specified."); @@ -886,6 +891,214 @@ public class ConnectionResource extends ApplicationResource { return clusterContext(generateOkResponse(entity)).build(); } + /** + * Drops the flowfiles in the queue of the specified connection. + * + * @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response. + * @param id The id of the connection + * @return A dropRequestEntity + */ + @DELETE + @Consumes(MediaType.WILDCARD) + @Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML}) + @Path("/{connection-id}/contents") + @PreAuthorize("hasRole('ROLE_DFM')") + @ApiOperation( + value = "Drops the contents of the queue in this connection.", + response = DropRequestEntity.class, + authorizations = { + @Authorization(value = "Data Flow Manager", type = "ROLE_DFM") + } + ) + @ApiResponses( + value = { + @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), + @ApiResponse(code = 401, message = "Client could not be authenticated."), + @ApiResponse(code = 403, message = "Client is not authorized to make this request."), + @ApiResponse(code = 404, message = "The specified resource could not be found."), + @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.") + } + ) + public Response dropQueueContents( + @ApiParam( + value = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response.", + required = false + ) + @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId, + @ApiParam( + value = "The connection id.", + required = true + ) + @PathParam("connection-id") String id) { + + // replicate if cluster manager + if (properties.isClusterManager()) { + return clusterManager.applyRequest(HttpMethod.DELETE, getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse(); + } + + // ensure the id is the same across the cluster + final String dropRequestId; + final ClusterContext clusterContext = ClusterContextThreadLocal.getContext(); + if (clusterContext != null) { + dropRequestId = UUID.nameUUIDFromBytes(clusterContext.getIdGenerationSeed().getBytes(StandardCharsets.UTF_8)).toString(); + } else { + dropRequestId = UUID.randomUUID().toString(); + } + + // submit the drop request + final DropRequestDTO dropRequest = serviceFacade.createFlowFileDropRequest(groupId, id); + dropRequest.setUri(generateResourceUri("controller", "process-groups", groupId, "connections", id, "contents", "drop-requests", dropRequestId)); + + // create the revision + final RevisionDTO revision = new RevisionDTO(); + revision.setClientId(clientId.getClientId()); + + // create the response entity + final DropRequestEntity entity = new DropRequestEntity(); + entity.setRevision(revision); + entity.setDropRequest(dropRequest); + + // generate the URI where the response will be + final URI location = URI.create(dropRequest.getUri()); + if (dropRequest.isFinished()) { + return generateCreatedResponse(location, entity).build(); + } else { + return Response.status(Status.ACCEPTED).location(location).entity(entity).build(); + } + } + + /** + * Checks the status of an outstanding drop request. + * + * @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response. + * @param connectionId The id of the connection + * @param dropRequestId The id of the drop request + * @return A dropRequestEntity + */ + @GET + @Consumes(MediaType.WILDCARD) + @Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML}) + @Path("/{connection-id}/contents/drop-requests/{drop-request-id}") + @PreAuthorize("hasRole('ROLE_DFM')") + @ApiOperation( + value = "Gets the current status of a drop request for the specified connection.", + response = DropRequestEntity.class, + authorizations = { + @Authorization(value = "Data Flow Manager", type = "ROLE_DFM") + } + ) + @ApiResponses( + value = { + @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), + @ApiResponse(code = 401, message = "Client could not be authenticated."), + @ApiResponse(code = 403, message = "Client is not authorized to make this request."), + @ApiResponse(code = 404, message = "The specified resource could not be found."), + @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.") + } + ) + public Response getDropRequest( + @ApiParam( + value = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response.", + required = false + ) + @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId, + @ApiParam( + value = "The connection id.", + required = true + ) + @PathParam("connection-id") String connectionId, + @ApiParam( + value = "The drop request id.", + required = true + ) + @PathParam("drop-request-id") String dropRequestId) { + + // replicate if cluster manager + if (properties.isClusterManager()) { + return clusterManager.applyRequest(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse(); + } + + // get the drop request + final DropRequestDTO dropRequest = serviceFacade.getFlowFileDropRequest(dropRequestId); + dropRequest.setUri(generateResourceUri("controller", "process-groups", groupId, "connections", connectionId, "contents", "drop-requests", dropRequestId)); + + // create the revision + final RevisionDTO revision = new RevisionDTO(); + revision.setClientId(clientId.getClientId()); + + // create the response entity + final DropRequestEntity entity = new DropRequestEntity(); + entity.setRevision(revision); + entity.setDropRequest(dropRequest); + + return generateOkResponse(entity).build(); + } + + /** + * Deletes the specified drop request. + * + * @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response. + * @param connectionId The connection id + * @param dropRequestId The drop request id + * @return A dropRequestEntity + */ + @DELETE + @Consumes(MediaType.WILDCARD) + @Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML}) + @Path("/{connection-id}/contents/drop-requests/{drop-request-id}") + @PreAuthorize("hasRole('ROLE_DFM')") + @ApiOperation( + value = "Cancels and/or removes a request drop of the contents in this connection.", + response = DropRequestEntity.class, + authorizations = { + @Authorization(value = "Data Flow Manager", type = "ROLE_DFM") + } + ) + @ApiResponses( + value = { + @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), + @ApiResponse(code = 401, message = "Client could not be authenticated."), + @ApiResponse(code = 403, message = "Client is not authorized to make this request."), + @ApiResponse(code = 404, message = "The specified resource could not be found."), + @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.") + } + ) + public Response removeDropRequest( + @ApiParam( + value = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response.", + required = false + ) + @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId, + @ApiParam( + value = "The connection id.", + required = true + ) + @PathParam("connection-id") String connectionId, + @ApiParam( + value = "The drop request id.", + required = true + ) + @PathParam("drop-request-id") String dropRequestId) { + + // replicate if cluster manager + if (properties.isClusterManager()) { + return clusterManager.applyRequest(HttpMethod.DELETE, getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse(); + } + + // delete the drop request + serviceFacade.deleteFlowFileDropRequest(dropRequestId); + + // create the revision + final RevisionDTO revision = new RevisionDTO(); + revision.setClientId(clientId.getClientId()); + + // create the response entity + final DropRequestEntity entity = new DropRequestEntity(); + entity.setRevision(revision); + + return generateOkResponse(entity).build(); + } + // setters public void setServiceFacade(NiFiServiceFacade serviceFacade) { this.serviceFacade = serviceFacade; http://git-wip-us.apache.org/repos/asf/nifi/blob/e0ac7cde/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ConnectionDAO.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ConnectionDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ConnectionDAO.java index e0fb89e..ce1d1fd 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ConnectionDAO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ConnectionDAO.java @@ -32,6 +32,13 @@ public interface ConnectionDAO { Connection getConnection(String groupId, String id); /** + * Gets the specified flow file drop request. + * + * @param dropRequestId The drop request id + */ + void getFlowFileDropRequest(String dropRequestId); + + /** * Gets the connections for the specified source processor. * * @param groupId group id @@ -67,6 +74,14 @@ public interface ConnectionDAO { Connection createConnection(String groupId, ConnectionDTO connectionDTO); /** + * Creates a new flow file drop request. + * + * @param groupId group id + * @param id connection id + */ + void createFileFlowDropRequest(String groupId, String id); + + /** * Verifies the create request can be processed. * * @param groupId group id @@ -106,4 +121,11 @@ public interface ConnectionDAO { * @param id The id of the connection */ void deleteConnection(String groupId, String id); + + /** + * Deletes the specified flow file drop request. + * + * @param dropRequestId The drop request id + */ + void deleteFlowFileDropRequest(String dropRequestId); } http://git-wip-us.apache.org/repos/asf/nifi/blob/e0ac7cde/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectionDAO.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectionDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectionDAO.java index 5fbc393..8fa9d3b 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectionDAO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectionDAO.java @@ -69,6 +69,11 @@ public class StandardConnectionDAO extends ComponentDAO implements ConnectionDAO } @Override + public void getFlowFileDropRequest(String dropRequestId) { + // TODO + } + + @Override public Set<Connection> getConnectionsForSource(final String groupId, final String processorId) { final Set<Connection> connections = new HashSet<>(getConnections(groupId)); for (final Iterator<Connection> connectionIter = connections.iterator(); connectionIter.hasNext();) { @@ -294,6 +299,11 @@ public class StandardConnectionDAO extends ComponentDAO implements ConnectionDAO } @Override + public void createFileFlowDropRequest(String groupId, String id) { + // TODO + } + + @Override public void verifyCreate(String groupId, ConnectionDTO connectionDTO) { // validate the incoming request final List<String> validationErrors = validateProposedConfiguration(groupId, connectionDTO); @@ -464,6 +474,11 @@ public class StandardConnectionDAO extends ComponentDAO implements ConnectionDAO group.removeConnection(connection); } + @Override + public void deleteFlowFileDropRequest(String dropRequestId) { + // TODO + } + /* setters */ public void setFlowController(final FlowController flowController) { this.flowController = flowController; http://git-wip-us.apache.org/repos/asf/nifi/blob/e0ac7cde/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-actions.js ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-actions.js b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-actions.js index 3b47a8d..bab2236 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-actions.js +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-actions.js @@ -846,6 +846,64 @@ nf.Actions = (function () { }, /** + * Deletes the flow files in the specified connection. + * + * @param {type} selection + */ + deleteQueueContents: function (selection) { + if (selection.size() !== 1 || !nf.CanvasUtils.isConnection(selection)) { + return; + } + + // process the drop request + var processDropRequest = function (dropRequest, nextDelay) { + // see if the drop request has completed + if (dropRequest.finished === true) { + deleteDropRequest(dropRequest); + } else { + schedule(dropRequest, nextDelay); + } + }; + + // schedule for the next poll iteration + var schedule = function (dropRequest, delay) { + setTimeout(function () { + $.ajax({ + type: 'GET', + url: dropRequest.uri, + dataType: 'json' + }).done(function(response) { + var dropRequest = response.dropRequest; + processDropRequest(dropRequest, Math.min(8, delay * 2)); + }).fail(nf.Common.handleAjaxError); + }, delay * 1000); + }; + + // delete the drop request + var deleteDropRequest = function (dropRequest) { + $.ajax({ + type: 'DELETE', + url: dropRequest.uri, + dataType: 'json' + }).done(function() { + // drop request has been deleted + }).fail(nf.Common.handleAjaxError); + }; + + // get the connection data + var connection = selection.datum(); + + // issue the request to delete the flow files + $.ajax({ + type: 'DELETE', + url: connection.component.uri + '/contents', + dataType: 'json' + }).done(function(response) { + processDropRequest(response.dropRequest, 1); + }).fail(nf.Common.handleAjaxError); + }, + + /** * Opens the fill color dialog for the component in the specified selection. * * @param {type} selection The selection http://git-wip-us.apache.org/repos/asf/nifi/blob/e0ac7cde/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-context-menu.js ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-context-menu.js b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-context-menu.js index e652dd4..58397d4 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-context-menu.js +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-context-menu.js @@ -278,6 +278,15 @@ nf.ContextMenu = (function () { }; /** + * Only DFMs can delete flow files from a connection. + * + * @param {selection} selection + */ + var canDeleteFlowFiles = function (selection) { + return nf.Common.isDFM() && isConnection(selection); + }; + + /** * Determines if the components in the specified selection can be moved into a parent group. * * @param {type} selection @@ -373,6 +382,7 @@ nf.ContextMenu = (function () { {condition: isCopyable, menuItem: {img: 'images/iconCopy.png', text: 'Copy', action: 'copy'}}, {condition: isPastable, menuItem: {img: 'images/iconPaste.png', text: 'Paste', action: 'paste'}}, {condition: canMoveToParent, menuItem: {img: 'images/iconMoveToParent.png', text: 'Move to parent group', action: 'moveIntoParent'}}, + {condition: canDeleteFlowFiles, menuItem: {img: 'images/iconDelete.png', text: 'Delete Flow Files', action: 'deleteQueueContents'}}, {condition: isDeletable, menuItem: {img: 'images/iconDelete.png', text: 'Delete', action: 'delete'}} ];
