NIFI-108: Added merging of response for listing of flowfiles in cluster manager
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/9408507e Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/9408507e Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/9408507e Branch: refs/heads/NIFI-108 Commit: 9408507eb5733714537afb6266e6fe886609afdc Parents: 64a415c Author: Mark Payne <[email protected]> Authored: Thu Dec 17 16:46:12 2015 -0500 Committer: Mark Payne <[email protected]> Committed: Thu Dec 17 16:46:12 2015 -0500 ---------------------------------------------------------------------- .../nifi/controller/queue/FlowFileSummary.java | 2 +- .../controller/queue/ListFlowFileStatus.java | 15 +++ .../nifi/controller/queue/SortColumn.java | 64 +++++++++-- .../nifi/web/api/dto/ListingRequestDTO.java | 76 ++++++++++++- .../cluster/manager/impl/WebClusterManager.java | 111 ++++++++++++++++--- .../controller/queue/ListFlowFileRequest.java | 34 ++++-- .../nifi/controller/FlowFileSummaries.java | 95 ++++++++++++++++ .../nifi/controller/StandardFlowFileQueue.java | 16 ++- .../org/apache/nifi/web/api/dto/DtoFactory.java | 12 +- .../web/dao/impl/StandardConnectionDAO.java | 2 +- 10 files changed, 384 insertions(+), 43 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/9408507e/nifi-api/src/main/java/org/apache/nifi/controller/queue/FlowFileSummary.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/queue/FlowFileSummary.java b/nifi-api/src/main/java/org/apache/nifi/controller/queue/FlowFileSummary.java index 8c37185..b7207f2 100644 --- a/nifi-api/src/main/java/org/apache/nifi/controller/queue/FlowFileSummary.java +++ b/nifi-api/src/main/java/org/apache/nifi/controller/queue/FlowFileSummary.java @@ -45,7 +45,7 @@ public interface FlowFileSummary { /** * @return the timestamp (in milliseconds since epoch) at which the FlowFile was added to the queue */ - long lastQueuedTime(); + long getLastQueuedTime(); /** * @return the timestamp (in milliseconds since epoch) at which the FlowFile's greatest ancestor entered the flow http://git-wip-us.apache.org/repos/asf/nifi/blob/9408507e/nifi-api/src/main/java/org/apache/nifi/controller/queue/ListFlowFileStatus.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/queue/ListFlowFileStatus.java b/nifi-api/src/main/java/org/apache/nifi/controller/queue/ListFlowFileStatus.java index 5781283..cae500d 100644 --- a/nifi-api/src/main/java/org/apache/nifi/controller/queue/ListFlowFileStatus.java +++ b/nifi-api/src/main/java/org/apache/nifi/controller/queue/ListFlowFileStatus.java @@ -22,6 +22,11 @@ import java.util.List; public interface ListFlowFileStatus { /** + * @return the maximum number of FlowFile Summary objects that should be returned + */ + int getMaxResults(); + + /** * @return the identifier of the request to drop FlowFiles from the queue */ String getRequestIdentifier(); @@ -72,4 +77,14 @@ public interface ListFlowFileStatus { * @return the percentage (an integer between 0 and 100, inclusive) of how close the request is to being completed */ int getCompletionPercentage(); + + /** + * @return the total number of steps that are required in order to finish the listing + */ + int getTotalStepCount(); + + /** + * @return the total number of steps that have already been completed. The value returned will be >= 0 and <= the result of calling {@link #getTotalStepCount()}. + */ + int getCompletedStepCount(); } http://git-wip-us.apache.org/repos/asf/nifi/blob/9408507e/nifi-api/src/main/java/org/apache/nifi/controller/queue/SortColumn.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/queue/SortColumn.java b/nifi-api/src/main/java/org/apache/nifi/controller/queue/SortColumn.java index 48f3e9f..30d285c 100644 --- a/nifi-api/src/main/java/org/apache/nifi/controller/queue/SortColumn.java +++ b/nifi-api/src/main/java/org/apache/nifi/controller/queue/SortColumn.java @@ -17,44 +17,92 @@ package org.apache.nifi.controller.queue; +import java.util.Comparator; + /** * Specifies which column to sort on when performing a Listing of FlowFiles via * {@link FlowFileQueue#listFlowFiles(String, SortColumn, SortDirection)} */ -public enum SortColumn { +public enum SortColumn implements Comparator<FlowFileSummary> { /** * Sort based on the current position in the queue */ - QUEUE_POSITION, + QUEUE_POSITION (new Comparator<FlowFileSummary>() { + @Override + public int compare(final FlowFileSummary o1, final FlowFileSummary o2) { + return Integer.compare(o1.getPosition(), o2.getPosition()); + } + }), /** * Sort based on the UUID of the FlowFile */ - FLOWFILE_UUID, + FLOWFILE_UUID (new Comparator<FlowFileSummary>() { + @Override + public int compare(final FlowFileSummary o1, final FlowFileSummary o2) { + return o1.getUuid().compareTo(o2.getUuid()); + } + }), /** * Sort based on the 'filename' attribute of the FlowFile */ - FILENAME, + FILENAME (new Comparator<FlowFileSummary>() { + @Override + public int compare(final FlowFileSummary o1, final FlowFileSummary o2) { + return o1.getFilename().compareTo(o2.getFilename()); + } + }), /** * Sort based on the size of the FlowFile */ - FLOWFILE_SIZE, + FLOWFILE_SIZE(new Comparator<FlowFileSummary>() { + @Override + public int compare(final FlowFileSummary o1, final FlowFileSummary o2) { + return Long.compare(o1.getSize(), o2.getSize()); + } + }), /** * Sort based on how long the FlowFile has been sitting in the queue */ - QUEUED_DURATION, + QUEUED_DURATION (new Comparator<FlowFileSummary>() { + @Override + public int compare(final FlowFileSummary o1, final FlowFileSummary o2) { + return -Long.compare(o1.getLastQueuedTime(), o2.getLastQueuedTime()); + } + }), /** * Sort based on the age of the FlowFile. I.e., the time at which the FlowFile's * "greatest ancestor" entered the flow */ - FLOWFILE_AGE, + FLOWFILE_AGE (new Comparator<FlowFileSummary>() { + @Override + public int compare(final FlowFileSummary o1, final FlowFileSummary o2) { + return Long.compare(o1.getLineageStartDate(), o2.getLineageStartDate()); + } + }), /** * Sort based on when the FlowFile's penalization ends */ - PENALIZATION; + PENALIZATION (new Comparator<FlowFileSummary>() { + @Override + public int compare(final FlowFileSummary o1, final FlowFileSummary o2) { + return Boolean.compare(o1.isPenalized(), o2.isPenalized()); + } + }); + + private final Comparator<FlowFileSummary> comparator; + + private SortColumn(final Comparator<FlowFileSummary> comparator) { + this.comparator = comparator; + } + + @Override + public int compare(final FlowFileSummary o1, final FlowFileSummary o2) { + return comparator.compare(o1, o2); + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/9408507e/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ListingRequestDTO.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ListingRequestDTO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ListingRequestDTO.java index 53c2a74..36c0518 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ListingRequestDTO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ListingRequestDTO.java @@ -16,13 +16,15 @@ */ package org.apache.nifi.web.api.dto; -import com.wordnik.swagger.annotations.ApiModelProperty; -import org.apache.nifi.web.api.dto.util.TimestampAdapter; - -import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter; import java.util.Date; import java.util.List; +import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter; + +import org.apache.nifi.web.api.dto.util.TimestampAdapter; + +import com.wordnik.swagger.annotations.ApiModelProperty; + public class ListingRequestDTO { private String id; @@ -34,6 +36,11 @@ public class ListingRequestDTO { private Integer percentCompleted; private Boolean finished; private String failureReason; + private String sortColumn; + private String sortDirection; + private Integer maxResults; + private Integer totalStepCount; + private Integer completedStepCount; private String state; @@ -166,4 +173,65 @@ public class ListingRequestDTO { public void setFlowFileSummaries(List<FlowFileSummaryDTO> flowFileSummaries) { this.flowFileSummaries = flowFileSummaries; } + + /** + * @return the column on which the listing is sorted + */ + @ApiModelProperty(value = "The column on which the FlowFiles are sorted.") + public String getSortColumn() { + return sortColumn; + } + + public void setSortColumn(String sortColumn) { + this.sortColumn = sortColumn; + } + + /** + * @return the direction in which the FlowFiles are sorted + */ + @ApiModelProperty(value = "The direction in which the FlowFiles are sorted. Either ASCENDING or DESCENDING.") + public String getSortDirection() { + return sortDirection; + } + + public void setSortDirection(String sortDirection) { + this.sortDirection = sortDirection; + } + + /** + * @return the maximum number of FlowFileSummary objects to return + */ + @ApiModelProperty(value = "The maximum number of FlowFileSummary objects to return") + public Integer getMaxResults() { + return maxResults; + } + + public void setMaxResults(Integer maxResults) { + this.maxResults = maxResults; + } + + + /** + * @return the total number of steps required to complete the listing + */ + @ApiModelProperty(value = "The total number of steps required to complete the listing") + public Integer getTotalStepCount() { + return totalStepCount; + } + + public void setTotalStepCount(Integer totalStepCount) { + this.totalStepCount = totalStepCount; + } + + /** + * @return the number of steps that have already been completed. This value will be >= 0 and <= the total step count + */ + @ApiModelProperty(value = "The number of steps that have already been completed. This value will be between 0 and the total step count (inclusive)") + public Integer getCompletedStepCount() { + return completedStepCount; + } + + public void setCompletedStepCount(Integer completedStepCount) { + this.completedStepCount = completedStepCount; + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/9408507e/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java index 035a888..6b43f9d 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java @@ -34,11 +34,13 @@ import java.util.LinkedHashSet; import java.util.List; import java.util.ListIterator; import java.util.Map; +import java.util.NavigableSet; import java.util.Queue; import java.util.Set; import java.util.Timer; import java.util.TimerTask; import java.util.TreeMap; +import java.util.TreeSet; import java.util.UUID; import java.util.concurrent.CompletionService; import java.util.concurrent.ConcurrentHashMap; @@ -125,6 +127,7 @@ import org.apache.nifi.cluster.protocol.message.ReconnectionFailureMessage; import org.apache.nifi.cluster.protocol.message.ReconnectionRequestMessage; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.controller.ControllerService; +import org.apache.nifi.controller.FlowFileSummaries; import org.apache.nifi.controller.Heartbeater; import org.apache.nifi.controller.ReportingTaskNode; import org.apache.nifi.controller.ScheduledState; @@ -132,6 +135,10 @@ import org.apache.nifi.controller.StandardFlowSerializer; import org.apache.nifi.controller.StandardProcessorNode; import org.apache.nifi.controller.ValidationContextFactory; import org.apache.nifi.controller.exception.ComponentLifeCycleException; +import org.apache.nifi.controller.queue.DropFlowFileState; +import org.apache.nifi.controller.queue.ListFlowFileState; +import org.apache.nifi.controller.queue.SortColumn; +import org.apache.nifi.controller.queue.SortDirection; import org.apache.nifi.controller.reporting.ClusteredReportingTaskNode; import org.apache.nifi.controller.reporting.ReportingTaskInstantiationException; import org.apache.nifi.controller.reporting.ReportingTaskProvider; @@ -142,6 +149,7 @@ import org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent; import org.apache.nifi.controller.service.ControllerServiceLoader; import org.apache.nifi.controller.service.ControllerServiceNode; import org.apache.nifi.controller.service.ControllerServiceProvider; +import org.apache.nifi.controller.service.ControllerServiceState; import org.apache.nifi.controller.service.StandardControllerServiceProvider; import org.apache.nifi.controller.status.ProcessGroupStatus; import org.apache.nifi.controller.status.RemoteProcessGroupStatus; @@ -192,13 +200,19 @@ import org.apache.nifi.util.ReflectionUtils; import org.apache.nifi.web.OptimisticLockingManager; import org.apache.nifi.web.Revision; import org.apache.nifi.web.UpdateRevision; +import org.apache.nifi.web.api.dto.ControllerServiceDTO; +import org.apache.nifi.web.api.dto.ControllerServiceReferencingComponentDTO; +import org.apache.nifi.web.api.dto.DropRequestDTO; +import org.apache.nifi.web.api.dto.FlowFileSummaryDTO; import org.apache.nifi.web.api.dto.FlowSnippetDTO; +import org.apache.nifi.web.api.dto.ListingRequestDTO; import org.apache.nifi.web.api.dto.NodeDTO; import org.apache.nifi.web.api.dto.ProcessGroupDTO; import org.apache.nifi.web.api.dto.ProcessorDTO; import org.apache.nifi.web.api.dto.RemoteProcessGroupContentsDTO; import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO; import org.apache.nifi.web.api.dto.RemoteProcessGroupPortDTO; +import org.apache.nifi.web.api.dto.ReportingTaskDTO; import org.apache.nifi.web.api.dto.provenance.ProvenanceDTO; import org.apache.nifi.web.api.dto.provenance.ProvenanceEventDTO; import org.apache.nifi.web.api.dto.provenance.ProvenanceRequestDTO; @@ -207,7 +221,12 @@ import org.apache.nifi.web.api.dto.status.ClusterStatusHistoryDTO; import org.apache.nifi.web.api.dto.status.NodeStatusHistoryDTO; import org.apache.nifi.web.api.dto.status.StatusHistoryDTO; import org.apache.nifi.web.api.dto.status.StatusSnapshotDTO; +import org.apache.nifi.web.api.entity.ControllerServiceEntity; +import org.apache.nifi.web.api.entity.ControllerServiceReferencingComponentsEntity; +import org.apache.nifi.web.api.entity.ControllerServicesEntity; +import org.apache.nifi.web.api.entity.DropRequestEntity; import org.apache.nifi.web.api.entity.FlowSnippetEntity; +import org.apache.nifi.web.api.entity.ListingRequestEntity; import org.apache.nifi.web.api.entity.ProcessGroupEntity; import org.apache.nifi.web.api.entity.ProcessorEntity; import org.apache.nifi.web.api.entity.ProcessorsEntity; @@ -215,6 +234,8 @@ import org.apache.nifi.web.api.entity.ProvenanceEntity; import org.apache.nifi.web.api.entity.ProvenanceEventEntity; import org.apache.nifi.web.api.entity.RemoteProcessGroupEntity; import org.apache.nifi.web.api.entity.RemoteProcessGroupsEntity; +import org.apache.nifi.web.api.entity.ReportingTaskEntity; +import org.apache.nifi.web.api.entity.ReportingTasksEntity; import org.apache.nifi.web.util.WebUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -226,19 +247,6 @@ import org.xml.sax.SAXException; import org.xml.sax.SAXParseException; import com.sun.jersey.api.client.ClientResponse; -import org.apache.nifi.controller.queue.DropFlowFileState; - -import org.apache.nifi.controller.service.ControllerServiceState; -import org.apache.nifi.web.api.dto.ControllerServiceDTO; -import org.apache.nifi.web.api.dto.ControllerServiceReferencingComponentDTO; -import org.apache.nifi.web.api.dto.DropRequestDTO; -import org.apache.nifi.web.api.dto.ReportingTaskDTO; -import org.apache.nifi.web.api.entity.ControllerServiceEntity; -import org.apache.nifi.web.api.entity.ControllerServiceReferencingComponentsEntity; -import org.apache.nifi.web.api.entity.ControllerServicesEntity; -import org.apache.nifi.web.api.entity.DropRequestEntity; -import org.apache.nifi.web.api.entity.ReportingTaskEntity; -import org.apache.nifi.web.api.entity.ReportingTasksEntity; /** * Provides a cluster manager implementation. The manager federates incoming HTTP client requests to the nodes' external API using the HTTP protocol. The manager also communicates with nodes using the @@ -319,6 +327,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C public static final String REPORTING_TASKS_URI = "/nifi-api/controller/reporting-tasks/node"; public static final Pattern REPORTING_TASK_URI_PATTERN = Pattern.compile("/nifi-api/controller/reporting-tasks/node/[a-f0-9\\-]{36}"); + @Deprecated public static final Pattern QUEUE_CONTENTS_URI = Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/connections/[a-f0-9\\-]{36}/contents"); public static final Pattern DROP_REQUEST_URI = Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/connections/[a-f0-9\\-]{36}/drop-requests/[a-f0-9\\-]{36}"); public static final Pattern LIST_FLOWFILES_URI = Pattern @@ -2831,6 +2840,64 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C return normalizedValidationErrors; } + + /** + * Merges the listing requests in the specified map into the specified listing request + * + * @param listingRequest the target listing request + * @param listingRequestMap the mapping of all responses being merged + */ + private void mergeListingRequests(final ListingRequestDTO listingRequest, final Map<NodeIdentifier, ListingRequestDTO> listingRequestMap) { + final Comparator<FlowFileSummaryDTO> comparator = FlowFileSummaries.createDTOComparator( + SortColumn.valueOf(listingRequest.getSortColumn()), SortDirection.valueOf(listingRequest.getSortDirection())); + + final NavigableSet<FlowFileSummaryDTO> flowFileSummaries = new TreeSet<>(comparator); + + ListFlowFileState state = null; + int sumOfPercents = 0; + boolean finished = true; + for (final ListingRequestDTO nodeRequest : listingRequestMap.values()) { + Integer percentComplete = nodeRequest.getPercentCompleted(); + if (percentComplete != null) { + sumOfPercents += percentComplete; + } + + if (!nodeRequest.getFinished()) { + finished = false; + } + + if (nodeRequest.getLastUpdated().after(listingRequest.getLastUpdated())) { + listingRequest.setLastUpdated(nodeRequest.getLastUpdated()); + } + + // Keep the state with the lowest ordinal value (the "least completed"). + final ListFlowFileState nodeState = ListFlowFileState.valueOfDescription(nodeRequest.getState()); + if (state == null || state.compareTo(nodeState) > 0) { + state = nodeState; + } + + for (final FlowFileSummaryDTO summaryDTO : nodeRequest.getFlowFileSummaries()) { + flowFileSummaries.add(summaryDTO); + + // Keep the set from growing beyond our max + if (flowFileSummaries.size() > listingRequest.getMaxResults()) { + flowFileSummaries.pollLast(); + } + } + + if (nodeRequest.getFailureReason() != null) { + listingRequest.setFailureReason(nodeRequest.getFailureReason()); + } + } + + final List<FlowFileSummaryDTO> summaryDTOs = new ArrayList<>(flowFileSummaries); + listingRequest.setFlowFileSummaries(summaryDTOs); + + final int percentCompleted = sumOfPercents / listingRequestMap.size(); + listingRequest.setPercentCompleted(percentCompleted); + listingRequest.setFinished(finished); + } + /** * Merges the drop requests in the specified map into the specified drop request. * @@ -3316,7 +3383,23 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C clientResponse = new NodeResponse(clientResponse, responseEntity); } else if (hasSuccessfulClientResponse && isListFlowFilesEndpoint(uri, method)) { - // TODO: IMPLEMENT + final ListingRequestEntity responseEntity = clientResponse.getClientResponse().getEntity(ListingRequestEntity.class); + final ListingRequestDTO listingRequest = responseEntity.getListingRequest(); + + final Map<NodeIdentifier, ListingRequestDTO> resultsMap = new HashMap<>(); + for (final NodeResponse nodeResponse : updatedNodesMap.values()) { + if (problematicNodeResponses.contains(nodeResponse)) { + continue; + } + + final ListingRequestEntity nodeResponseEntity = nodeResponse == clientResponse ? responseEntity : nodeResponse.getClientResponse().getEntity(ListingRequestEntity.class); + final ListingRequestDTO nodeListingRequest = nodeResponseEntity.getListingRequest(); + + resultsMap.put(nodeResponse.getNodeId(), nodeListingRequest); + } + mergeListingRequests(listingRequest, resultsMap); + + clientResponse = new NodeResponse(clientResponse, responseEntity); } else { if (!nodeResponsesToDrain.isEmpty()) { drainResponses(nodeResponsesToDrain); http://git-wip-us.apache.org/repos/asf/nifi/blob/9408507e/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/queue/ListFlowFileRequest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/queue/ListFlowFileRequest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/queue/ListFlowFileRequest.java index e83fd80..aad4c4f 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/queue/ListFlowFileRequest.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/queue/ListFlowFileRequest.java @@ -23,6 +23,7 @@ import java.util.List; public class ListFlowFileRequest implements ListFlowFileStatus { private final String requestId; + private final int maxResults; private final QueueSize queueSize; private final SortColumn sortColumn; private final SortDirection sortDirection; @@ -32,13 +33,14 @@ public class ListFlowFileRequest implements ListFlowFileStatus { private ListFlowFileState state; private String failureReason; private int numSteps; - private int numCompletedSteps; + private int completedStepCount; private long lastUpdated = System.currentTimeMillis(); - public ListFlowFileRequest(final String requestId, final SortColumn sortColumn, final SortDirection sortDirection, final QueueSize queueSize, final int numSteps) { + public ListFlowFileRequest(final String requestId, final SortColumn sortColumn, final SortDirection sortDirection, final int maxResults, final QueueSize queueSize, final int numSteps) { this.requestId = requestId; this.sortColumn = sortColumn; this.sortDirection = sortDirection; + this.maxResults = maxResults; this.queueSize = queueSize; this.numSteps = numSteps; } @@ -94,11 +96,10 @@ public class ListFlowFileRequest implements ListFlowFileStatus { return Collections.unmodifiableList(flowFileSummaries); } - public synchronized void addFlowFileSummaries(final List<FlowFileSummary> summaries) { - // TODO: Implement. - + public synchronized void setFlowFileSummaries(final List<FlowFileSummary> summaries) { + this.flowFileSummaries.clear(); + this.flowFileSummaries.addAll(summaries); lastUpdated = System.currentTimeMillis(); - this.numCompletedSteps++; } @Override @@ -117,6 +118,25 @@ public class ListFlowFileRequest implements ListFlowFileStatus { @Override public synchronized int getCompletionPercentage() { - return (int) (100F * numCompletedSteps / numSteps); + return (int) (100F * completedStepCount / numSteps); + } + + public synchronized void setCompletedStepCount(final int completedStepCount) { + this.completedStepCount = completedStepCount; + } + + @Override + public int getMaxResults() { + return maxResults; + } + + @Override + public int getTotalStepCount() { + return numSteps; + } + + @Override + public int getCompletedStepCount() { + return completedStepCount; } } http://git-wip-us.apache.org/repos/asf/nifi/blob/9408507e/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowFileSummaries.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowFileSummaries.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowFileSummaries.java new file mode 100644 index 0000000..5a0a3ab --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowFileSummaries.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller; + +import java.util.Collections; +import java.util.Comparator; + +import org.apache.nifi.controller.queue.FlowFileSummary; +import org.apache.nifi.controller.queue.SortColumn; +import org.apache.nifi.controller.queue.SortDirection; +import org.apache.nifi.web.api.dto.FlowFileSummaryDTO; + +public class FlowFileSummaries { + + public static Comparator<FlowFileSummary> createComparator(final SortColumn column, final SortDirection direction) { + final Comparator<FlowFileSummary> comparator = new Comparator<FlowFileSummary>() { + @Override + public int compare(final FlowFileSummary o1, final FlowFileSummary o2) { + switch (column) { + case FILENAME: + return o1.getFilename().compareTo(o2.getFilename()); + case FLOWFILE_AGE: + return Long.compare(o1.getLineageStartDate(), o2.getLineageStartDate()); + case FLOWFILE_SIZE: + return Long.compare(o1.getSize(), o2.getSize()); + case FLOWFILE_UUID: + return o1.getUuid().compareTo(o2.getUuid()); + case PENALIZATION: + return Boolean.compare(o1.isPenalized(), o2.isPenalized()); + case QUEUE_POSITION: + return Long.compare(o1.getPosition(), o2.getPosition()); + case QUEUED_DURATION: + return Long.compare(o1.getLastQueuedTime(), o2.getLastQueuedTime()); + } + + return 0; + } + }; + + + if (direction == SortDirection.DESCENDING) { + return Collections.reverseOrder(comparator); + } else { + return comparator; + } + } + + public static Comparator<FlowFileSummaryDTO> createDTOComparator(final SortColumn column, final SortDirection direction) { + final Comparator<FlowFileSummaryDTO> comparator = new Comparator<FlowFileSummaryDTO>() { + @Override + public int compare(final FlowFileSummaryDTO o1, final FlowFileSummaryDTO o2) { + switch (column) { + case FILENAME: + return o1.getFilename().compareTo(o2.getFilename()); + case FLOWFILE_AGE: + return o1.getLinageStartDate().compareTo(o2.getLinageStartDate()); + case FLOWFILE_SIZE: + return Long.compare(o1.getSize(), o2.getSize()); + case FLOWFILE_UUID: + return o1.getUuid().compareTo(o2.getUuid()); + case PENALIZATION: + return Boolean.compare(o1.getPenalized(), o2.getPenalized()); + case QUEUE_POSITION: + return Long.compare(o1.getPosition(), o2.getPosition()); + case QUEUED_DURATION: + return o1.getLastQueuedTime().compareTo(o2.getLastQueuedTime()); + } + + return 0; + } + }; + + if (direction == SortDirection.DESCENDING) { + return Collections.reverseOrder(comparator); + } else { + return comparator; + } + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/9408507e/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java index 9935ba4..daaa763 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java @@ -894,12 +894,13 @@ public final class StandardFlowFileQueue implements FlowFileQueue { final int numSteps; readLock.lock(); try { - numSteps = 1 + swapLocations.size(); + // numSteps = 1 for each swap location + 1 for active queue + 1 for swap queue. + numSteps = 2 + swapLocations.size(); } finally { readLock.unlock("listFlowFiles"); } - final ListFlowFileRequest listRequest = new ListFlowFileRequest(requestIdentifier, sortColumn, direction, size(), numSteps); + final ListFlowFileRequest listRequest = new ListFlowFileRequest(requestIdentifier, sortColumn, direction, maxResults, size(), numSteps); final Thread t = new Thread(new Runnable() { @Override @@ -907,6 +908,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue { int position = 0; int resultCount = 0; final List<FlowFileSummary> summaries = new ArrayList<>(activeQueue.size()); + int completedStepCount = 0; // we need a write lock while using the Active Queue because we can't iterate over it - we have to poll from it // continually. This is because the iterator for PriorityQueue does not iterate over the elements in any particular @@ -935,6 +937,8 @@ public final class StandardFlowFileQueue implements FlowFileQueue { writeLock.unlock("List FlowFiles"); } + listRequest.setCompletedStepCount(++completedStepCount); + position = activeQueue.size(); sourceLoop: while (resultCount < maxResults) { try { @@ -954,6 +958,8 @@ public final class StandardFlowFileQueue implements FlowFileQueue { } } } + + listRequest.setCompletedStepCount(++completedStepCount); } for (final FlowFileRecord flowFile : swapQueue) { @@ -966,6 +972,8 @@ public final class StandardFlowFileQueue implements FlowFileQueue { } } } + + listRequest.setCompletedStepCount(++completedStepCount); } finally { readLock.unlock("List FlowFiles"); } @@ -974,6 +982,8 @@ public final class StandardFlowFileQueue implements FlowFileQueue { listRequest.setFailure("Could not read FlowFiles from queue. Check log files for more details."); } } + + listRequest.setFlowFileSummaries(summaries); } }, "List FlowFiles for Connection " + getIdentifier()); t.setDaemon(true); @@ -1018,7 +1028,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue { } @Override - public long lastQueuedTime() { + public long getLastQueuedTime() { return lastQueuedTime; } http://git-wip-us.apache.org/repos/asf/nifi/blob/9408507e/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java index b93ff95..0805b54 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java @@ -360,10 +360,14 @@ public final class DtoFactory { dto.setState(listingRequest.getState().toString()); dto.setFailureReason(listingRequest.getFailureReason()); dto.setFinished(isListingRequestComplete(listingRequest.getState())); + dto.setMaxResults(listingRequest.getMaxResults()); + dto.setSortColumn(listingRequest.getSortColumn().name()); + dto.setSortDirection(listingRequest.getSortDirection().name()); + dto.setTotalStepCount(listingRequest.getTotalStepCount()); + dto.setCompletedStepCount(listingRequest.getCompletedStepCount()); + dto.setPercentCompleted(listingRequest.getCompletionPercentage()); if (isListingRequestComplete(listingRequest.getState())) { - dto.setPercentCompleted(100); - final List<FlowFileSummary> flowFileSummaries = listingRequest.getFlowFileSummaries(); if (flowFileSummaries != null) { final List<FlowFileSummaryDTO> summaryDtos = new ArrayList<>(flowFileSummaries.size()); @@ -372,8 +376,6 @@ public final class DtoFactory { } dto.setFlowFileSummaries(summaryDtos); } - } else { - dto.setPercentCompleted(50); } return dto; @@ -383,7 +385,7 @@ public final class DtoFactory { final FlowFileSummaryDTO dto = new FlowFileSummaryDTO(); dto.setUuid(summary.getUuid()); dto.setFilename(summary.getFilename()); - dto.setLastQueuedTime(new Date(summary.lastQueuedTime())); + dto.setLastQueuedTime(new Date(summary.getLastQueuedTime())); dto.setLinageStartDate(new Date(summary.getLineageStartDate())); dto.setPenalized(summary.isPenalized()); dto.setPosition(summary.getPosition()); http://git-wip-us.apache.org/repos/asf/nifi/blob/9408507e/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectionDAO.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectionDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectionDAO.java index 0e9a90a..459c2b5 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectionDAO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectionDAO.java @@ -342,7 +342,7 @@ public class StandardConnectionDAO extends ComponentDAO implements ConnectionDAO public ListFlowFileStatus createFlowFileListingRequest(String groupId, String id, String listingRequestId) { final Connection connection = locateConnection(groupId, id); final FlowFileQueue queue = connection.getFlowFileQueue(); - return queue.listFlowFiles(listingRequestId); + return queue.listFlowFiles(listingRequestId, 100); } @Override
