NIFI-108: Built out 'skeleton' of the requests so that web tier can be written against it
NIFI-108: Implementing ability to list FlowFiles in a queue NIFI-108: - Starting to add support for endpoints that will listing flowfiles in a queue. NIFI-108: Added merging of response for listing of flowfiles in cluster manager NIFI-108: - Starting to add support for endpoints that will listing flowfiles in a queue. NIFI-108: - Starting to add support for endpoints that will listing flowfiles in a queue. NIFI-108: - Adding checkstyle issues. NIFI-108: Add clusterNodeId to FlowFileSummaryDTO NIFI-108: Added unit tests; added verifyCanList method to queue; fixed bugs NIFI-108: - Adding compilation error for IOException from getFlowFile(). - Code clean up. - Javadocs. NIFI-108: - Verifying two phase commit for queue listing. - Fixing checkstyle. - Ensuring drop and listing requests are merged when created when clustered. NIFI-108: - Adding initial listing capabilities. - Passing through the sort column and direction. NIFI-108: - Removing Delete FlowFile button. - Ensuring sort flags are being passed correctly. - Setting column widths. - Also including the cluster node address in the flowfile summaries. NIFI-108: - Including queue size statistics in listing request. - Showing connection name. NIFI-108: - Including queue size statistics in listing request. - Ensuring verifyCanList runs when appropriate. NIFI-108: - Adding initial support for viewing flowfile details dialog. - Adding initial support for click to content. NIFI-108: - Allowing the flowfile details dialog to be draggable. NIFI-108: - Only showing the flowfile listing table when the listing is successful and the listing is not empty. NIFI-108: - Reseting the queue stats when closing the listing table. NIFI-108: Implemented sorting when performing listing of FlowFiles NIFI-108: Fixed bug that caused the listFlowFiles operation to wait on a readLock before returning and performing work asynchronously; fixed bug in Write-Ahead FlowFile Repository that caused ContentClaims to be queued up for destruction instead of ResourceClaims - this caused millions of ContentClaims to be queued up instead of a single ResourceClaim in some tests NIFI-108: - Ensured the column sort indicator is reset when a new listing is opened. - Removing unused import. NIFI-108: - Addressed issues found during the review. Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/b330fd16 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/b330fd16 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/b330fd16 Branch: refs/heads/master Commit: b330fd160182a28a9c9d469f1ac081d71a6b8894 Parents: de2dd93 Author: Mark Payne <[email protected]> Authored: Wed Dec 16 14:03:36 2015 -0500 Committer: Matt Gilman <[email protected]> Committed: Wed Jan 6 16:20:23 2016 -0500 ---------------------------------------------------------------------- .../nifi/controller/queue/FlowFileQueue.java | 98 +++ .../nifi/controller/queue/FlowFileSummary.java | 59 ++ .../controller/queue/ListFlowFileState.java | 57 ++ .../controller/queue/ListFlowFileStatus.java | 90 +++ .../nifi/controller/queue/SortColumn.java | 108 ++++ .../nifi/controller/queue/SortDirection.java | 37 ++ nifi-docs/src/main/asciidoc/user-guide.adoc | 14 + .../apache/nifi/web/api/dto/DropRequestDTO.java | 11 +- .../apache/nifi/web/api/dto/FlowFileDTO.java | 133 ++++ .../nifi/web/api/dto/FlowFileSummaryDTO.java | 178 +++++ .../nifi/web/api/dto/ListingRequestDTO.java | 252 ++++++++ .../apache/nifi/web/api/dto/QueueSizeDTO.java | 59 ++ .../nifi/web/api/entity/FlowFileEntity.java | 44 ++ .../web/api/entity/ListingRequestEntity.java | 44 ++ .../cluster/manager/impl/WebClusterManager.java | 145 ++++- .../controller/queue/ListFlowFileRequest.java | 142 ++++ .../apache/nifi/controller/FlowController.java | 40 ++ .../nifi/controller/FlowFileSummaries.java | 95 +++ .../nifi/controller/StandardFlowFileQueue.java | 376 +++++++++-- .../WriteAheadFlowFileRepository.java | 33 +- .../controller/TestStandardFlowFileQueue.java | 206 +++++- .../org/apache/nifi/web/NiFiServiceFacade.java | 65 ++ .../nifi/web/StandardNiFiContentAccess.java | 52 +- .../nifi/web/StandardNiFiServiceFacade.java | 37 +- .../apache/nifi/web/api/ConnectionResource.java | 643 +++++++++++++++++-- .../org/apache/nifi/web/api/dto/DtoFactory.java | 114 ++++ .../org/apache/nifi/web/dao/ConnectionDAO.java | 68 +- .../web/dao/impl/StandardConnectionDAO.java | 137 +++- .../src/main/resources/nifi-web-api-context.xml | 1 + .../nifi-framework/nifi-web/nifi-web-ui/pom.xml | 2 + .../main/resources/filters/canvas.properties | 1 + .../src/main/webapp/WEB-INF/pages/canvas.jsp | 4 + .../partials/canvas/flowfile-details-dialog.jsp | 114 ++++ .../canvas/listing-request-status-dialog.jsp | 29 + .../WEB-INF/partials/canvas/queue-listing.jsp | 29 + .../nifi-web-ui/src/main/webapp/css/canvas.css | 1 + .../nifi-web-ui/src/main/webapp/css/dialog.css | 21 +- .../src/main/webapp/css/queue-listing.css | 215 +++++++ .../src/main/webapp/images/iconListQueue.png | Bin 0 -> 1502 bytes .../src/main/webapp/js/nf/canvas/nf-actions.js | 25 +- .../src/main/webapp/js/nf/canvas/nf-canvas.js | 6 + .../main/webapp/js/nf/canvas/nf-context-menu.js | 10 + .../webapp/js/nf/canvas/nf-queue-listing.js | 570 ++++++++++++++++ .../src/main/webapp/js/nf/nf-common.js | 10 + .../js/nf/provenance/nf-provenance-table.js | 12 +- 45 files changed, 4208 insertions(+), 179 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/b330fd16/nifi-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java b/nifi-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java index 727c755..0d0f03f 100644 --- a/nifi-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java +++ b/nifi-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java @@ -16,6 +16,7 @@ */ package org.apache.nifi.controller.queue; +import java.io.IOException; import java.util.Collection; import java.util.List; import java.util.Queue; @@ -180,6 +181,7 @@ public interface FlowFileQueue { * passed to the {@link #getDropFlowFileStatus(String)} and {@link #cancelDropFlowFileStatus(String)} * methods in order to obtain the status later or cancel a request * + * @param requestIdentifier the identifier of the Drop FlowFile Request * @param requestor the entity that is requesting that the FlowFiles be dropped; this will be * included in the Provenance Events that are generated. * @@ -207,4 +209,100 @@ public interface FlowFileQueue { * request status exists with that identifier */ DropFlowFileStatus cancelDropFlowFileRequest(String requestIdentifier); + + /** + * Initiates a request to obtain a listing of FlowFiles in this queue. This method returns a + * ListFlowFileStatus that can be used to obtain information about the FlowFiles that exist + * within the queue. Additionally, the ListFlowFileStatus provides a request identifier that + * can then be passed to the {@link #getListFlowFileStatus(String)}. The listing of FlowFiles + * will be returned ordered by the position of the FlowFile in the queue. + * + * @param requestIdentifier the identifier of the List FlowFile Request + * @param maxResults the maximum number of FlowFileSummary objects to add to the ListFlowFileStatus + * + * @return the status for the request + * + * @throws IllegalStateException if either the source or the destination of the connection to which this queue belongs + * is currently running. + */ + ListFlowFileStatus listFlowFiles(String requestIdentifier, int maxResults); + + /** + * Initiates a request to obtain a listing of FlowFiles in this queue. This method returns a + * ListFlowFileStatus that can be used to obtain information about the FlowFiles that exist + * within the queue. Additionally, the ListFlowFileStatus provides a request identifier that + * can then be passed to the {@link #getListFlowFileStatus(String)} + * + * @param requestIdentifier the identifier of the List FlowFile Request + * @param maxResults the maximum number of FlowFileSummary objects to add to the ListFlowFileStatus + * @param sortColumn specifies which column to sort on + * @param direction specifies which direction to sort the FlowFiles + * + * @return the status for the request + * + * @throws IllegalStateException if either the source or the destination of the connection to which this queue belongs + * is currently running. + */ + ListFlowFileStatus listFlowFiles(String requestIdentifier, int maxResults, SortColumn sortColumn, SortDirection direction); + + /** + * Initiates a request to obtain a listing of FlowFiles in this queue. This method returns a + * ListFlowFileStatus that can be used to obtain information about the FlowFiles that exist + * within the queue. Additionally, the ListFlowFileStatus provides a request identifier that + * can then be passed to the {@link #getListFlowFileStatus(String)} + * + * @param requestIdentifier the identifier of the List FlowFile Request + * @param maxResults the maximum number of FlowFileSummary objects to add to the ListFlowFileStatus + * @param query an Expression Language expression that will be evaluated against all FlowFiles. Only FlowFiles that satisfy the expression will + * be included in the results. The expression must be a valid expression and return a Boolean type + * @param sortColumn specifies which column to sort on + * @param direction specifies which direction to sort the FlowFiles + * + * @return the status for the request + * + * @throws IllegalStateException if either the source or the destination of the connection to which this queue belongs + * is currently running. + * @throws IllegalArgumentException if query is not a valid Expression Language expression or does not return a boolean type + */ + ListFlowFileStatus listFlowFiles(String requestIdentifier, int maxResults, String query, SortColumn sortColumn, SortDirection direction); + + /** + * Returns the current status of a List FlowFile Request that was initiated via the {@link #listFlowFiles(String)} + * method that has the given identifier + * + * @param requestIdentifier the identifier of the Drop FlowFile Request + * @return the current status of the List FlowFile Request with the given identifier or <code>null</code> if no + * request status exists with that identifier + */ + ListFlowFileStatus getListFlowFileStatus(String requestIdentifier); + + /** + * Cancels the request to list FlowFiles that has the given identifier. After this method is called, the request + * will no longer be known by this queue, so subsequent calls to {@link #getListFlowFileStatus(String)} or + * {@link #cancelListFlowFileRequest(String)} will return <code>null</code> + * + * @param requestIdentifier the identifier of the Drop FlowFile Request + * @return the current status of the List FlowFile Request with the given identifier or <code>null</code> if no + * request status exists with that identifier + */ + ListFlowFileStatus cancelListFlowFileRequest(String requestIdentifier); + + /** + * Returns the FlowFile with the given UUID or <code>null</code> if no FlowFile can be found in this queue + * with the given UUID + * + * @param flowFileUuid the UUID of the FlowFile to retrieve + * @return the FlowFile with the given UUID or <code>null</code> if no FlowFile can be found in this queue + * with the given UUID + * + * @throws IOException if unable to read FlowFiles that are stored on some external device + */ + FlowFileRecord getFlowFile(String flowFileUuid) throws IOException; + + /** + * Ensures that a listing can be performed on the queue + * + * @throws IllegalStateException if the queue is not in a state in which a listing can be performed + */ + void verifyCanList() throws IllegalStateException; } http://git-wip-us.apache.org/repos/asf/nifi/blob/b330fd16/nifi-api/src/main/java/org/apache/nifi/controller/queue/FlowFileSummary.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/queue/FlowFileSummary.java b/nifi-api/src/main/java/org/apache/nifi/controller/queue/FlowFileSummary.java new file mode 100644 index 0000000..b7207f2 --- /dev/null +++ b/nifi-api/src/main/java/org/apache/nifi/controller/queue/FlowFileSummary.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.queue; + +/** + * A summary of a FlowFile that can be used to represent a "high level" view of FlowFile + * without providing all of the information available. + */ +public interface FlowFileSummary { + /** + * @return the UUID of the FlowFile + */ + String getUuid(); + + /** + * @return the value of the 'filename' attribute + */ + String getFilename(); + + /** + * @return the current position of the FlowFile in the queue based on the prioritizers selected + */ + int getPosition(); + + /** + * @return the size of the FlowFile in bytes + */ + long getSize(); + + /** + * @return the timestamp (in milliseconds since epoch) at which the FlowFile was added to the queue + */ + long getLastQueuedTime(); + + /** + * @return the timestamp (in milliseconds since epoch) at which the FlowFile's greatest ancestor entered the flow + */ + long getLineageStartDate(); + + /** + * @return <code>true</code> if the FlowFile is penalized, <code>false</code> otherwise + */ + boolean isPenalized(); +} http://git-wip-us.apache.org/repos/asf/nifi/blob/b330fd16/nifi-api/src/main/java/org/apache/nifi/controller/queue/ListFlowFileState.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/queue/ListFlowFileState.java b/nifi-api/src/main/java/org/apache/nifi/controller/queue/ListFlowFileState.java new file mode 100644 index 0000000..eb417aa --- /dev/null +++ b/nifi-api/src/main/java/org/apache/nifi/controller/queue/ListFlowFileState.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.queue; + +/** + * Represents the state that a List FlowFile Request is in + */ +public enum ListFlowFileState { + WAITING_FOR_LOCK("Waiting for other queue requests to complete"), + CALCULATING_LIST("Calculating list of FlowFiles"), + FAILURE("Failed"), + CANCELED("Canceled by user"), + COMPLETE("Completed successfully"); + + private final String description; + + private ListFlowFileState(final String description) { + this.description = description; + } + + @Override + public String toString() { + return description; + } + + /** + * @param description string form of list flow file state + * @return the matching ListFlowFileState or <code>null</code> if the description doesn't match + */ + public static ListFlowFileState valueOfDescription(String description) { + ListFlowFileState desiredState = null; + + for (ListFlowFileState state : values()) { + if (state.toString().equals(description)) { + desiredState = state; + break; + } + } + + return desiredState; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/b330fd16/nifi-api/src/main/java/org/apache/nifi/controller/queue/ListFlowFileStatus.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/queue/ListFlowFileStatus.java b/nifi-api/src/main/java/org/apache/nifi/controller/queue/ListFlowFileStatus.java new file mode 100644 index 0000000..cae500d --- /dev/null +++ b/nifi-api/src/main/java/org/apache/nifi/controller/queue/ListFlowFileStatus.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.queue; + +import java.util.List; + +public interface ListFlowFileStatus { + + /** + * @return the maximum number of FlowFile Summary objects that should be returned + */ + int getMaxResults(); + + /** + * @return the identifier of the request to drop FlowFiles from the queue + */ + String getRequestIdentifier(); + + /** + * @return the date/time (in milliseconds since epoch) at which the request to + * drop the FlowFiles from a queue was submitted + */ + long getRequestSubmissionTime(); + + /** + * @return the date/time (in milliseconds since epoch) at which the status of the + * request was last updated + */ + long getLastUpdated(); + + /** + * @return the column on which the listing is sorted + */ + SortColumn getSortColumn(); + + /** + * @return the direction in which the FlowFiles are sorted + */ + SortDirection getSortDirection(); + + /** + * @return the current state of the operation + */ + ListFlowFileState getState(); + + /** + * @return the reason that the state is set to a Failure state, or <code>null</code> if the state is not {@link ListFlowFileStatus#FAILURE}. + */ + String getFailureReason(); + + /** + * @return the current size of the queue + */ + QueueSize getQueueSize(); + + /** + * @return a List of FlowFileSummary objects + */ + List<FlowFileSummary> getFlowFileSummaries(); + + /** + * @return the percentage (an integer between 0 and 100, inclusive) of how close the request is to being completed + */ + int getCompletionPercentage(); + + /** + * @return the total number of steps that are required in order to finish the listing + */ + int getTotalStepCount(); + + /** + * @return the total number of steps that have already been completed. The value returned will be >= 0 and <= the result of calling {@link #getTotalStepCount()}. + */ + int getCompletedStepCount(); +} http://git-wip-us.apache.org/repos/asf/nifi/blob/b330fd16/nifi-api/src/main/java/org/apache/nifi/controller/queue/SortColumn.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/queue/SortColumn.java b/nifi-api/src/main/java/org/apache/nifi/controller/queue/SortColumn.java new file mode 100644 index 0000000..30d285c --- /dev/null +++ b/nifi-api/src/main/java/org/apache/nifi/controller/queue/SortColumn.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.queue; + +import java.util.Comparator; + +/** + * Specifies which column to sort on when performing a Listing of FlowFiles via + * {@link FlowFileQueue#listFlowFiles(String, SortColumn, SortDirection)} + */ +public enum SortColumn implements Comparator<FlowFileSummary> { + /** + * Sort based on the current position in the queue + */ + QUEUE_POSITION (new Comparator<FlowFileSummary>() { + @Override + public int compare(final FlowFileSummary o1, final FlowFileSummary o2) { + return Integer.compare(o1.getPosition(), o2.getPosition()); + } + }), + + /** + * Sort based on the UUID of the FlowFile + */ + FLOWFILE_UUID (new Comparator<FlowFileSummary>() { + @Override + public int compare(final FlowFileSummary o1, final FlowFileSummary o2) { + return o1.getUuid().compareTo(o2.getUuid()); + } + }), + + /** + * Sort based on the 'filename' attribute of the FlowFile + */ + FILENAME (new Comparator<FlowFileSummary>() { + @Override + public int compare(final FlowFileSummary o1, final FlowFileSummary o2) { + return o1.getFilename().compareTo(o2.getFilename()); + } + }), + + /** + * Sort based on the size of the FlowFile + */ + FLOWFILE_SIZE(new Comparator<FlowFileSummary>() { + @Override + public int compare(final FlowFileSummary o1, final FlowFileSummary o2) { + return Long.compare(o1.getSize(), o2.getSize()); + } + }), + + /** + * Sort based on how long the FlowFile has been sitting in the queue + */ + QUEUED_DURATION (new Comparator<FlowFileSummary>() { + @Override + public int compare(final FlowFileSummary o1, final FlowFileSummary o2) { + return -Long.compare(o1.getLastQueuedTime(), o2.getLastQueuedTime()); + } + }), + + /** + * Sort based on the age of the FlowFile. I.e., the time at which the FlowFile's + * "greatest ancestor" entered the flow + */ + FLOWFILE_AGE (new Comparator<FlowFileSummary>() { + @Override + public int compare(final FlowFileSummary o1, final FlowFileSummary o2) { + return Long.compare(o1.getLineageStartDate(), o2.getLineageStartDate()); + } + }), + + /** + * Sort based on when the FlowFile's penalization ends + */ + PENALIZATION (new Comparator<FlowFileSummary>() { + @Override + public int compare(final FlowFileSummary o1, final FlowFileSummary o2) { + return Boolean.compare(o1.isPenalized(), o2.isPenalized()); + } + }); + + private final Comparator<FlowFileSummary> comparator; + + private SortColumn(final Comparator<FlowFileSummary> comparator) { + this.comparator = comparator; + } + + @Override + public int compare(final FlowFileSummary o1, final FlowFileSummary o2) { + return comparator.compare(o1, o2); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/b330fd16/nifi-api/src/main/java/org/apache/nifi/controller/queue/SortDirection.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/queue/SortDirection.java b/nifi-api/src/main/java/org/apache/nifi/controller/queue/SortDirection.java new file mode 100644 index 0000000..129e748 --- /dev/null +++ b/nifi-api/src/main/java/org/apache/nifi/controller/queue/SortDirection.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.queue; + +/** + * Specifies the order in which FlowFiles should be sorted when performing a listing of + * FlowFiles via the {@link FlowFileQueue#listFlowFiles(String, SortColumn, SortDirection)} + * method + */ +public enum SortDirection { + /** + * FlowFiles should be sorted such that the FlowFile with the lowest value for the Sort Column + * should occur first in the listing. + */ + ASCENDING, + + /** + * FlowFiles should be sorted such that the FlowFile with the largest value for the Sort Column + * should occur first in the listing. + */ + DESCENDING; +} http://git-wip-us.apache.org/repos/asf/nifi/blob/b330fd16/nifi-docs/src/main/asciidoc/user-guide.adoc ---------------------------------------------------------------------- diff --git a/nifi-docs/src/main/asciidoc/user-guide.adoc b/nifi-docs/src/main/asciidoc/user-guide.adoc index 64421d7..3397e88 100644 --- a/nifi-docs/src/main/asciidoc/user-guide.adoc +++ b/nifi-docs/src/main/asciidoc/user-guide.adoc @@ -1252,6 +1252,20 @@ image:iconNotSecure.png["Not Secure"] +[[Queue_Listing]] +=== Listing FlowFiles in a Queue + +The FlowFiles enqueued in a Connection can be viewed when necessary. The Queue listing is opened via a menu item in +a Connection's context menu. This option is only available when the source and destination of the Connection have +been stopped and all active threads have completed. The listing will return the top 100 FlowFiles according to +the currently sorted column. + +Additionally, details for a Flowfile in the listing can be viewed by clicking on the Details icon ( +image:iconDetails.png["Details"] +) in the left most column. From here, the FlowFile details and attributes are available as well buttons for +downloading or viewing the content. Viewing the content is only available if the nifi.content.viewer.url has been configured. + + [[Summary_Page]] === Summary Page http://git-wip-us.apache.org/repos/asf/nifi/blob/b330fd16/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/DropRequestDTO.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/DropRequestDTO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/DropRequestDTO.java index c0b94a1..8879054 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/DropRequestDTO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/DropRequestDTO.java @@ -51,12 +51,12 @@ public class DropRequestDTO { private String state; /** - * The id for this component. + * The id for this drop request. * * @return The id */ @ApiModelProperty( - value = "The id of the component." + value = "The id for this drop request." ) public String getId() { return this.id; @@ -67,12 +67,12 @@ public class DropRequestDTO { } /** - * The uri for linking to this component in this NiFi. + * The uri for linking to this drop request in this NiFi. * * @return The uri */ @ApiModelProperty( - value = "The URI for futures requests to the component." + value = "The URI for future requests to this drop request." ) public String getUri() { return uri; @@ -128,6 +128,9 @@ public class DropRequestDTO { /** * @return the reason, if any, that this drop request failed */ + @ApiModelProperty( + value = "The reason, if any, that this drop request failed." + ) public String getFailureReason() { return failureReason; } http://git-wip-us.apache.org/repos/asf/nifi/blob/b330fd16/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/FlowFileDTO.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/FlowFileDTO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/FlowFileDTO.java new file mode 100644 index 0000000..6e02832 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/FlowFileDTO.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.web.api.dto; + +import com.wordnik.swagger.annotations.ApiModelProperty; + +import javax.xml.bind.annotation.XmlType; +import java.util.Map; + +@XmlType(name = "flowFile") +public class FlowFileDTO extends FlowFileSummaryDTO { + + private Map<String, String> attributes; + + private String contentClaimSection; + private String contentClaimContainer; + private String contentClaimIdentifier; + private Long contentClaimOffset; + private String contentClaimFileSize; + private Long contentClaimFileSizeBytes; + + /** + * @return the FlowFile attributes + */ + @ApiModelProperty( + value = "The FlowFile attributes." + ) + public Map<String, String> getAttributes() { + return attributes; + } + + public void setAttributes(Map<String, String> attributes) { + this.attributes = attributes; + } + + /** + * @return the Section in which the Content Claim lives, or <code>null</code> if no Content Claim exists + */ + @ApiModelProperty( + value = "The section in which the content claim lives." + ) + public String getContentClaimSection() { + return contentClaimSection; + } + + public void setContentClaimSection(String contentClaimSection) { + this.contentClaimSection = contentClaimSection; + } + + /** + * @return the Container in which the Content Claim lives, or <code>null</code> if no Content Claim exists + */ + @ApiModelProperty( + value = "The container in which the content claim lives." + ) + public String getContentClaimContainer() { + return contentClaimContainer; + } + + public void setContentClaimContainer(String contentClaimContainer) { + this.contentClaimContainer = contentClaimContainer; + } + + /** + * @return the Identifier of the Content Claim, or <code>null</code> if no Content Claim exists + */ + @ApiModelProperty( + value = "The identifier of the content claim." + ) + public String getContentClaimIdentifier() { + return contentClaimIdentifier; + } + + public void setContentClaimIdentifier(String contentClaimIdentifier) { + this.contentClaimIdentifier = contentClaimIdentifier; + } + + /** + * @return the offset into the the Content Claim where the FlowFile's content begins, or <code>null</code> if no Content Claim exists + */ + @ApiModelProperty( + value = "The offset into the content claim where the flowfile's content begins." + ) + public Long getContentClaimOffset() { + return contentClaimOffset; + } + + public void setContentClaimOffset(Long contentClaimOffset) { + this.contentClaimOffset = contentClaimOffset; + } + + /** + * @return the formatted file size of the content claim + */ + @ApiModelProperty( + value = "The file size of the content claim formatted." + ) + public String getContentClaimFileSize() { + return contentClaimFileSize; + } + + public void setContentClaimFileSize(String contentClaimFileSize) { + this.contentClaimFileSize = contentClaimFileSize; + } + + /** + * @return the number of bytes of the content claim + */ + @ApiModelProperty( + value = "The file size of the content claim in bytes." + ) + public Long getContentClaimFileSizeBytes() { + return contentClaimFileSizeBytes; + } + + public void setContentClaimFileSizeBytes(Long contentClaimFileSizeBytes) { + this.contentClaimFileSizeBytes = contentClaimFileSizeBytes; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/b330fd16/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/FlowFileSummaryDTO.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/FlowFileSummaryDTO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/FlowFileSummaryDTO.java new file mode 100644 index 0000000..2a8a1f1 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/FlowFileSummaryDTO.java @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.web.api.dto; + +import com.wordnik.swagger.annotations.ApiModelProperty; + +import javax.xml.bind.annotation.XmlType; + +@XmlType(name = "flowFileSummary") +public class FlowFileSummaryDTO { + + private String uri; + + private String uuid; + private String filename; + private Integer position; + private Long size; + private Long queuedDuration; + private Long lineageDuration; + private Boolean isPenalized; + + private String clusterNodeId; // include when clustered + private String clusterNodeAddress; // include when clustered + + /** + * @return the FlowFile uri + */ + @ApiModelProperty( + value = "The URI that can be used to access this FlowFile." + ) + public String getUri() { + return uri; + } + + public void setUri(String uri) { + this.uri = uri; + } + + /** + * @return the FlowFile uuid + */ + @ApiModelProperty( + value = "The FlowFile UUID." + ) + public String getUuid() { + return uuid; + } + + public void setUuid(String uuid) { + this.uuid = uuid; + } + + /** + * @return the FlowFile filename + */ + @ApiModelProperty( + value = "The FlowFile filename." + ) + public String getFilename() { + return filename; + } + + public void setFilename(String filename) { + this.filename = filename; + } + + /** + * @return the FlowFile's position in the queue. + */ + @ApiModelProperty( + value = "The FlowFile's position in the queue." + ) + public Integer getPosition() { + return position; + } + + public void setPosition(Integer position) { + this.position = position; + } + + /** + * @return the FlowFile file size + */ + @ApiModelProperty( + value = "The FlowFile file size." + ) + public Long getSize() { + return size; + } + + public void setSize(Long size) { + this.size = size; + } + + /** + * @return how long this FlowFile has been enqueued + */ + @ApiModelProperty( + value = "How long this FlowFile has been enqueued." + ) + public Long getQueuedDuration() { + return queuedDuration; + } + + public void setQueuedDuration(Long queuedDuration) { + this.queuedDuration = queuedDuration; + } + + /** + * @return duration since the FlowFile's greatest ancestor entered the flow + */ + @ApiModelProperty( + value = "Duration since the FlowFile's greatest ancestor entered the flow." + ) + public Long getLineageDuration() { + return lineageDuration; + } + + public void setLineageDuration(Long lineageDuration) { + this.lineageDuration = lineageDuration; + } + + /** + * @return if the FlowFile is penalized + */ + @ApiModelProperty( + value = "If the FlowFile is penalized." + ) + public Boolean getPenalized() { + return isPenalized; + } + + public void setPenalized(Boolean penalized) { + isPenalized = penalized; + } + + /** + * @return The id of the node where this FlowFile resides. + */ + @ApiModelProperty( + value = "The id of the node where this FlowFile resides." + ) + public String getClusterNodeId() { + return clusterNodeId; + } + + public void setClusterNodeId(String clusterNodeId) { + this.clusterNodeId = clusterNodeId; + } + + /** + * @return label for the node where this FlowFile resides + */ + @ApiModelProperty( + value = "The label for the node where this FlowFile resides." + ) + public String getClusterNodeAddress() { + return clusterNodeAddress; + } + + public void setClusterNodeAddress(String clusterNodeAddress) { + this.clusterNodeAddress = clusterNodeAddress; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/b330fd16/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ListingRequestDTO.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ListingRequestDTO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ListingRequestDTO.java new file mode 100644 index 0000000..e29f41f --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ListingRequestDTO.java @@ -0,0 +1,252 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.web.api.dto; + +import java.util.Date; +import java.util.List; + +import javax.xml.bind.annotation.XmlType; +import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter; + +import org.apache.nifi.web.api.dto.util.TimestampAdapter; + +import com.wordnik.swagger.annotations.ApiModelProperty; + +@XmlType(name = "listingRequest") +public class ListingRequestDTO { + + private String id; + private String uri; + + private Date submissionTime; + private Date lastUpdated; + + private Integer percentCompleted; + private Boolean finished; + private String failureReason; + private String sortColumn; + private String sortDirection; + private Integer maxResults; + private Integer totalStepCount; + private Integer completedStepCount; + + private String state; + private QueueSizeDTO queueSize; + + private List<FlowFileSummaryDTO> flowFileSummaries; + + /** + * @return the id for this listing request. + */ + @ApiModelProperty( + value = "The id for this listing request." + ) + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + + /** + * @return the URI for this listing request. + */ + @ApiModelProperty( + value = "The URI for future requests to this listing request." + ) + public String getUri() { + return uri; + } + + public void setUri(String uri) { + this.uri = uri; + } + + /** + * @return time the query was submitted + */ + @XmlJavaTypeAdapter(TimestampAdapter.class) + @ApiModelProperty( + value = "The timestamp when the query was submitted." + ) + public Date getSubmissionTime() { + return submissionTime; + } + + public void setSubmissionTime(Date submissionTime) { + this.submissionTime = submissionTime; + } + + /** + * @return the time this request was last updated + */ + @XmlJavaTypeAdapter(TimestampAdapter.class) + @ApiModelProperty( + value = "The last time this listing request was updated." + ) + public Date getLastUpdated() { + return lastUpdated; + } + + public void setLastUpdated(Date lastUpdated) { + this.lastUpdated = lastUpdated; + } + + /** + * @return percent completed + */ + @ApiModelProperty( + value = "The current percent complete." + ) + public Integer getPercentCompleted() { + return percentCompleted; + } + + public void setPercentCompleted(Integer percentCompleted) { + this.percentCompleted = percentCompleted; + } + + /** + * @return whether the query has finished + */ + @ApiModelProperty( + value = "Whether the query has finished." + ) + public Boolean getFinished() { + return finished; + } + + public void setFinished(Boolean finished) { + this.finished = finished; + } + + /** + * @return the reason, if any, that this listing request failed + */ + @ApiModelProperty( + value = "The reason, if any, that this listing request failed." + ) + public String getFailureReason() { + return failureReason; + } + + public void setFailureReason(String failureReason) { + this.failureReason = failureReason; + } + + /** + * @return the current state of the listing request. + */ + @ApiModelProperty( + value = "The current state of the listing request." + ) + public String getState() { + return state; + } + + public void setState(String state) { + this.state = state; + } + + /** + * @return the FlowFile summaries. + */ + @ApiModelProperty( + value = "The FlowFile summaries. The summaries will be populated once the request has completed." + ) + public List<FlowFileSummaryDTO> getFlowFileSummaries() { + return flowFileSummaries; + } + + public void setFlowFileSummaries(List<FlowFileSummaryDTO> flowFileSummaries) { + this.flowFileSummaries = flowFileSummaries; + } + + /** + * @return the column on which the listing is sorted + */ + @ApiModelProperty(value = "The column on which the FlowFiles are sorted.") + public String getSortColumn() { + return sortColumn; + } + + public void setSortColumn(String sortColumn) { + this.sortColumn = sortColumn; + } + + /** + * @return the direction in which the FlowFiles are sorted + */ + @ApiModelProperty(value = "The direction in which the FlowFiles are sorted. Either ASCENDING or DESCENDING.") + public String getSortDirection() { + return sortDirection; + } + + public void setSortDirection(String sortDirection) { + this.sortDirection = sortDirection; + } + + /** + * @return the maximum number of FlowFileSummary objects to return + */ + @ApiModelProperty(value = "The maximum number of FlowFileSummary objects to return") + public Integer getMaxResults() { + return maxResults; + } + + public void setMaxResults(Integer maxResults) { + this.maxResults = maxResults; + } + + + /** + * @return the total number of steps required to complete the listing + */ + @ApiModelProperty(value = "The total number of steps required to complete the listing") + public Integer getTotalStepCount() { + return totalStepCount; + } + + public void setTotalStepCount(Integer totalStepCount) { + this.totalStepCount = totalStepCount; + } + + /** + * @return the number of steps that have already been completed. This value will be >= 0 and <= the total step count + */ + @ApiModelProperty(value = "The number of steps that have already been completed. This value will be between 0 and the total step count (inclusive)") + public Integer getCompletedStepCount() { + return completedStepCount; + } + + public void setCompletedStepCount(Integer completedStepCount) { + this.completedStepCount = completedStepCount; + } + + /** + * @return the size for the queue + */ + @ApiModelProperty(value = "The size of the queue") + public QueueSizeDTO getQueueSize() { + return queueSize; + } + + public void setQueueSize(QueueSizeDTO queueSize) { + this.queueSize = queueSize; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/b330fd16/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/QueueSizeDTO.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/QueueSizeDTO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/QueueSizeDTO.java new file mode 100644 index 0000000..cc46c57 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/QueueSizeDTO.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.web.api.dto; + +import com.wordnik.swagger.annotations.ApiModelProperty; + +import javax.xml.bind.annotation.XmlType; + +/** + * The stats for a queue. + */ +@XmlType(name = "queueSize") +public class QueueSizeDTO { + + private long byteCount; + private int objectCount; + + /** + * @return the count of objects in a queue + */ + @ApiModelProperty( + value = "The count of objects in a queue." + ) + public int getObjectCount() { + return objectCount; + } + + public void setObjectCount(int objectCount) { + this.objectCount = objectCount; + } + + /** + * @return the size of objects in a queue + */ + @ApiModelProperty( + value = "The size of objects in a queue." + ) + public long getByteCount() { + return byteCount; + } + + public void setByteCount(long byteCount) { + this.byteCount = byteCount; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/b330fd16/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/FlowFileEntity.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/FlowFileEntity.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/FlowFileEntity.java new file mode 100644 index 0000000..639cc85 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/FlowFileEntity.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.web.api.entity; + +import org.apache.nifi.web.api.dto.FlowFileDTO; + +import javax.xml.bind.annotation.XmlRootElement; + +/** + * A serialized representation of this class can be placed in the entity body of a request or response to or from the API. This particular entity holds a reference to a FlowFileDTO. + */ +@XmlRootElement(name = "listingRequestEntity") +public class FlowFileEntity extends Entity { + + private FlowFileDTO flowFile; + + /** + * The FlowFileDTO that is being serialized. + * + * @return The FlowFileDTO object + */ + public FlowFileDTO getFlowFile() { + return flowFile; + } + + public void setFlowFile(FlowFileDTO flowFile) { + this.flowFile = flowFile; + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/b330fd16/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ListingRequestEntity.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ListingRequestEntity.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ListingRequestEntity.java new file mode 100644 index 0000000..5fee5c9 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ListingRequestEntity.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.web.api.entity; + +import org.apache.nifi.web.api.dto.ListingRequestDTO; + +import javax.xml.bind.annotation.XmlRootElement; + +/** + * A serialized representation of this class can be placed in the entity body of a request or response to or from the API. This particular entity holds a reference to a ListingRequestDTO. + */ +@XmlRootElement(name = "listingRequestEntity") +public class ListingRequestEntity extends Entity { + + private ListingRequestDTO listingRequest; + + /** + * The ListingRequestDTO that is being serialized. + * + * @return The ListingRequestDTO object + */ + public ListingRequestDTO getListingRequest() { + return listingRequest; + } + + public void setListingRequest(ListingRequestDTO listingRequest) { + this.listingRequest = listingRequest; + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/b330fd16/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java index 5e9dd3c..55e58ac 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java @@ -34,11 +34,13 @@ import java.util.LinkedHashSet; import java.util.List; import java.util.ListIterator; import java.util.Map; +import java.util.NavigableSet; import java.util.Queue; import java.util.Set; import java.util.Timer; import java.util.TimerTask; import java.util.TreeMap; +import java.util.TreeSet; import java.util.UUID; import java.util.concurrent.CompletionService; import java.util.concurrent.ConcurrentHashMap; @@ -125,6 +127,7 @@ import org.apache.nifi.cluster.protocol.message.ReconnectionFailureMessage; import org.apache.nifi.cluster.protocol.message.ReconnectionRequestMessage; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.controller.ControllerService; +import org.apache.nifi.controller.FlowFileSummaries; import org.apache.nifi.controller.Heartbeater; import org.apache.nifi.controller.ReportingTaskNode; import org.apache.nifi.controller.ScheduledState; @@ -132,6 +135,10 @@ import org.apache.nifi.controller.StandardFlowSerializer; import org.apache.nifi.controller.StandardProcessorNode; import org.apache.nifi.controller.ValidationContextFactory; import org.apache.nifi.controller.exception.ComponentLifeCycleException; +import org.apache.nifi.controller.queue.DropFlowFileState; +import org.apache.nifi.controller.queue.ListFlowFileState; +import org.apache.nifi.controller.queue.SortColumn; +import org.apache.nifi.controller.queue.SortDirection; import org.apache.nifi.controller.reporting.ClusteredReportingTaskNode; import org.apache.nifi.controller.reporting.ReportingTaskInstantiationException; import org.apache.nifi.controller.reporting.ReportingTaskProvider; @@ -142,6 +149,7 @@ import org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent; import org.apache.nifi.controller.service.ControllerServiceLoader; import org.apache.nifi.controller.service.ControllerServiceNode; import org.apache.nifi.controller.service.ControllerServiceProvider; +import org.apache.nifi.controller.service.ControllerServiceState; import org.apache.nifi.controller.service.StandardControllerServiceProvider; import org.apache.nifi.controller.status.ProcessGroupStatus; import org.apache.nifi.controller.status.RemoteProcessGroupStatus; @@ -192,13 +200,20 @@ import org.apache.nifi.util.ReflectionUtils; import org.apache.nifi.web.OptimisticLockingManager; import org.apache.nifi.web.Revision; import org.apache.nifi.web.UpdateRevision; +import org.apache.nifi.web.api.dto.ControllerServiceDTO; +import org.apache.nifi.web.api.dto.ControllerServiceReferencingComponentDTO; +import org.apache.nifi.web.api.dto.DropRequestDTO; +import org.apache.nifi.web.api.dto.FlowFileSummaryDTO; import org.apache.nifi.web.api.dto.FlowSnippetDTO; +import org.apache.nifi.web.api.dto.ListingRequestDTO; import org.apache.nifi.web.api.dto.NodeDTO; import org.apache.nifi.web.api.dto.ProcessGroupDTO; import org.apache.nifi.web.api.dto.ProcessorDTO; +import org.apache.nifi.web.api.dto.QueueSizeDTO; import org.apache.nifi.web.api.dto.RemoteProcessGroupContentsDTO; import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO; import org.apache.nifi.web.api.dto.RemoteProcessGroupPortDTO; +import org.apache.nifi.web.api.dto.ReportingTaskDTO; import org.apache.nifi.web.api.dto.provenance.ProvenanceDTO; import org.apache.nifi.web.api.dto.provenance.ProvenanceEventDTO; import org.apache.nifi.web.api.dto.provenance.ProvenanceRequestDTO; @@ -207,7 +222,12 @@ import org.apache.nifi.web.api.dto.status.ClusterStatusHistoryDTO; import org.apache.nifi.web.api.dto.status.NodeStatusHistoryDTO; import org.apache.nifi.web.api.dto.status.StatusHistoryDTO; import org.apache.nifi.web.api.dto.status.StatusSnapshotDTO; +import org.apache.nifi.web.api.entity.ControllerServiceEntity; +import org.apache.nifi.web.api.entity.ControllerServiceReferencingComponentsEntity; +import org.apache.nifi.web.api.entity.ControllerServicesEntity; +import org.apache.nifi.web.api.entity.DropRequestEntity; import org.apache.nifi.web.api.entity.FlowSnippetEntity; +import org.apache.nifi.web.api.entity.ListingRequestEntity; import org.apache.nifi.web.api.entity.ProcessGroupEntity; import org.apache.nifi.web.api.entity.ProcessorEntity; import org.apache.nifi.web.api.entity.ProcessorsEntity; @@ -215,6 +235,8 @@ import org.apache.nifi.web.api.entity.ProvenanceEntity; import org.apache.nifi.web.api.entity.ProvenanceEventEntity; import org.apache.nifi.web.api.entity.RemoteProcessGroupEntity; import org.apache.nifi.web.api.entity.RemoteProcessGroupsEntity; +import org.apache.nifi.web.api.entity.ReportingTaskEntity; +import org.apache.nifi.web.api.entity.ReportingTasksEntity; import org.apache.nifi.web.util.WebUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -226,19 +248,6 @@ import org.xml.sax.SAXException; import org.xml.sax.SAXParseException; import com.sun.jersey.api.client.ClientResponse; -import org.apache.nifi.controller.queue.DropFlowFileState; - -import org.apache.nifi.controller.service.ControllerServiceState; -import org.apache.nifi.web.api.dto.ControllerServiceDTO; -import org.apache.nifi.web.api.dto.ControllerServiceReferencingComponentDTO; -import org.apache.nifi.web.api.dto.DropRequestDTO; -import org.apache.nifi.web.api.dto.ReportingTaskDTO; -import org.apache.nifi.web.api.entity.ControllerServiceEntity; -import org.apache.nifi.web.api.entity.ControllerServiceReferencingComponentsEntity; -import org.apache.nifi.web.api.entity.ControllerServicesEntity; -import org.apache.nifi.web.api.entity.DropRequestEntity; -import org.apache.nifi.web.api.entity.ReportingTaskEntity; -import org.apache.nifi.web.api.entity.ReportingTasksEntity; /** * Provides a cluster manager implementation. The manager federates incoming HTTP client requests to the nodes' external API using the HTTP protocol. The manager also communicates with nodes using the @@ -319,8 +328,14 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C public static final String REPORTING_TASKS_URI = "/nifi-api/controller/reporting-tasks/node"; public static final Pattern REPORTING_TASK_URI_PATTERN = Pattern.compile("/nifi-api/controller/reporting-tasks/node/[a-f0-9\\-]{36}"); + @Deprecated public static final Pattern QUEUE_CONTENTS_URI = Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/connections/[a-f0-9\\-]{36}/contents"); + public static final Pattern DROP_REQUESTS_URI = Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/connections/[a-f0-9\\-]{36}/drop-requests"); public static final Pattern DROP_REQUEST_URI = Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/connections/[a-f0-9\\-]{36}/drop-requests/[a-f0-9\\-]{36}"); + public static final Pattern LISTING_REQUESTS_URI = Pattern + .compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/connections/[a-f0-9\\-]{36}/listing-requests"); + public static final Pattern LISTING_REQUEST_URI = Pattern + .compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/connections/[a-f0-9\\-]{36}/listing-requests/[a-f0-9\\-]{36}"); private final NiFiProperties properties; private final HttpRequestReplicator httpRequestReplicator; @@ -2431,6 +2446,16 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C return "GET".equalsIgnoreCase(method) && PROVENANCE_EVENT_URI.matcher(uri.getPath()).matches(); } + private static boolean isListFlowFilesEndpoint(final URI uri, final String method) { + if ("GET".equalsIgnoreCase(method) && LISTING_REQUEST_URI.matcher(uri.getPath()).matches()) { + return true; + } else if ("POST".equalsIgnoreCase(method) && LISTING_REQUESTS_URI.matcher(uri.getPath()).matches()) { + return true; + } + + return false; + } + private static boolean isCountersEndpoint(final URI uri) { return COUNTERS_URI.matcher(uri.getPath()).matches(); } @@ -2476,6 +2501,8 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C return true; } else if (("GET".equalsIgnoreCase(method) || "DELETE".equalsIgnoreCase(method)) && DROP_REQUEST_URI.matcher(uri.getPath()).matches()) { return true; + } else if (("POST".equalsIgnoreCase(method) && DROP_REQUESTS_URI.matcher(uri.getPath()).matches())) { + return true; } return false; @@ -2825,6 +2852,80 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C return normalizedValidationErrors; } + + /** + * Merges the listing requests in the specified map into the specified listing request + * + * @param listingRequest the target listing request + * @param listingRequestMap the mapping of all responses being merged + */ + private void mergeListingRequests(final ListingRequestDTO listingRequest, final Map<NodeIdentifier, ListingRequestDTO> listingRequestMap) { + final Comparator<FlowFileSummaryDTO> comparator = FlowFileSummaries.createDTOComparator( + SortColumn.valueOf(listingRequest.getSortColumn()), SortDirection.valueOf(listingRequest.getSortDirection())); + + final NavigableSet<FlowFileSummaryDTO> flowFileSummaries = new TreeSet<>(comparator); + + ListFlowFileState state = null; + int numStepsCompleted = 0; + int numStepsTotal = 0; + int objectCount = 0; + long byteCount = 0; + boolean finished = true; + for (final Map.Entry<NodeIdentifier, ListingRequestDTO> entry : listingRequestMap.entrySet()) { + final NodeIdentifier nodeIdentifier = entry.getKey(); + final String nodeAddress = nodeIdentifier.getApiAddress() + ":" + nodeIdentifier.getApiPort(); + + final ListingRequestDTO nodeRequest = entry.getValue(); + + numStepsCompleted += nodeRequest.getCompletedStepCount(); + numStepsTotal += nodeRequest.getTotalStepCount(); + + final QueueSizeDTO nodeQueueSize = nodeRequest.getQueueSize(); + objectCount += nodeQueueSize.getObjectCount(); + byteCount += nodeQueueSize.getByteCount(); + + if (!nodeRequest.getFinished()) { + finished = false; + } + + if (nodeRequest.getLastUpdated().after(listingRequest.getLastUpdated())) { + listingRequest.setLastUpdated(nodeRequest.getLastUpdated()); + } + + // Keep the state with the lowest ordinal value (the "least completed"). + final ListFlowFileState nodeState = ListFlowFileState.valueOfDescription(nodeRequest.getState()); + if (state == null || state.compareTo(nodeState) > 0) { + state = nodeState; + } + + for (final FlowFileSummaryDTO summaryDTO : nodeRequest.getFlowFileSummaries()) { + summaryDTO.setClusterNodeId(nodeIdentifier.getId()); + summaryDTO.setClusterNodeAddress(nodeAddress); + + flowFileSummaries.add(summaryDTO); + + // Keep the set from growing beyond our max + if (flowFileSummaries.size() > listingRequest.getMaxResults()) { + flowFileSummaries.pollLast(); + } + } + + if (nodeRequest.getFailureReason() != null) { + listingRequest.setFailureReason(nodeRequest.getFailureReason()); + } + } + + final List<FlowFileSummaryDTO> summaryDTOs = new ArrayList<>(flowFileSummaries); + listingRequest.setFlowFileSummaries(summaryDTOs); + + final int percentCompleted = numStepsCompleted / numStepsTotal; + listingRequest.setPercentCompleted(percentCompleted); + listingRequest.setFinished(finished); + + listingRequest.getQueueSize().setByteCount(byteCount); + listingRequest.getQueueSize().setObjectCount(objectCount); + } + /** * Merges the drop requests in the specified map into the specified drop request. * @@ -3309,6 +3410,24 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C mergeDropRequests(dropRequest, resultsMap); clientResponse = new NodeResponse(clientResponse, responseEntity); + } else if (hasSuccessfulClientResponse && isListFlowFilesEndpoint(uri, method)) { + final ListingRequestEntity responseEntity = clientResponse.getClientResponse().getEntity(ListingRequestEntity.class); + final ListingRequestDTO listingRequest = responseEntity.getListingRequest(); + + final Map<NodeIdentifier, ListingRequestDTO> resultsMap = new HashMap<>(); + for (final NodeResponse nodeResponse : updatedNodesMap.values()) { + if (problematicNodeResponses.contains(nodeResponse)) { + continue; + } + + final ListingRequestEntity nodeResponseEntity = nodeResponse == clientResponse ? responseEntity : nodeResponse.getClientResponse().getEntity(ListingRequestEntity.class); + final ListingRequestDTO nodeListingRequest = nodeResponseEntity.getListingRequest(); + + resultsMap.put(nodeResponse.getNodeId(), nodeListingRequest); + } + mergeListingRequests(listingRequest, resultsMap); + + clientResponse = new NodeResponse(clientResponse, responseEntity); } else { if (!nodeResponsesToDrain.isEmpty()) { drainResponses(nodeResponsesToDrain); http://git-wip-us.apache.org/repos/asf/nifi/blob/b330fd16/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/queue/ListFlowFileRequest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/queue/ListFlowFileRequest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/queue/ListFlowFileRequest.java new file mode 100644 index 0000000..313ad0c --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/queue/ListFlowFileRequest.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.queue; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +public class ListFlowFileRequest implements ListFlowFileStatus { + private final String requestId; + private final int maxResults; + private final QueueSize queueSize; + private final SortColumn sortColumn; + private final SortDirection sortDirection; + private final long submissionTime = System.currentTimeMillis(); + private final List<FlowFileSummary> flowFileSummaries = new ArrayList<>(); + + private ListFlowFileState state = ListFlowFileState.WAITING_FOR_LOCK; + private String failureReason; + private int numSteps; + private int completedStepCount; + private long lastUpdated = System.currentTimeMillis(); + + public ListFlowFileRequest(final String requestId, final SortColumn sortColumn, final SortDirection sortDirection, final int maxResults, final QueueSize queueSize, final int numSteps) { + this.requestId = requestId; + this.sortColumn = sortColumn; + this.sortDirection = sortDirection; + this.maxResults = maxResults; + this.queueSize = queueSize; + this.numSteps = numSteps; + } + + @Override + public String getRequestIdentifier() { + return requestId; + } + + @Override + public long getRequestSubmissionTime() { + return submissionTime; + } + + @Override + public synchronized long getLastUpdated() { + return lastUpdated; + } + + @Override + public SortColumn getSortColumn() { + return sortColumn; + } + + @Override + public SortDirection getSortDirection() { + return sortDirection; + } + + @Override + public synchronized ListFlowFileState getState() { + return state; + } + + @Override + public synchronized String getFailureReason() { + return failureReason; + } + + public synchronized void setState(final ListFlowFileState state) { + this.state = state; + this.lastUpdated = System.currentTimeMillis(); + } + + public synchronized void setFailure(final String explanation) { + this.state = ListFlowFileState.FAILURE; + this.failureReason = explanation; + this.lastUpdated = System.currentTimeMillis(); + } + + @Override + public synchronized List<FlowFileSummary> getFlowFileSummaries() { + return Collections.unmodifiableList(flowFileSummaries); + } + + public synchronized void setFlowFileSummaries(final List<FlowFileSummary> summaries) { + this.flowFileSummaries.clear(); + this.flowFileSummaries.addAll(summaries); + lastUpdated = System.currentTimeMillis(); + } + + @Override + public QueueSize getQueueSize() { + return queueSize; + } + + public synchronized boolean cancel() { + if (this.state == ListFlowFileState.COMPLETE || this.state == ListFlowFileState.CANCELED) { + return false; + } + + this.state = ListFlowFileState.CANCELED; + return true; + } + + @Override + public synchronized int getCompletionPercentage() { + return (int) (100F * completedStepCount / numSteps); + } + + public synchronized void setCompletedStepCount(final int completedStepCount) { + this.completedStepCount = completedStepCount; + } + + @Override + public int getMaxResults() { + return maxResults; + } + + @Override + public int getTotalStepCount() { + return numSteps; + } + + @Override + public int getCompletedStepCount() { + return completedStepCount; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/b330fd16/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index 73cdea1..dd3b687 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -177,6 +177,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.net.ssl.SSLContext; + +import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -3241,6 +3243,44 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R return new LimitedInputStream(rawStream, size); } + public InputStream getContent(final FlowFileRecord flowFile, final String requestor, final String requestUri) throws IOException { + requireNonNull(flowFile); + requireNonNull(requestor); + requireNonNull(requestUri); + + final InputStream stream; + final ResourceClaim resourceClaim; + final ContentClaim contentClaim = flowFile.getContentClaim(); + if (contentClaim == null) { + resourceClaim = null; + stream = new ByteArrayInputStream(new byte[0]); + } else { + resourceClaim = flowFile.getContentClaim().getResourceClaim(); + stream = contentRepository.read(flowFile.getContentClaim()); + } + + // Register a Provenance Event to indicate that we replayed the data. + final StandardProvenanceEventRecord.Builder sendEventBuilder = new StandardProvenanceEventRecord.Builder() + .setEventType(ProvenanceEventType.DOWNLOAD) + .setFlowFileUUID(flowFile.getAttribute(CoreAttributes.UUID.key())) + .setAttributes(flowFile.getAttributes(), Collections.<String, String> emptyMap()) + .setTransitUri(requestUri) + .setEventTime(System.currentTimeMillis()) + .setFlowFileEntryDate(flowFile.getEntryDate()) + .setLineageStartDate(flowFile.getLineageStartDate()) + .setComponentType(getName()) + .setComponentId(getRootGroupId()) + .setDetails("Download of Content requested by " + requestor + " for " + flowFile); + + if (contentClaim != null) { + sendEventBuilder.setCurrentContentClaim(resourceClaim.getContainer(), resourceClaim.getSection(), resourceClaim.getId(), contentClaim.getOffset(), flowFile.getSize()); + } + + final ProvenanceEventRecord sendEvent = sendEventBuilder.build(); + provenanceEventRepository.registerEvent(sendEvent); + return stream; + } + private String getReplayFailureReason(final ProvenanceEventRecord event) { // Check that the event is a valid type. final ProvenanceEventType type = event.getEventType(); http://git-wip-us.apache.org/repos/asf/nifi/blob/b330fd16/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowFileSummaries.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowFileSummaries.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowFileSummaries.java new file mode 100644 index 0000000..7687d8a --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowFileSummaries.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller; + +import java.util.Collections; +import java.util.Comparator; + +import org.apache.nifi.controller.queue.FlowFileSummary; +import org.apache.nifi.controller.queue.SortColumn; +import org.apache.nifi.controller.queue.SortDirection; +import org.apache.nifi.web.api.dto.FlowFileSummaryDTO; + +public class FlowFileSummaries { + + public static Comparator<FlowFileSummary> createComparator(final SortColumn column, final SortDirection direction) { + final Comparator<FlowFileSummary> comparator = new Comparator<FlowFileSummary>() { + @Override + public int compare(final FlowFileSummary o1, final FlowFileSummary o2) { + switch (column) { + case FILENAME: + return o1.getFilename().compareTo(o2.getFilename()); + case FLOWFILE_AGE: + return Long.compare(o1.getLineageStartDate(), o2.getLineageStartDate()); + case FLOWFILE_SIZE: + return Long.compare(o1.getSize(), o2.getSize()); + case FLOWFILE_UUID: + return o1.getUuid().compareTo(o2.getUuid()); + case PENALIZATION: + return Boolean.compare(o1.isPenalized(), o2.isPenalized()); + case QUEUE_POSITION: + return Long.compare(o1.getPosition(), o2.getPosition()); + case QUEUED_DURATION: + return Long.compare(o1.getLastQueuedTime(), o2.getLastQueuedTime()); + } + + return 0; + } + }; + + + if (direction == SortDirection.DESCENDING) { + return Collections.reverseOrder(comparator); + } else { + return comparator; + } + } + + public static Comparator<FlowFileSummaryDTO> createDTOComparator(final SortColumn column, final SortDirection direction) { + final Comparator<FlowFileSummaryDTO> comparator = new Comparator<FlowFileSummaryDTO>() { + @Override + public int compare(final FlowFileSummaryDTO o1, final FlowFileSummaryDTO o2) { + switch (column) { + case FILENAME: + return o1.getFilename().compareTo(o2.getFilename()); + case FLOWFILE_AGE: + return o1.getLineageDuration().compareTo(o2.getLineageDuration()); + case FLOWFILE_SIZE: + return Long.compare(o1.getSize(), o2.getSize()); + case FLOWFILE_UUID: + return o1.getUuid().compareTo(o2.getUuid()); + case PENALIZATION: + return Boolean.compare(o1.getPenalized(), o2.getPenalized()); + case QUEUE_POSITION: + return Long.compare(o1.getPosition(), o2.getPosition()); + case QUEUED_DURATION: + return o1.getQueuedDuration().compareTo(o2.getQueuedDuration()); + } + + return 0; + } + }; + + if (direction == SortDirection.DESCENDING) { + return Collections.reverseOrder(comparator); + } else { + return comparator; + } + } + +}
