Repository: nifi Updated Branches: refs/heads/NIFI-108 f0e86e5a7 -> 817050960
NIFI-108: Fixed bug that caused the listFlowFiles operation to wait on a readLock before returning and performing work asynchronously; fixed bug in Write-Ahead FlowFile Repository that caused ContentClaims to be queued up for destruction instead of ResourceClaims - this caused millions of ContentClaims to be queued up instead of a single ResourceClaim in some tests Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/c7700128 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/c7700128 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/c7700128 Branch: refs/heads/NIFI-108 Commit: c7700128b7dc5bd20f48677f4b5c57ddbc01ff26 Parents: 7f3980d Author: Mark Payne <[email protected]> Authored: Tue Jan 5 16:51:40 2016 -0500 Committer: Mark Payne <[email protected]> Committed: Tue Jan 5 16:51:40 2016 -0500 ---------------------------------------------------------------------- .../nifi/controller/StandardFlowFileQueue.java | 63 +++++++++----------- .../WriteAheadFlowFileRepository.java | 33 +++++----- 2 files changed, 43 insertions(+), 53 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/c7700128/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java index 57e84b1..d23cf95 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java @@ -75,7 +75,6 @@ import org.apache.nifi.reporting.Severity; import org.apache.nifi.scheduling.SchedulingStrategy; import org.apache.nifi.util.FormatUtils; import org.apache.nifi.util.concurrency.TimedLock; -import org.apache.nifi.web.api.dto.FlowFileSummaryDTO; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -96,7 +95,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue { // guarded by lock private ArrayList<FlowFileRecord> swapQueue = null; - private final AtomicReference<FlowFileQueueSize> size = new AtomicReference<>(new FlowFileQueueSize(0, 0L, 0, 0L, 0, 0L)); + private final AtomicReference<FlowFileQueueSize> size = new AtomicReference<>(new FlowFileQueueSize(0, 0L, 0, 0L, 0, 0, 0L)); private boolean swapMode = false; @@ -279,7 +278,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue { try { if (swapMode || activeQueue.size() >= swapThreshold) { swapQueue.add(file); - incrementSwapQueueSize(1, file.getSize()); + incrementSwapQueueSize(1, file.getSize(), 0); swapMode = true; writeSwapFilesIfNecessary(); } else { @@ -307,7 +306,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue { try { if (swapMode || activeQueue.size() >= swapThreshold - numFiles) { swapQueue.addAll(files); - incrementSwapQueueSize(numFiles, bytes); + incrementSwapQueueSize(numFiles, bytes, 0); swapMode = true; writeSwapFilesIfNecessary(); } else { @@ -466,7 +465,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue { for (final FlowFileRecord flowFile : swappedIn) { swapSize += flowFile.getSize(); } - incrementSwapQueueSize(-swappedIn.size(), -swapSize); + incrementSwapQueueSize(-swappedIn.size(), -swapSize, -1); incrementActiveQueueSize(swappedIn.size(), swapSize); activeQueue.addAll(swappedIn); return; @@ -512,7 +511,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue { if (recordsMigrated > 0) { incrementActiveQueueSize(recordsMigrated, bytesMigrated); - incrementSwapQueueSize(-recordsMigrated, -bytesMigrated); + incrementSwapQueueSize(-recordsMigrated, -bytesMigrated, 0); } if (size.get().swappedCount == 0) { @@ -603,7 +602,9 @@ public final class StandardFlowFileQueue implements FlowFileQueue { final long addedSwapBytes = updatedSwapQueueBytes - originalSwapQueueBytes; final FlowFileQueueSize newSize = new FlowFileQueueSize(activeQueue.size(), activeQueueBytes, - originalSize.swappedCount + addedSwapRecords + flowFilesSwappedOut, originalSize.swappedBytes + addedSwapBytes + bytesSwappedOut, + originalSize.swappedCount + addedSwapRecords + flowFilesSwappedOut, + originalSize.swappedBytes + addedSwapBytes + bytesSwappedOut, + originalSize.swapFiles + numSwapFiles, originalSize.unacknowledgedCount, originalSize.unacknowledgedBytes); updated = size.compareAndSet(originalSize, newSize); } @@ -833,7 +834,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue { } } - incrementSwapQueueSize(swapFlowFileCount, swapByteCount); + incrementSwapQueueSize(swapFlowFileCount, swapByteCount, swapLocations.size()); this.swapLocations.addAll(swapLocations); } finally { writeLock.unlock("Recover Swap Files"); @@ -894,15 +895,8 @@ public final class StandardFlowFileQueue implements FlowFileQueue { } } - final int numSteps; - readLock.lock(); - try { - // numSteps = 1 for each swap location + 1 for active queue + 1 for swap queue. - numSteps = 2 + swapLocations.size(); - } finally { - readLock.unlock("listFlowFiles"); - } - + // numSteps = 1 for each swap location + 1 for active queue + 1 for swap queue. + final int numSteps = 2 + size.get().swapFiles; final ListFlowFileRequest listRequest = new ListFlowFileRequest(requestIdentifier, sortColumn, direction, maxResults, size(), numSteps); final Thread t = new Thread(new Runnable() { @@ -1190,16 +1184,15 @@ public final class StandardFlowFileQueue implements FlowFileQueue { dropRequest.setCurrentSize(getQueueSize()); dropRequest.setDroppedSize(dropRequest.getDroppedSize().add(droppedSize)); - try { - final QueueSize swapSize = size.get().swapQueueSize(); - - logger.debug("For DropFlowFileRequest {}, Swap Queue has {} elements, Swapped Record Count = {}, Swapped Content Size = {}", - requestIdentifier, swapQueue.size(), swapSize.getObjectCount(), swapSize.getByteCount()); - if (dropRequest.getState() == DropFlowFileState.CANCELED) { - logger.info("Cancel requested for DropFlowFileRequest {}", requestIdentifier); - return; - } + final QueueSize swapSize = size.get().swapQueueSize(); + logger.debug("For DropFlowFileRequest {}, Swap Queue has {} elements, Swapped Record Count = {}, Swapped Content Size = {}", + requestIdentifier, swapQueue.size(), swapSize.getObjectCount(), swapSize.getByteCount()); + if (dropRequest.getState() == DropFlowFileState.CANCELED) { + logger.info("Cancel requested for DropFlowFileRequest {}", requestIdentifier); + return; + } + try { droppedSize = drop(swapQueue, requestor); } catch (final IOException ioe) { logger.error("Failed to drop the FlowFiles from queue {} due to {}", StandardFlowFileQueue.this.getIdentifier(), ioe.toString()); @@ -1213,7 +1206,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue { dropRequest.setCurrentSize(getQueueSize()); dropRequest.setDroppedSize(dropRequest.getDroppedSize().add(droppedSize)); swapMode = false; - incrementSwapQueueSize(-droppedSize.getObjectCount(), -droppedSize.getByteCount()); + incrementSwapQueueSize(-droppedSize.getObjectCount(), -droppedSize.getByteCount(), 0); logger.debug("For DropFlowFileRequest {}, dropped {} from Swap Queue", requestIdentifier, droppedSize); final int swapFileCount = swapLocations.size(); @@ -1244,7 +1237,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue { } dropRequest.setDroppedSize(dropRequest.getDroppedSize().add(droppedSize)); - incrementSwapQueueSize(-droppedSize.getObjectCount(), -droppedSize.getByteCount()); + incrementSwapQueueSize(-droppedSize.getObjectCount(), -droppedSize.getByteCount(), -1); dropRequest.setCurrentSize(getQueueSize()); swapLocationItr.remove(); @@ -1420,7 +1413,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue { while (!updated) { final FlowFileQueueSize original = size.get(); final FlowFileQueueSize newSize = new FlowFileQueueSize(original.activeQueueCount + count, original.activeQueueBytes + bytes, - original.swappedCount, original.swappedBytes, original.unacknowledgedCount, original.unacknowledgedBytes); + original.swappedCount, original.swappedBytes, original.swapFiles, original.unacknowledgedCount, original.unacknowledgedBytes); updated = size.compareAndSet(original, newSize); if (updated) { @@ -1429,12 +1422,12 @@ public final class StandardFlowFileQueue implements FlowFileQueue { } } - private void incrementSwapQueueSize(final int count, final long bytes) { + private void incrementSwapQueueSize(final int count, final long bytes, final int fileCount) { boolean updated = false; while (!updated) { final FlowFileQueueSize original = size.get(); final FlowFileQueueSize newSize = new FlowFileQueueSize(original.activeQueueCount, original.activeQueueBytes, - original.swappedCount + count, original.swappedBytes + bytes, original.unacknowledgedCount, original.unacknowledgedBytes); + original.swappedCount + count, original.swappedBytes + bytes, original.swapFiles + fileCount, original.unacknowledgedCount, original.unacknowledgedBytes); updated = size.compareAndSet(original, newSize); if (updated) { @@ -1448,7 +1441,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue { while (!updated) { final FlowFileQueueSize original = size.get(); final FlowFileQueueSize newSize = new FlowFileQueueSize(original.activeQueueCount, original.activeQueueBytes, - original.swappedCount, original.swappedBytes, original.unacknowledgedCount + count, original.unacknowledgedBytes + bytes); + original.swappedCount, original.swappedBytes, original.swapFiles, original.unacknowledgedCount + count, original.unacknowledgedBytes + bytes); updated = size.compareAndSet(original, newSize); if (updated) { @@ -1472,15 +1465,17 @@ public final class StandardFlowFileQueue implements FlowFileQueue { private final long activeQueueBytes; private final int swappedCount; private final long swappedBytes; + private final int swapFiles; private final int unacknowledgedCount; private final long unacknowledgedBytes; - public FlowFileQueueSize(final int activeQueueCount, final long activeQueueBytes, final int swappedCount, final long swappedBytes, + public FlowFileQueueSize(final int activeQueueCount, final long activeQueueBytes, final int swappedCount, final long swappedBytes, final int swapFileCount, final int unacknowledgedCount, final long unacknowledgedBytes) { this.activeQueueCount = activeQueueCount; this.activeQueueBytes = activeQueueBytes; this.swappedCount = swappedCount; this.swappedBytes = swappedBytes; + this.swapFiles = swapFileCount; this.unacknowledgedCount = unacknowledgedCount; this.unacknowledgedBytes = unacknowledgedBytes; } @@ -1509,7 +1504,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue { public String toString() { return "FlowFile Queue Size[ ActiveQueue=[" + activeQueueCount + ", " + activeQueueBytes + " Bytes], Swap Queue=[" + swappedCount + ", " + swappedBytes + - " Bytes], Unacknowledged=[" + unacknowledgedCount + ", " + unacknowledgedBytes + " Bytes] ]"; + " Bytes], Swap Files=[" + swapFiles + "], Unacknowledged=[" + unacknowledgedCount + ", " + unacknowledgedBytes + " Bytes] ]"; } } http://git-wip-us.apache.org/repos/asf/nifi/blob/c7700128/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java index 639a4c8..211baa7 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java @@ -110,7 +110,7 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis // We can then destroy the data. If we end up syncing the FlowFile Repository to the backing storage mechanism and then restart // before the data is destroyed, it's okay because the data will be unknown to the Content Repository, so it will be destroyed // on restart. - private final ConcurrentMap<Integer, BlockingQueue<ContentClaim>> claimsAwaitingDestruction = new ConcurrentHashMap<>(); + private final ConcurrentMap<Integer, BlockingQueue<ResourceClaim>> claimsAwaitingDestruction = new ConcurrentHashMap<>(); public WriteAheadFlowFileRepository() { final NiFiProperties properties = NiFiProperties.getInstance(); @@ -169,12 +169,7 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis updateRepository(records, alwaysSync); } - private void markDestructable(final ContentClaim contentClaim) { - if (contentClaim == null) { - return; - } - - final ResourceClaim resourceClaim = contentClaim.getResourceClaim(); + private void markDestructable(final ResourceClaim resourceClaim) { if (resourceClaim == null) { return; } @@ -213,22 +208,22 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis // However, the result of this is that the FileSystem Repository may end up trying to remove the content multiple times. // This does not, however, cause problems, as ContentRepository should handle this // This does indicate that some refactoring should probably be performed, though, as this is not a very clean interface. - final Set<ContentClaim> claimsToAdd = new HashSet<>(); + final Set<ResourceClaim> claimsToAdd = new HashSet<>(); for (final RepositoryRecord record : records) { if (record.getType() == RepositoryRecordType.DELETE) { // For any DELETE record that we have, if current claim's claimant count <= 0, mark it as destructable if (record.getCurrentClaim() != null && getClaimantCount(record.getCurrentClaim()) <= 0) { - claimsToAdd.add(record.getCurrentClaim()); + claimsToAdd.add(record.getCurrentClaim().getResourceClaim()); } // If the original claim is different than the current claim and the original claim has a claimant count <= 0, mark it as destructable. if (record.getOriginalClaim() != null && !record.getOriginalClaim().equals(record.getCurrentClaim()) && getClaimantCount(record.getOriginalClaim()) <= 0) { - claimsToAdd.add(record.getOriginalClaim()); + claimsToAdd.add(record.getOriginalClaim().getResourceClaim()); } } else if (record.getType() == RepositoryRecordType.UPDATE) { // if we have an update, and the original is no longer needed, mark original as destructable if (record.getOriginalClaim() != null && record.getCurrentClaim() != record.getOriginalClaim() && getClaimantCount(record.getOriginalClaim()) <= 0) { - claimsToAdd.add(record.getOriginalClaim()); + claimsToAdd.add(record.getOriginalClaim().getResourceClaim()); } } } @@ -236,10 +231,10 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis if (!claimsToAdd.isEmpty()) { // Get / Register a Set<ContentClaim> for the given Partiton Index final Integer partitionKey = Integer.valueOf(partitionIndex); - BlockingQueue<ContentClaim> claimQueue = claimsAwaitingDestruction.get(partitionKey); + BlockingQueue<ResourceClaim> claimQueue = claimsAwaitingDestruction.get(partitionKey); if (claimQueue == null) { claimQueue = new LinkedBlockingQueue<>(); - final BlockingQueue<ContentClaim> existingClaimQueue = claimsAwaitingDestruction.putIfAbsent(partitionKey, claimQueue); + final BlockingQueue<ResourceClaim> existingClaimQueue = claimsAwaitingDestruction.putIfAbsent(partitionKey, claimQueue); if (existingClaimQueue != null) { claimQueue = existingClaimQueue; } @@ -252,26 +247,26 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis @Override public void onSync(final int partitionIndex) { - final BlockingQueue<ContentClaim> claimQueue = claimsAwaitingDestruction.get(Integer.valueOf(partitionIndex)); + final BlockingQueue<ResourceClaim> claimQueue = claimsAwaitingDestruction.get(Integer.valueOf(partitionIndex)); if (claimQueue == null) { return; } - final Set<ContentClaim> claimsToDestroy = new HashSet<>(); + final Set<ResourceClaim> claimsToDestroy = new HashSet<>(); claimQueue.drainTo(claimsToDestroy); - for (final ContentClaim claim : claimsToDestroy) { + for (final ResourceClaim claim : claimsToDestroy) { markDestructable(claim); } } @Override public void onGlobalSync() { - for (final BlockingQueue<ContentClaim> claimQueue : claimsAwaitingDestruction.values()) { - final Set<ContentClaim> claimsToDestroy = new HashSet<>(); + for (final BlockingQueue<ResourceClaim> claimQueue : claimsAwaitingDestruction.values()) { + final Set<ResourceClaim> claimsToDestroy = new HashSet<>(); claimQueue.drainTo(claimsToDestroy); - for (final ContentClaim claim : claimsToDestroy) { + for (final ResourceClaim claim : claimsToDestroy) { markDestructable(claim); } }
