NIFI-730: Added methods for dropping queued flowfiles; refactored swap manager but have not yet started swapping flowfiles in or out from within the flowfile queue
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/b8c51dc3 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/b8c51dc3 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/b8c51dc3 Branch: refs/heads/master Commit: b8c51dc35d1a7fdbf3e6449bbe297db667a1176c Parents: b4bfcc1 Author: Mark Payne <[email protected]> Authored: Sun Oct 11 10:27:07 2015 -0400 Committer: Mark Payne <[email protected]> Committed: Sun Oct 11 10:27:07 2015 -0400 ---------------------------------------------------------------------- .../apache/nifi/controller/FlowFileQueue.java | 203 ------ .../controller/queue/DropFlowFileState.java | 40 ++ .../controller/queue/DropFlowFileStatus.java | 62 ++ .../nifi/controller/queue/FlowFileQueue.java | 256 ++++++++ .../apache/nifi/controller/queue/QueueSize.java | 48 ++ .../repository/FlowFileRepository.java | 2 +- .../repository/FlowFileSwapManager.java | 94 ++- .../controller/repository/QueueProvider.java | 2 +- .../controller/repository/RepositoryRecord.java | 2 +- .../SwapManagerInitializationContext.java | 41 ++ .../apache/nifi/processor/ProcessSession.java | 1 + .../org/apache/nifi/processor/QueueSize.java | 48 -- .../org/apache/nifi/connectable/Connection.java | 2 +- .../nifi/controller/StandardFlowFileQueue.java | 108 +++- .../nifi/connectable/StandardConnection.java | 26 +- .../nifi/controller/FileSystemSwapManager.java | 626 ++++++++----------- .../apache/nifi/controller/FlowController.java | 193 +++--- .../repository/BatchingSessionFactory.java | 2 +- .../repository/ConnectionSwapInfo.java | 58 -- .../repository/StandardProcessSession.java | 4 +- .../repository/StandardRepositoryRecord.java | 2 +- .../repository/VolatileFlowFileRepository.java | 2 +- .../WriteAheadFlowFileRepository.java | 2 +- .../controller/TestFileSystemSwapManager.java | 6 +- .../repository/TestStandardProcessSession.java | 9 +- .../TestWriteAheadFlowFileRepository.java | 2 +- 26 files changed, 1003 insertions(+), 838 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/b8c51dc3/nifi-api/src/main/java/org/apache/nifi/controller/FlowFileQueue.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/FlowFileQueue.java b/nifi-api/src/main/java/org/apache/nifi/controller/FlowFileQueue.java deleted file mode 100644 index e1baeb7..0000000 --- a/nifi-api/src/main/java/org/apache/nifi/controller/FlowFileQueue.java +++ /dev/null @@ -1,203 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.controller; - -import java.util.Collection; -import java.util.List; -import java.util.Queue; -import java.util.Set; -import java.util.concurrent.TimeUnit; - -import org.apache.nifi.controller.repository.FlowFileRecord; -import org.apache.nifi.flowfile.FlowFilePrioritizer; -import org.apache.nifi.processor.FlowFileFilter; -import org.apache.nifi.processor.QueueSize; - -public interface FlowFileQueue { - - /** - * @return the unique identifier for this FlowFileQueue - */ - String getIdentifier(); - - /** - * @return list of processing priorities for this queue - */ - List<FlowFilePrioritizer> getPriorities(); - - /** - * @return the minimum number of FlowFiles that must be present in order for - * FlowFiles to begin being swapped out of the queue - */ - int getSwapThreshold(); - - /** - * Resets the comparator used by this queue to maintain order. - * - * @param newPriorities the ordered list of prioritizers to use to determine - * order within this queue. - * @throws NullPointerException if arg is null - */ - void setPriorities(List<FlowFilePrioritizer> newPriorities); - - /** - * Establishes this queue's preferred maximum work load. - * - * @param maxQueueSize the maximum number of flow files this processor - * recommends having in its work queue at any one time - */ - void setBackPressureObjectThreshold(long maxQueueSize); - - /** - * @return maximum number of flow files that should be queued up at any one - * time - */ - long getBackPressureObjectThreshold(); - - /** - * @param maxDataSize Establishes this queue's preferred maximum data size. - */ - void setBackPressureDataSizeThreshold(String maxDataSize); - - /** - * @return maximum data size that should be queued up at any one time - */ - String getBackPressureDataSizeThreshold(); - - QueueSize size(); - - /** - * @return total size in bytes of the queue flow file's content - */ - long contentSize(); - - /** - * @return true if no items queue; false otherwise - */ - boolean isEmpty(); - - /** - * @return true if the active queue is empty; false otherwise. The Active - * queue contains those FlowFiles that can be processed immediately and does - * not include those FlowFiles that have been swapped out or are currently - * being processed - */ - boolean isActiveQueueEmpty(); - - QueueSize getActiveQueueSize(); - - /** - * Returns a QueueSize that represents all FlowFiles that are 'unacknowledged'. A FlowFile - * is considered to be unacknowledged if it has been pulled from the queue by some component - * but the session that pulled the FlowFile has not yet been committed or rolled back. - * - * @return a QueueSize that represents all FlowFiles that are 'unacknowledged'. - */ - QueueSize getUnacknowledgedQueueSize(); - - void acknowledge(FlowFileRecord flowFile); - - void acknowledge(Collection<FlowFileRecord> flowFiles); - - /** - * @return true if maximum queue size has been reached or exceeded; false - * otherwise - */ - boolean isFull(); - - /** - * places the given file into the queue - * - * @param file to place into queue - */ - void put(FlowFileRecord file); - - /** - * places the given files into the queue - * - * @param files to place into queue - */ - void putAll(Collection<FlowFileRecord> files); - - /** - * Removes all records from the internal swap queue and returns them. - * - * @return all removed records from internal swap queue - */ - List<FlowFileRecord> pollSwappableRecords(); - - /** - * Restores the records from swap space into this queue, adding the records - * that have expired to the given set instead of enqueuing them. - * - * @param records that were swapped in - */ - void putSwappedRecords(Collection<FlowFileRecord> records); - - /** - * Updates the internal counters of how much data is queued, based on - * swapped data that is being restored. - * - * @param numRecords count of records swapped in - * @param contentSize total size of records being swapped in - */ - void incrementSwapCount(int numRecords, long contentSize); - - /** - * @return the number of FlowFiles that are enqueued and not swapped - */ - int unswappedSize(); - - int getSwapRecordCount(); - - int getSwapQueueSize(); - - /** - * @param expiredRecords expired records - * @return the next flow file on the queue; null if empty - */ - FlowFileRecord poll(Set<FlowFileRecord> expiredRecords); - - /** - * @param maxResults limits how many results can be polled - * @param expiredRecords for expired records - * @return the next flow files on the queue up to the max results; null if - * empty - */ - List<FlowFileRecord> poll(int maxResults, Set<FlowFileRecord> expiredRecords); - - /** - * Drains flow files from the given source queue into the given destination - * list. - * - * @param sourceQueue queue to drain from - * @param destination Collection to drain to - * @param maxResults max number to drain - * @param expiredRecords for expired records - * @return size (bytes) of flow files drained from queue - */ - long drainQueue(Queue<FlowFileRecord> sourceQueue, List<FlowFileRecord> destination, int maxResults, Set<FlowFileRecord> expiredRecords); - - List<FlowFileRecord> poll(FlowFileFilter filter, Set<FlowFileRecord> expiredRecords); - - String getFlowFileExpiration(); - - int getFlowFileExpiration(TimeUnit timeUnit); - - void setFlowFileExpiration(String flowExpirationPeriod); - -} http://git-wip-us.apache.org/repos/asf/nifi/blob/b8c51dc3/nifi-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileState.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileState.java b/nifi-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileState.java new file mode 100644 index 0000000..3f16d00 --- /dev/null +++ b/nifi-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileState.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.queue; + +/** + * Represents the state that a Drop FlowFile request is in + */ +public enum DropFlowFileState { + + WAITING_FOR_LOCK("Waiting for Destination Component to complete its action"), + DROPPING_ACTIVE_FLOWFILES("Dropping FlowFiles from queue"), + COMPLETE("Completed Successfully"), + FAILURE("Failed"); + + private final String description; + + private DropFlowFileState(final String description) { + this.description = description; + } + + @Override + public String toString() { + return description; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/b8c51dc3/nifi-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileStatus.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileStatus.java b/nifi-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileStatus.java new file mode 100644 index 0000000..b216608 --- /dev/null +++ b/nifi-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileStatus.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.queue; + +/** + * Represents the status of a Drop FlowFile Request that has been issued to + * a {@link FlowFileQueue}. When a queue is requested to drop its FlowFiles, + * that process may be rather lengthy in the case of a poorly behaving + * FlowFileRepository or if the destination Processor is polling from the + * queue using a filter that is misbehaving. As a result, the dropping of + * FlowFiles is performed asynchronously. + * + * This status object provides information about how far along in the process + * we currently are and information about the success or failure of the + * operation. + */ +public interface DropFlowFileStatus { + + /** + * @return the identifier of the request to drop FlowFiles from the queue + */ + String getRequestIdentifier(); + + /** + * @return the date/time (in milliseconds since epoch) at which the request to + * drop the FlowFiles from a queue was submitted + */ + long getRequestSubmissionTime(); + + /** + * @return the size of the queue when the drop request was issued or <code>null</code> if + * it is not yet known, which can happen if the {@link DropFlowFileState} is + * {@link DropFlowFileState#WAITING_FOR_LOCK}. + */ + QueueSize getOriginalSize(); + + /** + * @return the current size of the queue or <code>null</code> if it is not yet known + */ + QueueSize getCurrentSize(); + + /** + * @return the current state of the operation + */ + DropFlowFileState getState(); + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/b8c51dc3/nifi-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java b/nifi-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java new file mode 100644 index 0000000..31f17e0 --- /dev/null +++ b/nifi-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java @@ -0,0 +1,256 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller.queue; + +import java.util.Collection; +import java.util.List; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.controller.repository.FlowFileRecord; +import org.apache.nifi.flowfile.FlowFilePrioritizer; +import org.apache.nifi.processor.FlowFileFilter; + +public interface FlowFileQueue { + + /** + * @return the unique identifier for this FlowFileQueue + */ + String getIdentifier(); + + /** + * @return list of processing priorities for this queue + */ + List<FlowFilePrioritizer> getPriorities(); + + /** + * Reads any Swap Files that belong to this queue and increments counts so that the size + * of the queue will reflect the size of all FlowFiles regardless of whether or not they are + * swapped out. This will be called only during NiFi startup as an initialization step. This + * method is then responsible for returning the largest ID of any FlowFile that is swapped + * out, or <code>null</code> if no FlowFiles are swapped out for this queue. + * + * @return the largest ID of any FlowFile that is swapped out for this queue, or <code>null</code> if + * no FlowFiles are swapped out for this queue. + */ + Long recoverSwappedFlowFiles(); + + /** + * Destroys any Swap Files that exist for this queue without updating the FlowFile Repository + * or Provenance Repository. This is done only on startup in the case of non-persistent + * repositories. In the case of non-persistent repositories, we may still have Swap Files because + * we may still need to overflow the FlowFiles from heap onto disk, even though we don't want to keep + * the FlowFiles on restart. + */ + void purgeSwapFiles(); + + /** + * @return the minimum number of FlowFiles that must be present in order for + * FlowFiles to begin being swapped out of the queue + */ + // TODO: REMOVE THIS. + int getSwapThreshold(); + + /** + * Resets the comparator used by this queue to maintain order. + * + * @param newPriorities the ordered list of prioritizers to use to determine + * order within this queue. + * @throws NullPointerException if arg is null + */ + void setPriorities(List<FlowFilePrioritizer> newPriorities); + + /** + * Establishes this queue's preferred maximum work load. + * + * @param maxQueueSize the maximum number of flow files this processor + * recommends having in its work queue at any one time + */ + void setBackPressureObjectThreshold(long maxQueueSize); + + /** + * @return maximum number of flow files that should be queued up at any one + * time + */ + long getBackPressureObjectThreshold(); + + /** + * @param maxDataSize Establishes this queue's preferred maximum data size. + */ + void setBackPressureDataSizeThreshold(String maxDataSize); + + /** + * @return maximum data size that should be queued up at any one time + */ + String getBackPressureDataSizeThreshold(); + + QueueSize size(); + + /** + * @return true if no items queue; false otherwise + */ + boolean isEmpty(); + + /** + * @return true if the active queue is empty; false otherwise. The Active + * queue contains those FlowFiles that can be processed immediately and does + * not include those FlowFiles that have been swapped out or are currently + * being processed + */ + // TODO: REMOVE? + boolean isActiveQueueEmpty(); + + // TODO: REMOVE? + QueueSize getActiveQueueSize(); + + /** + * Returns a QueueSize that represents all FlowFiles that are 'unacknowledged'. A FlowFile + * is considered to be unacknowledged if it has been pulled from the queue by some component + * but the session that pulled the FlowFile has not yet been committed or rolled back. + * + * @return a QueueSize that represents all FlowFiles that are 'unacknowledged'. + */ + QueueSize getUnacknowledgedQueueSize(); + + void acknowledge(FlowFileRecord flowFile); + + void acknowledge(Collection<FlowFileRecord> flowFiles); + + /** + * @return true if maximum queue size has been reached or exceeded; false + * otherwise + */ + boolean isFull(); + + /** + * places the given file into the queue + * + * @param file to place into queue + */ + void put(FlowFileRecord file); + + /** + * places the given files into the queue + * + * @param files to place into queue + */ + void putAll(Collection<FlowFileRecord> files); + + /** + * Removes all records from the internal swap queue and returns them. + * + * @return all removed records from internal swap queue + */ + // TODO: REMOVE THIS? + List<FlowFileRecord> pollSwappableRecords(); + + /** + * Restores the records from swap space into this queue, adding the records + * that have expired to the given set instead of enqueuing them. + * + * @param records that were swapped in + */ + // TODO: REMOVE THIS? + void putSwappedRecords(Collection<FlowFileRecord> records); + + /** + * Updates the internal counters of how much data is queued, based on + * swapped data that is being restored. + * + * @param numRecords count of records swapped in + * @param contentSize total size of records being swapped in + */ + // TODO: REMOVE THIS? + void incrementSwapCount(int numRecords, long contentSize); + + /** + * @return the number of FlowFiles that are enqueued and not swapped + */ + // TODO: REMOVE THIS? + int unswappedSize(); + + // TODO: REMOVE THIS? + int getSwapRecordCount(); + + // TODO: REMOVE THIS? + int getSwapQueueSize(); + + /** + * @param expiredRecords expired records + * @return the next flow file on the queue; null if empty + */ + FlowFileRecord poll(Set<FlowFileRecord> expiredRecords); + + /** + * @param maxResults limits how many results can be polled + * @param expiredRecords for expired records + * @return the next flow files on the queue up to the max results; null if + * empty + */ + List<FlowFileRecord> poll(int maxResults, Set<FlowFileRecord> expiredRecords); + + /** + * Drains flow files from the given source queue into the given destination + * list. + * + * @param sourceQueue queue to drain from + * @param destination Collection to drain to + * @param maxResults max number to drain + * @param expiredRecords for expired records + * @return size (bytes) of flow files drained from queue + */ + long drainQueue(Queue<FlowFileRecord> sourceQueue, List<FlowFileRecord> destination, int maxResults, Set<FlowFileRecord> expiredRecords); + + List<FlowFileRecord> poll(FlowFileFilter filter, Set<FlowFileRecord> expiredRecords); + + String getFlowFileExpiration(); + + int getFlowFileExpiration(TimeUnit timeUnit); + + void setFlowFileExpiration(String flowExpirationPeriod); + + /** + * Initiates a request to drop all FlowFiles in this queue. This method returns + * a DropFlowFileStatus that can be used to determine the current state of the request. + * Additionally, the DropFlowFileStatus provides a request identifier that can then be + * passed to the {@link #getDropFlowFileStatus(String)} and {@link #cancelDropFlowFileStatus(String)} + * methods in order to obtain the status later or cancel a request + * + * @return the status of the drop request. + */ + DropFlowFileStatus dropFlowFiles(); + + /** + * Returns the current status of a Drop FlowFile Request that was initiated via the + * {@link #dropFlowFiles()} method that has the given identifier + * + * @param requestIdentifier the identifier of the Drop FlowFile Request + * @return the status for the request with the given identifier, or <code>null</code> if no + * request status exists with that identifier + */ + DropFlowFileStatus getDropFlowFileStatus(String requestIdentifier); + + /** + * Cancels the request to drop FlowFiles that has the given identifier + * + * @param requestIdentifier the identifier of the Drop FlowFile Request + * @return <code>true</code> if the request was canceled, <code>false</code> if the request has + * already completed or is not known + */ + boolean cancelDropFlowFileRequest(String requestIdentifier); +} http://git-wip-us.apache.org/repos/asf/nifi/blob/b8c51dc3/nifi-api/src/main/java/org/apache/nifi/controller/queue/QueueSize.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/queue/QueueSize.java b/nifi-api/src/main/java/org/apache/nifi/controller/queue/QueueSize.java new file mode 100644 index 0000000..42d8416 --- /dev/null +++ b/nifi-api/src/main/java/org/apache/nifi/controller/queue/QueueSize.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller.queue; + +/** + * + */ +public class QueueSize { + + private final int objectCount; + private final long totalSizeBytes; + + public QueueSize(final int numberObjects, final long totalSizeBytes) { + if (numberObjects < 0 || totalSizeBytes < 0) { + throw new IllegalArgumentException(); + } + objectCount = numberObjects; + this.totalSizeBytes = totalSizeBytes; + } + + /** + * @return number of objects present on the queue + */ + public int getObjectCount() { + return objectCount; + } + + /** + * @return total size in bytes of the content for the data on the queue + */ + public long getByteCount() { + return totalSizeBytes; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/b8c51dc3/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileRepository.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileRepository.java b/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileRepository.java index 58fc6b3..906cbe2 100644 --- a/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileRepository.java +++ b/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileRepository.java @@ -21,7 +21,7 @@ import java.io.IOException; import java.util.Collection; import java.util.List; -import org.apache.nifi.controller.FlowFileQueue; +import org.apache.nifi.controller.queue.FlowFileQueue; import org.apache.nifi.controller.repository.claim.ResourceClaimManager; /** http://git-wip-us.apache.org/repos/asf/nifi/blob/b8c51dc3/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileSwapManager.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileSwapManager.java b/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileSwapManager.java index 2e5be11..57e9186 100644 --- a/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileSwapManager.java +++ b/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileSwapManager.java @@ -16,8 +16,11 @@ */ package org.apache.nifi.controller.repository; -import org.apache.nifi.controller.repository.claim.ResourceClaimManager; -import org.apache.nifi.events.EventReporter; +import java.io.IOException; +import java.util.List; + +import org.apache.nifi.controller.queue.FlowFileQueue; +import org.apache.nifi.controller.queue.QueueSize; /** * Defines a mechanism by which FlowFiles can be move into external storage or @@ -26,38 +29,81 @@ import org.apache.nifi.events.EventReporter; public interface FlowFileSwapManager { /** - * Starts the Manager's background threads to start swapping FlowFiles in - * and out of memory + * Initializes the Swap Manager, providing a {@link SwapManagerInitializationContext} so that the + * Swap Manager has access to all of the components necessary to perform its functions * - * @param flowFileRepository the FlowFileRepository that must be notified of - * any swapping in or out of FlowFiles - * @param queueProvider the provider of FlowFileQueue's so that FlowFiles - * can be obtained and restored - * @param claimManager the ContentClaimManager to use for interacting with - * Content Claims - * @param reporter the EventReporter that can be used for notifying users of - * important events + * @param initializationContext the context the provides the swap manager with access to the + * resources that it needs to perform its functions */ - void start(FlowFileRepository flowFileRepository, QueueProvider queueProvider, ResourceClaimManager claimManager, EventReporter reporter); + void initialize(SwapManagerInitializationContext initializationContext); /** - * Shuts down the manager + * Swaps out the given FlowFiles that belong to the queue with the given identifier. + * + * @param flowFiles the FlowFiles to swap out to external storage + * @param flowFileQueue the queue that the FlowFiles belong to + * @return the location of the externally stored swap file + * + * @throws IOException if unable to swap the FlowFiles out */ - void shutdown(); + String swapOut(List<FlowFileRecord> flowFiles, FlowFileQueue flowFileQueue) throws IOException; /** - * Removes all Swap information, permanently destroying any FlowFiles that - * have been swapped out + * Recovers the SwapFiles from the swap file that lives at the given location. This action + * provides a view of the FlowFiles but does not actively swap them in, meaning that the swap file + * at the given location remains in that location and the FlowFile Repository is not updated. + * + * @param swapLocation the location of hte swap file + * @param flowFileQueue the queue that the FlowFiles belong to + * @return the FlowFiles that live at the given swap location + * + * @throws IOException if unable to recover the FlowFiles from the given location */ - void purge(); + List<FlowFileRecord> peek(String swapLocation, final FlowFileQueue flowFileQueue) throws IOException; /** - * Notifies FlowFile queues of the number of FlowFiles and content size of - * all FlowFiles that are currently swapped out + * Recovers the FlowFiles from the swap file that lives at the given location and belongs + * to the FlowFile Queue with the given identifier. The FlowFile Repository is then updated + * and the swap file is permanently removed from the external storage + * + * @param swapLocation the location of the swap file + * @param flowFileQueue the queue to which the FlowFiles belong + * + * @return the FlowFiles that are stored in the given location + * + * @throws IOException if unable to recover the FlowFiles from the given location or update the + * FlowFileRepository + */ + List<FlowFileRecord> swapIn(String swapLocation, FlowFileQueue flowFileQueue) throws IOException; + + /** + * Determines swap files that exist for the given FlowFileQueue + * + * @param flowFileQueue the queue for which the FlowFiles should be recovered * - * @param connectionProvider provider - * @param claimManager manager - * @return how many flowfiles have been recovered + * @return all swap locations that have been identified for the given queue, in the order that they should + * be swapped back in */ - long recoverSwappedFlowFiles(QueueProvider connectionProvider, ResourceClaimManager claimManager); + List<String> recoverSwapLocations(FlowFileQueue flowFileQueue) throws IOException; + + /** + * Determines how many FlowFiles and the size of the FlowFiles that are swapped out at the given location + * + * @param swapLocation the location of the swap file + * @return the QueueSize representing the number of FlowFiles and total size of the FlowFiles that are swapped out + */ + QueueSize getSwapSize(String swapLocation) throws IOException; + + /** + * Returns the maximum record id of the FlowFiles stored at the given swap location + * + * @param swapLocation the swap location to read id's from + * @return the max record id of any FlowFile in the swap location, or null if no record ID's can be found + */ + Long getMaxRecordId(String swapLocation) throws IOException; + + /** + * Purge all known Swap Files without updating FlowFileRepository or Provenance Repository + */ + void purge(); } http://git-wip-us.apache.org/repos/asf/nifi/blob/b8c51dc3/nifi-api/src/main/java/org/apache/nifi/controller/repository/QueueProvider.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/repository/QueueProvider.java b/nifi-api/src/main/java/org/apache/nifi/controller/repository/QueueProvider.java index fcb516d..95d9f2e 100644 --- a/nifi-api/src/main/java/org/apache/nifi/controller/repository/QueueProvider.java +++ b/nifi-api/src/main/java/org/apache/nifi/controller/repository/QueueProvider.java @@ -18,7 +18,7 @@ package org.apache.nifi.controller.repository; import java.util.Collection; -import org.apache.nifi.controller.FlowFileQueue; +import org.apache.nifi.controller.queue.FlowFileQueue; /** * Provides a collection of <code>FlowFileQueue</code>s that represents all http://git-wip-us.apache.org/repos/asf/nifi/blob/b8c51dc3/nifi-api/src/main/java/org/apache/nifi/controller/repository/RepositoryRecord.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/repository/RepositoryRecord.java b/nifi-api/src/main/java/org/apache/nifi/controller/repository/RepositoryRecord.java index 40d44a8..09202c0 100644 --- a/nifi-api/src/main/java/org/apache/nifi/controller/repository/RepositoryRecord.java +++ b/nifi-api/src/main/java/org/apache/nifi/controller/repository/RepositoryRecord.java @@ -16,7 +16,7 @@ */ package org.apache.nifi.controller.repository; -import org.apache.nifi.controller.FlowFileQueue; +import org.apache.nifi.controller.queue.FlowFileQueue; import org.apache.nifi.controller.repository.claim.ContentClaim; /** http://git-wip-us.apache.org/repos/asf/nifi/blob/b8c51dc3/nifi-api/src/main/java/org/apache/nifi/controller/repository/SwapManagerInitializationContext.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/repository/SwapManagerInitializationContext.java b/nifi-api/src/main/java/org/apache/nifi/controller/repository/SwapManagerInitializationContext.java new file mode 100644 index 0000000..564d5ec --- /dev/null +++ b/nifi-api/src/main/java/org/apache/nifi/controller/repository/SwapManagerInitializationContext.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.repository; + +import org.apache.nifi.controller.repository.claim.ResourceClaimManager; +import org.apache.nifi.events.EventReporter; + +public interface SwapManagerInitializationContext { + + /** + * @return the {@link FlowFileRepository} that should be updated when FlowFiles are swapped in and out + */ + FlowFileRepository getFlowFileRepository(); + + + /** + * @return the {@link ResourceClaimManager} that is necessary to provide to the FlowFileRepository when + * performing swapping actions + */ + ResourceClaimManager getResourceClaimManager(); + + /** + * @return an {@link EventReporter} that can be used to report events to users + */ + EventReporter getEventReporter(); +} http://git-wip-us.apache.org/repos/asf/nifi/blob/b8c51dc3/nifi-api/src/main/java/org/apache/nifi/processor/ProcessSession.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/processor/ProcessSession.java b/nifi-api/src/main/java/org/apache/nifi/processor/ProcessSession.java index ed46d68..ebd56a9 100644 --- a/nifi-api/src/main/java/org/apache/nifi/processor/ProcessSession.java +++ b/nifi-api/src/main/java/org/apache/nifi/processor/ProcessSession.java @@ -25,6 +25,7 @@ import java.util.Map; import java.util.Set; import java.util.regex.Pattern; +import org.apache.nifi.controller.queue.QueueSize; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.processor.exception.FlowFileAccessException; import org.apache.nifi.processor.exception.FlowFileHandlingException; http://git-wip-us.apache.org/repos/asf/nifi/blob/b8c51dc3/nifi-api/src/main/java/org/apache/nifi/processor/QueueSize.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/processor/QueueSize.java b/nifi-api/src/main/java/org/apache/nifi/processor/QueueSize.java deleted file mode 100644 index c3c2ccc..0000000 --- a/nifi-api/src/main/java/org/apache/nifi/processor/QueueSize.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processor; - -/** - * - */ -public class QueueSize { - - private final int objectCount; - private final long totalSizeBytes; - - public QueueSize(final int numberObjects, final long totalSizeBytes) { - if (numberObjects < 0 || totalSizeBytes < 0) { - throw new IllegalArgumentException(); - } - objectCount = numberObjects; - this.totalSizeBytes = totalSizeBytes; - } - - /** - * @return number of objects present on the queue - */ - public int getObjectCount() { - return objectCount; - } - - /** - * @return total size in bytes of the content for the data on the queue - */ - public long getByteCount() { - return totalSizeBytes; - } -} http://git-wip-us.apache.org/repos/asf/nifi/blob/b8c51dc3/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/connectable/Connection.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/connectable/Connection.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/connectable/Connection.java index 0a0089d..2e66905 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/connectable/Connection.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/connectable/Connection.java @@ -20,7 +20,7 @@ import java.util.Collection; import java.util.List; import java.util.Set; -import org.apache.nifi.controller.FlowFileQueue; +import org.apache.nifi.controller.queue.FlowFileQueue; import org.apache.nifi.controller.repository.FlowFileRecord; import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.processor.FlowFileFilter; http://git-wip-us.apache.org/repos/asf/nifi/blob/b8c51dc3/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java index f47ea2f..df356fd 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java @@ -16,6 +16,7 @@ */ package org.apache.nifi.controller; +import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; import java.util.Collection; @@ -34,21 +35,25 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.nifi.connectable.Connection; +import org.apache.nifi.controller.queue.DropFlowFileStatus; +import org.apache.nifi.controller.queue.FlowFileQueue; +import org.apache.nifi.controller.queue.QueueSize; import org.apache.nifi.controller.repository.FlowFileRecord; +import org.apache.nifi.controller.repository.FlowFileSwapManager; import org.apache.nifi.controller.repository.claim.ContentClaim; +import org.apache.nifi.events.EventReporter; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.FlowFilePrioritizer; import org.apache.nifi.processor.DataUnit; import org.apache.nifi.processor.FlowFileFilter; import org.apache.nifi.processor.FlowFileFilter.FlowFileFilterResult; -import org.apache.nifi.processor.QueueSize; +import org.apache.nifi.reporting.Severity; import org.apache.nifi.scheduling.SchedulingStrategy; import org.apache.nifi.util.FormatUtils; import org.apache.nifi.util.concurrency.TimedLock; import org.apache.nifi.util.timebuffer.LongEntityAccess; import org.apache.nifi.util.timebuffer.TimedBuffer; import org.apache.nifi.util.timebuffer.TimestampedLong; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -84,12 +89,14 @@ public final class StandardFlowFileQueue implements FlowFileQueue { private boolean swapMode = false; private long maximumQueueObjectCount; + private final EventReporter eventReporter; private final AtomicLong flowFileExpirationMillis; private final Connection connection; private final AtomicReference<String> flowFileExpirationPeriod; private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true); private final List<FlowFilePrioritizer> priorities; private final int swapThreshold; + private final FlowFileSwapManager swapManager; private final TimedLock readLock; private final TimedLock writeLock; private final String identifier; @@ -101,7 +108,8 @@ public final class StandardFlowFileQueue implements FlowFileQueue { // SCHEDULER CANNOT BE NOTIFIED OF EVENTS WITH THE WRITE LOCK HELD! DOING SO WILL RESULT IN A DEADLOCK! private final ProcessScheduler scheduler; - public StandardFlowFileQueue(final String identifier, final Connection connection, final ProcessScheduler scheduler, final int swapThreshold) { + public StandardFlowFileQueue(final String identifier, final Connection connection, final ProcessScheduler scheduler, final FlowFileSwapManager swapManager, final EventReporter eventReporter, + final int swapThreshold) { activeQueue = new PriorityQueue<>(20, new Prioritizer(new ArrayList<FlowFilePrioritizer>())); priorities = new ArrayList<>(); maximumQueueObjectCount = 0L; @@ -110,6 +118,8 @@ public final class StandardFlowFileQueue implements FlowFileQueue { flowFileExpirationMillis = new AtomicLong(0); flowFileExpirationPeriod = new AtomicReference<>("0 mins"); swapQueue = new ArrayList<>(); + this.eventReporter = eventReporter; + this.swapManager = swapManager; this.identifier = identifier; this.swapThreshold = swapThreshold; @@ -233,21 +243,6 @@ public final class StandardFlowFileQueue implements FlowFileQueue { } @Override - public long contentSize() { - readLock.lock(); - try { - final PreFetch prefetch = preFetchRef.get(); - if (prefetch == null) { - return activeQueueContentSize + swappedContentSize + unacknowledgedSizeRef.get().getObjectCount(); - } else { - return activeQueueContentSize + swappedContentSize + unacknowledgedSizeRef.get().getObjectCount() + prefetch.size().getByteCount(); - } - } finally { - readLock.unlock("getContentSize"); - } - } - - @Override public boolean isEmpty() { readLock.lock(); try { @@ -945,11 +940,88 @@ public final class StandardFlowFileQueue implements FlowFileQueue { this.flowFileExpirationMillis.set(millis); } + + @Override + public void purgeSwapFiles() { + swapManager.purge(); + } + + @Override + public Long recoverSwappedFlowFiles() { + int swapFlowFileCount = 0; + long swapByteCount = 0L; + Long maxId = null; + + writeLock.lock(); + try { + final List<String> swapLocations; + try { + swapLocations = swapManager.recoverSwapLocations(this); + } catch (final IOException ioe) { + logger.error("Failed to determine whether or not any Swap Files exist for FlowFile Queue {}", getIdentifier()); + logger.error("", ioe); + if (eventReporter != null) { + eventReporter.reportEvent(Severity.ERROR, "FlowFile Swapping", "Failed to determine whether or not any Swap Files exist for FlowFile Queue " + + getIdentifier() + "; see logs for more detials"); + } + return null; + } + + for (final String swapLocation : swapLocations) { + try { + final QueueSize queueSize = swapManager.getSwapSize(swapLocation); + final Long maxSwapRecordId = swapManager.getMaxRecordId(swapLocation); + if (maxSwapRecordId != null) { + if (maxId == null || maxSwapRecordId > maxId) { + maxId = maxSwapRecordId; + } + } + + swapFlowFileCount += queueSize.getObjectCount(); + swapByteCount += queueSize.getByteCount(); + } catch (final IOException ioe) { + logger.error("Failed to recover FlowFiles from Swap File {}; the file appears to be corrupt", swapLocation, ioe.toString()); + logger.error("", ioe); + if (eventReporter != null) { + eventReporter.reportEvent(Severity.ERROR, "FlowFile Swapping", "Failed to recover FlowFiles from Swap File " + swapLocation + + "; the file appears to be corrupt. See logs for more details"); + } + } + } + + this.swappedRecordCount = swapFlowFileCount; + this.swappedContentSize = swapByteCount; + } finally { + writeLock.unlock("Recover Swap Files"); + } + + return maxId; + } + + @Override public String toString() { return "FlowFileQueue[id=" + identifier + "]"; } + @Override + public DropFlowFileStatus dropFlowFiles() { + // TODO Auto-generated method stub + return null; + } + + @Override + public boolean cancelDropFlowFileRequest(String requestIdentifier) { + // TODO Auto-generated method stub + return false; + } + + @Override + public DropFlowFileStatus getDropFlowFileStatus(String requestIdentifier) { + // TODO Auto-generated method stub + return null; + } + /** * Lock the queue so that other threads are unable to interact with the * queue http://git-wip-us.apache.org/repos/asf/nifi/blob/b8c51dc3/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java index ad556e2..f0a6d8a 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java @@ -26,18 +26,19 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; -import org.apache.nifi.controller.FlowFileQueue; +import org.apache.commons.lang3.builder.EqualsBuilder; +import org.apache.commons.lang3.builder.HashCodeBuilder; import org.apache.nifi.controller.ProcessScheduler; import org.apache.nifi.controller.StandardFlowFileQueue; +import org.apache.nifi.controller.queue.FlowFileQueue; import org.apache.nifi.controller.repository.FlowFileRecord; +import org.apache.nifi.controller.repository.FlowFileSwapManager; +import org.apache.nifi.events.EventReporter; import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.processor.FlowFileFilter; import org.apache.nifi.processor.Relationship; import org.apache.nifi.util.NiFiProperties; -import org.apache.commons.lang3.builder.EqualsBuilder; -import org.apache.commons.lang3.builder.HashCodeBuilder; - /** * Models a connection between connectable components. A connection may contain one or more relationships that map the source component to the destination component. */ @@ -65,7 +66,7 @@ public final class StandardConnection implements Connection { destination = new AtomicReference<>(builder.destination); relationships = new AtomicReference<>(Collections.unmodifiableCollection(builder.relationships)); scheduler = builder.scheduler; - flowFileQueue = new StandardFlowFileQueue(id, this, scheduler, NiFiProperties.getInstance().getQueueSwapThreshold()); + flowFileQueue = new StandardFlowFileQueue(id, this, scheduler, builder.swapManager, builder.eventReporter, NiFiProperties.getInstance().getQueueSwapThreshold()); hashCode = new HashCodeBuilder(7, 67).append(id).toHashCode(); } @@ -259,6 +260,8 @@ public final class StandardConnection implements Connection { private Connectable source; private Connectable destination; private Collection<Relationship> relationships; + private FlowFileSwapManager swapManager; + private EventReporter eventReporter; public Builder(final ProcessScheduler scheduler) { this.scheduler = scheduler; @@ -305,6 +308,16 @@ public final class StandardConnection implements Connection { return this; } + public Builder swapManager(final FlowFileSwapManager swapManager) { + this.swapManager = swapManager; + return this; + } + + public Builder eventReporter(final EventReporter eventReporter) { + this.eventReporter = eventReporter; + return this; + } + public StandardConnection build() { if (source == null) { throw new IllegalStateException("Cannot build a Connection without a Source"); @@ -312,6 +325,9 @@ public final class StandardConnection implements Connection { if (destination == null) { throw new IllegalStateException("Cannot build a Connection without a Destination"); } + if (swapManager == null) { + throw new IllegalStateException("Cannot build a Connection without a FlowFileSwapManager"); + } if (relationships == null) { relationships = new ArrayList<>();
