http://git-wip-us.apache.org/repos/asf/nifi/blob/68d94cc0/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileFlowFileRepository.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileFlowFileRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileFlowFileRepository.java index fe34fe0..a85b23b 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileFlowFileRepository.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileFlowFileRepository.java @@ -22,7 +22,9 @@ import java.util.List; import java.util.concurrent.atomic.AtomicLong; import org.apache.nifi.controller.FlowFileQueue; -import org.apache.nifi.controller.repository.claim.ContentClaimManager; +import org.apache.nifi.controller.repository.claim.ContentClaim; +import org.apache.nifi.controller.repository.claim.ResourceClaim; +import org.apache.nifi.controller.repository.claim.ResourceClaimManager; /** * <p> @@ -32,10 +34,10 @@ import org.apache.nifi.controller.repository.claim.ContentClaimManager; public class VolatileFlowFileRepository implements FlowFileRepository { private final AtomicLong idGenerator = new AtomicLong(0L); - private ContentClaimManager claimManager; // effectively final + private ResourceClaimManager claimManager; // effectively final @Override - public void initialize(final ContentClaimManager claimManager) { + public void initialize(final ResourceClaimManager claimManager) { this.claimManager = claimManager; } @@ -58,23 +60,49 @@ public class VolatileFlowFileRepository implements FlowFileRepository { public void close() throws IOException { } + private void markDestructable(final ContentClaim contentClaim) { + if (contentClaim == null) { + return; + } + + final ResourceClaim resourceClaim = contentClaim.getResourceClaim(); + if (resourceClaim == null) { + return; + } + + claimManager.markDestructable(resourceClaim); + } + + private int getClaimantCount(final ContentClaim claim) { + if (claim == null) { + return 0; + } + + final ResourceClaim resourceClaim = claim.getResourceClaim(); + if (resourceClaim == null) { + return 0; + } + + return claimManager.getClaimantCount(resourceClaim); + } + @Override public void updateRepository(final Collection<RepositoryRecord> records) throws IOException { for (final RepositoryRecord record : records) { if (record.getType() == RepositoryRecordType.DELETE) { // For any DELETE record that we have, if current claim's claimant count <= 0, mark it as destructable - if (record.getCurrentClaim() != null && claimManager.getClaimantCount(record.getCurrentClaim()) <= 0) { - claimManager.markDestructable(record.getCurrentClaim()); + if (record.getCurrentClaim() != null && getClaimantCount(record.getCurrentClaim()) <= 0) { + markDestructable(record.getCurrentClaim()); } // If the original claim is different than the current claim and the original claim has a claimant count <= 0, mark it as destructable. - if (record.getOriginalClaim() != null && !record.getOriginalClaim().equals(record.getCurrentClaim()) && claimManager.getClaimantCount(record.getOriginalClaim()) <= 0) { - claimManager.markDestructable(record.getOriginalClaim()); + if (record.getOriginalClaim() != null && !record.getOriginalClaim().equals(record.getCurrentClaim()) && getClaimantCount(record.getOriginalClaim()) <= 0) { + markDestructable(record.getOriginalClaim()); } } else if (record.getType() == RepositoryRecordType.UPDATE) { // if we have an update, and the original is no longer needed, mark original as destructable - if (record.getOriginalClaim() != null && record.getCurrentClaim() != record.getOriginalClaim() && claimManager.getClaimantCount(record.getOriginalClaim()) <= 0) { - claimManager.markDestructable(record.getOriginalClaim()); + if (record.getOriginalClaim() != null && record.getCurrentClaim() != record.getOriginalClaim() && getClaimantCount(record.getOriginalClaim()) <= 0) { + markDestructable(record.getOriginalClaim()); } } }
http://git-wip-us.apache.org/repos/asf/nifi/blob/68d94cc0/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java index f2df821..5ee5fb5 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java @@ -43,11 +43,12 @@ import java.util.concurrent.atomic.AtomicLong; import org.apache.nifi.controller.FlowFileQueue; import org.apache.nifi.controller.repository.claim.ContentClaim; -import org.apache.nifi.controller.repository.claim.ContentClaimManager; +import org.apache.nifi.controller.repository.claim.ResourceClaim; +import org.apache.nifi.controller.repository.claim.ResourceClaimManager; +import org.apache.nifi.controller.repository.claim.StandardContentClaim; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.util.FormatUtils; import org.apache.nifi.util.NiFiProperties; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.wali.MinimalLockingWriteAheadLog; @@ -86,7 +87,7 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis // effectively final private WriteAheadRepository<RepositoryRecord> wal; private WriteAheadRecordSerde serde; - private ContentClaimManager claimManager; + private ResourceClaimManager claimManager; // WALI Provides the ability to register callbacks for when a Partition or the entire Repository is sync'ed with the underlying disk. // We keep track of this because we need to ensure that the ContentClaims are destroyed only after the FlowFile Repository has been @@ -125,7 +126,7 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis } @Override - public void initialize(final ContentClaimManager claimManager) throws IOException { + public void initialize(final ResourceClaimManager claimManager) throws IOException { this.claimManager = claimManager; Files.createDirectories(flowFileRepositoryPath); @@ -168,6 +169,32 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis updateRepository(records, alwaysSync); } + private void markDestructable(final ContentClaim contentClaim) { + if (contentClaim == null) { + return; + } + + final ResourceClaim resourceClaim = contentClaim.getResourceClaim(); + if (resourceClaim == null) { + return; + } + + claimManager.markDestructable(resourceClaim); + } + + private int getClaimantCount(final ContentClaim claim) { + if (claim == null) { + return 0; + } + + final ResourceClaim resourceClaim = claim.getResourceClaim(); + if (resourceClaim == null) { + return 0; + } + + return claimManager.getClaimantCount(resourceClaim); + } + private void updateRepository(final Collection<RepositoryRecord> records, final boolean sync) throws IOException { for (final RepositoryRecord record : records) { if (record.getType() != RepositoryRecordType.DELETE && record.getType() != RepositoryRecordType.CONTENTMISSING && record.getDestination() == null) { @@ -190,17 +217,17 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis for (final RepositoryRecord record : records) { if (record.getType() == RepositoryRecordType.DELETE) { // For any DELETE record that we have, if current claim's claimant count <= 0, mark it as destructable - if (record.getCurrentClaim() != null && claimManager.getClaimantCount(record.getCurrentClaim()) <= 0) { + if (record.getCurrentClaim() != null && getClaimantCount(record.getCurrentClaim()) <= 0) { claimsToAdd.add(record.getCurrentClaim()); } // If the original claim is different than the current claim and the original claim has a claimant count <= 0, mark it as destructable. - if (record.getOriginalClaim() != null && !record.getOriginalClaim().equals(record.getCurrentClaim()) && claimManager.getClaimantCount(record.getOriginalClaim()) <= 0) { + if (record.getOriginalClaim() != null && !record.getOriginalClaim().equals(record.getCurrentClaim()) && getClaimantCount(record.getOriginalClaim()) <= 0) { claimsToAdd.add(record.getOriginalClaim()); } } else if (record.getType() == RepositoryRecordType.UPDATE) { // if we have an update, and the original is no longer needed, mark original as destructable - if (record.getOriginalClaim() != null && record.getCurrentClaim() != record.getOriginalClaim() && claimManager.getClaimantCount(record.getOriginalClaim()) <= 0) { + if (record.getOriginalClaim() != null && record.getCurrentClaim() != record.getOriginalClaim() && getClaimantCount(record.getOriginalClaim()) <= 0) { claimsToAdd.add(record.getOriginalClaim()); } } @@ -212,7 +239,7 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis BlockingQueue<ContentClaim> claimQueue = claimsAwaitingDestruction.get(partitionKey); if (claimQueue == null) { claimQueue = new LinkedBlockingQueue<>(); - BlockingQueue<ContentClaim> existingClaimQueue = claimsAwaitingDestruction.putIfAbsent(partitionKey, claimQueue); + final BlockingQueue<ContentClaim> existingClaimQueue = claimsAwaitingDestruction.putIfAbsent(partitionKey, claimQueue); if (existingClaimQueue != null) { claimQueue = existingClaimQueue; } @@ -222,9 +249,6 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis } } - private void markDestructable(final ContentClaim claim) { - claimManager.markDestructable(claim); - } @Override public void onSync(final int partitionIndex) { @@ -307,7 +331,7 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis for (final RepositoryRecord record : recordList) { final ContentClaim claim = record.getCurrentClaim(); if (claim != null) { - claimManager.incrementClaimantCount(claim); + claimManager.incrementClaimantCount(claim.getResourceClaim()); } } @@ -339,7 +363,7 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis final long start = System.nanoTime(); final int numRecordsCheckpointed = checkpoint(); final long end = System.nanoTime(); - final long millis = TimeUnit.MILLISECONDS.convert((end - start), TimeUnit.NANOSECONDS); + final long millis = TimeUnit.MILLISECONDS.convert(end - start, TimeUnit.NANOSECONDS); logger.info("Successfully checkpointed FlowFile Repository with {} records in {} milliseconds", new Object[]{numRecordsCheckpointed, millis}); } catch (final IOException e) { @@ -378,9 +402,9 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis private Map<String, FlowFileQueue> flowFileQueueMap = null; private long recordsRestored = 0L; - private final ContentClaimManager claimManager; + private final ResourceClaimManager claimManager; - public WriteAheadRecordSerde(final ContentClaimManager claimManager) { + public WriteAheadRecordSerde(final ResourceClaimManager claimManager) { this.claimManager = claimManager; } @@ -518,7 +542,7 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis } final StandardFlowFileRecord.Builder ffBuilder = new StandardFlowFileRecord.Builder(); - RepositoryRecord record = currentRecordStates.get(recordId); + final RepositoryRecord record = currentRecordStates.get(recordId); ffBuilder.id(recordId); if (record != null) { ffBuilder.fromFlowFile(record.getCurrent()); @@ -705,11 +729,16 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis out.write(0); } else { out.write(1); - writeString(claim.getId(), out); - writeString(claim.getContainer(), out); - writeString(claim.getSection(), out); + + final ResourceClaim resourceClaim = claim.getResourceClaim(); + writeString(resourceClaim.getId(), out); + writeString(resourceClaim.getContainer(), out); + writeString(resourceClaim.getSection(), out); + out.writeLong(claim.getOffset()); + out.writeLong(claim.getLength()); + out.writeLong(offset); - out.writeBoolean(claim.isLossTolerant()); + out.writeBoolean(resourceClaim.isLossTolerant()); } } @@ -726,6 +755,17 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis final String container = readString(in); final String section = readString(in); + + final long resourceOffset; + final long resourceLength; + if (serializationVersion < 7) { + resourceOffset = 0L; + resourceLength = -1L; + } else { + resourceOffset = in.readLong(); + resourceLength = in.readLong(); + } + final long claimOffset = in.readLong(); final boolean lossTolerant; @@ -735,8 +775,11 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis lossTolerant = false; } - final ContentClaim existingClaim = claimManager.newContentClaim(container, section, claimId, lossTolerant); - ffBuilder.contentClaim(existingClaim); + final ResourceClaim resourceClaim = claimManager.newResourceClaim(container, section, claimId, lossTolerant); + final StandardContentClaim contentClaim = new StandardContentClaim(resourceClaim, resourceOffset); + contentClaim.setLength(resourceLength); + + ffBuilder.contentClaim(contentClaim); ffBuilder.contentClaimOffset(claimOffset); } else if (claimExists == -1) { throw new EOFException(); @@ -785,16 +828,16 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis throw new EOFException(); } if (firstValue == 0xff && secondValue == 0xff) { - int ch1 = in.read(); - int ch2 = in.read(); - int ch3 = in.read(); - int ch4 = in.read(); + final int ch1 = in.read(); + final int ch2 = in.read(); + final int ch3 = in.read(); + final int ch4 = in.read(); if ((ch1 | ch2 | ch3 | ch4) < 0) { throw new EOFException(); } - return ((ch1 << 24) + (ch2 << 16) + (ch3 << 8) + (ch4)); + return (ch1 << 24) + (ch2 << 16) + (ch3 << 8) + ch4; } else { - return ((firstValue << 8) + (secondValue)); + return (firstValue << 8) + secondValue; } } @@ -834,7 +877,7 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis @Override public int getVersion() { - return 6; + return 7; } @Override http://git-wip-us.apache.org/repos/asf/nifi/blob/68d94cc0/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaim.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaim.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaim.java index a8a6963..ea047c7 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaim.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaim.java @@ -16,7 +16,6 @@ */ package org.apache.nifi.controller.repository.claim; -import java.util.concurrent.atomic.AtomicInteger; /** * <p> @@ -28,115 +27,79 @@ import java.util.concurrent.atomic.AtomicInteger; */ public final class StandardContentClaim implements ContentClaim, Comparable<ContentClaim> { - private final String id; - private final String container; - private final String section; - private final boolean lossTolerant; - private final AtomicInteger claimantCount = new AtomicInteger(0); - private final int hashCode; + private final ResourceClaim resourceClaim; + private final long offset; + private volatile long length; - StandardContentClaim(final String container, final String section, final String id, final boolean lossTolerant) { - this.container = container.intern(); - this.section = section.intern(); - this.id = id; - this.lossTolerant = lossTolerant; - - hashCode = (int) (17 + 19 * (id.hashCode()) + 19 * container.hashCode() + 19 * section.hashCode()); - } - - @Override - public boolean isLossTolerant() { - return lossTolerant; + public StandardContentClaim(final ResourceClaim resourceClaim, final long offset) { + this.resourceClaim = resourceClaim; + this.offset = offset; + this.length = -1L; } - /** - * @return the unique identifier for this claim - */ - @Override - public String getId() { - return id; + public void setLength(final long length) { + this.length = length; } - /** - * @return the container identifier in which this claim is held - */ @Override - public String getContainer() { - return container; + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result; + result = prime * result + (int) (length ^ length >>> 32); + result = prime * result + (int) (offset ^ offset >>> 32); + result = prime * result + (resourceClaim == null ? 0 : resourceClaim.hashCode()); + return result; } - /** - * @return the section within a given container the claim is held - */ @Override - public String getSection() { - return section; - } - - int getClaimantCount() { - return claimantCount.get(); - } + public boolean equals(final Object obj) { + if (this == obj) { + return true; + } - int decrementClaimantCount() { - return claimantCount.decrementAndGet(); - } + if (obj == null) { + return false; + } - int incrementClaimantCount() { - return claimantCount.incrementAndGet(); - } + if (!(obj instanceof ContentClaim)) { + return false; + } - /** - * Provides the natural ordering for ContentClaim objects. By default they are sorted by their id, then container, then section - * - * @param other other claim - * @return x such that x <=1 if this is less than other; - * x=0 if this.equals(other); - * x >= 1 if this is greater than other - */ - @Override - public int compareTo(final ContentClaim other) { - final int idComparison = id.compareTo(other.getId()); - if (idComparison != 0) { - return idComparison; + final ContentClaim other = (ContentClaim) obj; + if (length != other.getLength()) { + return false; } - final int containerComparison = container.compareTo(other.getContainer()); - if (containerComparison != 0) { - return containerComparison; + if (offset != other.getOffset()) { + return false; } - return section.compareTo(other.getSection()); + return resourceClaim.equals(other.getResourceClaim()); } @Override - public boolean equals(final Object other) { - if (this == other) { - return true; + public int compareTo(final ContentClaim o) { + final int resourceComp = resourceClaim.compareTo(o.getResourceClaim()); + if (resourceComp != 0) { + return resourceComp; } - if (other == null) { - return false; - } - if (hashCode != other.hashCode()) { - // We check hash code before instanceof because instanceof is fairly expensive and for - // StandardContentClaim, calling hashCode() simply returns a pre-calculated value. - return false; - } + return Long.compare(offset, o.getOffset()); + } - if (!(other instanceof ContentClaim)) { - return false; - } - final ContentClaim otherClaim = (ContentClaim) other; - return id.equals(otherClaim.getId()) && container.equals(otherClaim.getContainer()) && section.equals(otherClaim.getSection()); + @Override + public ResourceClaim getResourceClaim() { + return resourceClaim; } @Override - public int hashCode() { - return hashCode; + public long getOffset() { + return offset; } @Override - public String toString() { - return "ContentClaim[id=" + id + ", container=" + container + ", section=" + section + "]"; + public long getLength() { + return length; } } http://git-wip-us.apache.org/repos/asf/nifi/blob/68d94cc0/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaimManager.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaimManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaimManager.java deleted file mode 100644 index b68f95e..0000000 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaimManager.java +++ /dev/null @@ -1,145 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.controller.repository.claim; - -import java.util.Collection; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class StandardContentClaimManager implements ContentClaimManager { - - private static final ConcurrentMap<ContentClaim, AtomicInteger> claimantCounts = new ConcurrentHashMap<>(); - private static final Logger logger = LoggerFactory.getLogger(StandardContentClaimManager.class); - - private static final BlockingQueue<ContentClaim> destructableClaims = new LinkedBlockingQueue<>(50000); - - @Override - public ContentClaim newContentClaim(final String container, final String section, final String id, final boolean lossTolerant) { - return new StandardContentClaim(container, section, id, lossTolerant); - } - - private static AtomicInteger getCounter(final ContentClaim claim) { - if (claim == null) { - return null; - } - - AtomicInteger counter = claimantCounts.get(claim); - if (counter != null) { - return counter; - } - - counter = new AtomicInteger(0); - AtomicInteger existingCounter = claimantCounts.putIfAbsent(claim, counter); - return (existingCounter == null) ? counter : existingCounter; - } - - @Override - public int getClaimantCount(final ContentClaim claim) { - if (claim == null) { - return 0; - } - final AtomicInteger counter = claimantCounts.get(claim); - return (counter == null) ? 0 : counter.get(); - } - - @Override - public int decrementClaimantCount(final ContentClaim claim) { - if (claim == null) { - return 0; - } - - final AtomicInteger counter = claimantCounts.get(claim); - if (counter == null) { - logger.debug("Decrementing claimant count for {} but claimant count is not known. Returning -1", claim); - return -1; - } - - final int newClaimantCount = counter.decrementAndGet(); - logger.debug("Decrementing claimant count for {} to {}", claim, newClaimantCount); - if (newClaimantCount == 0) { - claimantCounts.remove(claim); - } - return newClaimantCount; - } - - @Override - public int incrementClaimantCount(final ContentClaim claim) { - return incrementClaimantCount(claim, false); - } - - @Override - public int incrementClaimantCount(final ContentClaim claim, final boolean newClaim) { - final AtomicInteger counter = getCounter(claim); - - final int newClaimantCount = counter.incrementAndGet(); - logger.debug("Incrementing claimant count for {} to {}", claim, newClaimantCount); - // If the claimant count moved from 0 to 1, remove it from the queue of destructable claims. - if (!newClaim && newClaimantCount == 1) { - destructableClaims.remove(claim); - } - return newClaimantCount; - } - - @Override - public void markDestructable(final ContentClaim claim) { - if (claim == null) { - return; - } - - if (getClaimantCount(claim) > 0) { - return; - } - - logger.debug("Marking claim {} as destructable", claim); - try { - while (!destructableClaims.offer(claim, 30, TimeUnit.MINUTES)) { - } - } catch (final InterruptedException ie) { - } - } - - @Override - public void drainDestructableClaims(final Collection<ContentClaim> destination, final int maxElements) { - final int drainedCount = destructableClaims.drainTo(destination, maxElements); - logger.debug("Drained {} destructable claims to {}", drainedCount, destination); - } - - @Override - public void drainDestructableClaims(final Collection<ContentClaim> destination, final int maxElements, final long timeout, final TimeUnit unit) { - try { - final ContentClaim firstClaim = destructableClaims.poll(timeout, unit); - if (firstClaim != null) { - destination.add(firstClaim); - destructableClaims.drainTo(destination, maxElements - 1); - } - } catch (final InterruptedException e) { - } - } - - @Override - public void purge() { - claimantCounts.clear(); - } - -} http://git-wip-us.apache.org/repos/asf/nifi/blob/68d94cc0/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaim.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaim.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaim.java new file mode 100644 index 0000000..bd3ed5a --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaim.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller.repository.claim; + +import java.util.concurrent.atomic.AtomicInteger; + +public class StandardResourceClaim implements ResourceClaim, Comparable<ResourceClaim> { + private final String id; + private final String container; + private final String section; + private final boolean lossTolerant; + private final AtomicInteger claimantCount = new AtomicInteger(0); + private final int hashCode; + + public StandardResourceClaim(final String container, final String section, final String id, final boolean lossTolerant) { + this.container = container.intern(); + this.section = section.intern(); + this.id = id; + this.lossTolerant = lossTolerant; + + hashCode = 17 + 19 * id.hashCode() + 19 * container.hashCode() + 19 * section.hashCode(); + } + + @Override + public boolean isLossTolerant() { + return lossTolerant; + } + + /** + * @return the unique identifier for this claim + */ + @Override + public String getId() { + return id; + } + + /** + * @return the container identifier in which this claim is held + */ + @Override + public String getContainer() { + return container; + } + + /** + * @return the section within a given container the claim is held + */ + @Override + public String getSection() { + return section; + } + + int getClaimantCount() { + return claimantCount.get(); + } + + int decrementClaimantCount() { + return claimantCount.decrementAndGet(); + } + + int incrementClaimantCount() { + return claimantCount.incrementAndGet(); + } + + /** + * Provides the natural ordering for ResourceClaim objects. By default they are sorted by their id, then container, then section + * + * @param other other claim + * @return x such that x <=1 if this is less than other; + * x=0 if this.equals(other); + * x >= 1 if this is greater than other + */ + @Override + public int compareTo(final ResourceClaim other) { + final int idComparison = id.compareTo(other.getId()); + if (idComparison != 0) { + return idComparison; + } + + final int containerComparison = container.compareTo(other.getContainer()); + if (containerComparison != 0) { + return containerComparison; + } + + return section.compareTo(other.getSection()); + } + + @Override + public boolean equals(final Object other) { + if (this == other) { + return true; + } + + if (other == null) { + return false; + } + if (hashCode != other.hashCode()) { + // We check hash code before instanceof because instanceof is fairly expensive and for + // StandardResourceClaim, calling hashCode() simply returns a pre-calculated value. + return false; + } + + if (!(other instanceof ResourceClaim)) { + return false; + } + final ResourceClaim otherClaim = (ResourceClaim) other; + return id.equals(otherClaim.getId()) && container.equals(otherClaim.getContainer()) && section.equals(otherClaim.getSection()); + } + + @Override + public int hashCode() { + return hashCode; + } + + @Override + public String toString() { + return "StandardResourceClaim[id=" + id + ", container=" + container + ", section=" + section + "]"; + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/68d94cc0/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaimManager.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaimManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaimManager.java new file mode 100644 index 0000000..4826ac3 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaimManager.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller.repository.claim; + +import java.util.Collection; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class StandardResourceClaimManager implements ResourceClaimManager { + + private static final ConcurrentMap<ResourceClaim, AtomicInteger> claimantCounts = new ConcurrentHashMap<>(); + private static final Logger logger = LoggerFactory.getLogger(StandardResourceClaimManager.class); + + private static final BlockingQueue<ResourceClaim> destructableClaims = new LinkedBlockingQueue<>(50000); + + @Override + public ResourceClaim newResourceClaim(final String container, final String section, final String id, final boolean lossTolerant) { + return new StandardResourceClaim(container, section, id, lossTolerant); + } + + private static AtomicInteger getCounter(final ResourceClaim claim) { + if (claim == null) { + return null; + } + + AtomicInteger counter = claimantCounts.get(claim); + if (counter != null) { + return counter; + } + + counter = new AtomicInteger(0); + final AtomicInteger existingCounter = claimantCounts.putIfAbsent(claim, counter); + return existingCounter == null ? counter : existingCounter; + } + + @Override + public int getClaimantCount(final ResourceClaim claim) { + if (claim == null) { + return 0; + } + final AtomicInteger counter = claimantCounts.get(claim); + return counter == null ? 0 : counter.get(); + } + + @Override + public int decrementClaimantCount(final ResourceClaim claim) { + if (claim == null) { + return 0; + } + + final AtomicInteger counter = claimantCounts.get(claim); + if (counter == null) { + logger.debug("Decrementing claimant count for {} but claimant count is not known. Returning -1", claim); + return -1; + } + + final int newClaimantCount = counter.decrementAndGet(); + logger.debug("Decrementing claimant count for {} to {}", claim, newClaimantCount); + if (newClaimantCount == 0) { + claimantCounts.remove(claim); + } + return newClaimantCount; + } + + @Override + public int incrementClaimantCount(final ResourceClaim claim) { + return incrementClaimantCount(claim, false); + } + + @Override + public int incrementClaimantCount(final ResourceClaim claim, final boolean newClaim) { + final AtomicInteger counter = getCounter(claim); + + final int newClaimantCount = counter.incrementAndGet(); + logger.debug("Incrementing claimant count for {} to {}", claim, newClaimantCount); + // If the claimant count moved from 0 to 1, remove it from the queue of destructable claims. + if (!newClaim && newClaimantCount == 1) { + destructableClaims.remove(claim); + } + return newClaimantCount; + } + + @Override + public void markDestructable(final ResourceClaim claim) { + if (claim == null) { + return; + } + + if (getClaimantCount(claim) > 0) { + return; + } + + logger.debug("Marking claim {} as destructable", claim); + try { + while (!destructableClaims.offer(claim, 30, TimeUnit.MINUTES)) { + } + } catch (final InterruptedException ie) { + } + } + + @Override + public void drainDestructableClaims(final Collection<ResourceClaim> destination, final int maxElements) { + final int drainedCount = destructableClaims.drainTo(destination, maxElements); + logger.debug("Drained {} destructable claims to {}", drainedCount, destination); + } + + @Override + public void drainDestructableClaims(final Collection<ResourceClaim> destination, final int maxElements, final long timeout, final TimeUnit unit) { + try { + final ResourceClaim firstClaim = destructableClaims.poll(timeout, unit); + if (firstClaim != null) { + destination.add(firstClaim); + destructableClaims.drainTo(destination, maxElements - 1); + } + } catch (final InterruptedException e) { + } + } + + @Override + public void purge() { + claimantCounts.clear(); + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/68d94cc0/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java index 364dcad..a17bd40 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java @@ -30,8 +30,8 @@ import java.util.List; import java.util.concurrent.TimeUnit; import org.apache.nifi.controller.repository.FlowFileRecord; -import org.apache.nifi.controller.repository.claim.ContentClaim; -import org.apache.nifi.controller.repository.claim.ContentClaimManager; +import org.apache.nifi.controller.repository.claim.ResourceClaim; +import org.apache.nifi.controller.repository.claim.ResourceClaimManager; import org.junit.Test; import org.mockito.Mockito; @@ -48,7 +48,7 @@ public class TestFileSystemSwapManager { final FlowFileQueue flowFileQueue = Mockito.mock(FlowFileQueue.class); Mockito.when(flowFileQueue.getIdentifier()).thenReturn("87bb99fe-412c-49f6-a441-d1b0af4e20b4"); - final List<FlowFileRecord> records = FileSystemSwapManager.deserializeFlowFiles(in, flowFileQueue, new NopContentClaimManager()); + final List<FlowFileRecord> records = FileSystemSwapManager.deserializeFlowFiles(in, flowFileQueue, new NopResourceClaimManager()); assertEquals(10000, records.size()); for (final FlowFileRecord record : records) { @@ -58,43 +58,43 @@ public class TestFileSystemSwapManager { } } - public class NopContentClaimManager implements ContentClaimManager { + public class NopResourceClaimManager implements ResourceClaimManager { @Override - public ContentClaim newContentClaim(String container, String section, String id, boolean lossTolerant) { + public ResourceClaim newResourceClaim(String container, String section, String id, boolean lossTolerant) { return null; } @Override - public int getClaimantCount(ContentClaim claim) { + public int getClaimantCount(ResourceClaim claim) { return 0; } @Override - public int decrementClaimantCount(ContentClaim claim) { + public int decrementClaimantCount(ResourceClaim claim) { return 0; } @Override - public int incrementClaimantCount(ContentClaim claim) { + public int incrementClaimantCount(ResourceClaim claim) { return 0; } @Override - public int incrementClaimantCount(ContentClaim claim, boolean newClaim) { + public int incrementClaimantCount(ResourceClaim claim, boolean newClaim) { return 0; } @Override - public void markDestructable(ContentClaim claim) { + public void markDestructable(ResourceClaim claim) { } @Override - public void drainDestructableClaims(Collection<ContentClaim> destination, int maxElements) { + public void drainDestructableClaims(Collection<ResourceClaim> destination, int maxElements) { } @Override - public void drainDestructableClaims(Collection<ContentClaim> destination, int maxElements, long timeout, TimeUnit unit) { + public void drainDestructableClaims(Collection<ResourceClaim> destination, int maxElements, long timeout, TimeUnit unit) { } @Override http://git-wip-us.apache.org/repos/asf/nifi/blob/68d94cc0/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java index ada0775..5ffcb3d 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java @@ -16,11 +16,10 @@ */ package org.apache.nifi.controller.repository; -import org.apache.nifi.controller.repository.FileSystemRepository; -import org.apache.nifi.controller.repository.ContentNotFoundException; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNotSame; import static org.junit.Assert.assertTrue; import java.io.ByteArrayInputStream; @@ -39,10 +38,11 @@ import java.util.Arrays; import java.util.List; import org.apache.nifi.controller.repository.claim.ContentClaim; -import org.apache.nifi.controller.repository.claim.StandardContentClaimManager; +import org.apache.nifi.controller.repository.claim.StandardResourceClaimManager; import org.apache.nifi.controller.repository.util.DiskUtils; +import org.apache.nifi.stream.io.StreamUtils; import org.apache.nifi.util.NiFiProperties; - +import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -53,20 +53,25 @@ public class TestFileSystemRepository { public static final File helloWorldFile = new File("src/test/resources/hello.txt"); private FileSystemRepository repository = null; + private final File rootFile = new File("target/content_repository"); @Before public void setup() throws IOException { System.setProperty(NiFiProperties.PROPERTIES_FILE_PATH, "src/test/resources/nifi.properties"); - final File repo = new File("target/content_repository"); - if (repo.exists()) { - DiskUtils.deleteRecursively(repo); + if (rootFile.exists()) { + DiskUtils.deleteRecursively(rootFile); } repository = new FileSystemRepository(); - repository.initialize(new StandardContentClaimManager()); + repository.initialize(new StandardResourceClaimManager()); repository.purge(); } + @After + public void shutdown() throws IOException { + repository.shutdown(); + } + @Test public void testCreateContentClaim() throws IOException { // value passed to #create is irrelevant because the FileSystemRepository does not currently support loss tolerance. @@ -98,6 +103,79 @@ public class TestFileSystemRepository { } @Test + public void testResourceClaimReused() throws IOException { + final ContentClaim claim1 = repository.create(false); + final ContentClaim claim2 = repository.create(false); + + // should not be equal because claim1 may still be in use + assertNotSame(claim1.getResourceClaim(), claim2.getResourceClaim()); + + try (final OutputStream out = repository.write(claim1)) { + } + + final ContentClaim claim3 = repository.create(false); + assertEquals(claim1.getResourceClaim(), claim3.getResourceClaim()); + } + + @Test + public void testResourceClaimNotReusedAfterRestart() throws IOException, InterruptedException { + final ContentClaim claim1 = repository.create(false); + try (final OutputStream out = repository.write(claim1)) { + } + + repository.shutdown(); + Thread.sleep(1000L); + + repository = new FileSystemRepository(); + repository.initialize(new StandardResourceClaimManager()); + repository.purge(); + + final ContentClaim claim2 = repository.create(false); + assertNotSame(claim1.getResourceClaim(), claim2.getResourceClaim()); + } + + + @Test + public void testWriteWithNoContent() throws IOException { + final ContentClaim claim1 = repository.create(false); + try (final OutputStream out = repository.write(claim1)) { + out.write("Hello".getBytes()); + } + + final ContentClaim claim2 = repository.create(false); + assertEquals(claim1.getResourceClaim(), claim2.getResourceClaim()); + try (final OutputStream out = repository.write(claim2)) { + + } + + final ContentClaim claim3 = repository.create(false); + assertEquals(claim1.getResourceClaim(), claim3.getResourceClaim()); + try (final OutputStream out = repository.write(claim3)) { + out.write(" World".getBytes()); + } + + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + try (final InputStream in = repository.read(claim1)) { + StreamUtils.copy(in, baos); + } + + assertEquals("Hello", baos.toString()); + + baos.reset(); + try (final InputStream in = repository.read(claim2)) { + StreamUtils.copy(in, baos); + } + assertEquals("", baos.toString()); + assertEquals(0, baos.size()); + + baos.reset(); + try (final InputStream in = repository.read(claim3)) { + StreamUtils.copy(in, baos); + } + assertEquals(" World", baos.toString()); + } + + @Test public void testRemoveDeletesFileIfNoClaimants() throws IOException { final ContentClaim claim = repository.create(true); assertNotNull(claim); @@ -155,8 +233,16 @@ public class TestFileSystemRepository { final byte[] data = Files.readAllBytes(path); final byte[] expected = Files.readAllBytes(testFile.toPath()); assertTrue(Arrays.equals(expected, data)); + + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + try (final InputStream in = repository.read(claim)) { + StreamUtils.copy(in, baos); + } + + assertTrue(Arrays.equals(expected, baos.toByteArray())); } + @Test public void testImportFromStream() throws IOException { final ContentClaim claim = repository.create(false); @@ -314,9 +400,9 @@ public class TestFileSystemRepository { } final ContentClaim destination = repository.create(true); - final byte[] headerBytes = (header == null) ? null : header.getBytes(); - final byte[] footerBytes = (footer == null) ? null : footer.getBytes(); - final byte[] demarcatorBytes = (demarcator == null) ? null : demarcator.getBytes(); + final byte[] headerBytes = header == null ? null : header.getBytes(); + final byte[] footerBytes = footer == null ? null : footer.getBytes(); + final byte[] demarcatorBytes = demarcator == null ? null : demarcator.getBytes(); repository.merge(claims, destination, headerBytes, footerBytes, demarcatorBytes); final StringBuilder sb = new StringBuilder(); @@ -334,8 +420,12 @@ public class TestFileSystemRepository { } final String expectedText = sb.toString(); final byte[] expected = expectedText.getBytes(); - final Path path = getPath(destination); - final byte[] actual = Files.readAllBytes(path); + + final ByteArrayOutputStream baos = new ByteArrayOutputStream((int) destination.getLength()); + try (final InputStream in = repository.read(destination)) { + StreamUtils.copy(in, baos); + } + final byte[] actual = baos.toByteArray(); assertTrue(Arrays.equals(expected, actual)); } http://git-wip-us.apache.org/repos/asf/nifi/blob/68d94cc0/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java index 3486875..b1fd4c7 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java @@ -51,8 +51,11 @@ import org.apache.nifi.controller.FlowFileQueue; import org.apache.nifi.controller.ProcessScheduler; import org.apache.nifi.controller.StandardFlowFileQueue; import org.apache.nifi.controller.repository.claim.ContentClaim; -import org.apache.nifi.controller.repository.claim.ContentClaimManager; -import org.apache.nifi.controller.repository.claim.StandardContentClaimManager; +import org.apache.nifi.controller.repository.claim.ResourceClaim; +import org.apache.nifi.controller.repository.claim.ResourceClaimManager; +import org.apache.nifi.controller.repository.claim.StandardContentClaim; +import org.apache.nifi.controller.repository.claim.StandardResourceClaim; +import org.apache.nifi.controller.repository.claim.StandardResourceClaimManager; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.processor.Relationship; @@ -179,7 +182,7 @@ public class TestStandardProcessSession { when(connectable.getConnections()).thenReturn(new HashSet<>(connList)); contentRepo = new MockContentRepository(); - contentRepo.initialize(new StandardContentClaimManager()); + contentRepo.initialize(new StandardResourceClaimManager()); flowFileRepo = new MockFlowFileRepository(); final ProcessContext context = new ProcessContext(connectable, new AtomicLong(0L), contentRepo, flowFileRepo, flowFileEventRepo, counterRepo, provenanceRepo); @@ -194,10 +197,10 @@ public class TestStandardProcessSession { assertEquals(1, contentRepo.getExistingClaims().size()); final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder() - .contentClaim(claim) - .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") - .entryDate(System.currentTimeMillis()) - .build(); + .contentClaim(claim) + .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") + .entryDate(System.currentTimeMillis()) + .build(); flowFileQueue.put(flowFileRecord); FlowFile flowFile = session.get(); @@ -295,10 +298,10 @@ public class TestStandardProcessSession { public void testAppendAfterSessionClosesStream() throws IOException { final ContentClaim claim = contentRepo.create(false); final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder() - .contentClaim(claim) - .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") - .entryDate(System.currentTimeMillis()) - .build(); + .contentClaim(claim) + .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") + .entryDate(System.currentTimeMillis()) + .build(); flowFileQueue.put(flowFileRecord); FlowFile flowFile = session.get(); assertNotNull(flowFile); @@ -316,12 +319,12 @@ public class TestStandardProcessSession { public void testReadAfterSessionClosesStream() throws IOException { final ContentClaim claim = contentRepo.create(false); final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder() - .contentClaim(claim) - .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") - .entryDate(System.currentTimeMillis()) - .build(); + .contentClaim(claim) + .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") + .entryDate(System.currentTimeMillis()) + .build(); flowFileQueue.put(flowFileRecord); - FlowFile flowFile = session.get(); + final FlowFile flowFile = session.get(); assertNotNull(flowFile); final ObjectHolder<InputStream> inputStreamHolder = new ObjectHolder<>(null); session.read(flowFile, new InputStreamCallback() { @@ -337,10 +340,10 @@ public class TestStandardProcessSession { public void testStreamAfterSessionClosesStream() throws IOException { final ContentClaim claim = contentRepo.create(false); final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder() - .contentClaim(claim) - .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") - .entryDate(System.currentTimeMillis()) - .build(); + .contentClaim(claim) + .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") + .entryDate(System.currentTimeMillis()) + .build(); flowFileQueue.put(flowFileRecord); FlowFile flowFile = session.get(); assertNotNull(flowFile); @@ -361,10 +364,10 @@ public class TestStandardProcessSession { public void testWriteAfterSessionClosesStream() throws IOException { final ContentClaim claim = contentRepo.create(false); final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder() - .contentClaim(claim) - .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") - .entryDate(System.currentTimeMillis()) - .build(); + .contentClaim(claim) + .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") + .entryDate(System.currentTimeMillis()) + .build(); flowFileQueue.put(flowFileRecord); FlowFile flowFile = session.get(); assertNotNull(flowFile); @@ -382,9 +385,9 @@ public class TestStandardProcessSession { public void testCreateThenRollbackRemovesContent() throws IOException { final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder() - .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") - .entryDate(System.currentTimeMillis()) - .build(); + .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") + .entryDate(System.currentTimeMillis()) + .build(); flowFileQueue.put(flowFileRecord); final StreamCallback nop = new StreamCallback() { @@ -402,7 +405,7 @@ public class TestStandardProcessSession { session.write(flowFile2, nop); - FlowFile flowFile3 = session.create(); + final FlowFile flowFile3 = session.create(); session.write(flowFile3, nop); session.rollback(); @@ -412,14 +415,14 @@ public class TestStandardProcessSession { @Test public void testForksNotEmittedIfFilesDeleted() throws IOException { final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder() - .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") - .entryDate(System.currentTimeMillis()) - .build(); + .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") + .entryDate(System.currentTimeMillis()) + .build(); flowFileQueue.put(flowFileRecord); - FlowFile orig = session.get(); - FlowFile newFlowFile = session.create(orig); + final FlowFile orig = session.get(); + final FlowFile newFlowFile = session.create(orig); session.remove(newFlowFile); session.commit(); @@ -429,14 +432,14 @@ public class TestStandardProcessSession { @Test public void testProvenanceEventsEmittedForForkIfNotRemoved() throws IOException { final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder() - .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") - .entryDate(System.currentTimeMillis()) - .build(); + .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") + .entryDate(System.currentTimeMillis()) + .build(); flowFileQueue.put(flowFileRecord); - FlowFile orig = session.get(); - FlowFile newFlowFile = session.create(orig); + final FlowFile orig = session.get(); + final FlowFile newFlowFile = session.create(orig); session.transfer(newFlowFile, new Relationship.Builder().name("A").build()); session.commit(); @@ -446,15 +449,15 @@ public class TestStandardProcessSession { @Test public void testProvenanceEventsEmittedForRemove() throws IOException { final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder() - .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") - .entryDate(System.currentTimeMillis()) - .build(); + .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") + .entryDate(System.currentTimeMillis()) + .build(); flowFileQueue.put(flowFileRecord); - FlowFile orig = session.get(); - FlowFile newFlowFile = session.create(orig); - FlowFile secondNewFlowFile = session.create(orig); + final FlowFile orig = session.get(); + final FlowFile newFlowFile = session.create(orig); + final FlowFile secondNewFlowFile = session.create(orig); session.remove(newFlowFile); session.transfer(secondNewFlowFile, new Relationship.Builder().name("A").build()); session.commit(); @@ -465,16 +468,16 @@ public class TestStandardProcessSession { @Test public void testUpdateAttributesThenJoin() throws IOException { final FlowFileRecord flowFileRecord1 = new StandardFlowFileRecord.Builder() - .id(1L) - .addAttribute("uuid", "11111111-1111-1111-1111-111111111111") - .entryDate(System.currentTimeMillis()) - .build(); + .id(1L) + .addAttribute("uuid", "11111111-1111-1111-1111-111111111111") + .entryDate(System.currentTimeMillis()) + .build(); final FlowFileRecord flowFileRecord2 = new StandardFlowFileRecord.Builder() - .id(2L) - .addAttribute("uuid", "22222222-2222-2222-2222-222222222222") - .entryDate(System.currentTimeMillis()) - .build(); + .id(2L) + .addAttribute("uuid", "22222222-2222-2222-2222-222222222222") + .entryDate(System.currentTimeMillis()) + .build(); flowFileQueue.put(flowFileRecord1); flowFileQueue.put(flowFileRecord2); @@ -538,17 +541,17 @@ public class TestStandardProcessSession { @Test public void testForkOneToOneReported() throws IOException { final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder() - .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") - .entryDate(System.currentTimeMillis()) - .build(); + .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") + .entryDate(System.currentTimeMillis()) + .build(); flowFileQueue.put(flowFileRecord); // we have to increment the ID generator because we are creating a FlowFile without the FlowFile Repository's knowledge flowFileRepo.idGenerator.getAndIncrement(); - FlowFile orig = session.get(); - FlowFile newFlowFile = session.create(orig); + final FlowFile orig = session.get(); + final FlowFile newFlowFile = session.create(orig); session.transfer(newFlowFile, new Relationship.Builder().name("A").build()); session.getProvenanceReporter().fork(newFlowFile, Collections.singleton(orig)); session.remove(orig); @@ -566,7 +569,7 @@ public class TestStandardProcessSession { @Test public void testProcessExceptionThrownIfCallbackThrowsInOutputStreamCallback() { - FlowFile ff1 = session.create(); + final FlowFile ff1 = session.create(); final RuntimeException runtime = new RuntimeException(); try { @@ -610,7 +613,7 @@ public class TestStandardProcessSession { @Test public void testProcessExceptionThrownIfCallbackThrowsInStreamCallback() { - FlowFile ff1 = session.create(); + final FlowFile ff1 = session.create(); final RuntimeException runtime = new RuntimeException(); try { @@ -655,41 +658,16 @@ public class TestStandardProcessSession { @Test public void testMissingFlowFileExceptionThrownWhenUnableToReadData() { final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder() - .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") - .entryDate(System.currentTimeMillis()) - .contentClaim(new ContentClaim() { - @Override - public int compareTo(ContentClaim arg0) { - return 0; - } - - @Override - public String getId() { - return "0"; - } - - @Override - public String getContainer() { - return "x"; - } - - @Override - public String getSection() { - return "x"; - } - - @Override - public boolean isLossTolerant() { - return true; - } - }) - .size(1L) - .build(); + .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") + .entryDate(System.currentTimeMillis()) + .contentClaim(new StandardContentClaim(new StandardResourceClaim("x", "x", "0", true), 0L)) + .size(1L) + .build(); flowFileQueue.put(flowFileRecord); // attempt to read the data. try { - FlowFile ff1 = session.get(); + final FlowFile ff1 = session.get(); session.read(ff1, new InputStreamCallback() { @Override @@ -704,41 +682,16 @@ public class TestStandardProcessSession { @Test public void testMissingFlowFileExceptionThrownWhenUnableToReadDataStreamCallback() { final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder() - .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") - .entryDate(System.currentTimeMillis()) - .contentClaim(new ContentClaim() { - @Override - public int compareTo(ContentClaim arg0) { - return 0; - } - - @Override - public String getId() { - return "0"; - } - - @Override - public String getContainer() { - return "x"; - } - - @Override - public String getSection() { - return "x"; - } - - @Override - public boolean isLossTolerant() { - return true; - } - }) - .size(1L) - .build(); + .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") + .entryDate(System.currentTimeMillis()) + .contentClaim(new StandardContentClaim(new StandardResourceClaim("x", "x", "0", true), 0L)) + .size(1L) + .build(); flowFileQueue.put(flowFileRecord); // attempt to read the data. try { - FlowFile ff1 = session.get(); + final FlowFile ff1 = session.get(); session.write(ff1, new StreamCallback() { @Override @@ -753,35 +706,10 @@ public class TestStandardProcessSession { @Test public void testContentNotFoundExceptionThrownWhenUnableToReadDataStreamCallbackOffsetTooLarge() { final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder() - .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") - .entryDate(System.currentTimeMillis()) - .contentClaim(new ContentClaim() { - @Override - public int compareTo(ContentClaim arg0) { - return 0; - } - - @Override - public String getId() { - return "0"; - } - - @Override - public String getContainer() { - return "container"; - } - - @Override - public String getSection() { - return "section"; - } - - @Override - public boolean isLossTolerant() { - return true; - } - }) - .build(); + .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") + .entryDate(System.currentTimeMillis()) + .contentClaim(new StandardContentClaim(new StandardResourceClaim("x", "x", "0", true), 0L)) + .build(); flowFileQueue.put(flowFileRecord); FlowFile ff1 = session.get(); @@ -794,43 +722,18 @@ public class TestStandardProcessSession { session.commit(); final FlowFileRecord flowFileRecord2 = new StandardFlowFileRecord.Builder() - .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") - .entryDate(System.currentTimeMillis()) - .contentClaim(new ContentClaim() { - @Override - public int compareTo(ContentClaim arg0) { - return 0; - } - - @Override - public String getId() { - return "0"; - } - - @Override - public String getContainer() { - return "container"; - } - - @Override - public String getSection() { - return "section"; - } - - @Override - public boolean isLossTolerant() { - return true; - } - }) - .contentClaimOffset(1000L) - .size(1000L) - .build(); + .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") + .entryDate(System.currentTimeMillis()) + .contentClaim(new StandardContentClaim(new StandardResourceClaim("x", "x", "0", true), 0L)) + .contentClaimOffset(1000L) + .size(1000L) + .build(); flowFileQueue.put(flowFileRecord2); // attempt to read the data. try { session.get(); - FlowFile ff2 = session.get(); + final FlowFile ff2 = session.get(); session.write(ff2, new StreamCallback() { @Override public void process(InputStream in, OutputStream out) throws IOException { @@ -844,34 +747,11 @@ public class TestStandardProcessSession { @Test public void testContentNotFoundExceptionThrownWhenUnableToReadDataOffsetTooLarge() { final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder() - .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") - .entryDate(System.currentTimeMillis()) - .contentClaim(new ContentClaim() { - @Override - public int compareTo(ContentClaim arg0) { - return 0; - } - - @Override - public String getId() { - return "0"; - } - - @Override - public String getContainer() { - return "container"; - } + .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") + .entryDate(System.currentTimeMillis()) + .contentClaim(new StandardContentClaim(new StandardResourceClaim("x", "x", "0", true), 0L)) + .build(); - @Override - public String getSection() { - return "section"; - } - - @Override - public boolean isLossTolerant() { - return true; - } - }).build(); flowFileQueue.put(flowFileRecord); FlowFile ff1 = session.get(); @@ -884,41 +764,17 @@ public class TestStandardProcessSession { session.commit(); final FlowFileRecord flowFileRecord2 = new StandardFlowFileRecord.Builder() - .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") - .entryDate(System.currentTimeMillis()) - .contentClaim(new ContentClaim() { - @Override - public int compareTo(ContentClaim arg0) { - return 0; - } - - @Override - public String getId() { - return "0"; - } - - @Override - public String getContainer() { - return "container"; - } - - @Override - public String getSection() { - return "section"; - } + .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") + .entryDate(System.currentTimeMillis()) + .contentClaim(new StandardContentClaim(new StandardResourceClaim("x", "x", "0", true), 0L)) - @Override - public boolean isLossTolerant() { - return true; - } - }) - .contentClaimOffset(1000L).size(1L).build(); + .contentClaimOffset(1000L).size(1L).build(); flowFileQueue.put(flowFileRecord2); // attempt to read the data. try { session.get(); - FlowFile ff2 = session.get(); + final FlowFile ff2 = session.get(); session.read(ff2, new InputStreamCallback() { @Override public void process(InputStream in) throws IOException { @@ -931,7 +787,7 @@ public class TestStandardProcessSession { @Test public void testProcessExceptionThrownIfCallbackThrowsInInputStreamCallback() { - FlowFile ff1 = session.create(); + final FlowFile ff1 = session.create(); final RuntimeException runtime = new RuntimeException(); try { @@ -975,7 +831,7 @@ public class TestStandardProcessSession { @Test public void testCreateEmitted() throws IOException { - FlowFile newFlowFile = session.create(); + final FlowFile newFlowFile = session.create(); session.transfer(newFlowFile, new Relationship.Builder().name("A").build()); session.commit(); @@ -1009,9 +865,9 @@ public class TestStandardProcessSession { @Test public void testContentModifiedEmittedAndNotAttributesModified() throws IOException { final FlowFileRecord flowFile = new StandardFlowFileRecord.Builder() - .id(1L) - .addAttribute("uuid", "000000000000-0000-0000-0000-00000000") - .build(); + .id(1L) + .addAttribute("uuid", "000000000000-0000-0000-0000-00000000") + .build(); this.flowFileQueue.put(flowFile); FlowFile existingFlowFile = session.get(); @@ -1035,9 +891,9 @@ public class TestStandardProcessSession { @Test public void testAttributesModifiedEmitted() throws IOException { final FlowFileRecord flowFile = new StandardFlowFileRecord.Builder() - .id(1L) - .addAttribute("uuid", "000000000000-0000-0000-0000-00000000") - .build(); + .id(1L) + .addAttribute("uuid", "000000000000-0000-0000-0000-00000000") + .build(); this.flowFileQueue.put(flowFile); FlowFile existingFlowFile = session.get(); @@ -1104,7 +960,7 @@ public class TestStandardProcessSession { } @Override - public void initialize(ContentClaimManager claimManager) throws IOException { + public void initialize(ResourceClaimManager claimManager) throws IOException { } } @@ -1112,9 +968,9 @@ public class TestStandardProcessSession { private final AtomicLong idGenerator = new AtomicLong(0L); private final AtomicLong claimsRemoved = new AtomicLong(0L); - private ContentClaimManager claimManager; + private ResourceClaimManager claimManager; - private ConcurrentMap<ContentClaim, AtomicInteger> claimantCounts = new ConcurrentHashMap<>(); + private final ConcurrentMap<ContentClaim, AtomicInteger> claimantCounts = new ConcurrentHashMap<>(); @Override public void shutdown() { @@ -1124,9 +980,10 @@ public class TestStandardProcessSession { final Set<ContentClaim> claims = new HashSet<>(); for (long i = 0; i < idGenerator.get(); i++) { - final ContentClaim claim = claimManager.newContentClaim("container", "section", String.valueOf(i), false); - if (getClaimantCount(claim) > 0) { - claims.add(claim); + final ResourceClaim resourceClaim = new StandardResourceClaim("container", "section", String.valueOf(i), false); + final ContentClaim contentClaim = new StandardContentClaim(resourceClaim, 0L); + if (getClaimantCount(contentClaim) > 0) { + claims.add(contentClaim); } } @@ -1135,15 +992,17 @@ public class TestStandardProcessSession { @Override public ContentClaim create(boolean lossTolerant) throws IOException { - final ContentClaim claim = claimManager.newContentClaim("container", "section", String.valueOf(idGenerator.getAndIncrement()), false); - claimantCounts.put(claim, new AtomicInteger(1)); - final Path path = getPath(claim); + final ResourceClaim resourceClaim = claimManager.newResourceClaim("container", "section", String.valueOf(idGenerator.getAndIncrement()), false); + final ContentClaim contentClaim = new StandardContentClaim(resourceClaim, 0L); + + claimantCounts.put(contentClaim, new AtomicInteger(1)); + final Path path = getPath(contentClaim); final Path parent = path.getParent(); if (Files.exists(parent) == false) { Files.createDirectories(parent); } - Files.createFile(getPath(claim)); - return claim; + Files.createFile(getPath(contentClaim)); + return contentClaim; } @Override @@ -1219,7 +1078,8 @@ public class TestStandardProcessSession { return 0; } - private Path getPath(final ContentClaim claim) { + private Path getPath(final ContentClaim contentClaim) { + final ResourceClaim claim = contentClaim.getResourceClaim(); return Paths.get("target").resolve("contentRepo").resolve(claim.getContainer()).resolve(claim.getSection()).resolve(claim.getId()); } @@ -1315,7 +1175,7 @@ public class TestStandardProcessSession { } @Override - public void initialize(ContentClaimManager claimManager) throws IOException { + public void initialize(ResourceClaimManager claimManager) throws IOException { this.claimManager = claimManager; } } http://git-wip-us.apache.org/repos/asf/nifi/blob/68d94cc0/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestVolatileContentRepository.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestVolatileContentRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestVolatileContentRepository.java index a32f321..5733164 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestVolatileContentRepository.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestVolatileContentRepository.java @@ -17,6 +17,7 @@ package org.apache.nifi.controller.repository; import org.apache.nifi.controller.repository.VolatileContentRepository; + import static org.junit.Assert.assertEquals; import java.io.ByteArrayOutputStream; @@ -29,10 +30,11 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import org.apache.nifi.controller.repository.claim.ContentClaim; -import org.apache.nifi.controller.repository.claim.ContentClaimManager; -import org.apache.nifi.controller.repository.claim.StandardContentClaimManager; +import org.apache.nifi.controller.repository.claim.ResourceClaim; +import org.apache.nifi.controller.repository.claim.ResourceClaimManager; +import org.apache.nifi.controller.repository.claim.StandardContentClaim; +import org.apache.nifi.controller.repository.claim.StandardResourceClaimManager; import org.apache.nifi.util.NiFiProperties; - import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -41,11 +43,11 @@ import org.mockito.Mockito; public class TestVolatileContentRepository { - private ContentClaimManager claimManager; + private ResourceClaimManager claimManager; @Before public void setup() { - claimManager = new StandardContentClaimManager(); + claimManager = new StandardResourceClaimManager(); } @Test @@ -81,8 +83,9 @@ public class TestVolatileContentRepository { final ContentRepository mockRepo = Mockito.mock(ContentRepository.class); contentRepo.setBackupRepository(mockRepo); - final ContentClaim newClaim = claimManager.newContentClaim("container", "section", "1000", true); - Mockito.when(mockRepo.create(Matchers.anyBoolean())).thenReturn(newClaim); + final ResourceClaim resourceClaim = claimManager.newResourceClaim("container", "section", "1000", true); + final ContentClaim contentClaim = new StandardContentClaim(resourceClaim, 0L); + Mockito.when(mockRepo.create(Matchers.anyBoolean())).thenReturn(contentClaim); final ByteArrayOutputStream overflowStream = new ByteArrayOutputStream(); Mockito.when(mockRepo.write(Matchers.any(ContentClaim.class))).thenReturn(overflowStream); http://git-wip-us.apache.org/repos/asf/nifi/blob/68d94cc0/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java index 054ef5e..2138928 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java @@ -34,7 +34,7 @@ import java.util.List; import org.apache.nifi.connectable.Connection; import org.apache.nifi.controller.FlowFileQueue; -import org.apache.nifi.controller.repository.claim.StandardContentClaimManager; +import org.apache.nifi.controller.repository.claim.StandardResourceClaimManager; import org.apache.nifi.util.file.FileUtils; import org.junit.Test; @@ -53,7 +53,7 @@ public class TestWriteAheadFlowFileRepository { } final WriteAheadFlowFileRepository repo = new WriteAheadFlowFileRepository(); - repo.initialize(new StandardContentClaimManager()); + repo.initialize(new StandardResourceClaimManager()); final List<Connection> connectionList = new ArrayList<>(); final QueueProvider queueProvider = new QueueProvider() { @@ -119,7 +119,7 @@ public class TestWriteAheadFlowFileRepository { // restore final WriteAheadFlowFileRepository repo2 = new WriteAheadFlowFileRepository(); - repo2.initialize(new StandardContentClaimManager()); + repo2.initialize(new StandardResourceClaimManager()); repo2.loadFlowFiles(queueProvider, 0L); assertEquals(1, flowFileCollection.size());
