This is an automated email from the ASF dual-hosted git repository.

bbende pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 1fc25db  NIFI-7469: Updated RepositoryRecord to include flag 
indicating whether or not the content of a FlowFile was modified. This allows 
us to explicitly keep track of this state rather than implying it (potentially 
incorrectly).
1fc25db is described below

commit 1fc25db47cdecef404548ac011de2c2564574932
Author: Mark Payne <[email protected]>
AuthorDate: Thu Jul 9 12:29:49 2020 -0400

    NIFI-7469: Updated RepositoryRecord to include flag indicating whether or 
not the content of a FlowFile was modified. This allows us to explicitly keep 
track of this state rather than implying it (potentially incorrectly).
    
    This closes #4399.
---
 .../controller/repository/RepositoryRecord.java    |  5 ++
 .../org/apache/nifi/controller/FlowController.java |  2 +-
 .../queue/DropFlowFileRepositoryRecord.java        |  5 ++
 .../repository/StandardProcessSession.java         | 61 ++++++++++++----------
 .../repository/TransientClaimRepositoryRecord.java |  5 ++
 .../repository/WriteAheadFlowFileRepository.java   |  3 +-
 ...cryptedSequentialAccessWriteAheadLogTest.groovy |  2 +-
 .../SchemaRepositoryRecordSerdeTest.java           |  2 +-
 .../repository/TestRocksDBFlowFileRepository.java  | 12 ++---
 .../TestWriteAheadFlowFileRepository.java          | 12 ++---
 .../repository/StandardRepositoryRecord.java       | 22 ++++----
 .../repository/TestStandardRepositoryRecord.java   | 11 +++-
 12 files changed, 86 insertions(+), 56 deletions(-)

diff --git 
a/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/RepositoryRecord.java
 
b/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/RepositoryRecord.java
index 3b27d95..82bb3eb 100644
--- 
a/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/RepositoryRecord.java
+++ 
b/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/RepositoryRecord.java
@@ -87,4 +87,9 @@ public interface RepositoryRecord {
      *         life of the Process Session in which they were created and 
should not be persisted.
      */
     List<ContentClaim> getTransientClaims();
+
+    /**
+     * @return true if the content of the FlowFile has been modified
+     */
+    boolean isContentModified();
 }
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index 9a68658..97b4c13 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -2784,7 +2784,7 @@ public class FlowController implements 
ReportingTaskProvider, Authorizable, Node
 
         // Update the FlowFile Repository to indicate that we have added the 
FlowFile to the flow
         final StandardRepositoryRecord record = new 
StandardRepositoryRecord(queue);
-        record.setWorking(flowFileRecord);
+        record.setWorking(flowFileRecord, false);
         record.setDestination(queue);
         flowFileRepository.updateRepository(Collections.singleton(record));
 
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/DropFlowFileRepositoryRecord.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/DropFlowFileRepositoryRecord.java
index f47b4eb..64ed827 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/DropFlowFileRepositoryRecord.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/DropFlowFileRepositoryRecord.java
@@ -88,4 +88,9 @@ public class DropFlowFileRepositoryRecord implements 
RepositoryRecord {
     public List<ContentClaim> getTransientClaims() {
         return Collections.emptyList();
     }
+
+    @Override
+    public boolean isContentModified() {
+        return false;
+    }
 }
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
index 089ac90..ba49b27 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
@@ -321,7 +321,7 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
                     if (claim != null) {
                         
context.getContentRepository().incrementClaimaintCount(claim);
                     }
-                    newRecord.setWorking(clone, Collections.<String, String> 
emptyMap());
+                    newRecord.setWorking(clone, Collections.<String, String> 
emptyMap(), false);
 
                     newRecord.setDestination(destination.getFlowFileQueue());
                     
newRecord.setTransferRelationship(record.getTransferRelationship());
@@ -1691,7 +1691,7 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
             .addAttributes(attrs)
             .build();
         final StandardRepositoryRecord record = new 
StandardRepositoryRecord(null);
-        record.setWorking(fFile, attrs);
+        record.setWorking(fFile, attrs, false);
         records.put(fFile.getId(), record);
         createdFlowFiles.add(fFile.getAttribute(CoreAttributes.UUID.key()));
         return fFile;
@@ -1730,7 +1730,7 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
 
         final FlowFileRecord fFile = fFileBuilder.build();
         final StandardRepositoryRecord record = new 
