theBestAndrew commented on a change in pull request #5418:
URL: https://github.com/apache/nifi/pull/5418#discussion_r718448390
##########
File path:
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/StandardProcessSessionIT.java
##########
@@ -2603,6 +2603,205 @@ public void
testFullRollbackAFterCheckpointDoesNotStoreState() throws IOExceptio
stateManager.assertStateNotSet();
}
+ @Test
+ public void testCloneThenRollbackCountsClaimReferencesProperly() throws
IOException {
+ final ContentClaim originalClaim = contentRepo.create(false);
+ try (final OutputStream out = contentRepo.write(originalClaim)) {
+ out.write("hello, world".getBytes());
+ }
+
+ assertEquals(1, contentRepo.getClaimantCount(originalClaim));
+
+ final FlowFileRecord flowFileRecord = new
StandardFlowFileRecord.Builder()
+ .contentClaim(originalClaim)
+ .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
+ .entryDate(System.currentTimeMillis())
+ .size(12L)
+ .build();
+ flowFileQueue.put(flowFileRecord);
+
+ final FlowFile flowFile = session.get();
+
+ FlowFile clone = session.clone(flowFile);
+ session.rollback();
+ assertEquals(1, contentRepo.getClaimantCount(originalClaim));
+ }
+
+ @Test
+ public void testCloneThenWriteThenRollbackCountsClaimReferencesProperly()
throws IOException {
+ final ContentClaim originalClaim = contentRepo.create(false);
+ try (final OutputStream out = contentRepo.write(originalClaim)) {
+ out.write("hello, world".getBytes());
+ }
+
+ assertEquals(1, contentRepo.getClaimantCount(originalClaim));
+
+ final FlowFileRecord flowFileRecord = new
StandardFlowFileRecord.Builder()
+ .contentClaim(originalClaim)
+ .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
+ .entryDate(System.currentTimeMillis())
+ .size(12L)
+ .build();
+ flowFileQueue.put(flowFileRecord);
+
+ final FlowFile flowFile = session.get();
+
+ FlowFile clone = session.clone(flowFile);
+ clone = session.write(flowFile, out -> out.write("Bye".getBytes()));
+ assertEquals(1, contentRepo.getClaimantCount(getContentClaim(clone)));
+
+ session.rollback();
+ assertEquals(1, contentRepo.getClaimantCount(originalClaim));
+ assertEquals(0, contentRepo.getClaimantCount(getContentClaim(clone)));
+ }
+
+ @Test
+ public void testCloneThenAppendThenRollbackCountsClaimReferencesProperly()
throws IOException {
+ final ContentClaim originalClaim = contentRepo.create(false);
+ try (final OutputStream out = contentRepo.write(originalClaim)) {
+ out.write("hello, world".getBytes());
+ }
+
+ assertEquals(1, contentRepo.getClaimantCount(originalClaim));
+
+ final FlowFileRecord flowFileRecord = new
StandardFlowFileRecord.Builder()
+ .contentClaim(originalClaim)
+ .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
+ .entryDate(System.currentTimeMillis())
+ .size(12L)
+ .build();
+ flowFileQueue.put(flowFileRecord);
+
+ final FlowFile flowFile = session.get();
+
+ FlowFile clone = session.clone(flowFile);
+ clone = session.append(flowFile, out -> out.write("Bye".getBytes()));
+ assertEquals(1, contentRepo.getClaimantCount(getContentClaim(clone)));
+
+ session.rollback();
+ assertEquals(1, contentRepo.getClaimantCount(originalClaim));
+ assertEquals(0, contentRepo.getClaimantCount(getContentClaim(clone)));
+ }
+
+ @Test
+ public void testCloneThenWriteCountsClaimReferencesProperly() throws
IOException {
+ final ContentClaim originalClaim = contentRepo.create(false);
+ try (final OutputStream out = contentRepo.write(originalClaim)) {
+ out.write("hello, world".getBytes());
+ }
+
+ assertEquals(1, contentRepo.getClaimantCount(originalClaim));
+
+ final FlowFileRecord flowFileRecord = new
StandardFlowFileRecord.Builder()
+ .contentClaim(originalClaim)
+ .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
+ .entryDate(System.currentTimeMillis())
+ .size(12L)
+ .build();
+ flowFileQueue.put(flowFileRecord);
+
+ final FlowFile flowFile = session.get();
+
+ FlowFile clone = session.clone(flowFile);
+
+ // Expect claimant count of 2 because the clone() means that the new
FlowFile points to the same content claim.
+ assertEquals(2, contentRepo.getClaimantCount(originalClaim));
+
+ // Should be able to write to the FlowFile any number of times, and
each time it should leave us with a Content Claim Claimant Count of 1 for the
original (because the new FlowFile will no
+ // longer point at the original claim) and 1 for the new Content Claim.
+ for (int i=0; i < 10; i++) {
+ final ContentClaim previousCloneClaim = getContentClaim(clone);
+ clone = session.write(clone, out -> out.write("bye".getBytes()));
+
+ // After modifying the content of the FlowFile, the claimant count
of the 'old' content claim should be 1, as should the claimant count of the
updated content claim.
+ final ContentClaim updatedCloneClaim = getContentClaim(clone);
+ assertEquals(1, contentRepo.getClaimantCount(updatedCloneClaim));
+ assertEquals(1, contentRepo.getClaimantCount(originalClaim));
+ assertEquals(1, contentRepo.getClaimantCount(previousCloneClaim));
+ }
+ }
+
+ private ContentClaim getContentClaim(final FlowFile flowFile) {
+ return ((FlowFileRecord) flowFile).getContentClaim();
+ }
+
+ @Test
+ public void testCreateChildThenWriteCountsClaimReferencesProperly() throws
IOException {
+ final ContentClaim claim = contentRepo.create(false);
+ try (final OutputStream out = contentRepo.write(claim)) {
+ out.write("hello, world".getBytes());
+ }
+
+ assertEquals(1, contentRepo.getClaimantCount(claim));
+
+ final FlowFileRecord flowFileRecord = new
StandardFlowFileRecord.Builder()
+ .contentClaim(claim)
+ .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
+ .entryDate(System.currentTimeMillis())
+ .size(12L)
+ .build();
+ flowFileQueue.put(flowFileRecord);
+
+ final FlowFile flowFile = session.get();
+
+ FlowFile clone = session.create(flowFile);
+ assertEquals(1, contentRepo.getClaimantCount(claim));
+
+ clone = session.write(clone, out -> out.write("bye".getBytes()));
+
+ final ContentClaim updatedCloneClaim = getContentClaim(clone);
+ assertEquals(1, contentRepo.getClaimantCount(updatedCloneClaim));
+ assertEquals(1, contentRepo.getClaimantCount(claim));
+ }
+
+ @Test
+ public void
testCreateChildThenMultipleWriteCountsClaimReferencesProperly() throws
IOException {
+ final ContentClaim claim = contentRepo.create(false);
+ try (final OutputStream out = contentRepo.write(claim)) {
+ out.write("hello, world".getBytes());
+ }
+
+ assertEquals(1, contentRepo.getClaimantCount(claim));
+
+ final FlowFileRecord flowFileRecord = new
StandardFlowFileRecord.Builder()
+ .contentClaim(claim)
+ .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
+ .entryDate(System.currentTimeMillis())
+ .size(12L)
+ .build();
+ flowFileQueue.put(flowFileRecord);
+
+ final FlowFile flowFile = session.get();
+
+ FlowFile clone = session.create(flowFile);
+ assertEquals(1, contentRepo.getClaimantCount(claim));
+
+ for (int i=0; i < 100; i++) {
+ clone = session.write(clone, out -> out.write("bye".getBytes()));
+
+ final ContentClaim updatedCloneClaim = getContentClaim(clone);
+ assertEquals(1, contentRepo.getClaimantCount(updatedCloneClaim));
+ assertEquals(1, contentRepo.getClaimantCount(claim));
+ }
+ }
+
+ @Test
+ public void
testCreateNewFlowFileWithoutParentThenMultipleWritesCountsClaimReferencesProperly()
{
+ FlowFile clone = session.create();
Review comment:
My understanding is that this is a new FlowFile, not a clone of a
previous one, so should the variable name be changed?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]