Repository: nifi
Updated Branches:
  refs/heads/master 113ad5ecf -> 3b74d2dda


NIFI-4633: This closes #2327. Ensure that everywhere that a FlowFile is passed 
into ProcessSession that we used the most up-to-date version of it
Ensure that when ProcessSession.clone(FlowFile) is called, we obtain the most 
recent version of the FlowFile before attempting to obtain FlowFile size.

Signed-off-by: joewitt <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/3b74d2dd
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/3b74d2dd
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/3b74d2dd

Branch: refs/heads/master
Commit: 3b74d2ddad10de9105f38db3f555ec6f024afcd4
Parents: 113ad5e
Author: Mark Payne <[email protected]>
Authored: Thu Dec 7 14:44:10 2017 -0500
Committer: joewitt <[email protected]>
Committed: Fri Dec 8 15:30:01 2017 -0500

----------------------------------------------------------------------
 .../repository/StandardProcessSession.java      | 24 +++--
 .../repository/TestStandardProcessSession.java  | 94 +++++++++++++++++++-
 2 files changed, 111 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/3b74d2dd/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
index 0e2e9b5..c4ea132 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
@@ -1168,8 +1168,10 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
         migrate((StandardProcessSession) newOwner, flowFiles);
     }
 
-    private void migrate(final StandardProcessSession newOwner, final 
Collection<FlowFile> flowFiles) {
+    private void migrate(final StandardProcessSession newOwner, 
Collection<FlowFile> flowFiles) {
         // We don't call validateRecordState() here because we want to allow 
migration of FlowFiles that have already been marked as removed or transferred, 
etc.
+        flowFiles = 
flowFiles.stream().map(this::getMostRecent).collect(Collectors.toList());
+
         for (final FlowFile flowFile : flowFiles) {
             if (openInputStreams.containsKey(flowFile)) {
                 throw new IllegalStateException(flowFile + " cannot be 
migrated to a new Process Session because this session currently "
@@ -1580,7 +1582,7 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
                 registerDequeuedRecord(flowFile, connection);
             }
 
-            return new ArrayList<FlowFile>(newlySelected);
+            return new ArrayList<>(newlySelected);
         } finally {
             if (lockQueue) {
                 connection.unlock();
@@ -1615,7 +1617,7 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
                     registerDequeuedRecord(flowFile, conn);
                 }
 
-                return new ArrayList<FlowFile>(newlySelected);
+                return new ArrayList<>(newlySelected);
             }
 
             return new ArrayList<>();
@@ -1658,7 +1660,8 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
     }
 
     @Override
-    public FlowFile clone(final FlowFile example) {
+    public FlowFile clone(FlowFile example) {
+        example = validateRecordState(example);
         return clone(example, 0L, example.getSize());
     }
 
@@ -3098,8 +3101,15 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
         return records.containsKey(flowFile);
     }
 
+    private FlowFile getMostRecent(final FlowFile flowFile) {
+        final StandardRepositoryRecord existingRecord = records.get(flowFile);
+        return existingRecord == null ? flowFile : existingRecord.getCurrent();
+    }
+
     @Override
-    public FlowFile create(final FlowFile parent) {
+    public FlowFile create(FlowFile parent) {
+        parent = getMostRecent(parent);
+
         final Map<String, String> newAttributes = new HashMap<>(3);
         newAttributes.put(CoreAttributes.FILENAME.key(), 
String.valueOf(System.nanoTime()));
         newAttributes.put(CoreAttributes.PATH.key(), DEFAULT_FLOWFILE_PATH);
@@ -3135,7 +3145,9 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
     }
 
     @Override
-    public FlowFile create(final Collection<FlowFile> parents) {
+    public FlowFile create(Collection<FlowFile> parents) {
+        parents = 
parents.stream().map(this::getMostRecent).collect(Collectors.toList());
+
         final Map<String, String> newAttributes = intersectAttributes(parents);
         newAttributes.remove(CoreAttributes.UUID.key());
         newAttributes.remove(CoreAttributes.ALTERNATE_IDENTIFIER.key());

http://git-wip-us.apache.org/repos/asf/nifi/blob/3b74d2dd/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
index 5a939ae..68d13e3 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
@@ -16,6 +16,7 @@
  */
 package org.apache.nifi.controller.repository;
 
+import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
@@ -271,6 +272,80 @@ public class TestStandardProcessSession {
     }
 
     @Test
+    public void testCloneOriginalDataSmaller() throws IOException {
+        final byte[] originalContent = "hello".getBytes();
+        final byte[] replacementContent = "NEW DATA".getBytes();
+
+        final Connection conn1 = createConnection();
+        final FlowFileRecord flowFileRecord = new 
StandardFlowFileRecord.Builder()
+            .id(1000L)
+            .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
+            .entryDate(System.currentTimeMillis())
+            .contentClaim(contentRepo.create(originalContent))
+            .size(originalContent.length)
+            .build();
+
+        flowFileQueue.put(flowFileRecord);
+
+        
when(connectable.getIncomingConnections()).thenReturn(Collections.singletonList(conn1));
+
+        final FlowFile input = session.get();
+        assertEquals(originalContent.length, input.getSize());
+
+        final FlowFile modified = session.write(input, (in, out) -> 
out.write(replacementContent));
+        assertEquals(replacementContent.length, modified.getSize());
+
+        // Clone 'input', not 'modified' because we want to ensure that we use 
the outdated reference to ensure
+        // that the framework uses the most current reference.
+        final FlowFile clone = session.clone(input);
+        assertEquals(replacementContent.length, clone.getSize());
+
+        final byte[] buffer = new byte[replacementContent.length];
+        try (final InputStream in = session.read(clone)) {
+            StreamUtils.fillBuffer(in, buffer);
+        }
+
+        assertArrayEquals(replacementContent, buffer);
+    }
+
+    @Test
+    public void testCloneOriginalDataLarger() throws IOException {
+        final byte[] originalContent = "hello there 12345".getBytes();
+        final byte[] replacementContent = "NEW DATA".getBytes();
+
+        final Connection conn1 = createConnection();
+        final FlowFileRecord flowFileRecord = new 
StandardFlowFileRecord.Builder()
+            .id(1000L)
+            .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
+            .entryDate(System.currentTimeMillis())
+            .contentClaim(contentRepo.create(originalContent))
+            .size(originalContent.length)
+            .build();
+
+        flowFileQueue.put(flowFileRecord);
+
+        
when(connectable.getIncomingConnections()).thenReturn(Collections.singletonList(conn1));
+
+        final FlowFile input = session.get();
+        assertEquals(originalContent.length, input.getSize());
+
+        final FlowFile modified = session.write(input, (in, out) -> 
out.write(replacementContent));
+        assertEquals(replacementContent.length, modified.getSize());
+
+        // Clone 'input', not 'modified' because we want to ensure that we use 
the outdated reference to ensure
+        // that the framework uses the most current reference.
+        final FlowFile clone = session.clone(input);
+        assertEquals(replacementContent.length, clone.getSize());
+
+        final byte[] buffer = new byte[replacementContent.length];
+        try (final InputStream in = session.read(clone)) {
+            StreamUtils.fillBuffer(in, buffer);
+        }
+
+        assertArrayEquals(replacementContent, buffer);
+    }
+
+    @Test
     @SuppressWarnings("unchecked")
     public void testRoundRobinOnSessionGetWithCount() {
         final List<Connection> connList = new ArrayList<>();
@@ -1909,6 +1984,23 @@ public class TestStandardProcessSession {
             return contentClaim;
         }
 
+        public ContentClaim create(byte[] content) throws IOException {
+            final ResourceClaim resourceClaim = 
claimManager.newResourceClaim("container", "section", 
String.valueOf(idGenerator.getAndIncrement()), false, false);
+            final ContentClaim contentClaim = new 
StandardContentClaim(resourceClaim, 0L);
+
+            claimantCounts.put(contentClaim, new AtomicInteger(1));
+            final Path path = getPath(contentClaim);
+            final Path parent = path.getParent();
+            if (Files.exists(parent) == false) {
+                Files.createDirectories(parent);
+            }
+
+            try (final OutputStream out = new 
FileOutputStream(getPath(contentClaim).toFile())) {
+                out.write(content);
+            }
+            return contentClaim;
+        }
+
         @Override
         public int incrementClaimaintCount(ContentClaim claim) {
             AtomicInteger count = claimantCounts.get(claim);
@@ -1938,7 +2030,7 @@ public class TestStandardProcessSession {
 
         @Override
         public Set<String> getContainerNames() {
-            return new HashSet<String>();
+            return new HashSet<>();
         }
 
         @Override

Reply via email to