Repository: nifi Updated Branches: refs/heads/master 7fd2c42b1 -> e1f0ba5a4
NIFI-2925: When swapping in FlowFiles, do not assume that its Resource Claim is 'in use' but instead look up the canonical representation of the resource claim This closes #1150. Signed-off-by: Bryan Bende <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/e1f0ba5a Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/e1f0ba5a Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/e1f0ba5a Branch: refs/heads/master Commit: e1f0ba5a43811561d2dce09914ca87db4a3e1738 Parents: 7fd2c42 Author: Mark Payne <[email protected]> Authored: Wed Oct 19 16:48:55 2016 -0400 Committer: Bryan Bende <[email protected]> Committed: Mon Oct 31 09:59:06 2016 -0400 ---------------------------------------------------------------------- .../repository/claim/ResourceClaimManager.java | 14 ++++- .../nifi/controller/FileSystemSwapManager.java | 9 +++- .../apache/nifi/controller/FlowController.java | 10 ++-- .../repository/FileSystemRepository.java | 5 +- .../repository/VolatileContentRepository.java | 2 +- .../WriteAheadFlowFileRepository.java | 2 +- .../claim/StandardResourceClaimManager.java | 54 +++++++++++++++----- .../controller/TestFileSystemSwapManager.java | 7 ++- .../repository/TestStandardProcessSession.java | 20 ++++---- .../TestVolatileContentRepository.java | 2 +- .../TestWriteAheadFlowFileRepository.java | 4 +- .../claim/TestStandardResourceClaimManager.java | 2 +- 12 files changed, 93 insertions(+), 38 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/e1f0ba5a/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/claim/ResourceClaimManager.java ---------------------------------------------------------------------- diff --git a/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/claim/ResourceClaimManager.java b/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/claim/ResourceClaimManager.java index 4fe523e..a85ddc4 100644 --- a/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/claim/ResourceClaimManager.java +++ b/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/claim/ResourceClaimManager.java @@ -32,9 +32,21 @@ public interface ResourceClaimManager { * @param container of claim * @param section of claim * @param lossTolerant of claim + * @param writable whether or not the claim should be made writable * @return new claim */ - ResourceClaim newResourceClaim(String container, String section, String id, boolean lossTolerant); + ResourceClaim newResourceClaim(String container, String section, String id, boolean lossTolerant, boolean writable); + + /** + * Returns the Resource Claim with the given id, container, and section, if one exists, <code>null</code> otherwise + * + * @param id of claim + * @param container of claim + * @param section of claim + * @return the existing resource claim or <code>null</code> if none exists + */ + ResourceClaim getResourceClaim(String container, String section, String id); + /** * @param claim to obtain reference count for http://git-wip-us.apache.org/repos/asf/nifi/blob/e1f0ba5a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java index 3c4610f..350cceb 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java @@ -504,7 +504,14 @@ public class FileSystemSwapManager implements FlowFileSwapManager { lossTolerant = false; } - resourceClaim = claimManager.newResourceClaim(container, section, claimId, lossTolerant); + resourceClaim = claimManager.getResourceClaim(container, section, claimId); + if (resourceClaim == null) { + logger.error("Swap file indicates that FlowFile was referencing Resource Claim at container={}, section={}, claimId={}, " + + "but this Resource Claim cannot be found! Will create a temporary Resource Claim, but this may affect the framework's " + + "ability to properly clean up this resource", container, section, claimId); + resourceClaim = claimManager.newResourceClaim(container, section, claimId, lossTolerant, true); + } + final StandardContentClaim claim = new StandardContentClaim(resourceClaim, resourceOffset); claim.setLength(resourceLength); http://git-wip-us.apache.org/repos/asf/nifi/blob/e1f0ba5a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index b42f3ae..89a4379 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -3553,7 +3553,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R return null; } - final ResourceClaim resourceClaim = resourceClaimManager.newResourceClaim(container, section, identifier, false); + final ResourceClaim resourceClaim = resourceClaimManager.newResourceClaim(container, section, identifier, false, false); return new StandardContentClaim(resourceClaim, offset == null ? 0L : offset.longValue()); } @@ -3579,7 +3579,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R } final ResourceClaim resourceClaim = resourceClaimManager.newResourceClaim(provEvent.getPreviousContentClaimContainer(), provEvent.getPreviousContentClaimSection(), - provEvent.getPreviousContentClaimIdentifier(), false); + provEvent.getPreviousContentClaimIdentifier(), false, false); claim = new StandardContentClaim(resourceClaim, provEvent.getPreviousContentClaimOffset()); offset = provEvent.getPreviousContentClaimOffset() == null ? 0L : provEvent.getPreviousContentClaimOffset(); size = provEvent.getPreviousFileSize(); @@ -3589,7 +3589,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R } final ResourceClaim resourceClaim = resourceClaimManager.newResourceClaim(provEvent.getContentClaimContainer(), provEvent.getContentClaimSection(), - provEvent.getContentClaimIdentifier(), false); + provEvent.getContentClaimIdentifier(), false, false); claim = new StandardContentClaim(resourceClaim, provEvent.getContentClaimOffset()); offset = provEvent.getContentClaimOffset() == null ? 0L : provEvent.getContentClaimOffset(); @@ -3682,7 +3682,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R } try { - final ResourceClaim resourceClaim = resourceClaimManager.newResourceClaim(contentClaimContainer, contentClaimSection, contentClaimId, false); + final ResourceClaim resourceClaim = resourceClaimManager.newResourceClaim(contentClaimContainer, contentClaimSection, contentClaimId, false, false); final ContentClaim contentClaim = new StandardContentClaim(resourceClaim, event.getPreviousContentClaimOffset()); if (!contentRepository.isAccessible(contentClaim)) { @@ -3763,7 +3763,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R // Create the ContentClaim final ResourceClaim resourceClaim = resourceClaimManager.newResourceClaim(event.getPreviousContentClaimContainer(), - event.getPreviousContentClaimSection(), event.getPreviousContentClaimIdentifier(), false); + event.getPreviousContentClaimSection(), event.getPreviousContentClaimIdentifier(), false, false); // Increment Claimant Count, since we will now be referencing the Content Claim resourceClaimManager.incrementClaimantCount(resourceClaim); http://git-wip-us.apache.org/repos/asf/nifi/blob/e1f0ba5a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java index 2960091..e45852a 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java @@ -439,7 +439,7 @@ public class FileSystemRepository implements ContentRepository { final String id = idPath.toFile().getName(); final String sectionName = sectionPath.toFile().getName(); - final ResourceClaim resourceClaim = resourceClaimManager.newResourceClaim(containerName, sectionName, id, false); + final ResourceClaim resourceClaim = resourceClaimManager.newResourceClaim(containerName, sectionName, id, false, false); if (resourceClaimManager.getClaimantCount(resourceClaim) == 0) { removeIncompleteContent(fileToRemove); } @@ -537,7 +537,7 @@ public class FileSystemRepository implements ContentRepository { final String section = String.valueOf(modulatedSectionIndex); final String claimId = System.currentTimeMillis() + "-" + currentIndex; - resourceClaim = resourceClaimManager.newResourceClaim(containerName, section, claimId, lossTolerant); + resourceClaim = resourceClaimManager.newResourceClaim(containerName, section, claimId, lossTolerant, true); resourceOffset = 0L; LOG.debug("Creating new Resource Claim {}", resourceClaim); @@ -949,6 +949,7 @@ public class FileSystemRepository implements ContentRepository { LOG.debug("Claim length less than max; Adding {} back to Writable Claim Queue", this); } else { writableClaimStreams.remove(scc.getResourceClaim()); + resourceClaimManager.freeze(scc.getResourceClaim()); bcos.close(); http://git-wip-us.apache.org/repos/asf/nifi/blob/e1f0ba5a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileContentRepository.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileContentRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileContentRepository.java index 95b503b..f7ff7b8 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileContentRepository.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileContentRepository.java @@ -217,7 +217,7 @@ public class VolatileContentRepository implements ContentRepository { private ContentClaim createLossTolerant() { final long id = idGenerator.getAndIncrement(); - final ResourceClaim resourceClaim = claimManager.newResourceClaim(CONTAINER_NAME, "section", String.valueOf(id), true); + final ResourceClaim resourceClaim = claimManager.newResourceClaim(CONTAINER_NAME, "section", String.valueOf(id), true, false); final ContentClaim claim = new StandardContentClaim(resourceClaim, 0L); final ContentBlock contentBlock = new ContentBlock(claim, repoSize); claimManager.incrementClaimantCount(resourceClaim, true); http://git-wip-us.apache.org/repos/asf/nifi/blob/e1f0ba5a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java index 36d592c..9c2a7d8 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java @@ -826,7 +826,7 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis lossTolerant = false; } - final ResourceClaim resourceClaim = claimManager.newResourceClaim(container, section, claimId, lossTolerant); + final ResourceClaim resourceClaim = claimManager.newResourceClaim(container, section, claimId, lossTolerant, false); final StandardContentClaim contentClaim = new StandardContentClaim(resourceClaim, resourceOffset); contentClaim.setLength(resourceLength); http://git-wip-us.apache.org/repos/asf/nifi/blob/e1f0ba5a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaimManager.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaimManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaimManager.java index 9cb0fa1..be0e8b8 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaimManager.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaimManager.java @@ -29,14 +29,25 @@ import org.slf4j.LoggerFactory; public class StandardResourceClaimManager implements ResourceClaimManager { - private static final ConcurrentMap<ResourceClaim, AtomicInteger> claimantCounts = new ConcurrentHashMap<>(); + private static final ConcurrentMap<ResourceClaim, ClaimCount> claimantCounts = new ConcurrentHashMap<>(); private static final Logger logger = LoggerFactory.getLogger(StandardResourceClaimManager.class); private static final BlockingQueue<ResourceClaim> destructableClaims = new LinkedBlockingQueue<>(50000); @Override - public ResourceClaim newResourceClaim(final String container, final String section, final String id, final boolean lossTolerant) { - return new StandardResourceClaim(this, container, section, id, lossTolerant); + public ResourceClaim newResourceClaim(final String container, final String section, final String id, final boolean lossTolerant, final boolean writable) { + final StandardResourceClaim claim = new StandardResourceClaim(this, container, section, id, lossTolerant); + if (!writable) { + claim.freeze(); + } + return claim; + } + + @Override + public ResourceClaim getResourceClaim(final String container, final String section, final String id) { + final ResourceClaim tempClaim = new StandardResourceClaim(this, container, section, id, false); + final ClaimCount count = claimantCounts.get(tempClaim); + return (count == null) ? null : count.getClaim(); } private static AtomicInteger getCounter(final ResourceClaim claim) { @@ -44,14 +55,14 @@ public class StandardResourceClaimManager implements ResourceClaimManager { return null; } - AtomicInteger counter = claimantCounts.get(claim); + ClaimCount counter = claimantCounts.get(claim); if (counter != null) { - return counter; + return counter.getCount(); } - counter = new AtomicInteger(0); - final AtomicInteger existingCounter = claimantCounts.putIfAbsent(claim, counter); - return existingCounter == null ? counter : existingCounter; + counter = new ClaimCount(claim, new AtomicInteger(0)); + final ClaimCount existingCounter = claimantCounts.putIfAbsent(claim, counter); + return existingCounter == null ? counter.getCount() : existingCounter.getCount(); } @Override @@ -61,8 +72,8 @@ public class StandardResourceClaimManager implements ResourceClaimManager { } synchronized (claim) { - final AtomicInteger counter = claimantCounts.get(claim); - return counter == null ? 0 : counter.get(); + final ClaimCount counter = claimantCounts.get(claim); + return counter == null ? 0 : counter.getCount().get(); } } @@ -73,13 +84,13 @@ public class StandardResourceClaimManager implements ResourceClaimManager { } synchronized (claim) { - final AtomicInteger counter = claimantCounts.get(claim); + final ClaimCount counter = claimantCounts.get(claim); if (counter == null) { logger.warn("Decrementing claimant count for {} but claimant count is not known. Returning -1", claim); return -1; } - final int newClaimantCount = counter.decrementAndGet(); + final int newClaimantCount = counter.getCount().decrementAndGet(); if (newClaimantCount < 0) { logger.error("Decremented claimant count for {} to {}", claim, newClaimantCount); } else { @@ -178,4 +189,23 @@ public class StandardResourceClaimManager implements ResourceClaimManager { ((StandardResourceClaim) claim).freeze(); } + + + private static final class ClaimCount { + private final ResourceClaim claim; + private final AtomicInteger count; + + public ClaimCount(final ResourceClaim claim, final AtomicInteger count) { + this.claim = claim; + this.count = count; + } + + public AtomicInteger getCount() { + return count; + } + + public ResourceClaim getClaim() { + return claim; + } + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/e1f0ba5a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java index 6c52def..97226b2 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java @@ -115,7 +115,12 @@ public class TestFileSystemSwapManager { public class NopResourceClaimManager implements ResourceClaimManager { @Override - public ResourceClaim newResourceClaim(String container, String section, String id, boolean lossTolerant) { + public ResourceClaim newResourceClaim(String container, String section, String id, boolean lossTolerant, boolean writable) { + return null; + } + + @Override + public ResourceClaim getResourceClaim(String container, String section, String id) { return null; } http://git-wip-us.apache.org/repos/asf/nifi/blob/e1f0ba5a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java index 8d398aa..a286024 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java @@ -817,7 +817,7 @@ public class TestStandardProcessSession { final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder() .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") .entryDate(System.currentTimeMillis()) - .contentClaim(new StandardContentClaim(resourceClaimManager.newResourceClaim("x", "x", "0", true), 0L)) + .contentClaim(new StandardContentClaim(resourceClaimManager.newResourceClaim("x", "x", "0", true, false), 0L)) .size(1L) .build(); flowFileQueue.put(flowFileRecord); @@ -964,7 +964,7 @@ public class TestStandardProcessSession { final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder() .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") .entryDate(System.currentTimeMillis()) - .contentClaim(new StandardContentClaim(resourceClaimManager.newResourceClaim("x", "x", "0", true), 0L)) + .contentClaim(new StandardContentClaim(resourceClaimManager.newResourceClaim("x", "x", "0", true, false), 0L)) .size(1L) .build(); flowFileQueue.put(flowFileRecord); @@ -988,7 +988,7 @@ public class TestStandardProcessSession { final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder() .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") .entryDate(System.currentTimeMillis()) - .contentClaim(new StandardContentClaim(resourceClaimManager.newResourceClaim("x", "x", "0", true), 0L)) + .contentClaim(new StandardContentClaim(resourceClaimManager.newResourceClaim("x", "x", "0", true, false), 0L)) .build(); flowFileQueue.put(flowFileRecord); @@ -1004,7 +1004,7 @@ public class TestStandardProcessSession { final FlowFileRecord flowFileRecord2 = new StandardFlowFileRecord.Builder() .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") .entryDate(System.currentTimeMillis()) - .contentClaim(new StandardContentClaim(resourceClaimManager.newResourceClaim("x", "x", "0", true), 0L)) + .contentClaim(new StandardContentClaim(resourceClaimManager.newResourceClaim("x", "x", "0", true, false), 0L)) .contentClaimOffset(1000L) .size(1000L) .build(); @@ -1029,7 +1029,7 @@ public class TestStandardProcessSession { final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder() .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") .entryDate(System.currentTimeMillis()) - .contentClaim(new StandardContentClaim(resourceClaimManager.newResourceClaim("x", "x", "0", true), 0L)) + .contentClaim(new StandardContentClaim(resourceClaimManager.newResourceClaim("x", "x", "0", true, false), 0L)) .build(); flowFileQueue.put(flowFileRecord); @@ -1046,7 +1046,7 @@ public class TestStandardProcessSession { final FlowFileRecord flowFileRecord2 = new StandardFlowFileRecord.Builder() .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") .entryDate(System.currentTimeMillis()) - .contentClaim(new StandardContentClaim(resourceClaimManager.newResourceClaim("x", "x", "0", true), 0L)) + .contentClaim(new StandardContentClaim(resourceClaimManager.newResourceClaim("x", "x", "0", true, false), 0L)) .contentClaimOffset(1000L).size(1L).build(); flowFileQueue.put(flowFileRecord2); @@ -1113,7 +1113,7 @@ public class TestStandardProcessSession { final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder() .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") .entryDate(System.currentTimeMillis()) - .contentClaim(new StandardContentClaim(resourceClaimManager.newResourceClaim("x", "x", "0", true), 0L)) + .contentClaim(new StandardContentClaim(resourceClaimManager.newResourceClaim("x", "x", "0", true, false), 0L)) .contentClaimOffset(0L).size(0L).build(); flowFileQueue.put(flowFileRecord); @@ -1150,7 +1150,7 @@ public class TestStandardProcessSession { final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder() .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") .entryDate(System.currentTimeMillis()) - .contentClaim(new StandardContentClaim(resourceClaimManager.newResourceClaim("x", "x", "0", true), 0L)) + .contentClaim(new StandardContentClaim(resourceClaimManager.newResourceClaim("x", "x", "0", true, false), 0L)) .contentClaimOffset(0L).size(0L).build(); flowFileQueue.put(flowFileRecord); @@ -1477,7 +1477,7 @@ public class TestStandardProcessSession { final Set<ContentClaim> claims = new HashSet<>(); for (long i = 0; i < idGenerator.get(); i++) { - final ResourceClaim resourceClaim = resourceClaimManager.newResourceClaim("container", "section", String.valueOf(i), false); + final ResourceClaim resourceClaim = resourceClaimManager.newResourceClaim("container", "section", String.valueOf(i), false, false); final ContentClaim contentClaim = new StandardContentClaim(resourceClaim, 0L); if (getClaimantCount(contentClaim) > 0) { claims.add(contentClaim); @@ -1489,7 +1489,7 @@ public class TestStandardProcessSession { @Override public ContentClaim create(boolean lossTolerant) throws IOException { - final ResourceClaim resourceClaim = claimManager.newResourceClaim("container", "section", String.valueOf(idGenerator.getAndIncrement()), false); + final ResourceClaim resourceClaim = claimManager.newResourceClaim("container", "section", String.valueOf(idGenerator.getAndIncrement()), false, false); final ContentClaim contentClaim = new StandardContentClaim(resourceClaim, 0L); claimantCounts.put(contentClaim, new AtomicInteger(1)); http://git-wip-us.apache.org/repos/asf/nifi/blob/e1f0ba5a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestVolatileContentRepository.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestVolatileContentRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestVolatileContentRepository.java index feed31a..cebe91b 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestVolatileContentRepository.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestVolatileContentRepository.java @@ -83,7 +83,7 @@ public class TestVolatileContentRepository { final ContentRepository mockRepo = Mockito.mock(ContentRepository.class); contentRepo.setBackupRepository(mockRepo); - final ResourceClaim resourceClaim = claimManager.newResourceClaim("container", "section", "1000", true); + final ResourceClaim resourceClaim = claimManager.newResourceClaim("container", "section", "1000", true, false); final ContentClaim contentClaim = new StandardContentClaim(resourceClaim, 0L); Mockito.when(mockRepo.create(Matchers.anyBoolean())).thenReturn(contentClaim); http://git-wip-us.apache.org/repos/asf/nifi/blob/e1f0ba5a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java index 674f78f..b2ea0b9 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java @@ -87,10 +87,10 @@ public class TestWriteAheadFlowFileRepository { when(connection.getFlowFileQueue()).thenReturn(queue); queueProvider.addConnection(connection); - final ResourceClaim resourceClaim1 = claimManager.newResourceClaim("container", "section", "1", false); + final ResourceClaim resourceClaim1 = claimManager.newResourceClaim("container", "section", "1", false, false); final ContentClaim claim1 = new StandardContentClaim(resourceClaim1, 0L); - final ResourceClaim resourceClaim2 = claimManager.newResourceClaim("container", "section", "2", false); + final ResourceClaim resourceClaim2 = claimManager.newResourceClaim("container", "section", "2", false, false); final ContentClaim claim2 = new StandardContentClaim(resourceClaim2, 0L); // Create a flowfile repo, update it once with a FlowFile that points to one resource claim. Then, http://git-wip-us.apache.org/repos/asf/nifi/blob/e1f0ba5a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/claim/TestStandardResourceClaimManager.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/claim/TestStandardResourceClaimManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/claim/TestStandardResourceClaimManager.java index d29105a..867810e 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/claim/TestStandardResourceClaimManager.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/claim/TestStandardResourceClaimManager.java @@ -52,7 +52,7 @@ public class TestStandardResourceClaimManager { } }; - final ResourceClaim resourceClaim = manager.newResourceClaim("container", "section", "id", false); + final ResourceClaim resourceClaim = manager.newResourceClaim("container", "section", "id", false, false); assertEquals(1, manager.incrementClaimantCount(resourceClaim)); // increment claimant count to 1. assertEquals(1, manager.getClaimantCount(resourceClaim));