StandardRepositoryRecord(null);
-        record.setWorking(fFile, newAttributes);
+        record.setWorking(fFile, newAttributes, false);
         records.put(fFile.getId(), record);
         createdFlowFiles.add(fFile.getAttribute(CoreAttributes.UUID.key()));
 
@@ -1779,7 +1779,7 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
             .build();
 
         final StandardRepositoryRecord record = new 
StandardRepositoryRecord(null);
-        record.setWorking(fFile, newAttributes);
+        record.setWorking(fFile, newAttributes, false);
         records.put(fFile.getId(), record);
         createdFlowFiles.add(fFile.getAttribute(CoreAttributes.UUID.key()));
 
@@ -1820,7 +1820,7 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
             context.getContentRepository().incrementClaimaintCount(claim);
         }
         final StandardRepositoryRecord record = new 
StandardRepositoryRecord(null);
-        record.setWorking(clone, clone.getAttributes());
+        record.setWorking(clone, clone.getAttributes(), false);
         records.put(clone.getId(), record);
 
         if (offset == 0L && size == example.getSize()) {
@@ -1870,7 +1870,7 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
         final StandardRepositoryRecord record = getRecord(flowFile);
         final long expirationEpochMillis = System.currentTimeMillis() + 
context.getConnectable().getPenalizationPeriod(TimeUnit.MILLISECONDS);
         final FlowFileRecord newFile = new 
StandardFlowFileRecord.Builder().fromFlowFile(record.getCurrent()).penaltyExpirationTime(expirationEpochMillis).build();
-        record.setWorking(newFile);
+        record.setWorking(newFile, false);
         return newFile;
     }
 
@@ -1885,7 +1885,7 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
 
         final StandardRepositoryRecord record = getRecord(flowFile);
         final FlowFileRecord newFile = new 
StandardFlowFileRecord.Builder().fromFlowFile(record.getCurrent()).addAttribute(key,
 value).build();
-        record.setWorking(newFile, key, value);
+        record.setWorking(newFile, key, value, false);
 
         return newFile;
     }
@@ -1908,7 +1908,7 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
         final StandardFlowFileRecord.Builder ffBuilder = new 
StandardFlowFileRecord.Builder().fromFlowFile(record.getCurrent()).addAttributes(updatedAttributes);
         final FlowFileRecord newFile = ffBuilder.build();
 
-        record.setWorking(newFile, updatedAttributes);
+        record.setWorking(newFile, updatedAttributes, false);
 
         return newFile;
     }
@@ -1924,7 +1924,7 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
 
         final StandardRepositoryRecord record = getRecord(flowFile);
         final FlowFileRecord newFile = new 
StandardFlowFileRecord.Builder().fromFlowFile(record.getCurrent()).removeAttributes(key).build();
-        record.setWorking(newFile, key, null);
+        record.setWorking(newFile, key, null, false);
         return newFile;
     }
 
@@ -1949,7 +1949,7 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
             updatedAttrs.put(key, null);
         }
 
-        record.setWorking(newFile, updatedAttrs);
+        record.setWorking(newFile, updatedAttrs, false);
         return newFile;
     }
 
@@ -1962,7 +1962,7 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
         final FlowFileRecord newFile = new 
StandardFlowFileRecord.Builder().fromFlowFile(record.getCurrent()).removeAttributes(keyPattern).build();
 
         if (keyPattern == null) {
-            record.setWorking(newFile);
+            record.setWorking(newFile, false);
         } else {
             final Map<String, String> curAttrs = 
record.getCurrent().getAttributes();
 
@@ -1977,7 +1977,7 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
                 }
             }
 
-            record.setWorking(newFile, removed);
+            record.setWorking(newFile, removed, false);
         }
 
         return newFile;
@@ -1986,7 +1986,7 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
     private void updateLastQueuedDate(final StandardRepositoryRecord record, 
final Long lastQueueDate) {
         final FlowFileRecord newFile = new 
StandardFlowFileRecord.Builder().fromFlowFile(record.getCurrent())
                 .lastQueued(lastQueueDate, 
enqueuedIndex.getAndIncrement()).build();
-        record.setWorking(newFile);
+        record.setWorking(newFile, false);
     }
 
     private void updateLastQueuedDate(final StandardRepositoryRecord record) {
@@ -2582,8 +2582,13 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
         }
 
         removeTemporaryClaim(destinationRecord);
