This is an automated email from the ASF dual-hosted git repository.
pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 7e1d9be9cab NIFI-15732 - Revert NIFI-15570 - Partial defragmentation
of Content Repository via tail-claim truncation (#11024)
7e1d9be9cab is described below
commit 7e1d9be9cab0421d143016bb5f714cfa11ef4536
Author: Pierre Villard <[email protected]>
AuthorDate: Thu Mar 19 16:15:33 2026 +0100
NIFI-15732 - Revert NIFI-15570 - Partial defragmentation of Content
Repository via tail-claim truncation (#11024)
---
.../controller/repository/claim/ContentClaim.java | 8 -
.../repository/claim/ResourceClaimManager.java | 21 --
.../repository/FileSystemRepository.java | 227 +---------------
.../repository/WriteAheadFlowFileRepository.java | 137 +++-------
.../nifi/controller/TestFileSystemSwapManager.java | 9 -
.../repository/TestFileSystemRepository.java | 287 ++-------------------
.../TestWriteAheadFlowFileRepository.java | 283 --------------------
.../repository/claim/StandardContentClaim.java | 10 -
.../claim/StandardResourceClaimManager.java | 31 ---
.../claim/TestStandardResourceClaimManager.java | 51 ----
.../repository/ByteArrayContentRepository.java | 5 -
.../tests/system/GenerateTruncatableFlowFiles.java | 115 ---------
.../services/org.apache.nifi.processor.Processor | 1 -
.../ContentClaimTruncationAfterRestartIT.java | 163 ------------
.../repositories/ContentClaimTruncationIT.java | 153 -----------
15 files changed, 59 insertions(+), 1442 deletions(-)
diff --git
a/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/claim/ContentClaim.java
b/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/claim/ContentClaim.java
index 54745f9d28f..5c1d76bebbe 100644
---
a/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/claim/ContentClaim.java
+++
b/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/claim/ContentClaim.java
@@ -44,12 +44,4 @@ public interface ContentClaim extends
Comparable<ContentClaim> {
* @return the length of this ContentClaim
*/
long getLength();
-
- /**
- * Indicates whether or not this ContentClaim is a candidate for
truncation.
- * @return true if this ContentClaim is a candidate for truncation, false
otherwise
- */
- default boolean isTruncationCandidate() {
- return false;
- }
}
diff --git
a/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/claim/ResourceClaimManager.java
b/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/claim/ResourceClaimManager.java
index 4a54d371a1f..4c68383d86b 100644
---
a/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/claim/ResourceClaimManager.java
+++
b/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/claim/ResourceClaimManager.java
@@ -112,17 +112,6 @@ public interface ResourceClaimManager {
*/
void markDestructable(ResourceClaim claim);
- /**
- * Indicates that the Resource Claim associated with the given Content
Claim can now be
- * truncated to the start of the ContentClaim. This should only ever be
called after it is
- * guaranteed that the FlowFile Repository has been synchronized with its
underlying
- * storage component for the same reason as described in the {@link
#markDestructable(ResourceClaim)}
- * method.
- *
- * @param claim the ContentClaim that should be used for truncation
- */
- void markTruncatable(ContentClaim claim);
-
/**
* Drains up to {@code maxElements} Content Claims from the internal queue
* of destructable content claims to the given {@code destination} so that
@@ -149,16 +138,6 @@ public interface ResourceClaimManager {
*/
void drainDestructableClaims(Collection<ResourceClaim> destination, int
maxElements, long timeout, TimeUnit unit);
- /**
- * Drains up to {@code maxElements} Content Claims from the internal queue
- * of truncatable content claims to the given {@code destination} so that
- * they can be truncated.
- *
- * @param destination to drain to
- * @param maxElements max items to drain
- */
- void drainTruncatableClaims(Collection<ContentClaim> destination, int
maxElements);
-
/**
* Clears the manager's memory of any and all ResourceClaims that it knows
* about
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
index 8987b6464f2..15ec9e78cdd 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
@@ -47,10 +47,8 @@ import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
-import java.nio.channels.FileChannel;
import java.nio.file.FileVisitResult;
import java.nio.file.Files;
-import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
import java.nio.file.SimpleFileVisitor;
import java.nio.file.StandardOpenOption;
@@ -101,12 +99,9 @@ public class FileSystemRepository implements
ContentRepository {
private final List<String> containerNames;
private final AtomicLong index;
- // Executor handles: BinDestructableClaims, one
ArchiveOrDestroyDestructableClaims per content repository container,
- // TruncateClaims, and archive directory scanning tasks submitted during
initialization.
- private final ScheduledExecutorService executor = new FlowEngine(6,
"FileSystemRepository Workers", true);
+ private final ScheduledExecutorService executor = new FlowEngine(4,
"FileSystemRepository Workers", true);
private final ConcurrentMap<String, BlockingQueue<ResourceClaim>>
reclaimable = new ConcurrentHashMap<>();
private final Map<String, ContainerState> containerStateMap = new
HashMap<>();
- private final TruncationClaimManager truncationClaimManager = new
TruncationClaimManager();
// Queue for claims that are kept open for writing. Ideally, this will be
at
// least as large as the number of threads that will be updating the
repository simultaneously but we don't want
@@ -175,13 +170,12 @@ public class FileSystemRepository implements
ContentRepository {
archiveData = true;
if (maxArchiveSize == null) {
- throw new RuntimeException("No value specified for property
'%s' but archiving is enabled. You must configure the max disk usage in order
to enable archiving.".formatted(
- NiFiProperties.CONTENT_ARCHIVE_MAX_USAGE_PERCENTAGE));
+ throw new RuntimeException("No value specified for property '"
+ +
NiFiProperties.CONTENT_ARCHIVE_MAX_USAGE_PERCENTAGE + "' but archiving is
enabled. You must configure the max disk usage in order to enable archiving.");
}
if
(!MAX_ARCHIVE_SIZE_PATTERN.matcher(maxArchiveSize.trim()).matches()) {
- throw new RuntimeException("Invalid value specified for the
'%s' property. Value must be in format: <XX>%%".formatted(
- NiFiProperties.CONTENT_ARCHIVE_MAX_USAGE_PERCENTAGE));
+ throw new RuntimeException("Invalid value specified for the '"
+ NiFiProperties.CONTENT_ARCHIVE_MAX_USAGE_PERCENTAGE + "' property. Value must
be in format: <XX>%");
}
} else if ("false".equalsIgnoreCase(enableArchiving)) {
archiveData = false;
@@ -244,15 +238,14 @@ public class FileSystemRepository implements
ContentRepository {
this.resourceClaimManager = context.getResourceClaimManager();
this.eventReporter = context.getEventReporter();
- final Map<String, Path> fileRepositoryPaths =
nifiProperties.getContentRepositoryPaths();
+ final Map<String, Path> fileRespositoryPaths =
nifiProperties.getContentRepositoryPaths();
executor.scheduleWithFixedDelay(new BinDestructableClaims(), 1, 1,
TimeUnit.SECONDS);
- for (int i = 0; i < fileRepositoryPaths.size(); i++) {
+ for (int i = 0; i < fileRespositoryPaths.size(); i++) {
executor.scheduleWithFixedDelay(new
ArchiveOrDestroyDestructableClaims(), 1, 1, TimeUnit.SECONDS);
}
final long cleanupMillis =
this.determineCleanupInterval(nifiProperties);
- executor.scheduleWithFixedDelay(new TruncateClaims(), cleanupMillis,
cleanupMillis, TimeUnit.MILLISECONDS);
for (final Map.Entry<String, Path> containerEntry :
containers.entrySet()) {
final String containerName = containerEntry.getKey();
@@ -696,16 +689,7 @@ public class FileSystemRepository implements
ContentRepository {
@Override
public int incrementClaimaintCount(final ContentClaim claim) {
- if (claim == null) {
- return 0;
- }
-
- if (claim.isTruncationCandidate() && claim instanceof final
StandardContentClaim scc) {
- LOG.debug("{} is a truncation candidate, but is being claimed
again. Setting truncation candidate to false", claim);
- scc.setTruncationCandidate(false);
- }
-
- return incrementClaimantCount(claim.getResourceClaim(), false);
+ return incrementClaimantCount(claim == null ? null :
claim.getResourceClaim(), false);
}
protected int incrementClaimantCount(final ResourceClaim resourceClaim,
final boolean newClaim) {
@@ -757,7 +741,6 @@ public class FileSystemRepository implements
ContentRepository {
}
}
- truncationClaimManager.removeTruncationClaims(claim);
return true;
}
@@ -1049,122 +1032,6 @@ public class FileSystemRepository implements
ContentRepository {
resourceClaimManager.purge();
}
- private class TruncateClaims implements Runnable {
-
- @Override
- public void run() {
- final Map<String, Boolean> truncationActivationCache = new
HashMap<>();
-
- // Go through any known truncation claims and truncate them now if
truncation is enabled for their container.
- for (final String container : containerNames) {
- if (isTruncationActiveForContainer(container,
truncationActivationCache)) {
- final List<ContentClaim> toTruncate =
truncationClaimManager.removeTruncationClaims(container);
- if (toTruncate.isEmpty()) {
- continue;
- }
-
- truncateClaims(toTruncate, truncationActivationCache);
- }
- }
-
- // Drain any Truncation Claims from the Resource Claim Manager.
- // If able, truncate those claims. Otherwise, save those claims in
the Truncation Claim Manager to be truncated on the next run.
- // This prevents us from having a case where we could truncate a
big claim but we don't because we're not yet running out of disk space,
- // but then we later start to run out of disk space and lost the
opportunity to truncate that big claim.
- // Loop to drain the entire queue in a single invocation rather
than waiting for the next scheduled run. Because the default
- // interval is 1 minute, waiting for the next run could delay
truncation on a disk that is already under pressure and increases
- // the risk of having too many claims that the queue overflows (in
which case we would lose some optimization).
- while (true) {
- final List<ContentClaim> toTruncate = new ArrayList<>();
- resourceClaimManager.drainTruncatableClaims(toTruncate,
10_000);
- if (toTruncate.isEmpty()) {
- return;
- }
-
- truncateClaims(toTruncate, truncationActivationCache);
- }
- }
-
- private void truncateClaims(final List<ContentClaim> toTruncate, final
Map<String, Boolean> truncationActivationCache) {
- final Map<String, List<ContentClaim>> claimsSkipped = new
HashMap<>();
-
- for (final ContentClaim claim : toTruncate) {
- final String container =
claim.getResourceClaim().getContainer();
- if (!isTruncationActiveForContainer(container,
truncationActivationCache)) {
- LOG.debug("Will not truncate {} because truncation is not
active for container {}; will save for later truncation", claim, container);
- claimsSkipped.computeIfAbsent(container, key -> new
ArrayList<>()).add(claim);
- continue;
- }
-
- if (claim.isTruncationCandidate()) {
- truncate(claim);
- }
- }
-
- claimsSkipped.forEach(truncationClaimManager::addTruncationClaims);
- }
-
- private boolean isTruncationActiveForContainer(final String container,
final Map<String, Boolean> activationCache) {
- // If not archiving data, we consider truncation always active.
- if (!archiveData) {
- return true;
- }
-
- final Boolean cachedValue = activationCache.get(container);
- if (cachedValue != null) {
- return cachedValue;
- }
-
- if (!isArchiveClearedOnLastRun(container)) {
- LOG.debug("Truncation is not active for container {} because
the archive was not cleared on the last run", container);
- activationCache.put(container, false);
- return false;
- }
-
- final long usableSpace;
- try {
- usableSpace = getContainerUsableSpace(container);
- } catch (final IOException ioe) {
- LOG.warn("Failed to determine usable space for container {}.
Will not truncate claims for this container", container, ioe);
- return false;
- }
-
- final Long minUsableSpace =
minUsableContainerBytesForArchive.get(container);
- if (minUsableSpace != null && usableSpace < minUsableSpace) {
- LOG.debug("Truncate is active for Container {} because usable
space of {} bytes is below the desired threshold of {} bytes.",
- container, usableSpace, minUsableSpace);
-
- activationCache.put(container, true);
- return true;
- }
-
- activationCache.put(container, false);
- return false;
- }
-
- private void truncate(final ContentClaim claim) {
- LOG.info("Truncating {} to {} bytes because the FlowFile occupying
the last {} bytes has been removed",
- claim.getResourceClaim(), claim.getOffset(),
claim.getLength());
-
- final Path path = getPath(claim);
- if (path == null) {
- LOG.warn("Cannot truncate {} because the file cannot be
found", claim);
- return;
- }
-
- try (final FileChannel fileChannel = FileChannel.open(path,
StandardOpenOption.WRITE)) {
- fileChannel.truncate(claim.getOffset());
- } catch (final NoSuchFileException nsfe) {
- // This is unlikely but can occur if the claim was truncatable
and the underlying Resource Claim becomes
- // destructable. In this case, we may archive or delete the
entire ResourceClaim. This is safe to ignore,
- // since it means the data is cleaned up anyway.
- LOG.debug("Failed to truncate {} because file [{}] does not
exist", claim, path, nsfe);
- } catch (final IOException e) {
- LOG.warn("Failed to truncate {} to {} bytes", claim,
claim.getOffset(), e);
- }
- }
- }
-
private class BinDestructableClaims implements Runnable {
@Override
@@ -1253,11 +1120,6 @@ public class FileSystemRepository implements
ContentRepository {
final boolean archived = archive(curPath);
LOG.debug("Successfully moved {} to archive", claim);
-
- if (archived) {
- truncationClaimManager.removeTruncationClaims(claim);
- }
-
return archived;
}
@@ -1530,7 +1392,7 @@ public class FileSystemRepository implements
ContentRepository {
if (notYetExceedingThreshold.isEmpty()) {
oldestContainerArchive = System.currentTimeMillis();
} else {
- oldestContainerArchive =
notYetExceedingThreshold.getFirst().getLastModTime();
+ oldestContainerArchive =
notYetExceedingThreshold.get(0).getLastModTime();
}
// Queue up the files in the order that they should be destroyed so
that we don't have to scan the directories for a while.
@@ -1538,11 +1400,10 @@ public class FileSystemRepository implements
ContentRepository {
fileQueue.offer(toEnqueue);
}
-
containerState.setArchiveClearedOnLastRun(notYetExceedingThreshold.isEmpty());
-
final long cleanupMillis = stopWatch.getElapsed(TimeUnit.MILLISECONDS)
- deleteOldestMillis - sortRemainingMillis - deleteExpiredMillis;
LOG.debug("Oldest Archive Date for Container {} is {}; delete expired
= {} ms, sort remaining = {} ms, delete oldest = {} ms, cleanup = {} ms",
containerName, new Date(oldestContainerArchive),
deleteExpiredMillis, sortRemainingMillis, deleteOldestMillis, cleanupMillis);
+ return;
}
private class ArchiveOrDestroyDestructableClaims implements Runnable {
@@ -1682,7 +1543,6 @@ public class FileSystemRepository implements
ContentRepository {
private volatile long bytesUsed = 0L;
private volatile long checkUsedCutoffTimestamp = 0L;
- private volatile boolean archiveClearedOnLastRun = false;
public ContainerState(final String containerName, final boolean
archiveEnabled, final long backPressureBytes, final long capacity) {
this.containerName = containerName;
@@ -1801,24 +1661,6 @@ public class FileSystemRepository implements
ContentRepository {
public void decrementArchiveCount() {
archivedFileCount.decrementAndGet();
}
-
- public void setArchiveClearedOnLastRun(final boolean
archiveClearedOnLastRun) {
- this.archiveClearedOnLastRun = archiveClearedOnLastRun;
- }
-
- public boolean isArchiveClearedOnLastRun() {
- return archiveClearedOnLastRun;
- }
- }
-
- // Visible for testing
- protected boolean isArchiveClearedOnLastRun(final String containerName) {
- final ContainerState containerState =
containerStateMap.get(containerName);
- if (containerState == null) {
- return false;
- }
-
- return containerState.isArchiveClearedOnLastRun();
}
protected static class ClaimLengthPair {
@@ -2040,27 +1882,19 @@ public class FileSystemRepository implements
ContentRepository {
// Mark the claim as no longer being able to be written to
resourceClaimManager.freeze(scc.getResourceClaim());
- // If the content claim length is large (> 1 MB or the max
appendable claim length),
- // mark the claim as a truncation candidate
- final boolean largeClaim = scc.getLength() >
Math.min(1_000_000, maxAppendableClaimLength);
- final boolean nonStartClaim = scc.getOffset() > 0;
- if (largeClaim && nonStartClaim) {
- scc.setTruncationCandidate(true);
- }
-
// ensure that the claim is no longer on the queue
writableClaimQueue.remove(new
ClaimLengthPair(scc.getResourceClaim(), resourceClaimLength));
- LOG.debug("Claim length >= max; Closing {}", this);
+ bcos.close();
+ LOG.debug("Claim lenth >= max; Closing {}", this);
if (LOG.isTraceEnabled()) {
LOG.trace("Stack trace: ", new RuntimeException("Stack
Trace for closing " + this));
}
- bcos.close();
}
}
@Override
- public synchronized ContentClaim newContentClaim() {
+ public synchronized ContentClaim newContentClaim() throws IOException {
scc = new StandardContentClaim(scc.getResourceClaim(),
scc.getOffset() + Math.max(0, scc.getLength()));
initialLength = 0;
bytesWritten = 0L;
@@ -2069,41 +1903,4 @@ public class FileSystemRepository implements
ContentRepository {
}
}
- private static class TruncationClaimManager {
- private static final int MAX_THRESHOLD = 100_000;
- private final Map<String, List<ContentClaim>> truncationClaims = new
HashMap<>();
-
- synchronized void addTruncationClaims(final String container, final
List<ContentClaim> claim) {
- final List<ContentClaim> contentClaims =
truncationClaims.computeIfAbsent(container, c -> new ArrayList<>());
- contentClaims.addAll(claim);
-
- // If we have too many claims, remove the smallest ones so that we
only have the largest MAX_THRESHOLD claims.
- if (contentClaims.size() > MAX_THRESHOLD) {
-
contentClaims.sort(Comparator.comparingLong(ContentClaim::getLength).reversed());
- final List<ContentClaim> discardableClaims =
contentClaims.subList(MAX_THRESHOLD, contentClaims.size());
- LOG.debug("Truncation Claim Manager has more than {} claims
for container {}; discarding {} claims: {}",
- MAX_THRESHOLD, container, discardableClaims.size(),
discardableClaims);
- discardableClaims.clear();
- }
- }
-
- synchronized List<ContentClaim> removeTruncationClaims(final String
container) {
- final List<ContentClaim> removed =
truncationClaims.remove(container);
- return removed == null ? Collections.emptyList() : removed;
- }
-
- synchronized List<ContentClaim> removeTruncationClaims(final
ResourceClaim resourceClaim) {
- final List<ContentClaim> contentClaims =
truncationClaims.get(resourceClaim.getContainer());
- if (contentClaims == null) {
- return Collections.emptyList();
- }
-
- final List<ContentClaim> claimsToRemove = contentClaims.stream()
- .filter(cc -> cc.getResourceClaim().equals(resourceClaim))
- .toList();
-
- contentClaims.removeAll(claimsToRemove);
- return claimsToRemove;
- }
- }
}
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
index b886c062b2e..a04b7527917 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
@@ -21,9 +21,7 @@ import org.apache.nifi.controller.queue.FlowFileQueue;
import org.apache.nifi.controller.repository.claim.ContentClaim;
import org.apache.nifi.controller.repository.claim.ResourceClaim;
import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
-import org.apache.nifi.controller.repository.claim.StandardContentClaim;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
-import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.repository.schema.FieldCache;
import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.util.NiFiProperties;
@@ -100,7 +98,6 @@ public class WriteAheadFlowFileRepository implements
FlowFileRepository, SyncLis
private final List<File> flowFileRepositoryPaths = new ArrayList<>();
private final ScheduledExecutorService checkpointExecutor;
private final int maxCharactersToCache;
- private final long truncationThreshold;
private volatile Collection<SerializedRepositoryRecord> recoveredRecords =
null;
private final Set<ResourceClaim> orphanedResourceClaims =
Collections.synchronizedSet(new HashSet<>());
@@ -135,7 +132,6 @@ public class WriteAheadFlowFileRepository implements
FlowFileRepository, SyncLis
// before the data is destroyed, it's okay because the data will be
unknown to the Content Repository, so it will be destroyed
// on restart.
private final ConcurrentMap<Integer, BlockingQueue<ResourceClaim>>
claimsAwaitingDestruction = new ConcurrentHashMap<>();
- private final ConcurrentMap<Integer, BlockingQueue<ContentClaim>>
claimsAwaitingTruncation = new ConcurrentHashMap<>();
/**
* default no args constructor for service loading only.
@@ -147,7 +143,6 @@ public class WriteAheadFlowFileRepository implements
FlowFileRepository, SyncLis
nifiProperties = null;
retainOrphanedFlowFiles = true;
maxCharactersToCache = 0;
- truncationThreshold = Long.MAX_VALUE;
}
public WriteAheadFlowFileRepository(final NiFiProperties nifiProperties) {
@@ -158,10 +153,6 @@ public class WriteAheadFlowFileRepository implements
FlowFileRepository, SyncLis
retainOrphanedFlowFiles = orphanedFlowFileProperty == null ||
Boolean.parseBoolean(orphanedFlowFileProperty);
this.maxCharactersToCache =
nifiProperties.getIntegerProperty(FLOWFILE_REPO_CACHE_SIZE, DEFAULT_CACHE_SIZE);
- final long maxAppendableClaimLength =
DataUnit.parseDataSize(nifiProperties.getMaxAppendableClaimSize(),
DataUnit.B).longValue();
- // Cap the truncation threshold at 1 MB so that claims larger than 1
MB are always eligible
- // for truncation regardless of how large maxAppendableClaimSize is
configured.
- truncationThreshold = Math.min(1_000_000, maxAppendableClaimLength);
final String directoryName =
nifiProperties.getProperty(FLOWFILE_REPOSITORY_DIRECTORY_PREFIX);
flowFileRepositoryPaths.add(new File(directoryName));
@@ -454,13 +445,12 @@ public class WriteAheadFlowFileRepository implements
FlowFileRepository, SyncLis
// The below code is not entirely thread-safe, but we are OK with that
because the results aren't really harmful.
// Specifically, if two different threads call updateRepository with
DELETE records for the same Content Claim,
// it's quite possible for claimant count to be 0 below, which results
in two different threads adding the Content
- // Claim to the 'claimsAwaitingDestruction' map. As a result, we can
call #markDestructable with the same ContentClaim
+ // Claim to the 'claimsAwaitDestruction' map. As a result, we can call
#markDestructable with the same ContentClaim
// multiple times, and the #markDestructable method is not necessarily
idempotent.
// However, the result of this is that the FileSystem Repository may
end up trying to remove the content multiple times.
// This does not, however, cause problems, as ContentRepository should
handle this
// This does indicate that some refactoring should probably be
performed, though, as this is not a very clean interface.
- final Set<ResourceClaim> destructableClaims = new HashSet<>();
- final Set<ContentClaim> truncatableClaims = new HashSet<>();
+ final Set<ResourceClaim> claimsToAdd = new HashSet<>();
final Set<String> swapLocationsAdded = new HashSet<>();
final Set<String> swapLocationsRemoved = new HashSet<>();
@@ -468,34 +458,20 @@ public class WriteAheadFlowFileRepository implements
FlowFileRepository, SyncLis
for (final RepositoryRecord record : repositoryRecords) {
updateClaimCounts(record);
- final ContentClaim contentClaim = record.getCurrentClaim();
- final boolean truncationCandidate = contentClaim != null &&
contentClaim.isTruncationCandidate();
- final boolean claimChanged =
!Objects.equals(record.getOriginalClaim(), contentClaim);
if (record.getType() == RepositoryRecordType.DELETE) {
- // For any DELETE record that we have, if claim is
destructible or truncatable, mark it so
- if (isDestructable(contentClaim)) {
- destructableClaims.add(contentClaim.getResourceClaim());
- } else if (truncationCandidate) {
- truncatableClaims.add(contentClaim);
+ // For any DELETE record that we have, if claim is
destructible, mark it so
+ if (record.getCurrentClaim() != null &&
isDestructable(record.getCurrentClaim())) {
+
claimsToAdd.add(record.getCurrentClaim().getResourceClaim());
}
- // If the original claim is different than the current claim
and the original claim is destructible
- // or truncatable, mark it so
- if (claimChanged) {
- if (isDestructable(record.getOriginalClaim())) {
-
destructableClaims.add(record.getOriginalClaim().getResourceClaim());
- } else if (record.getOriginalClaim() != null &&
record.getOriginalClaim().isTruncationCandidate()) {
- truncatableClaims.add(record.getOriginalClaim());
- }
+ // If the original claim is different than the current claim
and the original claim is destructible, mark it so
+ if (record.getOriginalClaim() != null &&
!record.getOriginalClaim().equals(record.getCurrentClaim()) &&
isDestructable(record.getOriginalClaim())) {
+
claimsToAdd.add(record.getOriginalClaim().getResourceClaim());
}
} else if (record.getType() == RepositoryRecordType.UPDATE) {
// if we have an update, and the original is no longer needed,
mark original as destructible
- if (claimChanged) {
- if (isDestructable(record.getOriginalClaim())) {
-
destructableClaims.add(record.getOriginalClaim().getResourceClaim());
- } else if (record.getOriginalClaim() != null &&
record.getOriginalClaim().isTruncationCandidate()) {
- truncatableClaims.add(record.getOriginalClaim());
- }
+ if (record.getOriginalClaim() != null &&
record.getCurrentClaim() != record.getOriginalClaim() &&
isDestructable(record.getOriginalClaim())) {
+
claimsToAdd.add(record.getOriginalClaim().getResourceClaim());
}
} else if (record.getType() == RepositoryRecordType.SWAP_OUT) {
final String swapLocation = record.getSwapLocation();
@@ -508,16 +484,13 @@ public class WriteAheadFlowFileRepository implements
FlowFileRepository, SyncLis
}
}
- // Once the content claim counts have been updated for all records,
collect any transient
- // claims that are eligible for destruction or truncation
+ // Once the content claim counts have been updated for all records,
collect any transient claims that are eligible for destruction
for (final RepositoryRecord record : repositoryRecords) {
final List<ContentClaim> transientClaims =
record.getTransientClaims();
if (transientClaims != null) {
for (final ContentClaim transientClaim : transientClaims) {
if (isDestructable(transientClaim)) {
-
destructableClaims.add(transientClaim.getResourceClaim());
- } else if (transientClaim.isTruncationCandidate()) {
- truncatableClaims.add(transientClaim);
+ claimsToAdd.add(transientClaim.getResourceClaim());
}
}
}
@@ -531,15 +504,19 @@ public class WriteAheadFlowFileRepository implements
FlowFileRepository, SyncLis
}
}
- if (!destructableClaims.isEmpty()) {
- // Get / Register a Set<ResourceClaim> for the given Partition
Index
- final BlockingQueue<ResourceClaim> claimQueue =
claimsAwaitingDestruction.computeIfAbsent(partitionIndex, key -> new
LinkedBlockingQueue<>());
- claimQueue.addAll(destructableClaims);
- }
+ if (!claimsToAdd.isEmpty()) {
+ // Get / Register a Set<ContentClaim> for the given Partition Index
+ final Integer partitionKey = Integer.valueOf(partitionIndex);
+ BlockingQueue<ResourceClaim> claimQueue =
claimsAwaitingDestruction.get(partitionKey);
+ if (claimQueue == null) {
+ claimQueue = new LinkedBlockingQueue<>();
+ final BlockingQueue<ResourceClaim> existingClaimQueue =
claimsAwaitingDestruction.putIfAbsent(partitionKey, claimQueue);
+ if (existingClaimQueue != null) {
+ claimQueue = existingClaimQueue;
+ }
+ }
- if (!truncatableClaims.isEmpty()) {
- final BlockingQueue<ContentClaim> claimQueue =
claimsAwaitingTruncation.computeIfAbsent(partitionIndex, key -> new
LinkedBlockingQueue<>());
- claimQueue.addAll(truncatableClaims);
+ claimQueue.addAll(claimsToAdd);
}
}
@@ -589,24 +566,16 @@ public class WriteAheadFlowFileRepository implements
FlowFileRepository, SyncLis
@Override
public void onSync(final int partitionIndex) {
- final BlockingQueue<ResourceClaim> destructionClaimQueue =
claimsAwaitingDestruction.get(partitionIndex);
- if (destructionClaimQueue != null) {
- final Set<ResourceClaim> claimsToDestroy = new HashSet<>();
- destructionClaimQueue.drainTo(claimsToDestroy);
-
- for (final ResourceClaim claim : claimsToDestroy) {
- markDestructable(claim);
- }
+ final BlockingQueue<ResourceClaim> claimQueue =
claimsAwaitingDestruction.get(partitionIndex);
+ if (claimQueue == null) {
+ return;
}
- final BlockingQueue<ContentClaim> truncationClaimQueue =
claimsAwaitingTruncation.get(partitionIndex);
- if (truncationClaimQueue != null) {
- final Set<ContentClaim> claimsToTruncate = new HashSet<>();
- truncationClaimQueue.drainTo(claimsToTruncate);
+ final Set<ResourceClaim> claimsToDestroy = new HashSet<>();
+ claimQueue.drainTo(claimsToDestroy);
- for (final ContentClaim claim : claimsToTruncate) {
- claimManager.markTruncatable(claim);
- }
+ for (final ResourceClaim claim : claimsToDestroy) {
+ markDestructable(claim);
}
}
@@ -620,15 +589,6 @@ public class WriteAheadFlowFileRepository implements
FlowFileRepository, SyncLis
markDestructable(claim);
}
}
-
- for (final BlockingQueue<ContentClaim> claimQueue :
claimsAwaitingTruncation.values()) {
- final Set<ContentClaim> claimsToTruncate = new HashSet<>();
- claimQueue.drainTo(claimsToTruncate);
-
- for (final ContentClaim claim : claimsToTruncate) {
- claimManager.markTruncatable(claim);
- }
- }
}
/**
@@ -763,10 +723,6 @@ public class WriteAheadFlowFileRepository implements
FlowFileRepository, SyncLis
queueMap.put(queue.getIdentifier(), queue);
}
- final Set<StandardContentClaim> truncationEligibleClaims = new
HashSet<>();
- final Set<ContentClaim> forbiddenTruncationClaims = new HashSet<>();
- final Map<ResourceClaim, ContentClaim>
latestContentClaimByResourceClaim = new HashMap<>();
-
final List<SerializedRepositoryRecord> dropRecords = new ArrayList<>();
int numFlowFilesMissingQueue = 0;
long maxId = 0;
@@ -792,15 +748,6 @@ public class WriteAheadFlowFileRepository implements
FlowFileRepository, SyncLis
}
final ContentClaim claim = record.getContentClaim();
-
- // Track the latest Content Claim for each Resource Claim so that
we can determine which claims are eligible for truncation.
- if (claim != null) {
- final ContentClaim latestContentClaim =
latestContentClaimByResourceClaim.get(claim.getResourceClaim());
- if (latestContentClaim == null || claim.getOffset() >
latestContentClaim.getOffset()) {
-
latestContentClaimByResourceClaim.put(claim.getResourceClaim(), claim);
- }
- }
-
final FlowFileQueue flowFileQueue = queueMap.get(queueId);
final boolean orphaned = flowFileQueue == null;
if (orphaned) {
@@ -830,18 +777,6 @@ public class WriteAheadFlowFileRepository implements
FlowFileRepository, SyncLis
continue;
} else if (claim != null) {
- // If the claim exceeds the max appendable claim length on its
own and doesn't start the Resource Claim,
- // we will consider it to be eligible for truncation. However,
if there are multiple FlowFiles sharing the
- // same claim, we cannot truncate it because doing so would
affect the other FlowFiles.
- if (claim.getOffset() > 0 && claim.getLength() >
truncationThreshold && claim instanceof final StandardContentClaim scc) {
- if (forbiddenTruncationClaims.contains(claim) ||
truncationEligibleClaims.contains(scc)) {
- truncationEligibleClaims.remove(scc);
- forbiddenTruncationClaims.add(scc);
- } else {
- truncationEligibleClaims.add(scc);
- }
- }
-
claimManager.incrementClaimantCount(claim.getResourceClaim());
}
@@ -851,14 +786,6 @@ public class WriteAheadFlowFileRepository implements
FlowFileRepository, SyncLis
// If recoveredRecords has been populated it need to be nulled out now
because it is no longer useful and can be garbage collected.
recoveredRecords = null;
- // If any Content Claim was determined to be truncatable, mark it as
such now.
- for (final StandardContentClaim eligible : truncationEligibleClaims) {
- final ContentClaim latestForResource =
latestContentClaimByResourceClaim.get(eligible.getResourceClaim());
- if (Objects.equals(eligible, latestForResource)) {
- eligible.setTruncationCandidate(true);
- }
- }
-
// Set the AtomicLong to 1 more than the max ID so that calls to
#getNextFlowFileSequence() will
// return the appropriate number.
flowFileSequenceGenerator.set(maxId + 1);
@@ -925,7 +852,7 @@ public class WriteAheadFlowFileRepository implements
FlowFileRepository, SyncLis
}
@Override
- public long getMaxFlowFileIdentifier() {
+ public long getMaxFlowFileIdentifier() throws IOException {
// flowFileSequenceGenerator is 1 more than the MAX so that we can
call #getAndIncrement on the AtomicLong
return flowFileSequenceGenerator.get() - 1;
}
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java
index 5a6b9d89bf6..9e0a1324a48 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java
@@ -21,7 +21,6 @@ import org.apache.nifi.controller.repository.FlowFileRecord;
import org.apache.nifi.controller.repository.FlowFileRepository;
import org.apache.nifi.controller.repository.SwapContents;
import org.apache.nifi.controller.repository.SwapManagerInitializationContext;
-import org.apache.nifi.controller.repository.claim.ContentClaim;
import org.apache.nifi.controller.repository.claim.ResourceClaim;
import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
import org.apache.nifi.events.EventReporter;
@@ -224,10 +223,6 @@ public class TestFileSystemSwapManager {
public void markDestructable(ResourceClaim claim) {
}
- @Override
- public void markTruncatable(final ContentClaim claim) {
- }
-
@Override
public void drainDestructableClaims(Collection<ResourceClaim>
destination, int maxElements) {
}
@@ -236,10 +231,6 @@ public class TestFileSystemSwapManager {
public void drainDestructableClaims(Collection<ResourceClaim>
destination, int maxElements, long timeout, TimeUnit unit) {
}
- @Override
- public void drainTruncatableClaims(final Collection<ContentClaim>
destination, final int maxElements) {
- }
-
@Override
public void purge() {
}
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java
index 102b927af3e..7f0e2a7a9d7 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java
@@ -59,7 +59,6 @@ import java.util.Locale;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -82,19 +81,14 @@ public class TestFileSystemRepository {
private Path originalNifiPropertiesFile;
private Path rootFile;
private NiFiProperties nifiProperties;
- private long maxClaimLength;
@BeforeEach
public void setup() throws IOException {
originalNifiPropertiesFile =
Paths.get("src/test/resources/conf/nifi.properties");
rootFile = tempDir.resolve("content_repository");
final String contentRepositoryDirectory =
NiFiProperties.REPOSITORY_CONTENT_PREFIX.concat("default");
- final Map<String, String> additionalProperties = Map.of(
- contentRepositoryDirectory, rootFile.toString(),
- NiFiProperties.CONTENT_ARCHIVE_CLEANUP_FREQUENCY, "1 sec"
- );
+ final Map<String, String> additionalProperties =
Map.of(contentRepositoryDirectory, rootFile.toString());
nifiProperties =
NiFiProperties.createBasicNiFiProperties(originalNifiPropertiesFile.toString(),
additionalProperties);
- maxClaimLength =
DataUnit.parseDataSize(nifiProperties.getMaxAppendableClaimSize(),
DataUnit.B).longValue();
repository = new FileSystemRepository(nifiProperties);
claimManager = new StandardResourceClaimManager();
repository.initialize(new
StandardContentRepositoryContext(claimManager, EventReporter.NO_OP));
@@ -151,6 +145,7 @@ public class TestFileSystemRepository {
@Timeout(30)
public void testClaimsArchivedWhenMarkedDestructable() throws IOException,
InterruptedException {
final ContentClaim contentClaim = repository.create(false);
+ final long configuredAppendableClaimLength =
DataUnit.parseDataSize(nifiProperties.getMaxAppendableClaimSize(),
DataUnit.B).longValue();
final Map<String, Path> containerPaths =
nifiProperties.getContentRepositoryPaths();
assertEquals(1, containerPaths.size());
final String containerName = containerPaths.keySet().iterator().next();
@@ -159,7 +154,7 @@ public class TestFileSystemRepository {
long bytesWritten = 0L;
final byte[] bytes = "Hello
World".getBytes(StandardCharsets.UTF_8);
- while (bytesWritten <= maxClaimLength) {
+ while (bytesWritten <= configuredAppendableClaimLength) {
out.write(bytes);
bytesWritten += bytes.length;
}
@@ -485,9 +480,12 @@ public class TestFileSystemRepository {
repository.incrementClaimaintCount(claim);
final Path claimPath = getPath(claim);
+ final String maxAppendableClaimLength =
nifiProperties.getMaxAppendableClaimSize();
+ final int maxClaimLength =
DataUnit.parseDataSize(maxAppendableClaimLength, DataUnit.B).intValue();
+
// Create the file.
try (final OutputStream out = repository.write(claim)) {
- out.write(new byte[(int) maxClaimLength]);
+ out.write(new byte[maxClaimLength]);
}
int count = repository.decrementClaimantCount(claim);
@@ -504,14 +502,10 @@ public class TestFileSystemRepository {
}
private Path getPath(final ContentClaim claim) {
- return getPath(repository, claim);
- }
-
- private Path getPath(final FileSystemRepository repo, final ContentClaim
claim) {
try {
- final Method m =
FileSystemRepository.class.getDeclaredMethod("getPath", ContentClaim.class);
+ final Method m =
repository.getClass().getDeclaredMethod("getPath", ContentClaim.class);
m.setAccessible(true);
- return (Path) m.invoke(repo, claim);
+ return (Path) m.invoke(repository, claim);
} catch (final Exception e) {
throw new RuntimeException("Could not invoke #getPath on
FileSystemRepository due to " + e);
}
@@ -700,7 +694,9 @@ public class TestFileSystemRepository {
// write at least 1 MB to the output stream so that when we close the
output stream
// the repo won't keep the stream open.
- final byte[] buff = new byte[(int) maxClaimLength];
+ final String maxAppendableClaimLength =
nifiProperties.getMaxAppendableClaimSize();
+ final int maxClaimLength =
DataUnit.parseDataSize(maxAppendableClaimLength, DataUnit.B).intValue();
+ final byte[] buff = new byte[maxClaimLength];
out.write(buff);
out.write(buff);
@@ -901,267 +897,14 @@ public class TestFileSystemRepository {
}
}
- @Test
- public void testTruncationCandidateMarkedOnlyForLargeNonStartClaim()
throws IOException {
- // Create a small claim at offset 0. Write less data than
maxAppendableClaimLength so the ResourceClaim
- // is recycled back to the writable queue.
- final ContentClaim smallClaim = repository.create(false);
- final byte[] smallData = new byte[100];
- try (final OutputStream out = repository.write(smallClaim)) {
- out.write(smallData);
- }
- assertFalse(smallClaim.isTruncationCandidate());
-
- // Now create a large claim on potentially the same ResourceClaim,
writing more than maxAppendableClaimLength
- // to freeze the ResourceClaim. Because smallClaim was small and
recycled, largeClaim will be at a non-zero
- // offset on the same ResourceClaim.
- final ContentClaim largeClaim = repository.create(false);
- final byte[] largeData = new byte[(int) maxClaimLength + 1024];
- try (final OutputStream out = repository.write(largeClaim)) {
- out.write(largeData);
- }
- assertTrue(largeClaim.isTruncationCandidate());
-
- // Negative case: create a standalone large claim at offset 0 (fresh
ResourceClaim)
- // To ensure a fresh ResourceClaim, write large data to all writable
claims to exhaust them,
- // then create a new claim that starts at offset 0.
- // The simplest approach: create claims until we get one at offset 0.
- ContentClaim offsetZeroClaim = null;
- for (int i = 0; i < 20; i++) {
- final ContentClaim candidate = repository.create(false);
- if (candidate instanceof StandardContentClaim standardContentClaim
&& standardContentClaim.getOffset() == 0) {
- // Write large data that exceeds maxAppendableClaimLength
- try (final OutputStream out = repository.write(candidate)) {
- out.write(new byte[(int) maxClaimLength + 1024]);
- }
- offsetZeroClaim = candidate;
- break;
- } else {
- // Write large data to exhaust this claim's ResourceClaim
- try (final OutputStream out = repository.write(candidate)) {
- out.write(new byte[(int) maxClaimLength + 1024]);
- }
- }
- }
-
- assertNotNull(offsetZeroClaim);
- assertFalse(offsetZeroClaim.isTruncationCandidate());
- }
-
- @Test
- public void testIncrementClaimantCountClearsTruncationCandidate() throws
IOException {
- // Create a small claim to start a ResourceClaim, then a large claim
to freeze it
- final ContentClaim smallClaim = repository.create(false);
- try (final OutputStream out = repository.write(smallClaim)) {
- out.write(new byte[100]);
- }
-
- final ContentClaim largeClaim = repository.create(false);
- try (final OutputStream out = repository.write(largeClaim)) {
- out.write(new byte[(int) maxClaimLength + 1024]);
- }
-
- assertTrue(largeClaim.isTruncationCandidate());
-
- // Simulate a clone by incrementing claimant count
- repository.incrementClaimaintCount(largeClaim);
-
- assertFalse(largeClaim.isTruncationCandidate());
- }
-
- @Test
- @Timeout(60)
- public void testTruncateClaimReducesFileSizeAndPreservesEarlierData()
throws IOException, InterruptedException {
- // We need to create our own repository that overrides
getContainerUsableSpace to simulate disk pressure
- shutdown();
-
- final FileSystemRepository localRepository = new
FileSystemRepository(nifiProperties) {
- @Override
- public long getContainerUsableSpace(final String containerName) {
- return 0; // Extreme disk pressure
- }
-
- @Override
- protected boolean isArchiveClearedOnLastRun(final String
containerName) {
- return true;
- }
- };
-
- try {
- final StandardResourceClaimManager localClaimManager = new
StandardResourceClaimManager();
- localRepository.initialize(new
StandardContentRepositoryContext(localClaimManager, EventReporter.NO_OP));
- localRepository.purge();
-
- // Create a small claim then a large claim on the same
ResourceClaim
- final ContentClaim smallClaim = localRepository.create(false);
- final byte[] smallData = "Hello World - small claim
data".getBytes(StandardCharsets.UTF_8);
- try (final OutputStream out = localRepository.write(smallClaim)) {
- out.write(smallData);
- }
-
- final ContentClaim largeClaim = localRepository.create(false);
- final byte[] largeData = new byte[(int) maxClaimLength + 4096];
- new Random().nextBytes(largeData);
- try (final OutputStream out = localRepository.write(largeClaim)) {
- out.write(largeData);
- }
-
- assertTrue(largeClaim.isTruncationCandidate());
-
- // Both claims should share the same resource claim
- assertEquals(smallClaim.getResourceClaim(),
largeClaim.getResourceClaim());
-
- // Get the file path
- final Path filePath = getPath(localRepository, smallClaim);
- assertNotNull(filePath);
- final long originalSize = Files.size(filePath);
- assertTrue(originalSize > maxClaimLength);
-
- // Decrement claimant count for the large claim to 0 (small claim
still holds a reference)
-
localClaimManager.decrementClaimantCount(largeClaim.getResourceClaim());
-
- // Mark the large claim as truncatable
- localClaimManager.markTruncatable(largeClaim);
-
- // Wait for the TruncateClaims background task to truncate the
file. Poll the file size until it shrinks.
- final long expectedTruncatedSize = largeClaim.getOffset();
- while (Files.size(filePath) != expectedTruncatedSize) {
- Thread.sleep(100L);
- }
-
- // Verify the small claim's data is still fully readable
- try (final InputStream in = localRepository.read(smallClaim)) {
- final byte[] readData = readFully(in, smallData.length);
- assertArrayEquals(smallData, readData);
- }
- } finally {
- localRepository.shutdown();
- }
- }
-
- @Test
- @Timeout(60)
- public void testTruncateNotActiveWhenDiskNotPressured() throws
IOException, InterruptedException {
- // Create repository with ample disk space
- shutdown();
-
- final FileSystemRepository localRepository = new
FileSystemRepository(nifiProperties) {
- @Override
- public long getContainerUsableSpace(final String containerName) {
- return Long.MAX_VALUE; // Plenty of space
- }
-
- @Override
- protected boolean isArchiveClearedOnLastRun(final String
containerName) {
- return true;
- }
- };
-
- try {
- final StandardResourceClaimManager localClaimManager = new
StandardResourceClaimManager();
- localRepository.initialize(new
StandardContentRepositoryContext(localClaimManager, EventReporter.NO_OP));
- localRepository.purge();
-
- final ContentClaim smallClaim = localRepository.create(false);
- try (final OutputStream out = localRepository.write(smallClaim)) {
- out.write(new byte[100]);
- }
-
- final ContentClaim largeClaim = localRepository.create(false);
- try (final OutputStream out = localRepository.write(largeClaim)) {
- out.write(new byte[(int) maxClaimLength + 4096]);
- }
-
- assertTrue(largeClaim.isTruncationCandidate());
-
- final Path filePath = getPath(localRepository, smallClaim);
- final long originalSize = Files.size(filePath);
-
-
localClaimManager.decrementClaimantCount(largeClaim.getResourceClaim());
- localClaimManager.markTruncatable(largeClaim);
-
- Thread.sleep(3000L);
- assertEquals(originalSize, Files.size(filePath));
- } finally {
- localRepository.shutdown();
- }
- }
-
- @Test
- @Timeout(90)
- public void testTruncateClaimDeferredThenExecutedWhenPressureStarts()
throws IOException, InterruptedException {
- // Create a repository where disk pressure can be toggled
- shutdown();
-
- final AtomicLong usableSpace = new AtomicLong(Long.MAX_VALUE);
- final FileSystemRepository localRepository = new
FileSystemRepository(nifiProperties) {
- @Override
- public long getContainerUsableSpace(final String containerName) {
- return usableSpace.get();
- }
-
- @Override
- protected boolean isArchiveClearedOnLastRun(final String
containerName) {
- return true;
- }
- };
-
- try {
- final StandardResourceClaimManager localClaimManager = new
StandardResourceClaimManager();
- localRepository.initialize(new
StandardContentRepositoryContext(localClaimManager, EventReporter.NO_OP));
- localRepository.purge();
-
- // Create a small claim then a large claim on the same
ResourceClaim
- final ContentClaim smallClaim = localRepository.create(false);
- try (final OutputStream out = localRepository.write(smallClaim)) {
- out.write(new byte[100]);
- }
-
- final ContentClaim largeClaim = localRepository.create(false);
- try (final OutputStream out = localRepository.write(largeClaim)) {
- out.write(new byte[(int) maxClaimLength + 4096]);
- }
-
- assertTrue(largeClaim.isTruncationCandidate());
- assertEquals(smallClaim.getResourceClaim(),
largeClaim.getResourceClaim());
-
- final Path filePath = getPath(localRepository, smallClaim);
- final long originalSize = Files.size(filePath);
-
-
localClaimManager.decrementClaimantCount(largeClaim.getResourceClaim());
- localClaimManager.markTruncatable(largeClaim);
-
- // Wait for at least one run of the background task with NO
pressure.
- // File should NOT be truncated.
- Thread.sleep(3_000);
- assertEquals(originalSize, Files.size(filePath));
-
- // Now turn on disk pressure
- usableSpace.set(0);
-
- // Wait for the next background task run to truncate the file
- final long expectedTruncatedSize = largeClaim.getOffset();
- while (Files.size(filePath) != expectedTruncatedSize) {
- Thread.sleep(100L);
- }
-
- // Verify the small claim's data is still readable
- try (final InputStream in = localRepository.read(smallClaim)) {
- assertNotNull(in);
- }
- } finally {
- localRepository.shutdown();
- }
- }
-
private byte[] readFully(final InputStream inStream, final int size)
throws IOException {
- final ByteArrayOutputStream outputStream = new
ByteArrayOutputStream(size);
+ final ByteArrayOutputStream baos = new ByteArrayOutputStream(size);
int len;
final byte[] buffer = new byte[size];
while ((len = inStream.read(buffer)) >= 0) {
- outputStream.write(buffer, 0, len);
+ baos.write(buffer, 0, len);
}
- return outputStream.toByteArray();
+ return baos.toByteArray();
}
}
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java
index 3a6453e3124..9d6118e8ebe 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java
@@ -810,287 +810,4 @@ public class TestWriteAheadFlowFileRepository {
return swapLocation;
}
}
-
- //
=========================================================================
- // Truncation Feature: Helpers
- //
=========================================================================
-
- /**
- * Creates a mock queue + connection + queueProvider wired together,
suitable for runtime truncation tests.
- * Returns [claimManager, queueProvider, queue].
- */
- private record RuntimeRepoContext(StandardResourceClaimManager
claimManager, TestQueueProvider queueProvider, FlowFileQueue queue) {
- }
-
- private RuntimeRepoContext createRuntimeRepoContext() {
- final StandardResourceClaimManager claimManager = new
StandardResourceClaimManager();
- final TestQueueProvider queueProvider = new TestQueueProvider();
- final Connection connection = Mockito.mock(Connection.class);
- when(connection.getIdentifier()).thenReturn("1234");
-
when(connection.getDestination()).thenReturn(Mockito.mock(Connectable.class));
- final FlowFileQueue queue = Mockito.mock(FlowFileQueue.class);
- when(queue.getIdentifier()).thenReturn("1234");
- when(connection.getFlowFileQueue()).thenReturn(queue);
- queueProvider.addConnection(connection);
- return new RuntimeRepoContext(claimManager, queueProvider, queue);
- }
-
- private StandardContentClaim createClaim(final ResourceClaim rc, final
long offset, final long length, final boolean truncationCandidate) {
- final StandardContentClaim claim = new StandardContentClaim(rc,
offset);
- claim.setLength(length);
- if (truncationCandidate) {
- claim.setTruncationCandidate(true);
- }
- return claim;
- }
-
- private void createAndDeleteFlowFile(final WriteAheadFlowFileRepository
repo, final FlowFileQueue queue,
- final ContentClaim claim) throws
IOException {
- final FlowFileRecord flowFile = new StandardFlowFileRecord.Builder()
- .id(1L)
- .addAttribute("uuid", UUID.randomUUID().toString())
- .contentClaim(claim)
- .build();
-
- final StandardRepositoryRecord createRecord = new
StandardRepositoryRecord(queue);
- createRecord.setWorking(flowFile, false);
- createRecord.setDestination(queue);
- repo.updateRepository(List.of(createRecord));
-
- final StandardRepositoryRecord deleteRecord = new
StandardRepositoryRecord(queue, flowFile);
- deleteRecord.markForDelete();
- repo.updateRepository(List.of(deleteRecord));
- }
-
- /**
- * Writes FlowFiles (one per claim) to a new repo, closes it, then
recovers into a fresh repo
- * and returns the recovered FlowFileRecords.
- */
- private List<FlowFileRecord> writeAndRecover(final ContentClaim... claims)
throws IOException {
- final ResourceClaimManager writeClaimManager = new
StandardResourceClaimManager();
- final TestQueueProvider writeQueueProvider = new TestQueueProvider();
- final Connection writeConnection = Mockito.mock(Connection.class);
- when(writeConnection.getIdentifier()).thenReturn("1234");
-
when(writeConnection.getDestination()).thenReturn(Mockito.mock(Connectable.class));
- final FlowFileSwapManager swapMgr = new MockFlowFileSwapManager();
- final FlowFileQueue writeQueue = new StandardFlowFileQueue("1234",
null, null, null, swapMgr, null, 10000, "0 sec", 0L, "0 B");
- when(writeConnection.getFlowFileQueue()).thenReturn(writeQueue);
- writeQueueProvider.addConnection(writeConnection);
-
- try (final WriteAheadFlowFileRepository repo = new
WriteAheadFlowFileRepository(niFiProperties)) {
- repo.initialize(writeClaimManager);
- repo.loadFlowFiles(writeQueueProvider);
-
- final List<RepositoryRecord> records = new ArrayList<>();
- for (int i = 0; i < claims.length; i++) {
- final FlowFileRecord ff = new StandardFlowFileRecord.Builder()
- .id(i + 1L)
- .addAttribute("uuid", "11111111-1111-1111-1111-" +
String.format("%012d", i + 1))
- .contentClaim(claims[i])
- .build();
- final StandardRepositoryRecord rec = new
StandardRepositoryRecord(writeQueue);
- rec.setWorking(ff, false);
- rec.setDestination(writeQueue);
- records.add(rec);
- }
- repo.updateRepository(records);
- }
-
- // Recover
- final List<FlowFileRecord> recovered = new ArrayList<>();
- final FlowFileQueue recoveryQueue = Mockito.mock(FlowFileQueue.class);
- when(recoveryQueue.getIdentifier()).thenReturn("1234");
- doAnswer(invocation -> {
- recovered.add((FlowFileRecord) invocation.getArguments()[0]);
- return null;
- }).when(recoveryQueue).put(any(FlowFileRecord.class));
-
- final Connection recoveryConnection = Mockito.mock(Connection.class);
- when(recoveryConnection.getIdentifier()).thenReturn("1234");
- when(recoveryConnection.getFlowFileQueue()).thenReturn(recoveryQueue);
- final TestQueueProvider recoveryQueueProvider = new
TestQueueProvider();
- recoveryQueueProvider.addConnection(recoveryConnection);
-
- try (final WriteAheadFlowFileRepository repo2 = new
WriteAheadFlowFileRepository(niFiProperties)) {
- repo2.initialize(new StandardResourceClaimManager());
- repo2.loadFlowFiles(recoveryQueueProvider);
- }
-
- return recovered;
- }
-
- private FlowFileRecord findRecoveredByOffset(final List<FlowFileRecord>
recovered, final long offset) {
- return recovered.stream()
- .filter(ff -> ff.getContentClaim() != null &&
ff.getContentClaim().getOffset() == offset)
- .findFirst()
- .orElse(null);
- }
-
- //
=========================================================================
- // Truncation Feature: Runtime Tests
- //
=========================================================================
-
- @Test
- public void testDeleteRecordRoutesTruncatableClaimToTruncationQueue()
throws IOException {
- final RuntimeRepoContext context = createRuntimeRepoContext();
- final ResourceClaim resourceClaim =
context.claimManager().newResourceClaim("container", "section", "1", false,
false);
- context.claimManager().incrementClaimantCount(resourceClaim);
- context.claimManager().incrementClaimantCount(resourceClaim); // count
= 2 so that after delete decrement it stays > 0 (not destructable)
- final StandardContentClaim contentClaim = createClaim(resourceClaim,
1024L, 5_000_000L, true);
-
- try (final WriteAheadFlowFileRepository repo = new
WriteAheadFlowFileRepository(niFiProperties)) {
- repo.initialize(context.claimManager());
- repo.loadFlowFiles(context.queueProvider());
- createAndDeleteFlowFile(repo, context.queue(), contentClaim);
- repo.checkpoint();
- }
-
- final List<ContentClaim> truncated = new ArrayList<>();
- context.claimManager().drainTruncatableClaims(truncated, 100);
- assertTrue(truncated.contains(contentClaim));
- }
-
- @Test
- public void testDestructableClaimTakesPriorityOverTruncatable() throws
IOException {
- final RuntimeRepoContext context = createRuntimeRepoContext();
- final ResourceClaim resourceClaim =
context.claimManager().newResourceClaim("container", "section", "1", false,
false);
- context.claimManager().incrementClaimantCount(resourceClaim); // count
= 1 -- will reach 0 after delete
- final StandardContentClaim contentClaim = createClaim(resourceClaim,
1024L, 5_000_000L, true);
-
- try (final WriteAheadFlowFileRepository repo = new
WriteAheadFlowFileRepository(niFiProperties)) {
- repo.initialize(context.claimManager());
- repo.loadFlowFiles(context.queueProvider());
- createAndDeleteFlowFile(repo, context.queue(), contentClaim);
- repo.checkpoint();
- }
-
- final List<ResourceClaim> destructed = new ArrayList<>();
- context.claimManager().drainDestructableClaims(destructed, 100);
- assertTrue(destructed.contains(resourceClaim));
-
- final List<ContentClaim> truncated = new ArrayList<>();
- context.claimManager().drainTruncatableClaims(truncated, 100);
- assertFalse(truncated.contains(contentClaim));
- }
-
- @Test
- public void testUpdateRecordOriginalClaimQueuedForTruncation() throws
IOException {
- final RuntimeRepoContext context = createRuntimeRepoContext();
-
- final ResourceClaim originalResourceClaim =
context.claimManager().newResourceClaim("container", "section", "1", false,
false);
- context.claimManager().incrementClaimantCount(originalResourceClaim);
- context.claimManager().incrementClaimantCount(originalResourceClaim);
// count = 2 so it stays > 0 after decrement
- final StandardContentClaim originalClaim =
createClaim(originalResourceClaim, 2048L, 5_000_000L, true);
-
- final ResourceClaim newResourceClaim =
context.claimManager().newResourceClaim("container", "section", "2", false,
false);
- context.claimManager().incrementClaimantCount(newResourceClaim);
- final StandardContentClaim newClaim = createClaim(newResourceClaim,
0L, 100L, false);
-
- final FlowFileRecord originalFlowFile = new
StandardFlowFileRecord.Builder()
- .id(1L)
- .addAttribute("uuid", UUID.randomUUID().toString())
- .contentClaim(originalClaim)
- .build();
-
- try (final WriteAheadFlowFileRepository repo = new
WriteAheadFlowFileRepository(niFiProperties)) {
- repo.initialize(context.claimManager());
- repo.loadFlowFiles(context.queueProvider());
-
- final StandardRepositoryRecord createRecord = new
StandardRepositoryRecord(context.queue());
- createRecord.setWorking(originalFlowFile, false);
- createRecord.setDestination(context.queue());
- repo.updateRepository(List.of(createRecord));
-
- final FlowFileRecord updatedFlowFile = new
StandardFlowFileRecord.Builder()
- .fromFlowFile(originalFlowFile)
- .contentClaim(newClaim)
- .build();
- final StandardRepositoryRecord updateRecord = new
StandardRepositoryRecord(context.queue(), originalFlowFile);
- updateRecord.setWorking(updatedFlowFile, true);
- updateRecord.setDestination(context.queue());
- repo.updateRepository(List.of(updateRecord));
- repo.checkpoint();
- }
-
- final List<ContentClaim> truncated = new ArrayList<>();
- context.claimManager().drainTruncatableClaims(truncated, 100);
- assertTrue(truncated.contains(originalClaim));
- }
-
- //
=========================================================================
- // Truncation Feature: Recovery Tests
- //
=========================================================================
-
- @Test
- public void testRecoveryMarksTruncationCandidateForLargeTailClaim() throws
IOException {
- final StandardResourceClaimManager claimManager = new
StandardResourceClaimManager();
- final ResourceClaim resourceClaim =
claimManager.newResourceClaim("container", "section", "1", false, false);
- final StandardContentClaim smallClaim = createClaim(resourceClaim, 0L,
100L, false);
- final StandardContentClaim largeClaim = createClaim(resourceClaim,
100L, 2_000_000L, false);
-
- final List<FlowFileRecord> recovered = writeAndRecover(smallClaim,
largeClaim);
-
- final FlowFileRecord recoveredLargeFlowFile =
findRecoveredByOffset(recovered, 100L);
- assertNotNull(recoveredLargeFlowFile);
-
assertTrue(recoveredLargeFlowFile.getContentClaim().isTruncationCandidate());
-
- final FlowFileRecord recoveredSmallFlowFile =
findRecoveredByOffset(recovered, 0L);
- assertNotNull(recoveredSmallFlowFile);
-
assertFalse(recoveredSmallFlowFile.getContentClaim().isTruncationCandidate());
- }
-
- @Test
- public void testRecoveryDoesNotMarkClonedClaim() throws IOException {
- final StandardResourceClaimManager claimManager = new
StandardResourceClaimManager();
- final ResourceClaim resourceClaim =
claimManager.newResourceClaim("container", "section", "1", false, false);
- final StandardContentClaim sharedClaim = createClaim(resourceClaim,
100L, 2_000_000L, false);
-
- // Two FlowFiles sharing the same claim (clone scenario)
- final List<FlowFileRecord> recovered = writeAndRecover(sharedClaim,
sharedClaim);
-
- for (final FlowFileRecord flowFile : recovered) {
- if (flowFile.getContentClaim() != null) {
-
assertFalse(flowFile.getContentClaim().isTruncationCandidate());
- }
- }
- }
-
- @Test
- public void testRecoveryOnlyMarksTailClaim() throws IOException {
- final StandardResourceClaimManager claimManager = new
StandardResourceClaimManager();
- final ResourceClaim resourceClaim =
claimManager.newResourceClaim("container", "section", "1", false, false);
- final StandardContentClaim nonTailClaim = createClaim(resourceClaim,
100L, 2_000_000L, false);
- final StandardContentClaim tailClaim = createClaim(resourceClaim,
2_000_100L, 3_000_000L, false);
-
- final List<FlowFileRecord> recovered = writeAndRecover(nonTailClaim,
tailClaim);
-
- final FlowFileRecord tailFlowFile = findRecoveredByOffset(recovered,
2_000_100L);
- assertNotNull(tailFlowFile);
- assertTrue(tailFlowFile.getContentClaim().isTruncationCandidate());
-
- final FlowFileRecord nonTailFlowFile =
findRecoveredByOffset(recovered, 100L);
- assertNotNull(nonTailFlowFile);
- assertFalse(nonTailFlowFile.getContentClaim().isTruncationCandidate());
- }
-
- @Test
- public void testRecoverySmallClaimAfterLargeDoesNotMarkLarge() throws
IOException {
- final StandardResourceClaimManager claimManager = new
StandardResourceClaimManager();
- final ResourceClaim resourceClaim =
claimManager.newResourceClaim("container", "section", "1", false, false);
- final StandardContentClaim firstSmallClaim =
createClaim(resourceClaim, 0L, 100L, false);
- final StandardContentClaim largeClaim = createClaim(resourceClaim,
100L, 2_000_000L, false);
- final StandardContentClaim secondSmallClaim =
createClaim(resourceClaim, 2_000_100L, 50L, false);
-
- final List<FlowFileRecord> recovered =
writeAndRecover(firstSmallClaim, largeClaim, secondSmallClaim);
-
- final List<FlowFileRecord> flowFilesWithClaims = recovered.stream()
- .filter(flowFile -> flowFile.getContentClaim() != null)
- .toList();
-
- assertFalse(flowFilesWithClaims.isEmpty());
- for (final FlowFileRecord flowFile : flowFilesWithClaims) {
- assertFalse(flowFile.getContentClaim().isTruncationCandidate(),
- "No claim should be a truncation candidate because the
large claim is not the tail; claim offset=" +
flowFile.getContentClaim().getOffset());
- }
- }
}
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-repository-models/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaim.java
b/nifi-framework-bundle/nifi-framework/nifi-repository-models/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaim.java
index 780a44f3699..814d3e81642 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-repository-models/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaim.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-repository-models/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaim.java
@@ -29,7 +29,6 @@ public final class StandardContentClaim implements
ContentClaim, Comparable<Cont
private final ResourceClaim resourceClaim;
private final long offset;
private volatile long length;
- private volatile boolean truncationCandidate = false;
public StandardContentClaim(final ResourceClaim resourceClaim, final long
offset) {
this.resourceClaim = resourceClaim;
@@ -41,15 +40,6 @@ public final class StandardContentClaim implements
ContentClaim, Comparable<Cont
this.length = length;
}
- public void setTruncationCandidate(final boolean candidate) {
- this.truncationCandidate = candidate;
- }
-
- @Override
- public boolean isTruncationCandidate() {
- return truncationCandidate;
- }
-
@Override
public int hashCode() {
final int prime = 31;
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-repository-models/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaimManager.java
b/nifi-framework-bundle/nifi-framework/nifi-repository-models/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaimManager.java
index 1e483fc25f3..95687747487 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-repository-models/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaimManager.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-repository-models/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaimManager.java
@@ -32,7 +32,6 @@ public class StandardResourceClaimManager implements
ResourceClaimManager {
private static final Logger logger =
LoggerFactory.getLogger(StandardResourceClaimManager.class);
private final ConcurrentMap<ResourceClaim, ClaimCount> claimantCounts =
new ConcurrentHashMap<>();
private final BlockingQueue<ResourceClaim> destructableClaims = new
LinkedBlockingQueue<>(50000);
- private final BlockingQueue<ContentClaim> truncatableClaims = new
LinkedBlockingQueue<>(100000);
@Override
public ResourceClaim newResourceClaim(final String container, final String
section, final String id, final boolean lossTolerant, final boolean writable) {
@@ -162,30 +161,6 @@ public class StandardResourceClaimManager implements
ResourceClaimManager {
}
}
- @Override
- public void markTruncatable(final ContentClaim contentClaim) {
- if (contentClaim == null) {
- return;
- }
-
- final ResourceClaim resourceClaim = contentClaim.getResourceClaim();
- synchronized (resourceClaim) {
- if (isDestructable(resourceClaim)) {
- return;
- }
-
- logger.debug("Marking {} as truncatable", contentClaim);
- try {
- if (!truncatableClaims.offer(contentClaim, 1,
TimeUnit.MINUTES)) {
- logger.info("Unable to mark {} as truncatable because
maximum queue size [{}] reached", truncatableClaims.size(), contentClaim);
- }
- } catch (final InterruptedException ie) {
- Thread.currentThread().interrupt();
- logger.debug("Interrupted while marking {} as truncatable",
contentClaim, ie);
- }
- }
- }
-
@Override
public void drainDestructableClaims(final Collection<ResourceClaim>
destination, final int maxElements) {
final int drainedCount = destructableClaims.drainTo(destination,
maxElements);
@@ -204,12 +179,6 @@ public class StandardResourceClaimManager implements
ResourceClaimManager {
}
}
- @Override
- public void drainTruncatableClaims(final Collection<ContentClaim>
destination, final int maxElements) {
- final int drainedCount = truncatableClaims.drainTo(destination,
maxElements);
- logger.debug("Drained {} truncatable claims to {}", drainedCount,
destination);
- }
-
@Override
public void purge() {
claimantCounts.clear();
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-repository-models/src/test/java/org/apache/nifi/controller/repository/claim/TestStandardResourceClaimManager.java
b/nifi-framework-bundle/nifi-framework/nifi-repository-models/src/test/java/org/apache/nifi/controller/repository/claim/TestStandardResourceClaimManager.java
index dd36bdd230a..7fb77b2739c 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-repository-models/src/test/java/org/apache/nifi/controller/repository/claim/TestStandardResourceClaimManager.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-repository-models/src/test/java/org/apache/nifi/controller/repository/claim/TestStandardResourceClaimManager.java
@@ -21,14 +21,12 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import java.util.ArrayList;
-import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertSame;
-import static org.junit.jupiter.api.Assertions.assertTrue;
public class TestStandardResourceClaimManager {
@@ -58,53 +56,4 @@ public class TestStandardResourceClaimManager {
manager.drainDestructableClaims(new ArrayList<>(), 1);
assertSame(completedObject, future.get());
}
-
- @Test
- public void testMarkTruncatableSkipsDestructableResourceClaim() {
- final StandardResourceClaimManager manager = new
StandardResourceClaimManager();
-
- // Create a resource claim with claimant count 0 and mark it
destructable
- final ResourceClaim rc = manager.newResourceClaim("container",
"section", "id1", false, false);
- manager.markDestructable(rc);
-
- // Create a content claim on that resource claim
- final StandardContentClaim contentClaim = new StandardContentClaim(rc,
0);
- contentClaim.setLength(1024);
- contentClaim.setTruncationCandidate(true);
-
- // markTruncatable should skip this because the resource claim is
already destructable
- manager.markTruncatable(contentClaim);
-
- // Drain truncatable claims - should be empty
- final List<ContentClaim> truncated = new ArrayList<>();
- manager.drainTruncatableClaims(truncated, 10);
- assertTrue(truncated.isEmpty(), "Truncatable claims should be empty
because the resource claim is destructable");
- }
-
- @Test
- public void testMarkTruncatableAndDrainRespectsMaxElements() {
- final StandardResourceClaimManager manager = new
StandardResourceClaimManager();
-
- // Create 5 truncatable claims, each on a distinct resource claim with
a positive claimant count
- for (int i = 0; i < 5; i++) {
- final ResourceClaim rc = manager.newResourceClaim("container",
"section", "id-" + i, false, false);
- // Give each resource claim a positive claimant count so it's not
destructable
- manager.incrementClaimantCount(rc);
-
- final StandardContentClaim cc = new StandardContentClaim(rc, 0);
- cc.setLength(1024);
- cc.setTruncationCandidate(true);
- manager.markTruncatable(cc);
- }
-
- // Drain with maxElements=3
- final List<ContentClaim> batch1 = new ArrayList<>();
- manager.drainTruncatableClaims(batch1, 3);
- assertEquals(3, batch1.size(), "First drain should return exactly 3
claims");
-
- // Drain again - should get remaining 2
- final List<ContentClaim> batch2 = new ArrayList<>();
- manager.drainTruncatableClaims(batch2, 10);
- assertEquals(2, batch2.size(), "Second drain should return the
remaining 2 claims");
- }
}
diff --git
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/repository/ByteArrayContentRepository.java
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/repository/ByteArrayContentRepository.java
index 4b24f3af045..08c9fd2eca7 100644
---
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/repository/ByteArrayContentRepository.java
+++
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/repository/ByteArrayContentRepository.java
@@ -260,11 +260,6 @@ public class ByteArrayContentRepository implements
ContentRepository {
return resourceClaim.getLength();
}
- @Override
- public boolean isTruncationCandidate() {
- return false;
- }
-
@Override
public int compareTo(final ContentClaim o) {
return resourceClaim.compareTo(o.getResourceClaim());
diff --git
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/GenerateTruncatableFlowFiles.java
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/GenerateTruncatableFlowFiles.java
deleted file mode 100644
index c00ba93c6ce..00000000000
---
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/GenerateTruncatableFlowFiles.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.tests.system;
-
-import org.apache.nifi.annotation.configuration.DefaultSchedule;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.processor.AbstractProcessor;
-import org.apache.nifi.processor.DataUnit;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.util.StandardValidators;
-
-import java.util.List;
-import java.util.Random;
-import java.util.Set;
-
-@DefaultSchedule(period = "10 mins")
-public class GenerateTruncatableFlowFiles extends AbstractProcessor {
-
- static final PropertyDescriptor BATCH_COUNT = new
PropertyDescriptor.Builder()
- .name("Batch Count")
- .description("""
- The maximum number of batches to generate. Each batch produces
10 FlowFiles (9 small + 1 large). \
- Once this many batches have been generated, no more FlowFiles
will be produced until the processor is stopped and restarted.""")
- .required(true)
- .defaultValue("10")
- .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
- .build();
-
- static final PropertyDescriptor SMALL_FILE_SIZE = new
PropertyDescriptor.Builder()
- .name("Small File Size")
- .description("Size of each small FlowFile in bytes")
- .required(true)
- .defaultValue("1 KB")
- .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
- .build();
-
- static final PropertyDescriptor LARGE_FILE_SIZE = new
PropertyDescriptor.Builder()
- .name("Large File Size")
- .description("Size of each large FlowFile in bytes")
- .required(true)
- .defaultValue("10 MB")
- .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
- .build();
-
- static final PropertyDescriptor SMALL_FILES_PER_BATCH = new
PropertyDescriptor.Builder()
- .name("Small Files Per Batch")
- .description("Number of small FlowFiles to generate per batch")
- .required(true)
- .defaultValue("9")
- .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
- .build();
-
- public static final Relationship REL_SUCCESS = new Relationship.Builder()
- .name("success")
- .build();
-
- @Override
- protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
- return List.of(BATCH_COUNT,
- SMALL_FILE_SIZE,
- LARGE_FILE_SIZE,
- SMALL_FILES_PER_BATCH);
- }
-
- @Override
- public Set<Relationship> getRelationships() {
- return Set.of(REL_SUCCESS);
- }
-
- @Override
- public void onTrigger(final ProcessContext context, final ProcessSession
session) throws ProcessException {
- final int batchCount = context.getProperty(BATCH_COUNT).asInteger();
- final Random random = new Random();
- final int smallFileSize =
context.getProperty(SMALL_FILE_SIZE).asDataSize(DataUnit.B).intValue();
- final int largeFileSize =
context.getProperty(LARGE_FILE_SIZE).asDataSize(DataUnit.B).intValue();
- final int smallFilesPerBatch =
context.getProperty(SMALL_FILES_PER_BATCH).asInteger();
-
- for (int batch = 0; batch < batchCount; batch++) {
- // Generate small FlowFiles with priority = 10 (low priority,
processed last by PriorityAttributePrioritizer)
- for (int i = 0; i < smallFilesPerBatch; i++) {
- createFlowFile(session, random, smallFileSize, "10");
- }
-
- // Generate one large FlowFile with priority = 1 (high priority,
processed first by PriorityAttributePrioritizer)
- createFlowFile(session, random, largeFileSize, "1");
- }
- }
-
- private void createFlowFile(final ProcessSession session, final Random
random, final int fileSize, final String priority) {
- FlowFile flowFile = session.create();
- flowFile = session.putAttribute(flowFile, "priority", priority);
- final byte[] data = new byte[fileSize];
- random.nextBytes(data);
- flowFile = session.write(flowFile, out -> out.write(data));
- session.transfer(flowFile, REL_SUCCESS);
- }
-}
diff --git
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index d4a9b4c81cf..a12f954cb84 100644
---
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -29,7 +29,6 @@ org.apache.nifi.processors.tests.system.FakeProcessor
org.apache.nifi.processors.tests.system.FakeDynamicPropertiesProcessor
org.apache.nifi.processors.tests.system.GenerateAndCountCallbacks
org.apache.nifi.processors.tests.system.GenerateFlowFile
-org.apache.nifi.processors.tests.system.GenerateTruncatableFlowFiles
org.apache.nifi.processors.tests.system.HoldInput
org.apache.nifi.processors.tests.system.IngestFile
org.apache.nifi.processors.tests.system.LoopFlowFile
diff --git
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/repositories/ContentClaimTruncationAfterRestartIT.java
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/repositories/ContentClaimTruncationAfterRestartIT.java
deleted file mode 100644
index 012739ca872..00000000000
---
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/repositories/ContentClaimTruncationAfterRestartIT.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.tests.system.repositories;
-
-import org.apache.nifi.tests.system.NiFiInstance;
-import org.apache.nifi.tests.system.NiFiSystemIT;
-import org.apache.nifi.toolkit.client.NiFiClientException;
-import org.apache.nifi.web.api.entity.ConnectionEntity;
-import org.apache.nifi.web.api.entity.ProcessorEntity;
-import org.junit.jupiter.api.Test;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
-/**
- * System test that verifies the truncation feature works correctly after a
NiFi restart.
- * <p>
- * During the first run, NiFi is configured with very conservative truncation
settings (99% archive
- * usage threshold), so truncation never activates. FlowFiles are generated
but not deleted.
- * </p>
- * <p>
- * NiFi is then stopped, reconfigured with aggressive truncation settings (1%
archive usage threshold),
- * and restarted. On recovery, {@code
WriteAheadFlowFileRepository.restoreFlowFiles()} re-derives
- * truncation candidates by analyzing the recovered FlowFiles' ContentClaims.
After restart, the large
- * FlowFiles are deleted, and the test verifies that the content repository
files are truncated on disk.
- * </p>
- */
-public class ContentClaimTruncationAfterRestartIT extends NiFiSystemIT {
-
- @Override
- protected Map<String, String> getNifiPropertiesOverrides() {
- // Phase 1: Conservative settings — truncation should NOT occur
- final Map<String, String> overrides = new HashMap<>();
- overrides.put("nifi.flowfile.repository.checkpoint.interval", "1 sec");
- overrides.put("nifi.content.claim.max.appendable.size", "50 KB");
- // Very high archive threshold means no disk pressure, so truncation
never activates
- overrides.put("nifi.content.repository.archive.max.usage.percentage",
"99%");
- overrides.put("nifi.content.repository.archive.cleanup.frequency", "1
sec");
- return overrides;
- }
-
- @Override
- protected boolean isAllowFactoryReuse() {
- return false;
- }
-
- @Test
- public void testTruncationOccursAfterRestartWithRecoveredCandidates()
throws NiFiClientException, IOException, InterruptedException {
- // === Phase 1: Generate FlowFiles with conservative settings (no
truncation) ===
-
- final ProcessorEntity generate =
getClientUtil().createProcessor("GenerateTruncatableFlowFiles");
- final ProcessorEntity terminate =
getClientUtil().createProcessor("TerminateFlowFile");
-
- final Map<String, String> generateProps = Map.of(
- "Batch Count", "10",
- "Small File Size", "1 KB",
- "Large File Size", "10 MB",
- "Small Files Per Batch", "9"
- );
- getClientUtil().updateProcessorProperties(generate, generateProps);
-
- ConnectionEntity connection =
getClientUtil().createConnection(generate, terminate, "success");
- connection = getClientUtil().updateConnectionPrioritizer(connection,
"PriorityAttributePrioritizer");
- connection = getClientUtil().updateConnectionBackpressure(connection,
10000, 100L * 1024 * 1024);
-
- // Generate all 100 FlowFiles (90 small @ 1 KB + 10 large @ 10 MB)
- getClientUtil().startProcessor(generate);
- waitForQueueCount(connection.getId(), 100);
- getClientUtil().stopProcessor(generate);
- getClientUtil().waitForStoppedProcessor(generate.getId());
-
- // Verify the content repository is large — the 10 MB FlowFiles are on
disk
- final File contentRepoDir = new
File(getNiFiInstance().getInstanceDirectory(), "content_repository");
- final long thresholdBytes = 1024 * 1024; // 1 MB
- final long sizeBeforeRestart = getContentRepoSize(contentRepoDir);
- assertTrue(sizeBeforeRestart > thresholdBytes,
- "Content repository should be large before restart, but was " +
sizeBeforeRestart + " bytes");
-
- // === Phase 2: Stop NiFi, reconfigure for aggressive truncation, and
restart ===
-
- final NiFiInstance nifiInstance = getNiFiInstance();
- nifiInstance.stop();
-
- // Switch archive threshold to 1% so truncation activates under disk
pressure
- nifiInstance.setProperties(Map.of(
- "nifi.content.repository.archive.max.usage.percentage", "1%"
- ));
-
- nifiInstance.start(true);
-
- // After restart, WriteAheadFlowFileRepository.restoreFlowFiles()
should have re-derived
- // that the 10 large tail claims are truncation candidates.
-
- // Run TerminateFlowFile 10 times. Due to
PriorityAttributePrioritizer, the 10 large
- // FlowFiles (priority=1) are dequeued first.
- for (int i = 0; i < 10; i++) {
- final ProcessorEntity terminateAfterRestart =
getNifiClient().getProcessorClient().getProcessor(terminate.getId());
-
getNifiClient().getProcessorClient().runProcessorOnce(terminateAfterRestart);
-
getClientUtil().waitForStoppedProcessor(terminateAfterRestart.getId());
- }
-
- waitForQueueCount(connection.getId(), 90);
-
- // Wait for the content repository files to be truncated.
- // Before truncation: ~10 files of ~10 MB each = ~100 MB total.
- // After truncation: ~10 files of ~9 KB each = ~90 KB total.
- waitFor(() -> {
- try {
- return getContentRepoSize(contentRepoDir) < thresholdBytes;
- } catch (final Exception e) {
- return false;
- }
- });
-
- final long finalSize = getContentRepoSize(contentRepoDir);
- assertTrue(finalSize < thresholdBytes,
- "Content repository total size should be below " + thresholdBytes
+ " bytes after truncation, but was " + finalSize);
- }
-
- private long getContentRepoSize(final File dir) {
- if (dir == null || !dir.exists()) {
- return 0;
- }
-
- final File[] children = dir.listFiles();
- if (children == null) {
- return 0L;
- }
-
- long total = 0;
- for (final File child : children) {
- if (child.isDirectory()) {
- if (child.getName().equals("archive")) {
- continue; // Skip archive directories
- }
-
- total += getContentRepoSize(child);
- } else {
- total += child.length();
- }
- }
-
- return total;
- }
-}
diff --git
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/repositories/ContentClaimTruncationIT.java
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/repositories/ContentClaimTruncationIT.java
deleted file mode 100644
index c8ae97087ff..00000000000
---
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/repositories/ContentClaimTruncationIT.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.tests.system.repositories;
-
-import org.apache.nifi.tests.system.NiFiSystemIT;
-import org.apache.nifi.toolkit.client.NiFiClientException;
-import org.apache.nifi.web.api.entity.ConnectionEntity;
-import org.apache.nifi.web.api.entity.ProcessorEntity;
-import org.junit.jupiter.api.Test;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.file.FileVisitResult;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.SimpleFileVisitor;
-import java.nio.file.attribute.BasicFileAttributes;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicLong;
-
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
-/**
- * System test that verifies the end-to-end truncation feature. It generates
FlowFiles with a pattern
- * of 9 small (1 KB) + 1 large (10 MB) per batch, removes only the large
FlowFiles via priority-based
- * ordering, and then verifies that the content repository files are truncated
on disk.
- */
-public class ContentClaimTruncationIT extends NiFiSystemIT {
-
- @Override
- protected Map<String, String> getNifiPropertiesOverrides() {
- final Map<String, String> overrides = new HashMap<>();
- // Use a short checkpoint interval so truncatable claims are flushed
to the ResourceClaimManager promptly
- overrides.put("nifi.flowfile.repository.checkpoint.interval", "1 sec");
- overrides.put("nifi.content.repository.archive.cleanup.frequency", "1
sec");
- // Explicitly set the max appendable claim size (same as system test
default, but explicit for clarity)
- overrides.put("nifi.content.claim.max.appendable.size", "50 KB");
- // Set archive threshold extremely low so that truncation occurs
quickly
- overrides.put("nifi.content.repository.archive.max.usage.percentage",
"1%");
- return overrides;
- }
-
- @Override
- protected boolean isAllowFactoryReuse() {
- // Don't reuse the NiFi instance since we override checkpoint interval
- return false;
- }
-
- @Test
- public void testLargeFlowFileTruncation() throws NiFiClientException,
IOException, InterruptedException {
- // Create the processors
- final ProcessorEntity generate =
getClientUtil().createProcessor("GenerateTruncatableFlowFiles");
- final ProcessorEntity terminate =
getClientUtil().createProcessor("TerminateFlowFile");
-
- // Configure GenerateTruncatableFlowFiles with 10 batches (100
FlowFiles total)
- final Map<String, String> generateProps = Map.of(
- "Batch Count", "10",
- "Small File Size", "1 KB",
- "Large File Size", "10 MB",
- "Small Files Per Batch", "9"
- );
- getClientUtil().updateProcessorProperties(generate, generateProps);
-
- // Create connection with PriorityAttributePrioritizer and 100 MB
backpressure
- ConnectionEntity connection =
getClientUtil().createConnection(generate, terminate, "success");
- connection = getClientUtil().updateConnectionPrioritizer(connection,
"PriorityAttributePrioritizer");
- connection = getClientUtil().updateConnectionBackpressure(connection,
10000, 100L * 1024 * 1024);
-
- // Start the generator and wait for 100 FlowFiles to be queued
- getClientUtil().startProcessor(generate);
- waitForQueueCount(connection.getId(), 100);
-
- // Stop the generator
- getClientUtil().stopProcessor(generate);
- getClientUtil().waitForStoppedProcessor(generate.getId());
-
- // Run TerminateFlowFile 10 times. Due to PriorityAttributePrioritizer,
- // the 10 large FlowFiles (priority=1) will be dequeued first.
- for (int i = 0; i < 10; i++) {
- getNifiClient().getProcessorClient().runProcessorOnce(terminate);
- getClientUtil().waitForStoppedProcessor(terminate.getId());
- }
-
- // Wait for 90 FlowFiles remaining (the 10 large ones have been
removed)
- waitForQueueCount(connection.getId(), 90);
-
- // Wait for the content repository files to be truncated.
- // Before truncation: ~10 files of ~10 MB each = ~100 MB total.
- // After truncation: ~10 files of ~9 KB each = ~90 KB total.
- // We set a generous threshold of 1 MB.
- final File contentRepoDir = new
File(getNiFiInstance().getInstanceDirectory(), "content_repository");
- final long thresholdBytes = 1024 * 1024; // 1 MB
-
- waitFor(() -> {
- try {
- final long totalSize =
getContentRepoSize(contentRepoDir.toPath());
- return totalSize < thresholdBytes;
- } catch (final IOException e) {
- return false;
- }
- });
-
- // Final assertion
- final long finalSize = getContentRepoSize(contentRepoDir.toPath());
- assertTrue(finalSize < thresholdBytes,
- "Content repository total size should be below " +
thresholdBytes + " bytes after truncation, but was " + finalSize);
- }
-
- /**
- * Walks the content repository directory (excluding any "archive"
subdirectories)
- * and returns the total size of all regular files.
- */
- private long getContentRepoSize(final Path contentRepoPath) throws
IOException {
- if (!Files.exists(contentRepoPath)) {
- return 0;
- }
-
- final AtomicLong totalSize = new AtomicLong(0);
- Files.walkFileTree(contentRepoPath, new SimpleFileVisitor<>() {
- @Override
- public FileVisitResult preVisitDirectory(final Path dir, final
BasicFileAttributes attrs) {
- // Skip archive directories
- if (dir.getFileName() != null &&
"archive".equals(dir.getFileName().toString())) {
- return FileVisitResult.SKIP_SUBTREE;
- }
- return FileVisitResult.CONTINUE;
- }
-
- @Override
- public FileVisitResult visitFile(final Path file, final
BasicFileAttributes attrs) {
- totalSize.addAndGet(attrs.size());
- return FileVisitResult.CONTINUE;
- }
- });
-
- return totalSize.get();
- }
-}