NIFI-5585: Addressed bug in calculating swap size of a queue partition when rebalancing
This closes #3010. Signed-off-by: Mark Payne <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/97afa4e7 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/97afa4e7 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/97afa4e7 Branch: refs/heads/master Commit: 97afa4e7bab53f707c8299adaab9cb30a8777dd5 Parents: a1a4c99 Author: Mark Payne <[email protected]> Authored: Tue Oct 9 14:54:21 2018 -0400 Committer: Mark Payne <[email protected]> Committed: Thu Oct 11 09:52:16 2018 -0400 ---------------------------------------------------------------------- .../queue/SwappablePriorityQueue.java | 24 ++++++++++++-------- 1 file changed, 14 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/97afa4e7/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/SwappablePriorityQueue.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/SwappablePriorityQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/SwappablePriorityQueue.java index 66b594d..6dfa77d 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/SwappablePriorityQueue.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/SwappablePriorityQueue.java @@ -878,7 +878,7 @@ public class SwappablePriorityQueue { original.getSwappedCount(), original.getSwappedBytes(), original.getSwapFileCount(), original.getUnacknowledgedCount(), original.getUnacknowledgedBytes()); - updated = size.compareAndSet(original, newSize); + updated = updateSize(original, newSize); if (updated) { logIfNegative(original, newSize, "active"); @@ -908,7 +908,8 @@ public class SwappablePriorityQueue { final FlowFileQueueSize newSize = new FlowFileQueueSize(original.getActiveCount(), original.getActiveBytes(), original.getSwappedCount(), original.getSwappedBytes(), original.getSwapFileCount(), original.getUnacknowledgedCount() + count, original.getUnacknowledgedBytes() + bytes); - updated = size.compareAndSet(original, newSize); + + updated = updateSize(original, newSize); if (updated) { logIfNegative(original, newSize, "Unacknowledged"); @@ -949,7 +950,6 @@ public class SwappablePriorityQueue { writeLock.lock(); try { final List<FlowFileRecord> activeRecords = new ArrayList<>(this.activeQueue); - activeRecords.addAll(this.swapQueue); final List<String> updatedSwapLocations = new ArrayList<>(swapLocations.size()); for (final String swapLocation : swapLocations) { @@ -963,23 +963,27 @@ public class SwappablePriorityQueue { this.swapLocations.clear(); this.activeQueue.clear(); - this.swapQueue.clear(); + + final int swapQueueCount = swapQueue.size(); + final long swapQueueBytes = swapQueue.stream().mapToLong(FlowFileRecord::getSize).sum(); + activeRecords.addAll(swapQueue); + swapQueue.clear(); this.swapMode = false; - QueueSize swapSize = new QueueSize(0, 0L); - boolean updated = false; - while (!updated) { + QueueSize swapSize; + boolean updated; + do { final FlowFileQueueSize currentSize = getFlowFileQueueSize(); - swapSize = new QueueSize(currentSize.getSwappedCount(), currentSize.getSwappedBytes()); + swapSize = new QueueSize(currentSize.getSwappedCount() - swapQueueCount, currentSize.getSwappedBytes() - swapQueueBytes); final FlowFileQueueSize updatedSize = new FlowFileQueueSize(0, 0, 0, 0, 0, currentSize.getUnacknowledgedCount(), currentSize.getUnacknowledgedBytes()); updated = updateSize(currentSize, updatedSize); - } + } while (!updated); return new FlowFileQueueContents(activeRecords, updatedSwapLocations, swapSize); } finally { - writeLock.unlock("transfer(SwappablePriorityQueue)"); + writeLock.unlock("packageForRebalance(SwappablePriorityQueue)"); } }