-        final FlowFileRecord newFile = new 
StandardFlowFileRecord.Builder().fromFlowFile(destinationRecord.getCurrent()).contentClaim(newClaim).contentClaimOffset(0L).size(writtenCount).build();
-        destinationRecord.setWorking(newFile);
+        final FlowFileRecord newFile = new StandardFlowFileRecord.Builder()
+            .fromFlowFile(destinationRecord.getCurrent())
+            .contentClaim(newClaim)
+            .contentClaimOffset(0L)
+            .size(writtenCount)
+            .build();
+        destinationRecord.setWorking(newFile, true);
         return newFile;
     }
 
@@ -2697,7 +2702,7 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
                         .size(bytesWritten)
                         .build();
 
-                    record.setWorking(newFile);
+                    record.setWorking(newFile, true);
                 }
             };
 
@@ -2777,7 +2782,7 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
             .size(writtenToFlowFile)
             .build();
 
-        record.setWorking(newFile);
+        record.setWorking(newFile, true);
         return newFile;
     }
 
@@ -2888,8 +2893,13 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
             removeTemporaryClaim(record);
         }
 
-        final FlowFileRecord newFile = new 
StandardFlowFileRecord.Builder().fromFlowFile(record.getCurrent()).contentClaim(newClaim).contentClaimOffset(0).size(newSize).build();
-        record.setWorking(newFile);
+        final FlowFileRecord newFile = new StandardFlowFileRecord.Builder()
+            .fromFlowFile(record.getCurrent())
+            .contentClaim(newClaim)
+            .contentClaimOffset(0)
+            .size(newSize)
+            .build();
+        record.setWorking(newFile, true);
         return newFile;
     }
 
@@ -2905,10 +2915,7 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
      * @param record record
      */
     private void removeTemporaryClaim(final StandardRepositoryRecord record) {
-        final boolean contentModified = record.getWorkingClaim() != null && 
record.getWorkingClaim() != record.getOriginalClaim();
-
-        // If the working claim is not the same as the original claim, we have 
modified the content of
-        // the FlowFile, and we need to remove the newly created content (the 
working claim). However, if
+        // If the content of the FlowFile has already been modified, we need 
to remove the newly created content (the working claim). However, if
         // they are the same, we cannot just remove the claim because 
record.getWorkingClaim() will return
         // the original claim if the record is "working" but the content has 
not been modified
         // (e.g., in the case of attributes only were updated)
@@ -2919,7 +2926,7 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
         // that may decrement the original claim (because the 2 claims are the 
same), and that's NOT what we want to do
         // because we will do that later, in the session.commit() and that 
would result in decrementing the count for
         // the original claim twice.
