Repository: nifi
Updated Branches:
  refs/heads/NIFI-108 f0e86e5a7 -> 817050960


NIFI-108: Fixed bug that caused the listFlowFiles operation to wait on a 
readLock before returning and performing work asynchronously; fixed bug in 
Write-Ahead FlowFile Repository that caused ContentClaims to be queued up for 
destruction instead of ResourceClaims - this caused millions of ContentClaims 
to be queued up instead of a single ResourceClaim in some tests


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/c7700128
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/c7700128
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/c7700128

Branch: refs/heads/NIFI-108
Commit: c7700128b7dc5bd20f48677f4b5c57ddbc01ff26
Parents: 7f3980d
Author: Mark Payne <[email protected]>
Authored: Tue Jan 5 16:51:40 2016 -0500
Committer: Mark Payne <[email protected]>
Committed: Tue Jan 5 16:51:40 2016 -0500

----------------------------------------------------------------------
 .../nifi/controller/StandardFlowFileQueue.java  | 63 +++++++++-----------
 .../WriteAheadFlowFileRepository.java           | 33 +++++-----
 2 files changed, 43 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/c7700128/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
index 57e84b1..d23cf95 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
@@ -75,7 +75,6 @@ import org.apache.nifi.reporting.Severity;
 import org.apache.nifi.scheduling.SchedulingStrategy;
 import org.apache.nifi.util.FormatUtils;
 import org.apache.nifi.util.concurrency.TimedLock;
