NIFI-5585: Addressed bug in calculating swap size of a queue partition when 
rebalancing

This closes #3010.

Signed-off-by: Mark Payne <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/97afa4e7
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/97afa4e7
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/97afa4e7

Branch: refs/heads/master
Commit: 97afa4e7bab53f707c8299adaab9cb30a8777dd5
Parents: a1a4c99
Author: Mark Payne <[email protected]>
Authored: Tue Oct 9 14:54:21 2018 -0400
Committer: Mark Payne <[email protected]>
Committed: Thu Oct 11 09:52:16 2018 -0400

----------------------------------------------------------------------
 .../queue/SwappablePriorityQueue.java           | 24 ++++++++++++--------
 1 file changed, 14 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/97afa4e7/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/SwappablePriorityQueue.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/SwappablePriorityQueue.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/SwappablePriorityQueue.java
index 66b594d..6dfa77d 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/SwappablePriorityQueue.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/SwappablePriorityQueue.java
@@ -878,7 +878,7 @@ public class SwappablePriorityQueue {
                 original.getSwappedCount(), original.getSwappedBytes(), 
original.getSwapFileCount(),
                 original.getUnacknowledgedCount(), 
original.getUnacknowledgedBytes());
 
-            updated = size.compareAndSet(original, newSize);
+            updated = updateSize(original, newSize);
 
             if (updated) {
                 logIfNegative(original, newSize, "active");
@@ -908,7 +908,8 @@ public class SwappablePriorityQueue {
             final FlowFileQueueSize newSize = new 
FlowFileQueueSize(original.getActiveCount(), original.getActiveBytes(),
                 original.getSwappedCount(), original.getSwappedBytes(), 
original.getSwapFileCount(),
                 original.getUnacknowledgedCount() + count, 
original.getUnacknowledgedBytes() + bytes);
-            updated = size.compareAndSet(original, newSize);
+
+            updated = updateSize(original, newSize);
 
             if (updated) {
                 logIfNegative(original, newSize, "Unacknowledged");
@@ -949,7 +950,6 @@ public class SwappablePriorityQueue {
         writeLock.lock();
         try {
             final List<FlowFileRecord> activeRecords = new 
ArrayList<>(this.activeQueue);
-            activeRecords.addAll(this.swapQueue);
 
             final List<String> updatedSwapLocations = new 
ArrayList<>(swapLocations.size());
             for (final String swapLocation : swapLocations) {
@@ -963,23 +963,27 @@ public class SwappablePriorityQueue {
 
             this.swapLocations.clear();
             this.activeQueue.clear();
-            this.swapQueue.clear();
+
+            final int swapQueueCount = swapQueue.size();
+            final long swapQueueBytes = 
swapQueue.stream().mapToLong(FlowFileRecord::getSize).sum();
+            activeRecords.addAll(swapQueue);
+            swapQueue.clear();
 
             this.swapMode = false;
 
-            QueueSize swapSize = new QueueSize(0, 0L);
-            boolean updated = false;
-            while (!updated) {
+            QueueSize swapSize;
+            boolean updated;
+            do {
                 final FlowFileQueueSize currentSize = getFlowFileQueueSize();
-                swapSize = new QueueSize(currentSize.getSwappedCount(), 
currentSize.getSwappedBytes());
+                swapSize = new QueueSize(currentSize.getSwappedCount() - 
swapQueueCount, currentSize.getSwappedBytes() - swapQueueBytes);
 
                 final FlowFileQueueSize updatedSize = new FlowFileQueueSize(0, 
0, 0, 0, 0, currentSize.getUnacknowledgedCount(), 
currentSize.getUnacknowledgedBytes());
                 updated = updateSize(currentSize, updatedSize);
-            }
+            } while (!updated);
 
             return new FlowFileQueueContents(activeRecords, 
updatedSwapLocations, swapSize);
         } finally {
-            writeLock.unlock("transfer(SwappablePriorityQueue)");
+            writeLock.unlock("packageForRebalance(SwappablePriorityQueue)");
         }
     }
 

Reply via email to