NIFI-730: Implemented swapping in and out on-demand by the FlowFileQueue rather than in a background thread
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/49a781df Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/49a781df Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/49a781df Branch: refs/heads/master Commit: 49a781df2d44859ec59672c2755b7346452cd74a Parents: b8c51dc Author: Mark Payne <[email protected]> Authored: Mon Oct 12 13:27:07 2015 -0400 Committer: Mark Payne <[email protected]> Committed: Tue Oct 13 10:03:03 2015 -0400 ---------------------------------------------------------------------- .../controller/queue/DropFlowFileState.java | 3 +- .../controller/queue/DropFlowFileStatus.java | 7 +- .../nifi/controller/queue/FlowFileQueue.java | 50 -- .../apache/nifi/controller/queue/QueueSize.java | 7 + .../repository/FlowFileSwapManager.java | 17 +- .../SwapManagerInitializationContext.java | 1 - .../java/org/apache/nifi/util/MockFlowFile.java | 24 +- .../org/apache/nifi/util/MockFlowFileQueue.java | 3 +- .../apache/nifi/util/MockProcessSession.java | 15 +- .../nifi/util/StandardProcessorTestRunner.java | 24 +- .../java/org/apache/nifi/util/TestRunner.java | 124 ++-- .../nifi/controller/DropFlowFileRequest.java | 99 +++ .../nifi/controller/StandardFlowFileQueue.java | 687 +++++++++---------- .../controller/TestStandardFlowFileQueue.java | 330 +++++++++ .../nifi/connectable/StandardConnection.java | 33 +- .../nifi/controller/FileSystemSwapManager.java | 54 +- .../apache/nifi/controller/FlowController.java | 24 +- .../repository/StandardProcessSession.java | 102 +-- .../java/org/apache/nifi/util/Connectables.java | 2 +- .../controller/TestFileSystemSwapManager.java | 142 +++- .../repository/TestStandardProcessSession.java | 2 +- .../nifi/web/controller/ControllerFacade.java | 30 +- 22 files changed, 1155 insertions(+), 625 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/49a781df/nifi-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileState.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileState.java b/nifi-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileState.java index 3f16d00..e412b80 100644 --- a/nifi-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileState.java +++ b/nifi-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileState.java @@ -25,7 +25,8 @@ public enum DropFlowFileState { WAITING_FOR_LOCK("Waiting for Destination Component to complete its action"), DROPPING_ACTIVE_FLOWFILES("Dropping FlowFiles from queue"), COMPLETE("Completed Successfully"), - FAILURE("Failed"); + FAILURE("Failed"), + CANCELED("Cancelled by User"); private final String description; http://git-wip-us.apache.org/repos/asf/nifi/blob/49a781df/nifi-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileStatus.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileStatus.java b/nifi-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileStatus.java index b216608..3c3be9b 100644 --- a/nifi-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileStatus.java +++ b/nifi-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileStatus.java @@ -43,6 +43,12 @@ public interface DropFlowFileStatus { long getRequestSubmissionTime(); /** + * @return the date/time (in milliseconds since epoch) at which the status of the + * request was last updated + */ + long getLastUpdated(); + + /** * @return the size of the queue when the drop request was issued or <code>null</code> if * it is not yet known, which can happen if the {@link DropFlowFileState} is * {@link DropFlowFileState#WAITING_FOR_LOCK}. @@ -58,5 +64,4 @@ public interface DropFlowFileStatus { * @return the current state of the operation */ DropFlowFileState getState(); - } http://git-wip-us.apache.org/repos/asf/nifi/blob/49a781df/nifi-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java b/nifi-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java index 31f17e0..bc2f358 100644 --- a/nifi-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java +++ b/nifi-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java @@ -60,13 +60,6 @@ public interface FlowFileQueue { void purgeSwapFiles(); /** - * @return the minimum number of FlowFiles that must be present in order for - * FlowFiles to begin being swapped out of the queue - */ - // TODO: REMOVE THIS. - int getSwapThreshold(); - - /** * Resets the comparator used by this queue to maintain order. * * @param newPriorities the ordered list of prioritizers to use to determine @@ -112,12 +105,8 @@ public interface FlowFileQueue { * not include those FlowFiles that have been swapped out or are currently * being processed */ - // TODO: REMOVE? boolean isActiveQueueEmpty(); - // TODO: REMOVE? - QueueSize getActiveQueueSize(); - /** * Returns a QueueSize that represents all FlowFiles that are 'unacknowledged'. A FlowFile * is considered to be unacknowledged if it has been pulled from the queue by some component @@ -152,45 +141,6 @@ public interface FlowFileQueue { void putAll(Collection<FlowFileRecord> files); /** - * Removes all records from the internal swap queue and returns them. - * - * @return all removed records from internal swap queue - */ - // TODO: REMOVE THIS? - List<FlowFileRecord> pollSwappableRecords(); - - /** - * Restores the records from swap space into this queue, adding the records - * that have expired to the given set instead of enqueuing them. - * - * @param records that were swapped in - */ - // TODO: REMOVE THIS? - void putSwappedRecords(Collection<FlowFileRecord> records); - - /** - * Updates the internal counters of how much data is queued, based on - * swapped data that is being restored. - * - * @param numRecords count of records swapped in - * @param contentSize total size of records being swapped in - */ - // TODO: REMOVE THIS? - void incrementSwapCount(int numRecords, long contentSize); - - /** - * @return the number of FlowFiles that are enqueued and not swapped - */ - // TODO: REMOVE THIS? - int unswappedSize(); - - // TODO: REMOVE THIS? - int getSwapRecordCount(); - - // TODO: REMOVE THIS? - int getSwapQueueSize(); - - /** * @param expiredRecords expired records * @return the next flow file on the queue; null if empty */ http://git-wip-us.apache.org/repos/asf/nifi/blob/49a781df/nifi-api/src/main/java/org/apache/nifi/controller/queue/QueueSize.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/queue/QueueSize.java b/nifi-api/src/main/java/org/apache/nifi/controller/queue/QueueSize.java index 42d8416..528d652 100644 --- a/nifi-api/src/main/java/org/apache/nifi/controller/queue/QueueSize.java +++ b/nifi-api/src/main/java/org/apache/nifi/controller/queue/QueueSize.java @@ -16,6 +16,8 @@ */ package org.apache.nifi.controller.queue; +import java.text.NumberFormat; + /** * */ @@ -45,4 +47,9 @@ public class QueueSize { public long getByteCount() { return totalSizeBytes; } + + @Override + public String toString() { + return "QueueSize[FlowFiles=" + objectCount + ", ContentSize=" + NumberFormat.getNumberInstance().format(totalSizeBytes) + " Bytes]"; + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/49a781df/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileSwapManager.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileSwapManager.java b/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileSwapManager.java index 57e9186..a70d287 100644 --- a/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileSwapManager.java +++ b/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileSwapManager.java @@ -26,6 +26,9 @@ import org.apache.nifi.controller.queue.QueueSize; * Defines a mechanism by which FlowFiles can be move into external storage or * memory so that they can be removed from the Java heap and vice-versa */ +// TODO: This needs to be refactored into two different mechanisms, one that is responsible for doing +// framework-y types of things, such as updating the repositories, and another that is responsible +// for serializing and deserializing FlowFiles to external storage. public interface FlowFileSwapManager { /** @@ -38,6 +41,16 @@ public interface FlowFileSwapManager { void initialize(SwapManagerInitializationContext initializationContext); /** + * Drops all FlowFiles that are swapped out at the given location. This will update the Provenance + * Repository as well as the FlowFile Repository and + * + * @param swapLocation the location of the swap file to drop + * @param flowFileQueue the queue to which the FlowFiles belong + * @param user the user that initiated the request + */ + void dropSwappedFlowFiles(String swapLocation, FlowFileQueue flowFileQueue, String user) throws IOException; + + /** * Swaps out the given FlowFiles that belong to the queue with the given identifier. * * @param flowFiles the FlowFiles to swap out to external storage @@ -53,13 +66,13 @@ public interface FlowFileSwapManager { * provides a view of the FlowFiles but does not actively swap them in, meaning that the swap file * at the given location remains in that location and the FlowFile Repository is not updated. * - * @param swapLocation the location of hte swap file + * @param swapLocation the location of the swap file * @param flowFileQueue the queue that the FlowFiles belong to * @return the FlowFiles that live at the given swap location * * @throws IOException if unable to recover the FlowFiles from the given location */ - List<FlowFileRecord> peek(String swapLocation, final FlowFileQueue flowFileQueue) throws IOException; + List<FlowFileRecord> peek(String swapLocation, FlowFileQueue flowFileQueue) throws IOException; /** * Recovers the FlowFiles from the swap file that lives at the given location and belongs http://git-wip-us.apache.org/repos/asf/nifi/blob/49a781df/nifi-api/src/main/java/org/apache/nifi/controller/repository/SwapManagerInitializationContext.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/repository/SwapManagerInitializationContext.java b/nifi-api/src/main/java/org/apache/nifi/controller/repository/SwapManagerInitializationContext.java index 564d5ec..0e30784 100644 --- a/nifi-api/src/main/java/org/apache/nifi/controller/repository/SwapManagerInitializationContext.java +++ b/nifi-api/src/main/java/org/apache/nifi/controller/repository/SwapManagerInitializationContext.java @@ -27,7 +27,6 @@ public interface SwapManagerInitializationContext { */ FlowFileRepository getFlowFileRepository(); - /** * @return the {@link ResourceClaimManager} that is necessary to provide to the FlowFileRepository when * performing swapping actions http://git-wip-us.apache.org/repos/asf/nifi/blob/49a781df/nifi-mock/src/main/java/org/apache/nifi/util/MockFlowFile.java ---------------------------------------------------------------------- diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockFlowFile.java b/nifi-mock/src/main/java/org/apache/nifi/util/MockFlowFile.java index e9fb9d6..41bcc74 100644 --- a/nifi-mock/src/main/java/org/apache/nifi/util/MockFlowFile.java +++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockFlowFile.java @@ -33,12 +33,13 @@ import java.util.Map; import java.util.Set; import java.util.UUID; +import org.apache.nifi.controller.repository.FlowFileRecord; +import org.apache.nifi.controller.repository.claim.ContentClaim; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.attributes.CoreAttributes; - import org.junit.Assert; -public class MockFlowFile implements FlowFile { +public class MockFlowFile implements FlowFileRecord { private final Map<String, String> attributes = new HashMap<>(); @@ -170,7 +171,7 @@ public class MockFlowFile implements FlowFile { public void assertAttributeNotExists(final String attributeName) { Assert.assertFalse("Attribute " + attributeName + " not exists with value " + attributes.get(attributeName), - attributes.containsKey(attributeName)); + attributes.containsKey(attributeName)); } public void assertAttributeEquals(final String attributeName, final String expectedValue) { @@ -250,7 +251,7 @@ public class MockFlowFile implements FlowFile { if ((fromStream & 0xFF) != (data[i] & 0xFF)) { Assert.fail("FlowFile content differs from input at byte " + bytesRead + " with input having value " - + (fromStream & 0xFF) + " and FlowFile having value " + (data[i] & 0xFF)); + + (fromStream & 0xFF) + " and FlowFile having value " + (data[i] & 0xFF)); } bytesRead++; @@ -274,4 +275,19 @@ public class MockFlowFile implements FlowFile { public Long getLastQueueDate() { return entryDate; } + + @Override + public long getPenaltyExpirationMillis() { + return -1; + } + + @Override + public ContentClaim getContentClaim() { + return null; + } + + @Override + public long getContentClaimOffset() { + return 0; + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/49a781df/nifi-mock/src/main/java/org/apache/nifi/util/MockFlowFileQueue.java ---------------------------------------------------------------------- diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockFlowFileQueue.java b/nifi-mock/src/main/java/org/apache/nifi/util/MockFlowFileQueue.java index 775a1d5..0c6ec2a 100644 --- a/nifi-mock/src/main/java/org/apache/nifi/util/MockFlowFileQueue.java +++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockFlowFileQueue.java @@ -23,7 +23,8 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; -import org.apache.nifi.processor.QueueSize; +import org.apache.nifi.controller.queue.QueueSize; + public class MockFlowFileQueue { http://git-wip-us.apache.org/repos/asf/nifi/blob/49a781df/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java ---------------------------------------------------------------------- diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java index 1060854..85fc784 100644 --- a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java +++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java @@ -40,12 +40,12 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.regex.Pattern; +import org.apache.nifi.controller.queue.QueueSize; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.processor.FlowFileFilter; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.Processor; -import org.apache.nifi.processor.QueueSize; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.FlowFileAccessException; import org.apache.nifi.processor.exception.FlowFileHandlingException; @@ -691,7 +691,7 @@ public class MockProcessSession implements ProcessSession { /** * @param relationship to get flowfiles for * @return a List of FlowFiles in the order in which they were transferred - * to the given relationship + * to the given relationship */ public List<MockFlowFile> getFlowFilesForRelationship(final String relationship) { final Relationship procRel = new Relationship.Builder().name(relationship).build(); @@ -778,7 +778,7 @@ public class MockProcessSession implements ProcessSession { */ private FlowFile inheritAttributes(final FlowFile source, final FlowFile destination) { if (source == null || destination == null || source == destination) { - return destination; //don't need to inherit from ourselves + return destination; // don't need to inherit from ourselves } final FlowFile updated = putAllAttributes(destination, source.getAttributes()); getProvenanceReporter().fork(source, Collections.singletonList(updated)); @@ -801,7 +801,7 @@ public class MockProcessSession implements ProcessSession { int uuidsCaptured = 0; for (final FlowFile source : sources) { if (source == destination) { - continue; //don't want to capture parent uuid of this. Something can't be a child of itself + continue; // don't want to capture parent uuid of this. Something can't be a child of itself } final String sourceUuid = source.getAttribute(CoreAttributes.UUID.key()); if (sourceUuid != null && !sourceUuid.trim().isEmpty()) { @@ -832,7 +832,7 @@ public class MockProcessSession implements ProcessSession { */ private static Map<String, String> intersectAttributes(final Collection<FlowFile> flowFileList) { final Map<String, String> result = new HashMap<>(); - //trivial cases + // trivial cases if (flowFileList == null || flowFileList.isEmpty()) { return result; } else if (flowFileList.size() == 1) { @@ -845,8 +845,7 @@ public class MockProcessSession implements ProcessSession { */ final Map<String, String> firstMap = flowFileList.iterator().next().getAttributes(); - outer: - for (final Map.Entry<String, String> mapEntry : firstMap.entrySet()) { + outer: for (final Map.Entry<String, String> mapEntry : firstMap.entrySet()) { final String key = mapEntry.getKey(); final String value = mapEntry.getValue(); for (final FlowFile flowFile : flowFileList) { @@ -900,7 +899,7 @@ public class MockProcessSession implements ProcessSession { public void assertTransferCount(final Relationship relationship, final int count) { final int transferCount = getFlowFilesForRelationship(relationship).size(); Assert.assertEquals("Expected " + count + " FlowFiles to be transferred to " - + relationship + " but actual transfer count was " + transferCount, count, transferCount); + + relationship + " but actual transfer count was " + transferCount, count, transferCount); } /** http://git-wip-us.apache.org/repos/asf/nifi/blob/49a781df/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java ---------------------------------------------------------------------- diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java b/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java index eeeff61..0d00cc8 100644 --- a/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java +++ b/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java @@ -58,12 +58,12 @@ import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.controller.ConfigurationContext; import org.apache.nifi.controller.ControllerService; +import org.apache.nifi.controller.queue.QueueSize; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processor.ProcessSessionFactory; import org.apache.nifi.processor.Processor; -import org.apache.nifi.processor.QueueSize; import org.apache.nifi.processor.Relationship; import org.apache.nifi.provenance.ProvenanceEventRecord; import org.apache.nifi.provenance.ProvenanceReporter; @@ -222,7 +222,7 @@ public class StandardProcessorTestRunner implements TestRunner { boolean unscheduledRun = false; for (final Future<Throwable> future : futures) { try { - final Throwable thrown = future.get(); // wait for the result + final Throwable thrown = future.get(); // wait for the result if (thrown != null) { throw new AssertionError(thrown); } @@ -551,11 +551,11 @@ public class StandardProcessorTestRunner implements TestRunner { @Override public void addControllerService(final String identifier, final ControllerService service, final Map<String, String> properties) throws InitializationException { // hold off on failing due to deprecated annotation for now... will introduce later. -// for ( final Method method : service.getClass().getMethods() ) { -// if ( method.isAnnotationPresent(org.apache.nifi.controller.annotation.OnConfigured.class) ) { -// Assert.fail("Controller Service " + service + " is using deprecated Annotation " + org.apache.nifi.controller.annotation.OnConfigured.class + " for method " + method); -// } -// } + // for ( final Method method : service.getClass().getMethods() ) { + // if ( method.isAnnotationPresent(org.apache.nifi.controller.annotation.OnConfigured.class) ) { + // Assert.fail("Controller Service " + service + " is using deprecated Annotation " + org.apache.nifi.controller.annotation.OnConfigured.class + " for method " + method); + // } + // } final ComponentLog logger = new MockProcessorLog(identifier, service); final MockControllerServiceInitializationContext initContext = new MockControllerServiceInitializationContext(requireNonNull(service), requireNonNull(identifier), logger); @@ -716,11 +716,11 @@ public class StandardProcessorTestRunner implements TestRunner { final PropertyDescriptor descriptor = service.getPropertyDescriptor(propertyName); if (descriptor == null) { return new ValidationResult.Builder() - .input(propertyName) - .explanation(propertyName + " is not a known Property for Controller Service " + service) - .subject("Invalid property") - .valid(false) - .build(); + .input(propertyName) + .explanation(propertyName + " is not a known Property for Controller Service " + service) + .subject("Invalid property") + .valid(false) + .build(); } return setProperty(service, descriptor, value); } http://git-wip-us.apache.org/repos/asf/nifi/blob/49a781df/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java ---------------------------------------------------------------------- diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java b/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java index 6e66bfe..ec901fe 100644 --- a/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java +++ b/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java @@ -26,11 +26,11 @@ import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.controller.ControllerService; +import org.apache.nifi.controller.queue.QueueSize; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSessionFactory; import org.apache.nifi.processor.Processor; -import org.apache.nifi.processor.QueueSize; import org.apache.nifi.processor.Relationship; import org.apache.nifi.provenance.ProvenanceEventRecord; import org.apache.nifi.provenance.ProvenanceReporter; @@ -40,22 +40,22 @@ public interface TestRunner { /** * @return the {@link Processor} for which this <code>TestRunner</code> is - * configured + * configured */ Processor getProcessor(); /** * @return the {@link ProcessSessionFactory} that this - * <code>TestRunner</code> will use to invoke the - * {@link Processor#onTrigger(ProcessContext, ProcessSessionFactory)} method + * <code>TestRunner</code> will use to invoke the + * {@link Processor#onTrigger(ProcessContext, ProcessSessionFactory)} method */ ProcessSessionFactory getProcessSessionFactory(); /** * @return the {@Link ProcessContext} that this <code>TestRunner</code> will - * use to invoke the - * {@link Processor#onTrigger(ProcessContext, ProcessSessionFactory) onTrigger} - * method + * use to invoke the + * {@link Processor#onTrigger(ProcessContext, ProcessSessionFactory) onTrigger} + * method */ ProcessContext getProcessContext(); @@ -120,7 +120,7 @@ public interface TestRunner { * * @param iterations number of iterations * @param stopOnFinish whether or not to run the Processor methods that are - * annotated with {@link org.apache.nifi.processor.annotation.OnStopped @OnStopped} + * annotated with {@link org.apache.nifi.processor.annotation.OnStopped @OnStopped} * @param initialize true if must initialize */ void run(int iterations, boolean stopOnFinish, final boolean initialize); @@ -163,10 +163,10 @@ public interface TestRunner { * * @param iterations number of iterations * @param stopOnFinish whether or not to run the Processor methods that are - * annotated with {@link org.apache.nifi.processor.annotation.OnStopped @OnStopped} + * annotated with {@link org.apache.nifi.processor.annotation.OnStopped @OnStopped} * @param initialize true if must initialize * @param runWait indicates the amount of time in milliseconds that the framework should wait for - * processors to stop running before calling the {@link org.apache.nifi.processor.annotation.OnUnscheduled @OnUnscheduled} annotation + * processors to stop running before calling the {@link org.apache.nifi.processor.annotation.OnUnscheduled @OnUnscheduled} annotation */ void run(int iterations, boolean stopOnFinish, final boolean initialize, final long runWait); @@ -187,8 +187,8 @@ public interface TestRunner { /** * @return the currently configured number of threads that will be used to - * runt he Processor when calling the {@link #run()} or {@link #run(int)} - * methods + * runt he Processor when calling the {@link #run()} or {@link #run(int)} + * methods */ int getThreadCount(); @@ -296,7 +296,7 @@ public interface TestRunner { /** * @return <code>true</code> if the Input Queue to the Processor is empty, - * <code>false</code> otherwise + * <code>false</code> otherwise */ boolean isQueueEmpty(); @@ -421,7 +421,7 @@ public interface TestRunner { /** * @return the {@link ProvenanceReporter} that will be used by the - * configured {@link Processor} for reporting Provenance Events + * configured {@link Processor} for reporting Provenance Events */ ProvenanceReporter getProvenanceReporter(); @@ -433,7 +433,7 @@ public interface TestRunner { /** * @param name of counter * @return the current value of the counter with the specified name, or null - * if no counter exists with the specified name + * if no counter exists with the specified name */ Long getCounterValue(String name); @@ -599,14 +599,14 @@ public interface TestRunner { /** * @param service the service * @return {@code true} if the given Controller Service is enabled, - * {@code false} if it is disabled + * {@code false} if it is disabled * * @throws IllegalArgumentException if the given ControllerService is not - * known by this TestRunner (i.e., it has not been added via the - * {@link #addControllerService(String, ControllerService)} or - * {@link #addControllerService(String, ControllerService, Map)} method or - * if the Controller Service has been removed via the - * {@link #removeControllerService(ControllerService)} method. + * known by this TestRunner (i.e., it has not been added via the + * {@link #addControllerService(String, ControllerService)} or + * {@link #addControllerService(String, ControllerService, Map)} method or + * if the Controller Service has been removed via the + * {@link #removeControllerService(ControllerService)} method. */ boolean isControllerServiceEnabled(ControllerService service); @@ -622,11 +622,11 @@ public interface TestRunner { * * @throws IllegalStateException if the ControllerService is not disabled * @throws IllegalArgumentException if the given ControllerService is not - * known by this TestRunner (i.e., it has not been added via the - * {@link #addControllerService(String, ControllerService)} or - * {@link #addControllerService(String, ControllerService, Map)} method or - * if the Controller Service has been removed via the - * {@link #removeControllerService(ControllerService)} method. + * known by this TestRunner (i.e., it has not been added via the + * {@link #addControllerService(String, ControllerService)} or + * {@link #addControllerService(String, ControllerService, Map)} method or + * if the Controller Service has been removed via the + * {@link #removeControllerService(ControllerService)} method. * */ void removeControllerService(ControllerService service); @@ -641,11 +641,11 @@ public interface TestRunner { * * @throws IllegalStateException if the ControllerService is not disabled * @throws IllegalArgumentException if the given ControllerService is not - * known by this TestRunner (i.e., it has not been added via the - * {@link #addControllerService(String, ControllerService)} or - * {@link #addControllerService(String, ControllerService, Map)} method or - * if the Controller Service has been removed via the - * {@link #removeControllerService(ControllerService)} method. + * known by this TestRunner (i.e., it has not been added via the + * {@link #addControllerService(String, ControllerService)} or + * {@link #addControllerService(String, ControllerService, Map)} method or + * if the Controller Service has been removed via the + * {@link #removeControllerService(ControllerService)} method. * */ ValidationResult setProperty(ControllerService service, PropertyDescriptor property, String value); @@ -660,11 +660,11 @@ public interface TestRunner { * * @throws IllegalStateException if the ControllerService is not disabled * @throws IllegalArgumentException if the given ControllerService is not - * known by this TestRunner (i.e., it has not been added via the - * {@link #addControllerService(String, ControllerService)} or - * {@link #addControllerService(String, ControllerService, Map)} method or - * if the Controller Service has been removed via the - * {@link #removeControllerService(ControllerService)} method. + * known by this TestRunner (i.e., it has not been added via the + * {@link #addControllerService(String, ControllerService)} or + * {@link #addControllerService(String, ControllerService, Map)} method or + * if the Controller Service has been removed via the + * {@link #removeControllerService(ControllerService)} method. * */ ValidationResult setProperty(ControllerService service, PropertyDescriptor property, AllowableValue value); @@ -679,11 +679,11 @@ public interface TestRunner { * * @throws IllegalStateException if the ControllerService is not disabled * @throws IllegalArgumentException if the given ControllerService is not - * known by this TestRunner (i.e., it has not been added via the - * {@link #addControllerService(String, ControllerService)} or - * {@link #addControllerService(String, ControllerService, Map)} method or - * if the Controller Service has been removed via the - * {@link #removeControllerService(ControllerService)} method. + * known by this TestRunner (i.e., it has not been added via the + * {@link #addControllerService(String, ControllerService)} or + * {@link #addControllerService(String, ControllerService, Map)} method or + * if the Controller Service has been removed via the + * {@link #removeControllerService(ControllerService)} method. * */ ValidationResult setProperty(ControllerService service, String propertyName, String value); @@ -698,19 +698,19 @@ public interface TestRunner { * @throws IllegalStateException if the Controller Service is not disabled * * @throws IllegalArgumentException if the given ControllerService is not - * known by this TestRunner (i.e., it has not been added via the - * {@link #addControllerService(String, ControllerService)} or - * {@link #addControllerService(String, ControllerService, Map)} method or - * if the Controller Service has been removed via the - * {@link #removeControllerService(ControllerService)} method. + * known by this TestRunner (i.e., it has not been added via the + * {@link #addControllerService(String, ControllerService)} or + * {@link #addControllerService(String, ControllerService, Map)} method or + * if the Controller Service has been removed via the + * {@link #removeControllerService(ControllerService)} method. */ void setAnnotationData(ControllerService service, String annotationData); /** * @param identifier of controller service * @return the {@link ControllerService} that is registered with the given - * identifier, or <code>null</code> if no Controller Service exists with the - * given identifier + * identifier, or <code>null</code> if no Controller Service exists with the + * given identifier */ ControllerService getControllerService(String identifier); @@ -720,11 +720,11 @@ public interface TestRunner { * * @param service the service to validate * @throws IllegalArgumentException if the given ControllerService is not - * known by this TestRunner (i.e., it has not been added via the - * {@link #addControllerService(String, ControllerService)} or - * {@link #addControllerService(String, ControllerService, Map)} method or - * if the Controller Service has been removed via the - * {@link #removeControllerService(ControllerService)} method. + * known by this TestRunner (i.e., it has not been added via the + * {@link #addControllerService(String, ControllerService)} or + * {@link #addControllerService(String, ControllerService, Map)} method or + * if the Controller Service has been removed via the + * {@link #removeControllerService(ControllerService)} method. */ void assertValid(ControllerService service); @@ -734,11 +734,11 @@ public interface TestRunner { * * @param service the service to validate * @throws IllegalArgumentException if the given ControllerService is not - * known by this TestRunner (i.e., it has not been added via the - * {@link #addControllerService(String, ControllerService)} or - * {@link #addControllerService(String, ControllerService, Map)} method or - * if the Controller Service has been removed via the - * {@link #removeControllerService(ControllerService)} method. + * known by this TestRunner (i.e., it has not been added via the + * {@link #addControllerService(String, ControllerService)} or + * {@link #addControllerService(String, ControllerService, Map)} method or + * if the Controller Service has been removed via the + * {@link #removeControllerService(ControllerService)} method. * */ void assertNotValid(ControllerService service); @@ -748,12 +748,12 @@ public interface TestRunner { * @param identifier identifier of service * @param serviceType type of service * @return the {@link ControllerService} that is registered with the given - * identifier, cast as the provided service type, or <code>null</code> if no - * Controller Service exists with the given identifier + * identifier, cast as the provided service type, or <code>null</code> if no + * Controller Service exists with the given identifier * * @throws ClassCastException if the identifier given is registered for a - * Controller Service but that Controller Service is not of the type - * specified + * Controller Service but that Controller Service is not of the type + * specified */ <T extends ControllerService> T getControllerService(String identifier, Class<T> serviceType); http://git-wip-us.apache.org/repos/asf/nifi/blob/49a781df/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/DropFlowFileRequest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/DropFlowFileRequest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/DropFlowFileRequest.java new file mode 100644 index 0000000..609fe75 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/DropFlowFileRequest.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller; + +import org.apache.nifi.controller.queue.DropFlowFileState; +import org.apache.nifi.controller.queue.DropFlowFileStatus; +import org.apache.nifi.controller.queue.QueueSize; + +public class DropFlowFileRequest implements DropFlowFileStatus { + private final String identifier; + private final long submissionTime = System.currentTimeMillis(); + + private volatile QueueSize originalSize; + private volatile QueueSize currentSize; + private volatile long lastUpdated = System.currentTimeMillis(); + private volatile Thread executionThread; + + private DropFlowFileState state = DropFlowFileState.WAITING_FOR_LOCK; + + + public DropFlowFileRequest(final String identifier) { + this.identifier = identifier; + } + + @Override + public String getRequestIdentifier() { + return identifier; + } + + @Override + public long getRequestSubmissionTime() { + return submissionTime; + } + + @Override + public QueueSize getOriginalSize() { + return originalSize; + } + + void setOriginalSize(final QueueSize originalSize) { + this.originalSize = originalSize; + } + + @Override + public QueueSize getCurrentSize() { + return currentSize; + } + + void setCurrentSize(final QueueSize queueSize) { + this.currentSize = currentSize; + } + + @Override + public DropFlowFileState getState() { + return state; + } + + @Override + public long getLastUpdated() { + return lastUpdated; + } + + synchronized void setState(final DropFlowFileState state) { + this.state = state; + this.lastUpdated = System.currentTimeMillis(); + } + + void setExecutionThread(final Thread thread) { + this.executionThread = thread; + } + + synchronized boolean cancel() { + if (this.state == DropFlowFileState.COMPLETE || this.state == DropFlowFileState.CANCELED) { + return false; + } + + this.state = DropFlowFileState.CANCELED; + if (executionThread != null) { + executionThread.interrupt(); + } + + return true; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/49a781df/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java index df356fd..073e5fb 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java @@ -16,6 +16,7 @@ */ package org.apache.nifi.controller; +import java.io.FileNotFoundException; import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; @@ -24,9 +25,13 @@ import java.util.Collections; import java.util.Comparator; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.PriorityQueue; import java.util.Queue; import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -35,25 +40,32 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.nifi.connectable.Connection; +import org.apache.nifi.controller.queue.DropFlowFileState; import org.apache.nifi.controller.queue.DropFlowFileStatus; import org.apache.nifi.controller.queue.FlowFileQueue; import org.apache.nifi.controller.queue.QueueSize; import org.apache.nifi.controller.repository.FlowFileRecord; +import org.apache.nifi.controller.repository.FlowFileRepository; import org.apache.nifi.controller.repository.FlowFileSwapManager; +import org.apache.nifi.controller.repository.RepositoryRecord; +import org.apache.nifi.controller.repository.RepositoryRecordType; import org.apache.nifi.controller.repository.claim.ContentClaim; +import org.apache.nifi.controller.repository.claim.ResourceClaim; +import org.apache.nifi.controller.repository.claim.ResourceClaimManager; import org.apache.nifi.events.EventReporter; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.FlowFilePrioritizer; import org.apache.nifi.processor.DataUnit; import org.apache.nifi.processor.FlowFileFilter; import org.apache.nifi.processor.FlowFileFilter.FlowFileFilterResult; +import org.apache.nifi.provenance.ProvenanceEventBuilder; +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.ProvenanceEventRepository; +import org.apache.nifi.provenance.ProvenanceEventType; import org.apache.nifi.reporting.Severity; import org.apache.nifi.scheduling.SchedulingStrategy; import org.apache.nifi.util.FormatUtils; import org.apache.nifi.util.concurrency.TimedLock; -import org.apache.nifi.util.timebuffer.LongEntityAccess; -import org.apache.nifi.util.timebuffer.TimedBuffer; -import org.apache.nifi.util.timebuffer.TimestampedLong; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -67,15 +79,6 @@ public final class StandardFlowFileQueue implements FlowFileQueue { public static final int MAX_EXPIRED_RECORDS_PER_ITERATION = 100000; public static final int SWAP_RECORD_POLL_SIZE = 10000; - // When we have very high contention on a FlowFile Queue, the writeLock quickly becomes the bottleneck. In order to avoid this, - // we keep track of how often we are obtaining the write lock. If we exceed some threshold, we start performing a Pre-fetch so that - // we can then poll many times without having to obtain the lock. - // If lock obtained an average of more than PREFETCH_POLL_THRESHOLD times per second in order to poll from queue for last 5 seconds, do a pre-fetch. - public static final int PREFETCH_POLL_THRESHOLD = 1000; - public static final int PRIORITIZED_PREFETCH_SIZE = 10; - public static final int UNPRIORITIZED_PREFETCH_SIZE = 1000; - private volatile int prefetchSize = UNPRIORITIZED_PREFETCH_SIZE; // when we pre-fetch, how many should we pre-fetch? - private static final Logger logger = LoggerFactory.getLogger(StandardFlowFileQueue.class); private PriorityQueue<FlowFileRecord> activeQueue = null; @@ -97,9 +100,13 @@ public final class StandardFlowFileQueue implements FlowFileQueue { private final List<FlowFilePrioritizer> priorities; private final int swapThreshold; private final FlowFileSwapManager swapManager; + private final List<String> swapLocations = new ArrayList<>(); private final TimedLock readLock; private final TimedLock writeLock; private final String identifier; + private final FlowFileRepository flowFileRepository; + private final ProvenanceEventRepository provRepository; + private final ResourceClaimManager resourceClaimManager; private final AtomicBoolean queueFullRef = new AtomicBoolean(false); private final AtomicInteger activeQueueSizeRef = new AtomicInteger(0); @@ -108,8 +115,8 @@ public final class StandardFlowFileQueue implements FlowFileQueue { // SCHEDULER CANNOT BE NOTIFIED OF EVENTS WITH THE WRITE LOCK HELD! DOING SO WILL RESULT IN A DEADLOCK! private final ProcessScheduler scheduler; - public StandardFlowFileQueue(final String identifier, final Connection connection, final ProcessScheduler scheduler, final FlowFileSwapManager swapManager, final EventReporter eventReporter, - final int swapThreshold) { + public StandardFlowFileQueue(final String identifier, final Connection connection, final FlowFileRepository flowFileRepo, final ProvenanceEventRepository provRepo, + final ResourceClaimManager resourceClaimManager, final ProcessScheduler scheduler, final FlowFileSwapManager swapManager, final EventReporter eventReporter, final int swapThreshold) { activeQueue = new PriorityQueue<>(20, new Prioritizer(new ArrayList<FlowFilePrioritizer>())); priorities = new ArrayList<>(); maximumQueueObjectCount = 0L; @@ -120,6 +127,9 @@ public final class StandardFlowFileQueue implements FlowFileQueue { swapQueue = new ArrayList<>(); this.eventReporter = eventReporter; this.swapManager = swapManager; + this.flowFileRepository = flowFileRepo; + this.provRepository = provRepo; + this.resourceClaimManager = resourceClaimManager; this.identifier = identifier; this.swapThreshold = swapThreshold; @@ -141,11 +151,6 @@ public final class StandardFlowFileQueue implements FlowFileQueue { } @Override - public int getSwapThreshold() { - return swapThreshold; - } - - @Override public void setPriorities(final List<FlowFilePrioritizer> newPriorities) { writeLock.lock(); try { @@ -154,12 +159,6 @@ public final class StandardFlowFileQueue implements FlowFileQueue { activeQueue = newQueue; priorities.clear(); priorities.addAll(newPriorities); - - if (newPriorities.isEmpty()) { - prefetchSize = UNPRIORITIZED_PREFETCH_SIZE; - } else { - prefetchSize = PRIORITIZED_PREFETCH_SIZE; - } } finally { writeLock.unlock("setPriorities"); } @@ -225,33 +224,16 @@ public final class StandardFlowFileQueue implements FlowFileQueue { */ private QueueSize getQueueSize() { final QueueSize unacknowledged = unacknowledgedSizeRef.get(); - final PreFetch preFetch = preFetchRef.get(); - final int preFetchCount; - final long preFetchSize; - if (preFetch == null) { - preFetchCount = 0; - preFetchSize = 0L; - } else { - final QueueSize preFetchQueueSize = preFetch.size(); - preFetchCount = preFetchQueueSize.getObjectCount(); - preFetchSize = preFetchQueueSize.getByteCount(); - } - - return new QueueSize(activeQueue.size() + swappedRecordCount + unacknowledged.getObjectCount() + preFetchCount, - activeQueueContentSize + swappedContentSize + unacknowledged.getByteCount() + preFetchSize); + return new QueueSize(activeQueue.size() + swappedRecordCount + unacknowledged.getObjectCount(), + activeQueueContentSize + swappedContentSize + unacknowledged.getByteCount()); } @Override public boolean isEmpty() { readLock.lock(); try { - final PreFetch prefetch = preFetchRef.get(); - if (prefetch == null) { - return activeQueue.isEmpty() && swappedRecordCount == 0 && unacknowledgedSizeRef.get().getObjectCount() == 0; - } else { - return activeQueue.isEmpty() && swappedRecordCount == 0 && unacknowledgedSizeRef.get().getObjectCount() == 0 && prefetch.size().getObjectCount() == 0; - } + return activeQueue.isEmpty() && swappedRecordCount == 0 && unacknowledgedSizeRef.get().getObjectCount() == 0; } finally { readLock.unlock("isEmpty"); } @@ -260,30 +242,13 @@ public final class StandardFlowFileQueue implements FlowFileQueue { @Override public boolean isActiveQueueEmpty() { final int activeQueueSize = activeQueueSizeRef.get(); - if (activeQueueSize == 0) { - final PreFetch preFetch = preFetchRef.get(); - if (preFetch == null) { - return true; - } - - final QueueSize queueSize = preFetch.size(); - return queueSize.getObjectCount() == 0; - } else { - return false; - } + return activeQueueSize == 0; } - @Override public QueueSize getActiveQueueSize() { readLock.lock(); try { - final PreFetch preFetch = preFetchRef.get(); - if (preFetch == null) { - return new QueueSize(activeQueue.size(), activeQueueContentSize); - } else { - final QueueSize preFetchSize = preFetch.size(); - return new QueueSize(activeQueue.size() + preFetchSize.getObjectCount(), activeQueueContentSize + preFetchSize.getByteCount()); - } + return new QueueSize(activeQueue.size(), activeQueueContentSize); } finally { readLock.unlock("getActiveQueueSize"); } @@ -374,6 +339,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue { swappedContentSize += file.getSize(); swappedRecordCount++; swapMode = true; + writeSwapFilesIfNecessary(); } else { activeQueueContentSize += file.getSize(); activeQueue.add(file); @@ -405,6 +371,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue { swappedContentSize += bytes; swappedRecordCount += numFiles; swapMode = true; + writeSwapFilesIfNecessary(); } else { activeQueueContentSize += bytes; activeQueue.addAll(files); @@ -421,116 +388,6 @@ public final class StandardFlowFileQueue implements FlowFileQueue { } } - @Override - public List<FlowFileRecord> pollSwappableRecords() { - writeLock.lock(); - try { - if (swapQueue.size() < SWAP_RECORD_POLL_SIZE) { - return null; - } - - final List<FlowFileRecord> swapRecords = new ArrayList<>(Math.min(SWAP_RECORD_POLL_SIZE, swapQueue.size())); - final Iterator<FlowFileRecord> itr = swapQueue.iterator(); - while (itr.hasNext() && swapRecords.size() < SWAP_RECORD_POLL_SIZE) { - final FlowFileRecord record = itr.next(); - swapRecords.add(record); - itr.remove(); - } - - swapQueue.trimToSize(); - return swapRecords; - } finally { - writeLock.unlock("pollSwappableRecords"); - } - } - - @Override - public void putSwappedRecords(final Collection<FlowFileRecord> records) { - writeLock.lock(); - try { - try { - for (final FlowFileRecord record : records) { - swappedContentSize -= record.getSize(); - swappedRecordCount--; - activeQueueContentSize += record.getSize(); - activeQueue.add(record); - } - - if (swappedRecordCount > swapQueue.size()) { - // we have more swap files to be swapped in. - return; - } - - // If a call to #pollSwappableRecords will not produce any, go ahead and roll those FlowFiles back into the mix - if (swapQueue.size() < SWAP_RECORD_POLL_SIZE) { - for (final FlowFileRecord record : swapQueue) { - activeQueue.add(record); - activeQueueContentSize += record.getSize(); - } - swapQueue.clear(); - swappedContentSize = 0L; - swappedRecordCount = 0; - swapMode = false; - } - } finally { - activeQueueSizeRef.set(activeQueue.size()); - } - } finally { - writeLock.unlock("putSwappedRecords"); - scheduler.registerEvent(connection.getDestination()); - } - } - - @Override - public void incrementSwapCount(final int numRecords, final long contentSize) { - writeLock.lock(); - try { - swappedContentSize += contentSize; - swappedRecordCount += numRecords; - } finally { - writeLock.unlock("incrementSwapCount"); - } - } - - @Override - public int unswappedSize() { - readLock.lock(); - try { - return activeQueue.size() + unacknowledgedSizeRef.get().getObjectCount(); - } finally { - readLock.unlock("unswappedSize"); - } - } - - @Override - public int getSwapRecordCount() { - readLock.lock(); - try { - return swappedRecordCount; - } finally { - readLock.unlock("getSwapRecordCount"); - } - } - - @Override - public int getSwapQueueSize() { - readLock.lock(); - try { - if (logger.isDebugEnabled()) { - final long byteToMbDivisor = 1024L * 1024L; - final QueueSize unacknowledged = unacknowledgedSizeRef.get(); - - logger.debug("Total Queue Size: ActiveQueue={}/{} MB, Swap Queue={}/{} MB, Unacknowledged={}/{} MB", - activeQueue.size(), activeQueueContentSize / byteToMbDivisor, - swappedRecordCount, swappedContentSize / byteToMbDivisor, - unacknowledged.getObjectCount(), unacknowledged.getByteCount() / byteToMbDivisor); - } - - return swapQueue.size(); - } finally { - readLock.unlock("getSwapQueueSize"); - } - } private boolean isLaterThan(final Long maxAge) { if (maxAge == null) { @@ -558,30 +415,6 @@ public final class StandardFlowFileQueue implements FlowFileQueue { // First check if we have any records Pre-Fetched. final long expirationMillis = flowFileExpirationMillis.get(); - final PreFetch preFetch = preFetchRef.get(); - if (preFetch != null) { - if (preFetch.isExpired()) { - requeueExpiredPrefetch(preFetch); - } else { - while (true) { - final FlowFileRecord next = preFetch.nextRecord(); - if (next == null) { - break; - } - - if (isLaterThan(getExpirationDate(next, expirationMillis))) { - expiredRecords.add(next); - continue; - } - - updateUnacknowledgedSize(1, next.getSize()); - return next; - } - - preFetchRef.compareAndSet(preFetch, null); - } - } - writeLock.lock(); try { flowFile = doPoll(expiredRecords, expirationMillis); @@ -631,9 +464,6 @@ public final class StandardFlowFileQueue implements FlowFileQueue { queueFullRef.set(determineIfFull()); } - if (incrementPollCount()) { - prefetch(); - } return isExpired ? null : flowFile; } @@ -642,38 +472,6 @@ public final class StandardFlowFileQueue implements FlowFileQueue { final List<FlowFileRecord> records = new ArrayList<>(Math.min(1024, maxResults)); // First check if we have any records Pre-Fetched. - final long expirationMillis = flowFileExpirationMillis.get(); - final PreFetch preFetch = preFetchRef.get(); - if (preFetch != null) { - if (preFetch.isExpired()) { - requeueExpiredPrefetch(preFetch); - } else { - long totalSize = 0L; - for (int i = 0; i < maxResults; i++) { - final FlowFileRecord next = preFetch.nextRecord(); - if (next == null) { - break; - } - - if (isLaterThan(getExpirationDate(next, expirationMillis))) { - expiredRecords.add(next); - continue; - } - - records.add(next); - totalSize += next.getSize(); - } - - // If anything was prefetched, use what we have. - if (!records.isEmpty()) { - updateUnacknowledgedSize(records.size(), totalSize); - return records; - } - - preFetchRef.compareAndSet(preFetch, null); - } - } - writeLock.lock(); try { doPoll(records, maxResults, expiredRecords); @@ -705,10 +503,6 @@ public final class StandardFlowFileQueue implements FlowFileQueue { if (queueFullAtStart && !expiredRecords.isEmpty()) { queueFullRef.set(determineIfFull()); } - - if (incrementPollCount()) { - prefetch(); - } } /** @@ -732,6 +526,46 @@ public final class StandardFlowFileQueue implements FlowFileQueue { // Swap Queue to the Active Queue. However, we don't do this if there are FlowFiles already swapped out // to disk, because we want them to be swapped back in in the same order that they were swapped out. + if (activeQueue.size() > swapThreshold - SWAP_RECORD_POLL_SIZE) { + return; + } + + // If there are swap files waiting to be swapped in, swap those in first. We do this in order to ensure that those that + // were swapped out first are then swapped back in first. If we instead just immediately migrated the FlowFiles from the + // swap queue to the active queue, and we never run out of FlowFiles in the active queue (because destination cannot + // keep up with queue), we will end up always processing the new FlowFiles first instead of the FlowFiles that arrived + // first. + if (!swapLocations.isEmpty()) { + final String swapLocation = swapLocations.remove(0); + try { + final List<FlowFileRecord> swappedIn = swapManager.swapIn(swapLocation, this); + swappedRecordCount -= swappedIn.size(); + long swapSize = 0L; + for (final FlowFileRecord flowFile : swappedIn) { + swapSize += flowFile.getSize(); + } + swappedContentSize -= swapSize; + activeQueueContentSize += swapSize; + activeQueueSizeRef.set(activeQueue.size()); + activeQueue.addAll(swappedIn); + return; + } catch (final FileNotFoundException fnfe) { + logger.error("Failed to swap in FlowFiles from Swap File {} because the Swap File can no longer be found", swapLocation); + if (eventReporter != null) { + eventReporter.reportEvent(Severity.ERROR, "Swap File", "Failed to swap in FlowFiles from Swap File " + swapLocation + " because the Swap File can no longer be found"); + } + return; + } catch (final IOException ioe) { + logger.error("Failed to swap in FlowFiles from Swap File {}; Swap File appears to be corrupt!", swapLocation); + logger.error("", ioe); + if (eventReporter != null) { + eventReporter.reportEvent(Severity.ERROR, "Swap File", "Failed to swap in FlowFiles from Swap File " + + swapLocation + "; Swap File appears to be corrupt! Some FlowFiles in the queue may not be accessible. See logs for more information."); + } + return; + } + } + // this is the most common condition (nothing is swapped out), so do the check first and avoid the expense // of other checks for 99.999% of the cases. if (swappedRecordCount == 0 && swapQueue.isEmpty()) { @@ -760,6 +594,69 @@ public final class StandardFlowFileQueue implements FlowFileQueue { } } + /** + * This method MUST be called with the write lock held + */ + private void writeSwapFilesIfNecessary() { + if (swapQueue.size() < SWAP_RECORD_POLL_SIZE) { + return; + } + + final int numSwapFiles = swapQueue.size() / SWAP_RECORD_POLL_SIZE; + + // Create a new Priority queue with the prioritizers that are set, but reverse the + // prioritizers because we want to pull the lowest-priority FlowFiles to swap out + final PriorityQueue<FlowFileRecord> tempQueue = new PriorityQueue<>(activeQueue.size() + swapQueue.size(), Collections.reverseOrder(new Prioritizer(priorities))); + tempQueue.addAll(activeQueue); + tempQueue.addAll(swapQueue); + + final List<String> swapLocations = new ArrayList<>(numSwapFiles); + for (int i = 0; i < numSwapFiles; i++) { + // Create a new swap file for the next SWAP_RECORD_POLL_SIZE records + final List<FlowFileRecord> toSwap = new ArrayList<>(SWAP_RECORD_POLL_SIZE); + for (int j = 0; j < SWAP_RECORD_POLL_SIZE; j++) { + toSwap.add(tempQueue.poll()); + } + + try { + Collections.reverse(toSwap); // currently ordered in reverse priority order based on the ordering of the temp queue. + final String swapLocation = swapManager.swapOut(toSwap, this); + swapLocations.add(swapLocation); + } catch (final IOException ioe) { + tempQueue.addAll(toSwap); // if we failed, we must add the FlowFiles back to the queue. + logger.error("FlowFile Queue with identifier {} has {} FlowFiles queued up. Attempted to spill FlowFile information over to disk in order to avoid exhausting " + + "the Java heap space but failed to write information to disk due to {}", getIdentifier(), getQueueSize().getObjectCount(), ioe.toString()); + logger.error("", ioe); + if (eventReporter != null) { + eventReporter.reportEvent(Severity.ERROR, "Failed to Overflow to Disk", "Flowfile Queue with identifier " + getIdentifier() + " has " + getQueueSize().getObjectCount() + + " queued up. Attempted to spill FlowFile information over to disk in order to avoid exhausting the Java heap space but failed to write information to disk. " + + "See logs for more information."); + } + + break; + } + } + + // Pull any records off of the temp queue that won't fit back on the active queue, and add those to the + // swap queue. Then add the records back to the active queue. + swapQueue.clear(); + while (tempQueue.size() > swapThreshold) { + final FlowFileRecord record = tempQueue.poll(); + swapQueue.add(record); + } + + Collections.reverse(swapQueue); // currently ordered in reverse priority order based on the ordering of the temp queue + + // replace the contents of the active queue, since we've merged it with the swap queue. + activeQueue.clear(); + FlowFileRecord toRequeue; + while ((toRequeue = tempQueue.poll()) != null) { + activeQueue.offer(toRequeue); + } + this.swapLocations.addAll(swapLocations); + } + + @Override public long drainQueue(final Queue<FlowFileRecord> sourceQueue, final List<FlowFileRecord> destination, int maxResults, final Set<FlowFileRecord> expiredRecords) { long drainedSize = 0L; @@ -796,13 +693,6 @@ public final class StandardFlowFileQueue implements FlowFileQueue { final List<FlowFileRecord> selectedFlowFiles = new ArrayList<>(); final List<FlowFileRecord> unselected = new ArrayList<>(); - // the prefetch doesn't allow us to add records back. So when this method is used, - // if there are prefetched records, we have to requeue them into the active queue first. - final PreFetch prefetch = preFetchRef.get(); - if (prefetch != null) { - requeueExpiredPrefetch(prefetch); - } - while (true) { FlowFileRecord flowFile = this.activeQueue.poll(); if (flowFile == null) { @@ -856,6 +746,8 @@ public final class StandardFlowFileQueue implements FlowFileQueue { } } + + private static final class Prioritizer implements Comparator<FlowFileRecord>, Serializable { private static final long serialVersionUID = 1L; @@ -991,6 +883,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue { this.swappedRecordCount = swapFlowFileCount; this.swappedContentSize = swapByteCount; + this.swapLocations.addAll(swapLocations); } finally { writeLock.unlock("Recover Swap Files"); } @@ -1004,173 +897,219 @@ public final class StandardFlowFileQueue implements FlowFileQueue { return "FlowFileQueue[id=" + identifier + "]"; } - @Override - public DropFlowFileStatus dropFlowFiles() { - // TODO Auto-generated method stub - return null; - } + private final ConcurrentMap<String, DropFlowFileRequest> dropRequestMap = new ConcurrentHashMap<>(); @Override - public boolean cancelDropFlowFileRequest(String requestIdentifier) { - // TODO Auto-generated method stub - return false; - } + public DropFlowFileStatus dropFlowFiles() { + // purge any old requests from the map just to keep it clean. But if there are very requests, which is usually the case, then don't bother + if (dropRequestMap.size() > 10) { + final List<String> toDrop = new ArrayList<>(); + for (final Map.Entry<String, DropFlowFileRequest> entry : dropRequestMap.entrySet()) { + final DropFlowFileRequest request = entry.getValue(); + final boolean completed = request.getState() == DropFlowFileState.COMPLETE || request.getState() == DropFlowFileState.FAILURE; + + if (completed && System.currentTimeMillis() - request.getLastUpdated() > TimeUnit.MINUTES.toMillis(5L)) { + toDrop.add(entry.getKey()); + } + } - @Override - public DropFlowFileStatus getDropFlowFileStatus(String requestIdentifier) { - // TODO Auto-generated method stub - return null; - } + for (final String requestId : toDrop) { + dropRequestMap.remove(requestId); + } + } - /** - * Lock the queue so that other threads are unable to interact with the - * queue - */ - public void lock() { - writeLock.lock(); - } + // TODO: get user name! + final String userName = null; - /** - * Unlock the queue - */ - public void unlock() { - writeLock.unlock("external unlock"); - } + final String requestIdentifier = UUID.randomUUID().toString(); + final DropFlowFileRequest dropRequest = new DropFlowFileRequest(requestIdentifier); + final Thread t = new Thread(new Runnable() { + @Override + public void run() { + writeLock.lock(); + try { + dropRequest.setState(DropFlowFileState.DROPPING_ACTIVE_FLOWFILES); + + try { + final List<FlowFileRecord> activeQueueRecords = new ArrayList<>(activeQueue); + drop(activeQueueRecords, userName); + activeQueue.clear(); + dropRequest.setCurrentSize(getQueueSize()); + + drop(swapQueue, userName); + swapQueue.clear(); + dropRequest.setCurrentSize(getQueueSize()); + + final Iterator<String> swapLocationItr = swapLocations.iterator(); + while (swapLocationItr.hasNext()) { + final String swapLocation = swapLocationItr.next(); + final List<FlowFileRecord> swappedIn = swapManager.swapIn(swapLocation, StandardFlowFileQueue.this); + try { + drop(swappedIn, userName); + } catch (final Exception e) { + activeQueue.addAll(swappedIn); // ensure that we don't lose the FlowFiles from our queue. + throw e; + } + + dropRequest.setCurrentSize(getQueueSize()); + swapLocationItr.remove(); + } - @Override - public QueueSize getUnacknowledgedQueueSize() { - return unacknowledgedSizeRef.get(); - } + dropRequest.setState(DropFlowFileState.COMPLETE); + } catch (final Exception e) { + // TODO: Handle adequately + dropRequest.setState(DropFlowFileState.FAILURE); + } + } finally { + writeLock.unlock("Drop FlowFiles"); + } + } + }, "Drop FlowFiles for Connection " + getIdentifier()); + t.setDaemon(true); + t.start(); - private void updateUnacknowledgedSize(final int addToCount, final long addToSize) { - boolean updated = false; + dropRequest.setExecutionThread(t); + dropRequestMap.put(requestIdentifier, dropRequest); - do { - final QueueSize queueSize = unacknowledgedSizeRef.get(); - final QueueSize newSize = new QueueSize(queueSize.getObjectCount() + addToCount, queueSize.getByteCount() + addToSize); - updated = unacknowledgedSizeRef.compareAndSet(queueSize, newSize); - } while (!updated); + return dropRequest; } - private void requeueExpiredPrefetch(final PreFetch prefetch) { - if (prefetch == null) { - return; + private void drop(final List<FlowFileRecord> flowFiles, final String user) throws IOException { + // Create a Provenance Event and a FlowFile Repository record for each FlowFile + final List<ProvenanceEventRecord> provenanceEvents = new ArrayList<>(flowFiles.size()); + final List<RepositoryRecord> flowFileRepoRecords = new ArrayList<>(flowFiles.size()); + for (final FlowFileRecord flowFile : flowFiles) { + provenanceEvents.add(createDropEvent(flowFile, user)); + flowFileRepoRecords.add(createDeleteRepositoryRecord(flowFile)); } - writeLock.lock(); - try { - final long contentSizeRequeued = prefetch.requeue(activeQueue); - this.activeQueueContentSize += contentSizeRequeued; - this.preFetchRef.compareAndSet(prefetch, null); - } finally { - writeLock.unlock("requeueExpiredPrefetch"); - } - } + for (final FlowFileRecord flowFile : flowFiles) { + final ContentClaim contentClaim = flowFile.getContentClaim(); + if (contentClaim == null) { + continue; + } - /** - * MUST be called with write lock held. - */ - private final AtomicReference<PreFetch> preFetchRef = new AtomicReference<>(); + final ResourceClaim resourceClaim = contentClaim.getResourceClaim(); + if (resourceClaim == null) { + continue; + } - private void prefetch() { - if (activeQueue.isEmpty()) { - return; + resourceClaimManager.decrementClaimantCount(resourceClaim); } - final int numToFetch = Math.min(prefetchSize, activeQueue.size()); + provRepository.registerEvents(provenanceEvents); + flowFileRepository.updateRepository(flowFileRepoRecords); + } - final PreFetch curPreFetch = preFetchRef.get(); - if (curPreFetch != null && curPreFetch.size().getObjectCount() > 0) { - return; - } + private ProvenanceEventRecord createDropEvent(final FlowFileRecord flowFile, final String user) { + final ProvenanceEventBuilder builder = provRepository.eventBuilder(); + builder.fromFlowFile(flowFile); + builder.setEventType(ProvenanceEventType.DROP); + builder.setLineageStartDate(flowFile.getLineageStartDate()); + builder.setComponentId(getIdentifier()); + builder.setComponentType("Connection"); + builder.setDetails("FlowFile manually dropped by user " + user); + return builder.build(); + } - final List<FlowFileRecord> buffer = new ArrayList<>(numToFetch); - long contentSize = 0L; - for (int i = 0; i < numToFetch; i++) { - final FlowFileRecord record = activeQueue.poll(); - if (record == null || record.isPenalized()) { - // not enough unpenalized records to pull. Put all records back and return - activeQueue.addAll(buffer); - if (record != null) { - activeQueue.add(record); - } - return; - } else { - buffer.add(record); - contentSize += record.getSize(); + private RepositoryRecord createDeleteRepositoryRecord(final FlowFileRecord flowFile) { + return new RepositoryRecord() { + @Override + public FlowFileQueue getDestination() { + return null; } - } - activeQueueContentSize -= contentSize; - preFetchRef.set(new PreFetch(buffer)); - } + @Override + public FlowFileQueue getOriginalQueue() { + return StandardFlowFileQueue.this; + } - private final TimedBuffer<TimestampedLong> pollCounts = new TimedBuffer<>(TimeUnit.SECONDS, 5, new LongEntityAccess()); + @Override + public RepositoryRecordType getType() { + return RepositoryRecordType.DELETE; + } - private boolean incrementPollCount() { - pollCounts.add(new TimestampedLong(1L)); - final long totalCount = pollCounts.getAggregateValue(System.currentTimeMillis() - 5000L).getValue(); - return totalCount > PREFETCH_POLL_THRESHOLD * 5; - } + @Override + public ContentClaim getCurrentClaim() { + return flowFile.getContentClaim(); + } - private static class PreFetch { + @Override + public ContentClaim getOriginalClaim() { + return flowFile.getContentClaim(); + } - private final List<FlowFileRecord> records; - private final AtomicInteger pointer = new AtomicInteger(0); - private final long expirationTime = System.nanoTime() + TimeUnit.SECONDS.toNanos(1L); - private final AtomicLong contentSize = new AtomicLong(0L); + @Override + public long getCurrentClaimOffset() { + return flowFile.getContentClaimOffset(); + } - public PreFetch(final List<FlowFileRecord> records) { - this.records = records; + @Override + public FlowFileRecord getCurrent() { + return flowFile; + } - long totalSize = 0L; - for (final FlowFileRecord record : records) { - totalSize += record.getSize(); + @Override + public boolean isAttributesChanged() { + return false; } - contentSize.set(totalSize); - } - public FlowFileRecord nextRecord() { - final int nextValue = pointer.getAndIncrement(); - if (nextValue >= records.size()) { + @Override + public boolean isMarkedForAbort() { + return false; + } + + @Override + public String getSwapLocation() { return null; } + }; + } - final FlowFileRecord flowFile = records.get(nextValue); - contentSize.addAndGet(-flowFile.getSize()); - return flowFile; + + @Override + public boolean cancelDropFlowFileRequest(final String requestIdentifier) { + final DropFlowFileRequest request = dropRequestMap.remove(requestIdentifier); + if (request == null) { + return false; } - public QueueSize size() { - final int pointerIndex = pointer.get(); - final int count = records.size() - pointerIndex; - if (count < 0) { - return new QueueSize(0, 0L); - } + final boolean successful = request.cancel(); + return successful; + } - final long bytes = contentSize.get(); - return new QueueSize(count, bytes); - } + @Override + public DropFlowFileStatus getDropFlowFileStatus(final String requestIdentifier) { + return dropRequestMap.get(requestIdentifier); + } - public boolean isExpired() { - return System.nanoTime() > expirationTime; - } + /** + * Lock the queue so that other threads are unable to interact with the + * queue + */ + public void lock() { + writeLock.lock(); + } - private long requeue(final Queue<FlowFileRecord> queue) { - // get the current pointer and prevent any other thread from accessing the rest of the elements - final int curPointer = pointer.getAndAdd(records.size()); - if (curPointer < records.size() - 1) { - final List<FlowFileRecord> subList = records.subList(curPointer, records.size()); - long contentSize = 0L; - for (final FlowFileRecord record : subList) { - contentSize += record.getSize(); - } + /** + * Unlock the queue + */ + public void unlock() { + writeLock.unlock("external unlock"); + } - queue.addAll(subList); + @Override + public QueueSize getUnacknowledgedQueueSize() { + return unacknowledgedSizeRef.get(); + } - return contentSize; - } - return 0L; - } + private void updateUnacknowledgedSize(final int addToCount, final long addToSize) { + boolean updated = false; + + do { + final QueueSize queueSize = unacknowledgedSizeRef.get(); + final QueueSize newSize = new QueueSize(queueSize.getObjectCount() + addToCount, queueSize.getByteCount() + addToSize); + updated = unacknowledgedSizeRef.compareAndSet(queueSize, newSize); + } while (!updated); } }