-import org.apache.nifi.web.api.dto.FlowFileSummaryDTO;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -96,7 +95,7 @@ public final class StandardFlowFileQueue implements 
FlowFileQueue {
     // guarded by lock
     private ArrayList<FlowFileRecord> swapQueue = null;
 
-    private final AtomicReference<FlowFileQueueSize> size = new 
AtomicReference<>(new FlowFileQueueSize(0, 0L, 0, 0L, 0, 0L));
+    private final AtomicReference<FlowFileQueueSize> size = new 
AtomicReference<>(new FlowFileQueueSize(0, 0L, 0, 0L, 0, 0, 0L));
 
     private boolean swapMode = false;
 
@@ -279,7 +278,7 @@ public final class StandardFlowFileQueue implements 
FlowFileQueue {
         try {
             if (swapMode || activeQueue.size() >= swapThreshold) {
                 swapQueue.add(file);
-                incrementSwapQueueSize(1, file.getSize());
+                incrementSwapQueueSize(1, file.getSize(), 0);
                 swapMode = true;
                 writeSwapFilesIfNecessary();
             } else {
@@ -307,7 +306,7 @@ public final class StandardFlowFileQueue implements 
FlowFileQueue {
         try {
             if (swapMode || activeQueue.size() >= swapThreshold - numFiles) {
                 swapQueue.addAll(files);
-                incrementSwapQueueSize(numFiles, bytes);
+                incrementSwapQueueSize(numFiles, bytes, 0);
                 swapMode = true;
                 writeSwapFilesIfNecessary();
             } else {
@@ -466,7 +465,7 @@ public final class StandardFlowFileQueue implements 
FlowFileQueue {
                 for (final FlowFileRecord flowFile : swappedIn) {
                     swapSize += flowFile.getSize();
                 }
-                incrementSwapQueueSize(-swappedIn.size(), -swapSize);
+                incrementSwapQueueSize(-swappedIn.size(), -swapSize, -1);
                 incrementActiveQueueSize(swappedIn.size(), swapSize);
                 activeQueue.addAll(swappedIn);
                 return;
@@ -512,7 +511,7 @@ public final class StandardFlowFileQueue implements 
FlowFileQueue {
 
         if (recordsMigrated > 0) {
             incrementActiveQueueSize(recordsMigrated, bytesMigrated);
-            incrementSwapQueueSize(-recordsMigrated, -bytesMigrated);
+            incrementSwapQueueSize(-recordsMigrated, -bytesMigrated, 0);
         }
 
         if (size.get().swappedCount == 0) {
@@ -603,7 +602,9 @@ public final class StandardFlowFileQueue implements 
FlowFileQueue {
             final long addedSwapBytes = updatedSwapQueueBytes - 
originalSwapQueueBytes;
 
             final FlowFileQueueSize newSize = new 
FlowFileQueueSize(activeQueue.size(), activeQueueBytes,
-                originalSize.swappedCount + addedSwapRecords + 
flowFilesSwappedOut, originalSize.swappedBytes + addedSwapBytes + 
bytesSwappedOut,
+                originalSize.swappedCount + addedSwapRecords + 
flowFilesSwappedOut,
+                originalSize.swappedBytes + addedSwapBytes + bytesSwappedOut,
+                originalSize.swapFiles + numSwapFiles,
                 originalSize.unacknowledgedCount, 
originalSize.unacknowledgedBytes);
             updated = size.compareAndSet(originalSize, newSize);
         }
@@ -833,7 +834,7 @@ public final class StandardFlowFileQueue implements 
FlowFileQueue {
                 }
             }
 
-            incrementSwapQueueSize(swapFlowFileCount, swapByteCount);
+            incrementSwapQueueSize(swapFlowFileCount, swapByteCount, 
swapLocations.size());
             this.swapLocations.addAll(swapLocations);
         } finally {
             writeLock.unlock("Recover Swap Files");
@@ -894,15 +895,8 @@ public final class StandardFlowFileQueue implements 
FlowFileQueue {
             }
         }
 
-        final int numSteps;
-        readLock.lock();
-        try {
-            // numSteps = 1 for each swap location + 1 for active queue + 1 
for swap queue.
-            numSteps = 2 + swapLocations.size();
-        } finally {
-            readLock.unlock("listFlowFiles");
-        }
-
+        // numSteps = 1 for each swap location + 1 for active queue + 1 for 
swap queue.
+        final int numSteps = 2 + size.get().swapFiles;
         final ListFlowFileRequest listRequest = new 
ListFlowFileRequest(requestIdentifier, sortColumn, direction, maxResults, 
size(), numSteps);
 
         final Thread t = new Thread(new Runnable() {
@@ -1190,16 +1184,15 @@ public final class StandardFlowFileQueue implements 
FlowFileQueue {
                         dropRequest.setCurrentSize(getQueueSize());
                         
dropRequest.setDroppedSize(dropRequest.getDroppedSize().add(droppedSize));
 
-                        try {
-                            final QueueSize swapSize = 
size.get().swapQueueSize();
-
-                            logger.debug("For DropFlowFileRequest {}, Swap 
Queue has {} elements, Swapped Record Count = {}, Swapped Content Size = {}",
-                                requestIdentifier, swapQueue.size(), 
swapSize.getObjectCount(), swapSize.getByteCount());
-                            if (dropRequest.getState() == 
DropFlowFileState.CANCELED) {
-                                logger.info("Cancel requested for 
DropFlowFileRequest {}", requestIdentifier);
-                                return;
-                            }
+                        final QueueSize swapSize = size.get().swapQueueSize();
+                        logger.debug("For DropFlowFileRequest {}, Swap Queue 
has {} elements, Swapped Record Count = {}, Swapped Content Size = {}",
+                            requestIdentifier, swapQueue.size(), 
swapSize.getObjectCount(), swapSize.getByteCount());
+                        if (dropRequest.getState() == 
DropFlowFileState.CANCELED) {
+                            logger.info("Cancel requested for 
DropFlowFileRequest {}", requestIdentifier);
+                            return;
+                        }
 
+                        try {
                             droppedSize = drop(swapQueue, requestor);
                         } catch (final IOException ioe) {
                             logger.error("Failed to drop the FlowFiles from 
queue {} due to {}", StandardFlowFileQueue.this.getIdentifier(), 
ioe.toString());
@@ -1213,7 +1206,7 @@ public final class StandardFlowFileQueue implements 
FlowFileQueue {
                         dropRequest.setCurrentSize(getQueueSize());
                         
dropRequest.setDroppedSize(dropRequest.getDroppedSize().add(droppedSize));
                         swapMode = false;
-                        incrementSwapQueueSize(-droppedSize.getObjectCount(), 
-droppedSize.getByteCount());
+                        incrementSwapQueueSize(-droppedSize.getObjectCount(), 
-droppedSize.getByteCount(), 0);
                         logger.debug("For DropFlowFileRequest {}, dropped {} 
from Swap Queue", requestIdentifier, droppedSize);
 
                         final int swapFileCount = swapLocations.size();
@@ -1244,7 +1237,7 @@ public final class StandardFlowFileQueue implements 
FlowFileQueue {
                             }
 
                             
dropRequest.setDroppedSize(dropRequest.getDroppedSize().add(droppedSize));
-                            
incrementSwapQueueSize(-droppedSize.getObjectCount(), 
-droppedSize.getByteCount());
+                            
incrementSwapQueueSize(-droppedSize.getObjectCount(), 
-droppedSize.getByteCount(), -1);
 
                             dropRequest.setCurrentSize(getQueueSize());
                             swapLocationItr.remove();
@@ -1420,7 +1413,7 @@ public final class StandardFlowFileQueue implements 
FlowFileQueue {
         while (!updated) {
             final FlowFileQueueSize original = size.get();
             final FlowFileQueueSize newSize = new 
FlowFileQueueSize(original.activeQueueCount + count, original.activeQueueBytes 
+ bytes,
-                original.swappedCount, original.swappedBytes, 
original.unacknowledgedCount, original.unacknowledgedBytes);
+                original.swappedCount, original.swappedBytes, 
original.swapFiles, original.unacknowledgedCount, original.unacknowledgedBytes);
             updated = size.compareAndSet(original, newSize);
 
             if (updated) {
@@ -1429,12 +1422,12 @@ public final class StandardFlowFileQueue implements 
FlowFileQueue {
         }
     }
 
-    private void incrementSwapQueueSize(final int count, final long bytes) {
+    private void incrementSwapQueueSize(final int count, final long bytes, 
final int fileCount) {
         boolean updated = false;
         while (!updated) {
             final FlowFileQueueSize original = size.get();
             final FlowFileQueueSize newSize = new 
FlowFileQueueSize(original.activeQueueCount, original.activeQueueBytes,
-                original.swappedCount + count, original.swappedBytes + bytes, 
original.unacknowledgedCount, original.unacknowledgedBytes);
+                original.swappedCount + count, original.swappedBytes + bytes, 
original.swapFiles + fileCount, original.unacknowledgedCount, 
original.unacknowledgedBytes);
             updated = size.compareAndSet(original, newSize);
 
             if (updated) {
@@ -1448,7 +1441,7 @@ public final class StandardFlowFileQueue implements 
FlowFileQueue {
         while (!updated) {
             final FlowFileQueueSize original = size.get();
             final FlowFileQueueSize newSize = new 
FlowFileQueueSize(original.activeQueueCount, original.activeQueueBytes,
-                original.swappedCount, original.swappedBytes, 
original.unacknowledgedCount + count, original.unacknowledgedBytes + bytes);
+                original.swappedCount, original.swappedBytes, 
original.swapFiles, original.unacknowledgedCount + count, 
original.unacknowledgedBytes + bytes);
             updated = size.compareAndSet(original, newSize);
 
             if (updated) {
@@ -1472,15 +1465,17 @@ public final class StandardFlowFileQueue implements 
FlowFileQueue {
         private final long activeQueueBytes;
         private final int swappedCount;
         private final long swappedBytes;
+        private final int swapFiles;
         private final int unacknowledgedCount;
         private final long unacknowledgedBytes;
 
-        public FlowFileQueueSize(final int activeQueueCount, final long 
activeQueueBytes, final int swappedCount, final long swappedBytes,
+        public FlowFileQueueSize(final int activeQueueCount, final long 
activeQueueBytes, final int swappedCount, final long swappedBytes, final int 
swapFileCount,
             final int unacknowledgedCount, final long unacknowledgedBytes) {
             this.activeQueueCount = activeQueueCount;
             this.activeQueueBytes = activeQueueBytes;
             this.swappedCount = swappedCount;
             this.swappedBytes = swappedBytes;
+            this.swapFiles = swapFileCount;
             this.unacknowledgedCount = unacknowledgedCount;
             this.unacknowledgedBytes = unacknowledgedBytes;
         }
@@ -1509,7 +1504,7 @@ public final class StandardFlowFileQueue implements 
FlowFileQueue {
         public String toString() {
             return "FlowFile Queue Size[ ActiveQueue=[" + activeQueueCount + 
", " + activeQueueBytes +
                 " Bytes], Swap Queue=[" + swappedCount + ", " + swappedBytes +
-                " Bytes], Unacknowledged=[" + unacknowledgedCount + ", " + 
unacknowledgedBytes + " Bytes] ]";
+                " Bytes], Swap Files=[" + swapFiles + "], Unacknowledged=[" + 
unacknowledgedCount + ", " + unacknowledgedBytes + " Bytes] ]";
         }
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/c7700128/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
index 639a4c8..211baa7 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
@@ -110,7 +110,7 @@ public class WriteAheadFlowFileRepository implements 
FlowFileRepository, SyncLis
     // We can then destroy the data. If we end up syncing the FlowFile 
Repository to the backing storage mechanism and then restart
     // before the data is destroyed, it's okay because the data will be 
unknown to the Content Repository, so it will be destroyed
     // on restart.
-    private final ConcurrentMap<Integer, BlockingQueue<ContentClaim>> 
claimsAwaitingDestruction = new ConcurrentHashMap<>();
+    private final ConcurrentMap<Integer, BlockingQueue<ResourceClaim>> 
claimsAwaitingDestruction = new ConcurrentHashMap<>();
 
     public WriteAheadFlowFileRepository() {
         final NiFiProperties properties = NiFiProperties.getInstance();
@@ -169,12 +169,7 @@ public class WriteAheadFlowFileRepository implements 
FlowFileRepository, SyncLis
         updateRepository(records, alwaysSync);
     }
 
-    private void markDestructable(final ContentClaim contentClaim) {
-        if (contentClaim == null) {
-            return;
-        }
-
-        final ResourceClaim resourceClaim = contentClaim.getResourceClaim();
+    private void markDestructable(final ResourceClaim resourceClaim) {
         if (resourceClaim == null) {
             return;
         }
@@ -213,22 +208,22 @@ public class WriteAheadFlowFileRepository implements 
FlowFileRepository, SyncLis
         // However, the result of this is that the FileSystem Repository may 
end up trying to remove the content multiple times.
         // This does not, however, cause problems, as ContentRepository should 
handle this
         // This does indicate that some refactoring should probably be 
performed, though, as this is not a very clean interface.
-        final Set<ContentClaim> claimsToAdd = new HashSet<>();
+        final Set<ResourceClaim> claimsToAdd = new HashSet<>();
         for (final RepositoryRecord record : records) {
             if (record.getType() == RepositoryRecordType.DELETE) {
                 // For any DELETE record that we have, if current claim's 
claimant count <= 0, mark it as destructable
                 if (record.getCurrentClaim() != null && 
getClaimantCount(record.getCurrentClaim()) <= 0) {
-                    claimsToAdd.add(record.getCurrentClaim());
+                    
claimsToAdd.add(record.getCurrentClaim().getResourceClaim());
                 }
 
                 // If the original claim is different than the current claim 
and the original claim has a claimant count <= 0, mark it as destructable.
                 if (record.getOriginalClaim() != null && 
!record.getOriginalClaim().equals(record.getCurrentClaim()) && 
getClaimantCount(record.getOriginalClaim()) <= 0) {
-                    claimsToAdd.add(record.getOriginalClaim());
+                    
claimsToAdd.add(record.getOriginalClaim().getResourceClaim());
                 }
             } else if (record.getType() == RepositoryRecordType.UPDATE) {
                 // if we have an update, and the original is no longer needed, 
mark original as destructable
                 if (record.getOriginalClaim() != null && 
record.getCurrentClaim() != record.getOriginalClaim() && 
getClaimantCount(record.getOriginalClaim()) <= 0) {
-                    claimsToAdd.add(record.getOriginalClaim());
+                    
claimsToAdd.add(record.getOriginalClaim().getResourceClaim());
                 }
             }
         }
@@ -236,10 +231,10 @@ public class WriteAheadFlowFileRepository implements 
FlowFileRepository, SyncLis
         if (!claimsToAdd.isEmpty()) {
             // Get / Register a Set<ContentClaim> for the given Partiton Index
             final Integer partitionKey = Integer.valueOf(partitionIndex);
-            BlockingQueue<ContentClaim> claimQueue = 
claimsAwaitingDestruction.get(partitionKey);
+            BlockingQueue<ResourceClaim> claimQueue = 
claimsAwaitingDestruction.get(partitionKey);
             if (claimQueue == null) {
                 claimQueue = new LinkedBlockingQueue<>();
-                final BlockingQueue<ContentClaim> existingClaimQueue = 
claimsAwaitingDestruction.putIfAbsent(partitionKey, claimQueue);
+                final BlockingQueue<ResourceClaim> existingClaimQueue = 
claimsAwaitingDestruction.putIfAbsent(partitionKey, claimQueue);
                 if (existingClaimQueue != null) {
                     claimQueue = existingClaimQueue;
                 }
@@ -252,26 +247,26 @@ public class WriteAheadFlowFileRepository implements 
FlowFileRepository, SyncLis
 
     @Override
     public void onSync(final int partitionIndex) {
-        final BlockingQueue<ContentClaim> claimQueue = 
claimsAwaitingDestruction.get(Integer.valueOf(partitionIndex));
+        final BlockingQueue<ResourceClaim> claimQueue = 
claimsAwaitingDestruction.get(Integer.valueOf(partitionIndex));
         if (claimQueue == null) {
             return;
         }
 
-        final Set<ContentClaim> claimsToDestroy = new HashSet<>();
+        final Set<ResourceClaim> claimsToDestroy = new HashSet<>();
         claimQueue.drainTo(claimsToDestroy);
 
-        for (final ContentClaim claim : claimsToDestroy) {
+        for (final ResourceClaim claim : claimsToDestroy) {
             markDestructable(claim);
         }
     }
 
     @Override
     public void onGlobalSync() {
-        for (final BlockingQueue<ContentClaim> claimQueue : 
claimsAwaitingDestruction.values()) {
-            final Set<ContentClaim> claimsToDestroy = new HashSet<>();
+        for (final BlockingQueue<ResourceClaim> claimQueue : 
claimsAwaitingDestruction.values()) {
+            final Set<ResourceClaim> claimsToDestroy = new HashSet<>();
             claimQueue.drainTo(claimsToDestroy);
 
-            for (final ContentClaim claim : claimsToDestroy) {
+            for (final ResourceClaim claim : claimsToDestroy) {
                 markDestructable(claim);
             }
         }

Reply via email to