Repository: nifi Updated Branches: refs/heads/NIFI-108 b19ff7cf3 -> 2e22954cd
NIFI-108: Built out 'skeleton' of the requests so that web tier can be written against it Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/2e22954c Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/2e22954c Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/2e22954c Branch: refs/heads/NIFI-108 Commit: 2e22954cdc0ce82d3c166a5d905c36bff3a76d2e Parents: b19ff7c Author: Mark Payne <[email protected]> Authored: Wed Dec 16 14:03:36 2015 -0500 Committer: Mark Payne <[email protected]> Committed: Wed Dec 16 14:14:55 2015 -0500 ---------------------------------------------------------------------- .../nifi/controller/queue/FlowFileQueue.java | 64 ++++++++++++++ .../nifi/controller/queue/FlowFileSummary.java | 59 +++++++++++++ .../controller/queue/ListFlowFileState.java | 57 +++++++++++++ .../controller/queue/ListFlowFileStatus.java | 70 ++++++++++++++++ .../nifi/controller/queue/SortColumn.java | 60 ++++++++++++++ .../nifi/controller/queue/SortDirection.java | 37 +++++++++ .../controller/queue/ListFlowFileRequest.java | 87 ++++++++++++++++++++ .../apache/nifi/controller/FlowController.java | 5 ++ .../nifi/controller/StandardFlowFileQueue.java | 40 ++++++++- 9 files changed, 478 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/2e22954c/nifi-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java b/nifi-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java index 727c755..fe8649d 100644 --- a/nifi-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java +++ b/nifi-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java @@ -180,6 +180,7 @@ public interface FlowFileQueue { * passed to the {@link #getDropFlowFileStatus(String)} and {@link #cancelDropFlowFileStatus(String)} * methods in order to obtain the status later or cancel a request * + * @param requestIdentifier the identifier of the Drop FlowFile Request * @param requestor the entity that is requesting that the FlowFiles be dropped; this will be * included in the Provenance Events that are generated. * @@ -207,4 +208,67 @@ public interface FlowFileQueue { * request status exists with that identifier */ DropFlowFileStatus cancelDropFlowFileRequest(String requestIdentifier); + + /** + * Initiates a request to obtain a listing of FlowFiles in this queue. This method returns a + * ListFlowFileStatus that can be used to obtain information about the FlowFiles that exist + * within the queue. Additionally, the ListFlowFileStatus provides a request identifier that + * can then be passed to the {@link #getListFlowFileStatus(String)}. The listing of FlowFiles + * will be returned ordered by the position of the FlowFile in the queue. + * + * @param requestIdentifier the identifier of the List FlowFile Request + * @return the status for the request + * + * @throws IllegalStateException if either the source or the destination of the connection to which this queue belongs + * is currently running. + */ + ListFlowFileStatus listFlowFiles(String requestIdentifier); + + /** + * Initiates a request to obtain a listing of FlowFiles in this queue. This method returns a + * ListFlowFileStatus that can be used to obtain information about the FlowFiles that exist + * within the queue. Additionally, the ListFlowFileStatus provides a request identifier that + * can then be passed to the {@link #getListFlowFileStatus(String)} + * + * @param requestIdentifier the identifier of the List FlowFile Request + * @param sortColumn specifies which column to sort on + * @param direction specifies which direction to sort the FlowFiles + * + * @return the status for the request + * + * @throws IllegalStateException if either the source or the destination of the connection to which this queue belongs + * is currently running. + */ + ListFlowFileStatus listFlowFiles(String requestIdentifier, SortColumn sortColumn, SortDirection direction); + + /** + * Returns the current status of a List FlowFile Request that was initiated via the {@link #listFlowFiles(String)} + * method that has the given identifier + * + * @param requestIdentifier the identifier of the Drop FlowFile Request + * @return the current status of the List FlowFile Request with the given identifier or <code>null</code> if no + * request status exists with that identifier + */ + ListFlowFileStatus getListFlowFileStatus(String requestIdentifier); + + /** + * Cancels the request to list FlowFiles that has the given identifier. After this method is called, the request + * will no longer be known by this queue, so subsequent calls to {@link #getListFlowFileStatus(String)} or + * {@link #cancelListFlowFileRequest(String)} will return <code>null</code> + * + * @param requestIdentifier the identifier of the Drop FlowFile Request + * @return the current status of the List FlowFile Request with the given identifier or <code>null</code> if no + * request status exists with that identifier + */ + ListFlowFileStatus cancelListFlowFileRequest(String requestIdentifier); + + /** + * Returns the FlowFile with the given UUID or <code>null</code> if no FlowFile can be found in this queue + * with the given UUID + * + * @param flowFileUuid the UUID of the FlowFile to retrieve + * @return the FlowFile with the given UUID or <code>null</code> if no FlowFile can be found in this queue + * with the given UUID + */ + FlowFileRecord getFlowFile(String flowFileUuid); } http://git-wip-us.apache.org/repos/asf/nifi/blob/2e22954c/nifi-api/src/main/java/org/apache/nifi/controller/queue/FlowFileSummary.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/queue/FlowFileSummary.java b/nifi-api/src/main/java/org/apache/nifi/controller/queue/FlowFileSummary.java new file mode 100644 index 0000000..8c37185 --- /dev/null +++ b/nifi-api/src/main/java/org/apache/nifi/controller/queue/FlowFileSummary.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.queue; + +/** + * A summary of a FlowFile that can be used to represent a "high level" view of FlowFile + * without providing all of the information available. + */ +public interface FlowFileSummary { + /** + * @return the UUID of the FlowFile + */ + String getUuid(); + + /** + * @return the value of the 'filename' attribute + */ + String getFilename(); + + /** + * @return the current position of the FlowFile in the queue based on the prioritizers selected + */ + int getPosition(); + + /** + * @return the size of the FlowFile in bytes + */ + long getSize(); + + /** + * @return the timestamp (in milliseconds since epoch) at which the FlowFile was added to the queue + */ + long lastQueuedTime(); + + /** + * @return the timestamp (in milliseconds since epoch) at which the FlowFile's greatest ancestor entered the flow + */ + long getLineageStartDate(); + + /** + * @return <code>true</code> if the FlowFile is penalized, <code>false</code> otherwise + */ + boolean isPenalized(); +} http://git-wip-us.apache.org/repos/asf/nifi/blob/2e22954c/nifi-api/src/main/java/org/apache/nifi/controller/queue/ListFlowFileState.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/queue/ListFlowFileState.java b/nifi-api/src/main/java/org/apache/nifi/controller/queue/ListFlowFileState.java new file mode 100644 index 0000000..eb417aa --- /dev/null +++ b/nifi-api/src/main/java/org/apache/nifi/controller/queue/ListFlowFileState.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.queue; + +/** + * Represents the state that a List FlowFile Request is in + */ +public enum ListFlowFileState { + WAITING_FOR_LOCK("Waiting for other queue requests to complete"), + CALCULATING_LIST("Calculating list of FlowFiles"), + FAILURE("Failed"), + CANCELED("Canceled by user"), + COMPLETE("Completed successfully"); + + private final String description; + + private ListFlowFileState(final String description) { + this.description = description; + } + + @Override + public String toString() { + return description; + } + + /** + * @param description string form of list flow file state + * @return the matching ListFlowFileState or <code>null</code> if the description doesn't match + */ + public static ListFlowFileState valueOfDescription(String description) { + ListFlowFileState desiredState = null; + + for (ListFlowFileState state : values()) { + if (state.toString().equals(description)) { + desiredState = state; + break; + } + } + + return desiredState; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/2e22954c/nifi-api/src/main/java/org/apache/nifi/controller/queue/ListFlowFileStatus.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/queue/ListFlowFileStatus.java b/nifi-api/src/main/java/org/apache/nifi/controller/queue/ListFlowFileStatus.java new file mode 100644 index 0000000..2959170 --- /dev/null +++ b/nifi-api/src/main/java/org/apache/nifi/controller/queue/ListFlowFileStatus.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.queue; + +import java.util.List; + +public interface ListFlowFileStatus { + + /** + * @return the identifier of the request to drop FlowFiles from the queue + */ + String getRequestIdentifier(); + + /** + * @return the date/time (in milliseconds since epoch) at which the request to + * drop the FlowFiles from a queue was submitted + */ + long getRequestSubmissionTime(); + + /** + * @return the date/time (in milliseconds since epoch) at which the status of the + * request was last updated + */ + long getLastUpdated(); + + /** + * @return the column on which the listing is sorted + */ + SortColumn getSortColumn(); + + /** + * @return the direction in which the FlowFiles are sorted + */ + SortDirection getSortDirection(); + + /** + * @return the current state of the operation + */ + ListFlowFileState getState(); + + /** + * @return the reason that the state is set to a Failure state, or <code>null</code> if the state is not {@link ListFlowFileStatus#FAILURE}. + */ + String getFailureReason(); + + /** + * @return the current size of the queue + */ + QueueSize getQueueSize(); + + /** + * @return a List of FlowFileSummary objects + */ + List<FlowFileSummary> getFlowFileSummaries(); +} http://git-wip-us.apache.org/repos/asf/nifi/blob/2e22954c/nifi-api/src/main/java/org/apache/nifi/controller/queue/SortColumn.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/queue/SortColumn.java b/nifi-api/src/main/java/org/apache/nifi/controller/queue/SortColumn.java new file mode 100644 index 0000000..48f3e9f --- /dev/null +++ b/nifi-api/src/main/java/org/apache/nifi/controller/queue/SortColumn.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.queue; + +/** + * Specifies which column to sort on when performing a Listing of FlowFiles via + * {@link FlowFileQueue#listFlowFiles(String, SortColumn, SortDirection)} + */ +public enum SortColumn { + /** + * Sort based on the current position in the queue + */ + QUEUE_POSITION, + + /** + * Sort based on the UUID of the FlowFile + */ + FLOWFILE_UUID, + + /** + * Sort based on the 'filename' attribute of the FlowFile + */ + FILENAME, + + /** + * Sort based on the size of the FlowFile + */ + FLOWFILE_SIZE, + + /** + * Sort based on how long the FlowFile has been sitting in the queue + */ + QUEUED_DURATION, + + /** + * Sort based on the age of the FlowFile. I.e., the time at which the FlowFile's + * "greatest ancestor" entered the flow + */ + FLOWFILE_AGE, + + /** + * Sort based on when the FlowFile's penalization ends + */ + PENALIZATION; +} http://git-wip-us.apache.org/repos/asf/nifi/blob/2e22954c/nifi-api/src/main/java/org/apache/nifi/controller/queue/SortDirection.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/queue/SortDirection.java b/nifi-api/src/main/java/org/apache/nifi/controller/queue/SortDirection.java new file mode 100644 index 0000000..129e748 --- /dev/null +++ b/nifi-api/src/main/java/org/apache/nifi/controller/queue/SortDirection.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.queue; + +/** + * Specifies the order in which FlowFiles should be sorted when performing a listing of + * FlowFiles via the {@link FlowFileQueue#listFlowFiles(String, SortColumn, SortDirection)} + * method + */ +public enum SortDirection { + /** + * FlowFiles should be sorted such that the FlowFile with the lowest value for the Sort Column + * should occur first in the listing. + */ + ASCENDING, + + /** + * FlowFiles should be sorted such that the FlowFile with the largest value for the Sort Column + * should occur first in the listing. + */ + DESCENDING; +} http://git-wip-us.apache.org/repos/asf/nifi/blob/2e22954c/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/queue/ListFlowFileRequest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/queue/ListFlowFileRequest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/queue/ListFlowFileRequest.java new file mode 100644 index 0000000..03e0188 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/queue/ListFlowFileRequest.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.queue; + +import java.util.List; + +public class ListFlowFileRequest implements ListFlowFileStatus { + private ListFlowFileState state; + + @Override + public String getRequestIdentifier() { + // TODO Auto-generated method stub + return null; + } + + @Override + public long getRequestSubmissionTime() { + // TODO Auto-generated method stub + return 0; + } + + @Override + public long getLastUpdated() { + // TODO Auto-generated method stub + return 0; + } + + @Override + public SortColumn getSortColumn() { + // TODO Auto-generated method stub + return null; + } + + @Override + public SortDirection getSortDirection() { + // TODO Auto-generated method stub + return null; + } + + @Override + public ListFlowFileState getState() { + // TODO Auto-generated method stub + return null; + } + + @Override + public String getFailureReason() { + // TODO Auto-generated method stub + return null; + } + + @Override + public List<FlowFileSummary> getFlowFileSummaries() { + // TODO Auto-generated method stub + return null; + } + + @Override + public QueueSize getQueueSize() { + // TODO Auto-generated method stub + return null; + } + + public synchronized boolean cancel() { + if (this.state == ListFlowFileState.COMPLETE || this.state == ListFlowFileState.CANCELED) { + return false; + } + + this.state = ListFlowFileState.CANCELED; + return true; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/2e22954c/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index 73cdea1..2ad102f 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -3241,6 +3241,11 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R return new LimitedInputStream(rawStream, size); } + public InputStream getContent(final FlowFileRecord flowFile, final String requestor, final String requestUri) throws IOException { + // TODO: IMPLEMENT + return null; + } + private String getReplayFailureReason(final ProvenanceEventRecord event) { // Check that the event is a valid type. final ProvenanceEventType type = event.getEventType(); http://git-wip-us.apache.org/repos/asf/nifi/blob/2e22954c/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java index c6fbf28..cf0d185 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java @@ -39,7 +39,11 @@ import org.apache.nifi.connectable.Connection; import org.apache.nifi.controller.queue.DropFlowFileState; import org.apache.nifi.controller.queue.DropFlowFileStatus; import org.apache.nifi.controller.queue.FlowFileQueue; +import org.apache.nifi.controller.queue.ListFlowFileRequest; +import org.apache.nifi.controller.queue.ListFlowFileStatus; import org.apache.nifi.controller.queue.QueueSize; +import org.apache.nifi.controller.queue.SortColumn; +import org.apache.nifi.controller.queue.SortDirection; import org.apache.nifi.controller.repository.FlowFileRecord; import org.apache.nifi.controller.repository.FlowFileRepository; import org.apache.nifi.controller.repository.FlowFileSwapManager; @@ -104,6 +108,9 @@ public final class StandardFlowFileQueue implements FlowFileQueue { private final ProvenanceEventRepository provRepository; private final ResourceClaimManager resourceClaimManager; + private final ConcurrentMap<String, DropFlowFileRequest> dropRequestMap = new ConcurrentHashMap<>(); + private final ConcurrentMap<String, ListFlowFileRequest> listRequestMap = new ConcurrentHashMap<>(); + // SCHEDULER CANNOT BE NOTIFIED OF EVENTS WITH THE WRITE LOCK HELD! DOING SO WILL RESULT IN A DEADLOCK! private final ProcessScheduler scheduler; @@ -832,7 +839,38 @@ public final class StandardFlowFileQueue implements FlowFileQueue { return "FlowFileQueue[id=" + identifier + "]"; } - private final ConcurrentMap<String, DropFlowFileRequest> dropRequestMap = new ConcurrentHashMap<>(); + + @Override + public ListFlowFileStatus listFlowFiles(final String requestIdentifier) { + return listFlowFiles(requestIdentifier, SortColumn.QUEUE_POSITION, SortDirection.ASCENDING); + } + + @Override + public ListFlowFileStatus listFlowFiles(final String requestIdentifier, final SortColumn sortColumn, final SortDirection direction) { + // TODO: Implement + return null; + } + + @Override + public ListFlowFileStatus getListFlowFileStatus(final String requestIdentifier) { + return listRequestMap.get(requestIdentifier); + } + + @Override + public ListFlowFileStatus cancelListFlowFileRequest(final String requestIdentifier) { + final ListFlowFileRequest request = listRequestMap.remove(requestIdentifier); + if (request != null) { + request.cancel(); + } + + return request; + } + + @Override + public FlowFileRecord getFlowFile(String flowFileUuid) { + // TODO: Implement + return null; + } @Override public DropFlowFileStatus dropFlowFiles(final String requestIdentifier, final String requestor) {
