pvillard31 commented on code in PR #10874:
URL: https://github.com/apache/nifi/pull/10874#discussion_r2781573813
##########
nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java:
##########
@@ -1035,6 +1050,120 @@ public void purge() {
resourceClaimManager.purge();
}
+
+ private class TruncateClaims implements Runnable {
+
+ @Override
+ public void run() {
+ final Map<String, Boolean> truncationActivationCache = new
HashMap<>();
+
+ // Go through any known truncation claims and truncate them now if
truncation is enabled for their container.
+ for (final String container : containerNames) {
+ if (isTruncationActiveForContainer(container,
truncationActivationCache)) {
+ final List<ContentClaim> toTruncate =
truncationClaimManager.removeTruncationClaims(container);
+ if (toTruncate.isEmpty()) {
+ continue;
+ }
+
+ truncateClaims(toTruncate, truncationActivationCache);
+ }
+ }
+
+ // Drain any Truncation Claims from the Resource Claim Manager.
+ // If able, truncate those claims. Otherwise, save those claims in
the Truncation Claim Manager to be truncated on the next run.
+ // This prevents us from having a case where we could truncate a
big claim but we don't because we're not yet running out of disk space,
+ // but then we later start to run out of disk space and lost the
opportunity to truncate that big claim.
+ while (true) {
+ final List<ContentClaim> toTruncate = new ArrayList<>();
+ resourceClaimManager.drainTruncatableClaims(toTruncate,
10_000);
+ if (toTruncate.isEmpty()) {
+ return;
+ }
+
+ truncateClaims(toTruncate, truncationActivationCache);
+ }
+ }
+
+ private void truncateClaims(final List<ContentClaim> toTruncate, final
Map<String, Boolean> truncationActivationCache) {
+ final Map<String, List<ContentClaim>> claimsSkipped = new
HashMap<>();
+
+ for (final ContentClaim claim : toTruncate) {
+ final String container =
claim.getResourceClaim().getContainer();
+ if (!isTruncationActiveForContainer(container,
truncationActivationCache)) {
+ LOG.debug("Will not truncate {} because truncation is not
active for container {}; will save for later truncation.", claim, container);
+ claimsSkipped.computeIfAbsent(container, key -> new
ArrayList<>()).add(claim);
+ continue;
+ }
+
+ if (claim.isTruncationCandidate()) {
+ truncate(claim);
+ }
+ }
+
+ claimsSkipped.forEach(truncationClaimManager::addTruncationClaims);
+ }
+
+ private boolean isTruncationActiveForContainer(final String container,
final Map<String, Boolean> activationCache) {
+ // If not archiving data, we consider truncation always active.
+ if (!archiveData) {
+ return true;
+ }
+
+ final Boolean cachedValue = activationCache.get(container);
+ if (cachedValue != null) {
+ return cachedValue;
+ }
+
+ if (!isArchiveClearedOnLastRun(container)) {
+ LOG.debug("Truncation is not active for container {} because
the archive was not cleared on the last run.", container);
+ activationCache.put(container, false);
+ return false;
+ }
+
+ final long usableSpace;
+ try {
+ usableSpace = getContainerUsableSpace(container);
+ } catch (final IOException ioe) {
+ LOG.warn("Failed to determine usable space for container {}.
Will not truncate claims for this container.", container, ioe);
+ return false;
+ }
+
+ final Long minUsableSpace =
minUsableContainerBytesForArchive.get(container);
+ if (minUsableSpace != null && usableSpace < minUsableSpace) {
+ LOG.debug("Truncate is active for Container {} because usable
space of {} bytes is below the desired threshold of {} bytes.",
+ container, usableSpace, minUsableSpace);
+
+ activationCache.put(container, true);
+ return true;
+ }
+
+ activationCache.put(container, false);
+ return false;
+ }
+
+ private void truncate(final ContentClaim claim) {
Review Comment:
The truncate method doesn't verify that the claimant count is still 0 before
truncating. If a clone operation increments the claimant count while the
truncation task is mid-flight, we could truncate content that is still
referenced. Isn't it a concern?
Wondering if we could have a race condition:
1. `TruncateClaims.truncateClaims()` checks `claim.isTruncationCandidate()`
and sees `true`
2. A clone operation calls `incrementClaimaintCount()`, which sets
`truncationCandidate = false` and increments the claimant count
3. `TruncateClaims.truncate()` proceeds to truncate the file anyway,
corrupting the data for the newly cloned FlowFile
Or maybe this scenario is not an option for some reasons that I missed?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]