Repository: nifi Updated Branches: refs/heads/NIFI-730 09a3f6dad -> 39a050d2f
NIFI-730: - Adding emptying a queue when clustered. Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/39a050d2 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/39a050d2 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/39a050d2 Branch: refs/heads/NIFI-730 Commit: 39a050d2fdc2437576d860b968ab1ec78d18fa21 Parents: 09a3f6d Author: Matt Gilman <[email protected]> Authored: Wed Oct 14 17:47:06 2015 -0400 Committer: Matt Gilman <[email protected]> Committed: Wed Oct 14 17:47:06 2015 -0400 ---------------------------------------------------------------------- .../controller/queue/DropFlowFileState.java | 18 ++- .../cluster/manager/impl/WebClusterManager.java | 111 +++++++++++++++++-- .../apache/nifi/web/api/ConnectionResource.java | 26 ++++- .../org/apache/nifi/web/api/dto/DtoFactory.java | 12 +- 4 files changed, 144 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/39a050d2/nifi-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileState.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileState.java b/nifi-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileState.java index 12dbedf..32efcbb 100644 --- a/nifi-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileState.java +++ b/nifi-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileState.java @@ -14,7 +14,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.nifi.controller.queue; /** @@ -38,4 +37,21 @@ public enum DropFlowFileState { public String toString() { return description; } + + /** + * @param description string form of drop flow file state + * @return the matching DropFlowFileState or null if the description doesn't match + */ + public static DropFlowFileState valueOfDescription(String description) { + DropFlowFileState desiredState = null; + + for (DropFlowFileState state : values()) { + if (state.toString().equals(description)) { + desiredState = state; + break; + } + } + + return desiredState; + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/39a050d2/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java index bfeec7a..6b0bb64 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java @@ -226,14 +226,17 @@ import org.xml.sax.SAXException; import org.xml.sax.SAXParseException; import com.sun.jersey.api.client.ClientResponse; +import org.apache.nifi.controller.queue.DropFlowFileState; import org.apache.nifi.controller.service.ControllerServiceState; import org.apache.nifi.web.api.dto.ControllerServiceDTO; import org.apache.nifi.web.api.dto.ControllerServiceReferencingComponentDTO; +import org.apache.nifi.web.api.dto.DropRequestDTO; import org.apache.nifi.web.api.dto.ReportingTaskDTO; import org.apache.nifi.web.api.entity.ControllerServiceEntity; import org.apache.nifi.web.api.entity.ControllerServiceReferencingComponentsEntity; import org.apache.nifi.web.api.entity.ControllerServicesEntity; +import org.apache.nifi.web.api.entity.DropRequestEntity; import org.apache.nifi.web.api.entity.ReportingTaskEntity; import org.apache.nifi.web.api.entity.ReportingTasksEntity; @@ -316,6 +319,9 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C public static final String REPORTING_TASKS_URI = "/nifi-api/controller/reporting-tasks/node"; public static final Pattern REPORTING_TASK_URI_PATTERN = Pattern.compile("/nifi-api/controller/reporting-tasks/node/[a-f0-9\\-]{36}"); + public static final Pattern QUEUE_CONTENTS_URI = Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/connections/[a-f0-9\\-]{36}/contents"); + public static final Pattern DROP_REQUEST_URI = Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/connections/[a-f0-9\\-]{36}/drop-requests/[a-f0-9\\-]{36}"); + private final NiFiProperties properties; private final HttpRequestReplicator httpRequestReplicator; private final HttpResponseMapper httpResponseMapper; @@ -1090,7 +1096,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C // Register log observer to provide bulletins when reporting task logs anything at WARN level or above final LogRepository logRepository = LogRepositoryFactory.getRepository(id); logRepository.addObserver(StandardProcessorNode.BULLETIN_OBSERVER_ID, LogLevel.WARN, - new ReportingTaskLogObserver(getBulletinRepository(), taskNode)); + new ReportingTaskLogObserver(getBulletinRepository(), taskNode)); return taskNode; } @@ -1385,7 +1391,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C // Register log observer to provide bulletins when reporting task logs anything at WARN level or above final LogRepository logRepository = LogRepositoryFactory.getRepository(id); logRepository.addObserver(StandardProcessorNode.BULLETIN_OBSERVER_ID, LogLevel.WARN, - new ControllerServiceLogObserver(getBulletinRepository(), serviceNode)); + new ControllerServiceLogObserver(getBulletinRepository(), serviceNode)); return serviceNode; } @@ -2465,6 +2471,16 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C return false; } + private static boolean isDropRequestEndpoint(final URI uri, final String method) { + if ("DELETE".equalsIgnoreCase(method) && QUEUE_CONTENTS_URI.matcher(uri.getPath()).matches()) { + return true; + } else if (("GET".equalsIgnoreCase(method) || "DELETE".equalsIgnoreCase(method)) && DROP_REQUEST_URI.matcher(uri.getPath()).matches()) { + return true; + } + + return false; + } + static boolean isResponseInterpreted(final URI uri, final String method) { return isProcessorsEndpoint(uri, method) || isProcessorEndpoint(uri, method) || isRemoteProcessGroupsEndpoint(uri, method) || isRemoteProcessGroupEndpoint(uri, method) @@ -2472,7 +2488,8 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C || isTemplateEndpoint(uri, method) || isFlowSnippetEndpoint(uri, method) || isProvenanceQueryEndpoint(uri, method) || isProvenanceEventEndpoint(uri, method) || isControllerServicesEndpoint(uri, method) || isControllerServiceEndpoint(uri, method) || isControllerServiceReferenceEndpoint(uri, method) - || isReportingTasksEndpoint(uri, method) || isReportingTaskEndpoint(uri, method); + || isReportingTasksEndpoint(uri, method) || isReportingTaskEndpoint(uri, method) + || isDropRequestEndpoint(uri, method); } private void mergeProcessorValidationErrors(final ProcessorDTO processor, Map<NodeIdentifier, ProcessorDTO> processorMap) { @@ -2808,6 +2825,62 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C return normalizedValidationErrors; } + /** + * Merges the drop requests in the specified map into the specified drop request. + * + * @param dropRequest the target drop request + * @param dropRequestMap the mapping of all responses being merged + */ + private void mergeDropRequests(final DropRequestDTO dropRequest, final Map<NodeIdentifier, DropRequestDTO> dropRequestMap) { + boolean nodeWaiting = false; + int originalCount = 0; + long originalSize = 0; + int currentCount = 0; + long currentSize = 0; + int droppedCount = 0; + long droppedSize = 0; + + DropFlowFileState state = null; + for (final Map.Entry<NodeIdentifier, DropRequestDTO> nodeEntry : dropRequestMap.entrySet()) { + final DropRequestDTO nodeDropRequest = nodeEntry.getValue(); + + currentCount += nodeDropRequest.getCurrentCount(); + currentSize += nodeDropRequest.getCurrentSize(); + droppedCount += nodeDropRequest.getDroppedCount(); + droppedSize += nodeDropRequest.getDroppedSize(); + + if (nodeDropRequest.getOriginalCount() == null) { + nodeWaiting = true; + } else { + originalCount += nodeDropRequest.getOriginalCount(); + originalSize += nodeDropRequest.getOriginalSize(); + } + + final DropFlowFileState nodeState = DropFlowFileState.valueOfDescription(nodeDropRequest.getState()); + if (state == null || state.compareTo(nodeState) > 0) { + state = nodeState; + } + } + + dropRequest.setCurrentCount(currentCount); + dropRequest.setCurrentSize(currentSize); + dropRequest.setCurrent(FormatUtils.formatCount(currentCount) + " / " + FormatUtils.formatDataSize(currentSize)); + + dropRequest.setDroppedCount(droppedCount); + dropRequest.setDroppedSize(droppedSize); + dropRequest.setDropped(FormatUtils.formatCount(droppedCount) + " / " + FormatUtils.formatDataSize(droppedSize)); + + if (!nodeWaiting) { + dropRequest.setOriginalCount(originalCount); + dropRequest.setOriginalSize(originalSize); + dropRequest.setOriginal(FormatUtils.formatCount(originalCount) + " / " + FormatUtils.formatDataSize(originalSize)); + } + + if (state != null) { + dropRequest.setState(state.toString()); + } + } + // requires write lock to be already acquired unless request is not mutable private NodeResponse mergeResponses(final URI uri, final String method, final Set<NodeResponse> nodeResponses, final boolean mutableRequest) { // holds the one response of all the node responses to return to the client @@ -3158,8 +3231,8 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C continue; } - final ControllerServiceReferencingComponentsEntity nodeResponseEntity = - nodeResponse == clientResponse ? responseEntity : nodeResponse.getClientResponse().getEntity(ControllerServiceReferencingComponentsEntity.class); + final ControllerServiceReferencingComponentsEntity nodeResponseEntity + = nodeResponse == clientResponse ? responseEntity : nodeResponse.getClientResponse().getEntity(ControllerServiceReferencingComponentsEntity.class); final Set<ControllerServiceReferencingComponentDTO> nodeReferencingComponents = nodeResponseEntity.getControllerServiceReferencingComponents(); resultsMap.put(nodeResponse.getNodeId(), nodeReferencingComponents); @@ -3218,6 +3291,24 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C // create a new client response clientResponse = new NodeResponse(clientResponse, responseEntity); + } else if (hasSuccessfulClientResponse && isDropRequestEndpoint(uri, method)) { + final DropRequestEntity responseEntity = clientResponse.getClientResponse().getEntity(DropRequestEntity.class); + final DropRequestDTO dropRequest = responseEntity.getDropRequest(); + + final Map<NodeIdentifier, DropRequestDTO> resultsMap = new HashMap<>(); + for (final NodeResponse nodeResponse : updatedNodesMap.values()) { + if (problematicNodeResponses.contains(nodeResponse)) { + continue; + } + + final DropRequestEntity nodeResponseEntity = nodeResponse == clientResponse ? responseEntity : nodeResponse.getClientResponse().getEntity(DropRequestEntity.class); + final DropRequestDTO nodeDropRequest = nodeResponseEntity.getDropRequest(); + + resultsMap.put(nodeResponse.getNodeId(), nodeDropRequest); + } + mergeDropRequests(dropRequest, resultsMap); + + clientResponse = new NodeResponse(clientResponse, responseEntity); } else { if (!nodeResponsesToDrain.isEmpty()) { drainResponses(nodeResponsesToDrain); @@ -3270,12 +3361,12 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C } /** - * Determines if all problematic responses were due to 404 NOT_FOUND. Assumes that problematicNodeResponses is not empty and - * is not comprised of responses from all nodes in the cluster (at least one node contained the counter in question). + * Determines if all problematic responses were due to 404 NOT_FOUND. Assumes that problematicNodeResponses is not empty and is not comprised of responses from all nodes in the cluster (at least + * one node contained the counter in question). * - * @param problematicNodeResponses The problematic node responses - * @param uri The URI for the request - * @return Whether all problematic node responses were due to a missing counter + * @param problematicNodeResponses The problematic node responses + * @param uri The URI for the request + * @return Whether all problematic node responses were due to a missing counter */ private boolean isMissingCounter(final Set<NodeResponse> problematicNodeResponses, final URI uri) { if (isCountersEndpoint(uri)) { http://git-wip-us.apache.org/repos/asf/nifi/blob/39a050d2/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectionResource.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectionResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectionResource.java index b170d39..6741348 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectionResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectionResource.java @@ -894,6 +894,7 @@ public class ConnectionResource extends ApplicationResource { /** * Drops the flowfiles in the queue of the specified connection. * + * @param httpServletRequest request * @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response. * @param id The id of the connection * @return A dropRequestEntity @@ -920,6 +921,7 @@ public class ConnectionResource extends ApplicationResource { } ) public Response dropQueueContents( + @Context HttpServletRequest httpServletRequest, @ApiParam( value = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response.", required = false @@ -936,6 +938,12 @@ public class ConnectionResource extends ApplicationResource { return clusterManager.applyRequest(HttpMethod.DELETE, getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse(); } + // handle expects request (usually from the cluster manager) + final String expects = httpServletRequest.getHeader(WebClusterManager.NCM_EXPECTS_HTTP_HEADER); + if (expects != null) { + return generateContinueResponse().build(); + } + // ensure the id is the same across the cluster final String dropRequestId; final ClusterContext clusterContext = ClusterContextThreadLocal.getContext(); @@ -947,7 +955,7 @@ public class ConnectionResource extends ApplicationResource { // submit the drop request final DropRequestDTO dropRequest = serviceFacade.createFlowFileDropRequest(groupId, id, dropRequestId); - dropRequest.setUri(generateResourceUri("controller", "process-groups", groupId, "connections", id, "contents", "drop-requests", dropRequest.getId())); + dropRequest.setUri(generateResourceUri("controller", "process-groups", groupId, "connections", id, "drop-requests", dropRequest.getId())); // create the revision final RevisionDTO revision = new RevisionDTO(); @@ -978,7 +986,7 @@ public class ConnectionResource extends ApplicationResource { @GET @Consumes(MediaType.WILDCARD) @Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML}) - @Path("/{connection-id}/contents/drop-requests/{drop-request-id}") + @Path("/{connection-id}/drop-requests/{drop-request-id}") @PreAuthorize("hasRole('ROLE_DFM')") @ApiOperation( value = "Gets the current status of a drop request for the specified connection.", @@ -1020,7 +1028,7 @@ public class ConnectionResource extends ApplicationResource { // get the drop request final DropRequestDTO dropRequest = serviceFacade.getFlowFileDropRequest(groupId, connectionId, dropRequestId); - dropRequest.setUri(generateResourceUri("controller", "process-groups", groupId, "connections", connectionId, "contents", "drop-requests", dropRequestId)); + dropRequest.setUri(generateResourceUri("controller", "process-groups", groupId, "connections", connectionId, "drop-requests", dropRequestId)); // create the revision final RevisionDTO revision = new RevisionDTO(); @@ -1037,6 +1045,7 @@ public class ConnectionResource extends ApplicationResource { /** * Deletes the specified drop request. * + * @param httpServletRequest request * @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response. * @param connectionId The connection id * @param dropRequestId The drop request id @@ -1045,7 +1054,7 @@ public class ConnectionResource extends ApplicationResource { @DELETE @Consumes(MediaType.WILDCARD) @Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML}) - @Path("/{connection-id}/contents/drop-requests/{drop-request-id}") + @Path("/{connection-id}/drop-requests/{drop-request-id}") @PreAuthorize("hasRole('ROLE_DFM')") @ApiOperation( value = "Cancels and/or removes a request drop of the contents in this connection.", @@ -1064,6 +1073,7 @@ public class ConnectionResource extends ApplicationResource { } ) public Response removeDropRequest( + @Context HttpServletRequest httpServletRequest, @ApiParam( value = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response.", required = false @@ -1085,9 +1095,15 @@ public class ConnectionResource extends ApplicationResource { return clusterManager.applyRequest(HttpMethod.DELETE, getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse(); } + // handle expects request (usually from the cluster manager) + final String expects = httpServletRequest.getHeader(WebClusterManager.NCM_EXPECTS_HTTP_HEADER); + if (expects != null) { + return generateContinueResponse().build(); + } + // delete the drop request final DropRequestDTO dropRequest = serviceFacade.deleteFlowFileDropRequest(groupId, connectionId, dropRequestId); - dropRequest.setUri(generateResourceUri("controller", "process-groups", groupId, "connections", connectionId, "contents", "drop-requests", dropRequestId)); + dropRequest.setUri(generateResourceUri("controller", "process-groups", groupId, "connections", connectionId, "drop-requests", dropRequestId)); // create the revision final RevisionDTO revision = new RevisionDTO(); http://git-wip-us.apache.org/repos/asf/nifi/blob/39a050d2/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java index 264268b..0758ce2 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java @@ -315,6 +315,11 @@ public final class DtoFactory { dto.setDroppedSize(dropped.getByteCount()); dto.setDropped(FormatUtils.formatCount(dropped.getObjectCount()) + " / " + FormatUtils.formatDataSize(dropped.getByteCount())); + final QueueSize current = dropRequest.getCurrentSize(); + dto.setCurrentCount(current.getObjectCount()); + dto.setCurrentSize(current.getByteCount()); + dto.setCurrent(FormatUtils.formatCount(current.getObjectCount()) + " / " + FormatUtils.formatDataSize(current.getByteCount())); + if (dropRequest.getOriginalSize() != null) { final QueueSize original = dropRequest.getOriginalSize(); dto.setOriginalCount(original.getObjectCount()); @@ -326,13 +331,6 @@ public final class DtoFactory { dto.setPercentCompleted(0); } - if (dropRequest.getCurrentSize() != null) { - final QueueSize current = dropRequest.getCurrentSize(); - dto.setCurrentCount(current.getObjectCount()); - dto.setCurrentSize(current.getByteCount()); - dto.setCurrent(FormatUtils.formatCount(current.getObjectCount()) + " / " + FormatUtils.formatDataSize(current.getByteCount())); - } - return dto; }