-        if (contentModified) {
+        if (record.isContentModified()) {
             // In this case, it's ok to decrement the claimant count for the 
content because we know that the working claim is going to be
             // updated and the given working claim is referenced only by 
FlowFiles in this session (because it's the Working Claim).
             // Therefore, we need to decrement the claimant count, and since 
the Working Claim is being changed, that means that
@@ -3041,7 +3048,7 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
             .size(writtenToFlowFile)
             .build();
 
-        record.setWorking(newFile);
+        record.setWorking(newFile, true);
 
         return newFile;
     }
@@ -3089,7 +3096,7 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
             .size(newSize)
             .addAttribute(CoreAttributes.FILENAME.key(), 
source.toFile().getName())
             .build();
-        record.setWorking(newFile, CoreAttributes.FILENAME.key(), 
source.toFile().getName());
+        record.setWorking(newFile, CoreAttributes.FILENAME.key(), 
source.toFile().getName(), true);
 
         if (!keepSourceFile) {
             deleteOnCommit.put(newFile, source);
@@ -3133,7 +3140,7 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
             .contentClaimOffset(claimOffset)
             .size(newSize)
             .build();
-        record.setWorking(newFile);
+        record.setWorking(newFile, true);
         return newFile;
     }
 
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/TransientClaimRepositoryRecord.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/TransientClaimRepositoryRecord.java
index 8cf6952..937874e 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/TransientClaimRepositoryRecord.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/TransientClaimRepositoryRecord.java
@@ -86,4 +86,9 @@ public class TransientClaimRepositoryRecord implements 
RepositoryRecord {
     public List<ContentClaim> getTransientClaims() {
         return claimsToCleanUp;
     }
+
+    @Override
+    public boolean isContentModified() {
+        return false;
+    }
 }
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
index 4b095ed..be1ada7 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
@@ -565,13 +565,12 @@ public class WriteAheadFlowFileRepository implements 
FlowFileRepository, SyncLis
     private void updateClaimCounts(final RepositoryRecord record) {
         final ContentClaim currentClaim = record.getCurrentClaim();
         final ContentClaim originalClaim = record.getOriginalClaim();
-        final boolean claimChanged = !Objects.equals(currentClaim, 
originalClaim);
 
         if (record.getType() == RepositoryRecordType.DELETE || 
record.getType() == RepositoryRecordType.CONTENTMISSING) {
             decrementClaimCount(currentClaim);
         }
 
-        if (claimChanged) {
+        if (record.isContentModified()) {
             // records which have been updated - remove original if exists
             decrementClaimCount(originalClaim);
         }
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/groovy/org/apache/nifi/wali/EncryptedSequentialAccessWriteAheadLogTest.groovy
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/groovy/org/apache/nifi/wali/EncryptedSequentialAccessWriteAheadLogTest.groovy
index 07d88ba..8bc91db 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/groovy/org/apache/nifi/wali/EncryptedSequentialAccessWriteAheadLogTest.groovy
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/groovy/org/apache/nifi/wali/EncryptedSequentialAccessWriteAheadLogTest.groovy
@@ -133,7 +133,7 @@ class EncryptedSequentialAccessWriteAheadLogTest extends 
GroovyTestCase {
         StandardRepositoryRecord record = new StandardRepositoryRecord(queue)
         StandardFlowFileRecord.Builder ffrb = new 
StandardFlowFileRecord.Builder().id(System.nanoTime())
         ffrb.addAttributes([uuid: getMockUUID()] + attributes as Map<String, 
String>)
-        record.setWorking(ffrb.build())
+        record.setWorking(ffrb.build(), false)
 
         return new LiveSerializedRepositoryRecord(record);
     }
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/SchemaRepositoryRecordSerdeTest.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/SchemaRepositoryRecordSerdeTest.java
index 96d595a..c306db1 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/SchemaRepositoryRecordSerdeTest.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/SchemaRepositoryRecordSerdeTest.java
@@ -253,7 +253,7 @@ public class SchemaRepositoryRecordSerdeTest {
         StandardRepositoryRecord standardRepositoryRecord = new 
StandardRepositoryRecord(flowFileQueue);
         StandardFlowFileRecord.Builder flowFileRecordBuilder = new 
StandardFlowFileRecord.Builder();
         flowFileRecordBuilder.addAttributes(attributes);
-        standardRepositoryRecord.setWorking(flowFileRecordBuilder.build());
+        standardRepositoryRecord.setWorking(flowFileRecordBuilder.build(), 
false);
         return standardRepositoryRecord;
     }
 
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestRocksDBFlowFileRepository.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestRocksDBFlowFileRepository.java
index cebf2cd..4f41de1 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestRocksDBFlowFileRepository.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestRocksDBFlowFileRepository.java
@@ -231,7 +231,7 @@ public class TestRocksDBFlowFileRepository {
                     .contentClaim(claim1)
                     .build();
             final StandardRepositoryRecord rec1 = new 
StandardRepositoryRecord(queue);
-            rec1.setWorking(flowFile1);
+            rec1.setWorking(flowFile1, false);
             rec1.setDestination(queue);
 
             // Create a Record that we can swap out
@@ -242,7 +242,7 @@ public class TestRocksDBFlowFileRepository {
                     .build();
 
             final StandardRepositoryRecord rec2 = new 
StandardRepositoryRecord(queue);
-            rec2.setWorking(flowFile2);
+            rec2.setWorking(flowFile2, false);
             rec2.setDestination(queue);
 
             final List<RepositoryRecord> records = new ArrayList<>();
@@ -311,7 +311,7 @@ public class TestRocksDBFlowFileRepository {
 
         final List<RepositoryRecord> records = new ArrayList<>();
         final StandardRepositoryRecord record = new 
StandardRepositoryRecord(null);
-        record.setWorking(flowFileRecord);
+        record.setWorking(flowFileRecord, false);
         record.setDestination(connection.getFlowFileQueue());
         records.add(record);
 
@@ -320,13 +320,13 @@ public class TestRocksDBFlowFileRepository {
         // update to add new attribute
         ffBuilder = new 
StandardFlowFileRecord.Builder().fromFlowFile(flowFileRecord).addAttribute("hello",
 "world");
         final FlowFileRecord flowFileRecord2 = ffBuilder.build();
-        record.setWorking(flowFileRecord2);
+        record.setWorking(flowFileRecord2, false);
         repo.updateRepository(records);
 
         // update size but no attribute
         ffBuilder = new 
StandardFlowFileRecord.Builder().fromFlowFile(flowFileRecord2).size(40L);
         final FlowFileRecord flowFileRecord3 = ffBuilder.build();
-        record.setWorking(flowFileRecord3);
+        record.setWorking(flowFileRecord3, false);
         repo.updateRepository(records);
 
         repo.close();
@@ -679,7 +679,7 @@ public class TestRocksDBFlowFileRepository {
 
         private List<RepositoryRecord> getRepositoryRecord(final 
FlowFileRecord flowFileRecord) {
             final StandardRepositoryRecord record = new 
StandardRepositoryRecord(null);
-            record.setWorking(flowFileRecord);
+            record.setWorking(flowFileRecord, false);
             record.setDestination(connection.getFlowFileQueue());
             return Collections.singletonList(record);
         }
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java
index a6edbf5..42d84c1 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java
@@ -352,7 +352,7 @@ public class TestWriteAheadFlowFileRepository {
                                 final StandardRepositoryRecord record = new 
StandardRepositoryRecord(null, flowFile);
                                 record.setDestination(queue);
                                 final Map<String, String> updatedAttrs = 
Collections.singletonMap("uuid", uuid);
-                                record.setWorking(flowFile, updatedAttrs);
+                                record.setWorking(flowFile, updatedAttrs, 
false);
 
                                 records.add(new 
LiveSerializedRepositoryRecord(record));
                             }
@@ -535,7 +535,7 @@ public class TestWriteAheadFlowFileRepository {
                     .contentClaim(claim1)
                     .build();
             final StandardRepositoryRecord rec1 = new 
StandardRepositoryRecord(queue);
-            rec1.setWorking(flowFile1);
+            rec1.setWorking(flowFile1, false);
             rec1.setDestination(queue);
 
             // Create a Record that we can swap out
@@ -546,7 +546,7 @@ public class TestWriteAheadFlowFileRepository {
                     .build();
 
             final StandardRepositoryRecord rec2 = new 
StandardRepositoryRecord(queue);
-            rec2.setWorking(flowFile2);
+            rec2.setWorking(flowFile2, true);
             rec2.setDestination(queue);
 
             final List<RepositoryRecord> records = new ArrayList<>();
@@ -623,7 +623,7 @@ public class TestWriteAheadFlowFileRepository {
 
         final List<RepositoryRecord> records = new ArrayList<>();
         final StandardRepositoryRecord record = new 
StandardRepositoryRecord(null);
-        record.setWorking(flowFileRecord);
+        record.setWorking(flowFileRecord, false);
         record.setDestination(connection.getFlowFileQueue());
         records.add(record);
 
@@ -632,13 +632,13 @@ public class TestWriteAheadFlowFileRepository {
         // update to add new attribute
         ffBuilder = new 
StandardFlowFileRecord.Builder().fromFlowFile(flowFileRecord).addAttribute("hello",
 "world");
         final FlowFileRecord flowFileRecord2 = ffBuilder.build();
-        record.setWorking(flowFileRecord2);
+        record.setWorking(flowFileRecord2, false);
         repo.updateRepository(records);
 
         // update size but no attribute
         ffBuilder = new 
StandardFlowFileRecord.Builder().fromFlowFile(flowFileRecord2).size(40L);
         final FlowFileRecord flowFileRecord3 = ffBuilder.build();
-        record.setWorking(flowFileRecord3);
+        record.setWorking(flowFileRecord3, true);
         repo.updateRepository(records);
 
         repo.close();
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-repository-models/src/main/java/org/apache/nifi/controller/repository/StandardRepositoryRecord.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-repository-models/src/main/java/org/apache/nifi/controller/repository/StandardRepositoryRecord.java
index 4aeb473..a3c724b 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-repository-models/src/main/java/org/apache/nifi/controller/repository/StandardRepositoryRecord.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-repository-models/src/main/java/org/apache/nifi/controller/repository/StandardRepositoryRecord.java
@@ -39,6 +39,7 @@ public class StandardRepositoryRecord implements 
RepositoryRecord {
     private Map<String, String> updatedAttributes = null;
     private List<ContentClaim> transientClaims;
     private final long startNanos = System.nanoTime();
+    private boolean contentModified;
 
 
     /**
@@ -110,8 +111,9 @@ public class StandardRepositoryRecord implements 
RepositoryRecord {
         return originalQueue;
     }
 
-    public void setWorking(final FlowFileRecord flowFile) {
+    public void setWorking(final FlowFileRecord flowFile, final boolean 
contentModified) {
         workingFlowFileRecord = flowFile;
+        this.contentModified |= contentModified;
     }
 
     private Map<String, String> initializeUpdatedAttributes() {
@@ -122,8 +124,9 @@ public class StandardRepositoryRecord implements 
RepositoryRecord {
         return updatedAttributes;
     }
 
-    public void setWorking(final FlowFileRecord flowFile, final String 
attributeKey, final String attributeValue) {
+    public void setWorking(final FlowFileRecord flowFile, final String 
attributeKey, final String attributeValue, final boolean contentModified) {
         workingFlowFileRecord = flowFile;
+        this.contentModified |= contentModified;
 
         // In the case that the type is CREATE, we know that all attributes 
are updated attributes, so no need to store them.
         if (type == RepositoryRecordType.CREATE) {
@@ -137,8 +140,9 @@ public class StandardRepositoryRecord implements 
RepositoryRecord {
         }
     }
 
-    public void setWorking(final FlowFileRecord flowFile, final Map<String, 
String> updatedAttribs) {
+    public void setWorking(final FlowFileRecord flowFile, final Map<String, 
String> updatedAttribs, final boolean contentModified) {
         workingFlowFileRecord = flowFile;
+        this.contentModified |= contentModified;
 
         // In the case that the type is CREATE, we know that all attributes 
are updated attributes, so no need to store them.
         if (type == RepositoryRecordType.CREATE) {
@@ -183,10 +187,6 @@ public class StandardRepositoryRecord implements 
RepositoryRecord {
         return transferRelationship;
     }
 
-    FlowFileRecord getWorking() {
-        return workingFlowFileRecord;
-    }
-
     ContentClaim getWorkingClaim() {
         return (workingFlowFileRecord == null) ? null : 
workingFlowFileRecord.getContentClaim();
     }
@@ -206,10 +206,6 @@ public class StandardRepositoryRecord implements 
RepositoryRecord {
         return (getCurrent() == null) ? 0L : 
getCurrent().getContentClaimOffset();
     }
 
-    boolean isWorking() {
-        return (workingFlowFileRecord != null);
-    }
-
     Map<String, String> getOriginalAttributes() {
         return originalAttributes;
     }
@@ -260,4 +256,8 @@ public class StandardRepositoryRecord implements 
RepositoryRecord {
     public long getStartNanos() {
         return startNanos;
     }
+
+    public boolean isContentModified() {
+        return contentModified;
+    }
 }
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-repository-models/src/test/java/org/apache/nifi/controller/repository/TestStandardRepositoryRecord.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-repository-models/src/test/java/org/apache/nifi/controller/repository/TestStandardRepositoryRecord.java
index 61e23fe..7c0807c 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-repository-models/src/test/java/org/apache/nifi/controller/repository/TestStandardRepositoryRecord.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-repository-models/src/test/java/org/apache/nifi/controller/repository/TestStandardRepositoryRecord.java
@@ -23,6 +23,8 @@ import java.util.Map;
 import java.util.UUID;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
 public class TestStandardRepositoryRecord {
 
@@ -40,12 +42,19 @@ public class TestStandardRepositoryRecord {
             .addAttributes(updatedAttributes)
             .build();
 
-        record.setWorking(flowFileRecord, updatedAttributes);
+        record.setWorking(flowFileRecord, updatedAttributes, false);
 
         final Map<String, String> updatedWithId = new 
HashMap<>(updatedAttributes);
         updatedWithId.put("uuid", uuid);
 
         assertEquals(updatedWithId, record.getUpdatedAttributes());
+        assertFalse(record.isContentModified());
+
+        record.setWorking(flowFileRecord, true);
+        assertTrue(record.isContentModified());
+
+        record.setWorking(flowFileRecord, false);
+        assertTrue(record.isContentModified()); // Should still be true 
because it was modified previously
 
         record.markForDelete();
 

Reply via email to