NIFI-396 added a DisableOnCloseInputStream class; modified 
StandardProcessSession to prevent access of the Input/OutputStreams after 
callbacks have been executed; updated tests


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/e2760f8c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/e2760f8c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/e2760f8c

Branch: refs/heads/NIFI-250
Commit: e2760f8c980583d285137134e05c435c930fb4d2
Parents: 7272d0d
Author: Bobby Owolabi <[email protected]>
Authored: Thu Mar 19 00:54:24 2015 -0400
Committer: Bobby Owolabi <[email protected]>
Committed: Thu Mar 19 00:54:24 2015 -0400

----------------------------------------------------------------------
 .../repository/StandardProcessSession.java      |  11 +-
 .../io/DisableOnCloseInputStream.java           |  93 ++++++
 .../repository/TestStandardProcessSession.java  | 295 +++++++++++--------
 3 files changed, 266 insertions(+), 133 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e2760f8c/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
index 8d2e456..e5cd03e 100644
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
+++ 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
@@ -46,6 +46,7 @@ import org.apache.nifi.controller.ProcessorNode;
 import org.apache.nifi.controller.repository.claim.ContentClaim;
 import org.apache.nifi.controller.repository.io.ByteCountingInputStream;
 import org.apache.nifi.controller.repository.io.ByteCountingOutputStream;
+import org.apache.nifi.controller.repository.io.DisableOnCloseInputStream;
 import org.apache.nifi.controller.repository.io.DisableOnCloseOutputStream;
 import org.apache.nifi.controller.repository.io.FlowFileAccessInputStream;
 import org.apache.nifi.controller.repository.io.FlowFileAccessOutputStream;
@@ -1735,7 +1736,8 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
 
         try (final InputStream rawIn = getInputStream(source, 
record.getCurrentClaim(), record.getCurrentClaimOffset());
                 final InputStream limitedIn = new LimitedInputStream(rawIn, 
source.getSize());
-                final ByteCountingInputStream countingStream = new 
ByteCountingInputStream(limitedIn, this.bytesRead)) {
+                final InputStream disableOnCloseIn = new 
DisableOnCloseInputStream(limitedIn);
+                final ByteCountingInputStream countingStream = new 
ByteCountingInputStream(disableOnCloseIn, this.bytesRead)) {
 
             // We want to differentiate between IOExceptions thrown by the 
repository and IOExceptions thrown from
             // Processor code. As a result, as have the 
FlowFileAccessInputStream that catches IOException from the repository
@@ -2180,9 +2182,10 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
 
                 try (final InputStream rawIn = getInputStream(source, 
currClaim, record.getCurrentClaimOffset());
                         final InputStream limitedIn = new 
LimitedInputStream(rawIn, source.getSize());
-                        final InputStream countingIn = new 
ByteCountingInputStream(limitedIn, bytesRead);
-                        final OutputStream disableOnClose = new 
DisableOnCloseOutputStream(currentWriteClaimStream);
-                        final OutputStream countingOut = new 
ByteCountingOutputStream(disableOnClose, writtenHolder)) {
+                        final InputStream disableOnCloseIn = new 
DisableOnCloseInputStream(limitedIn);
+                        final InputStream countingIn = new 
ByteCountingInputStream(disableOnCloseIn, bytesRead);
+                        final OutputStream disableOnCloseOut = new 
DisableOnCloseOutputStream(currentWriteClaimStream);
+                        final OutputStream countingOut = new 
ByteCountingOutputStream(disableOnCloseOut, writtenHolder)) {
 
                     recursionSet.add(source);
 

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e2760f8c/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/io/DisableOnCloseInputStream.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/io/DisableOnCloseInputStream.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/io/DisableOnCloseInputStream.java
new file mode 100644
index 0000000..ddcf6c9
--- /dev/null
+++ 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/io/DisableOnCloseInputStream.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.repository.io;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * Wraps an existing InputStream, so that when {@link InputStream#close()} is
+ * called, the underlying InputStream is NOT closed but this InputStream can no
+ * longer be written to
+ */
+public class DisableOnCloseInputStream extends InputStream {
+
+    private final InputStream wrapped;
+    private boolean closed = false;
+
+    public DisableOnCloseInputStream(final InputStream wrapped) {
+        this.wrapped = wrapped;
+    }
+
+    @Override
+    public int read() throws IOException {
+        checkClosed();
+        return wrapped.read();
+    }
+
+    @Override
+    public int read(byte[] b) throws IOException {
+        checkClosed();
+        return wrapped.read(b);
+    }
+
+    @Override
+    public int read(byte[] b, int off, int len) throws IOException {
+        checkClosed();
+        return wrapped.read(b, off, len);
+    }
+
+    @Override
+    public long skip(long n) throws IOException {
+        checkClosed();
+        return wrapped.skip(n);
+    }
+
+    @Override
+    public int available() throws IOException {
+        return wrapped.available();
+    }
+
+    private void checkClosed() throws IOException {
+        if (closed) {
+            throw new IOException("Stream is closed");
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        closed = true;
+    }
+
+    @Override
+    public void mark(int readlimit) {
+        if (closed == false) {
+            wrapped.mark(readlimit);
+        }
+    }
+
+    @Override
+    public synchronized void reset() throws IOException {
+        checkClosed();
+        wrapped.reset();
+    }
+
+    @Override
+    public boolean markSupported() {
+        return wrapped.markSupported();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e2760f8c/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
index 2d09ea5..ef2fb93 100644
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
+++ 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
@@ -66,7 +66,6 @@ import 
org.apache.nifi.provenance.MockProvenanceEventRepository;
 import org.apache.nifi.provenance.ProvenanceEventRecord;
 import org.apache.nifi.provenance.ProvenanceEventRepository;
 import org.apache.nifi.provenance.ProvenanceEventType;
-import org.apache.nifi.stream.io.ByteArrayOutputStream;
 import org.apache.nifi.util.ObjectHolder;
 import org.junit.After;
 import org.junit.Assert;
@@ -238,7 +237,61 @@ public class TestStandardProcessSession {
         assertEquals(0, contentRepo.getExistingClaims().size());
     }
 
-    @Test(expected = FlowFileAccessException.class)
+    private void assertDisabled(final OutputStream outputStream) {
+        try {
+            outputStream.write(new byte[0]);
+            Assert.fail("Expected OutputStream to be disabled; was able to 
call write(byte[])");
+        } catch (final Exception ex) {
+            Assert.assertEquals(FlowFileAccessException.class, ex.getClass());
+        }
+        try {
+            outputStream.write(0);
+            Assert.fail("Expected OutputStream to be disabled; was able to 
call write(int)");
+        } catch (final Exception ex) {
+            Assert.assertEquals(FlowFileAccessException.class, ex.getClass());
+        }
+        try {
+            outputStream.write(new byte[0], 0, 0);
+            Assert.fail("Expected OutputStream to be disabled; was able to 
call write(byte[], int, int)");
+        } catch (final Exception ex) {
+            Assert.assertEquals(FlowFileAccessException.class, ex.getClass());
+        }
+    }
+    
+    private void assertDisabled(final InputStream inputStream) {
+        try {
+            inputStream.read();
+            Assert.fail("Expected InputStream to be disabled; was able to call 
read()");
+        } catch (final Exception ex) {
+            Assert.assertEquals(FlowFileAccessException.class, ex.getClass());
+        }
+        try {
+            inputStream.read(new byte[0]);
+            Assert.fail("Expected InputStream to be disabled; was able to call 
read(byte[])");
+        } catch (final Exception ex) {
+            Assert.assertEquals(FlowFileAccessException.class, ex.getClass());
+        }
+        try {
+            inputStream.read(new byte[0], 0, 0);
+            Assert.fail("Expected InputStream to be disabled; was able to call 
read(byte[], int, int)");
+        } catch (final Exception ex) {
+            Assert.assertEquals(FlowFileAccessException.class, ex.getClass());
+        }
+        try {
+            inputStream.reset();
+            Assert.fail("Expected InputStream to be disabled; was able to call 
reset()");
+        } catch (final Exception ex) {
+            Assert.assertEquals(FlowFileAccessException.class, ex.getClass());
+        }
+        try {
+            inputStream.skip(1L);
+            Assert.fail("Expected InputStream to be disabled; was able to call 
skip(long)");
+        } catch (final Exception ex) {
+            Assert.assertEquals(FlowFileAccessException.class, ex.getClass());
+        }
+    }    
+    
+    @Test
     public void testAppendAfterSessionClosesStream() throws IOException {
         final ContentClaim claim = contentRepo.create(false);
         final FlowFileRecord flowFileRecord = new 
StandardFlowFileRecord.Builder()
@@ -256,12 +309,10 @@ public class TestStandardProcessSession {
                 outputStreamHolder.set(outputStream);
             }
         });
-        try (final OutputStream outputStream = outputStreamHolder.get()) {
-            outputStream.write(5);
-        }
+        assertDisabled(outputStreamHolder.get());
     }
 
-    @Test(expected = FlowFileAccessException.class)
+    @Test
     public void testReadAfterSessionClosesStream() throws IOException {
         final ContentClaim claim = contentRepo.create(false);
         final FlowFileRecord flowFileRecord = new 
StandardFlowFileRecord.Builder()
@@ -279,9 +330,7 @@ public class TestStandardProcessSession {
                 inputStreamHolder.set(inputStream);
             }
         });
-        try (final InputStream inputStream = inputStreamHolder.get()) {
-            inputStream.read();
-        }
+        assertDisabled(inputStreamHolder.get());
     }
 
     @Test
@@ -304,17 +353,11 @@ public class TestStandardProcessSession {
                 outputStreamHolder.set(output);
             }
         });
-        try (final InputStream inputStream = inputStreamHolder.get()) {
-            inputStream.read();
-            Assert.fail("Expected Exception to be thrown when read is 
attempted after session closes stream");
-        } catch (final Exception ex) {}
-        try (final OutputStream outputStream = outputStreamHolder.get()) {
-            outputStream.write(5);
-            Assert.fail("Expected Exception to be thrown when write is 
attempted after session closes stream");
-        } catch (final Exception ex) {}
-    }
+        assertDisabled(inputStreamHolder.get());
+        assertDisabled(outputStreamHolder.get());
+   }
 
-    @Test(expected = FlowFileAccessException.class)
+    @Test
     public void testWriteAfterSessionClosesStream() throws IOException {
         final ContentClaim claim = contentRepo.create(false);
         final FlowFileRecord flowFileRecord = new 
StandardFlowFileRecord.Builder()
@@ -332,9 +375,7 @@ public class TestStandardProcessSession {
                 outputStreamHolder.set(out);
             }
         });
-        try (final OutputStream outputStream = outputStreamHolder.get()) {
-            outputStream.write(5);
-        }
+        assertDisabled(outputStreamHolder.get());
     }
 
     @Test
@@ -385,7 +426,6 @@ public class TestStandardProcessSession {
         assertEquals(0, provenanceRepo.getEvents(0L, 100000).size());
     }
 
-    
     @Test
     public void testProvenanceEventsEmittedForForkIfNotRemoved() throws 
IOException {
         final FlowFileRecord flowFileRecord = new 
StandardFlowFileRecord.Builder()
@@ -425,59 +465,59 @@ public class TestStandardProcessSession {
     @Test
     public void testUpdateAttributesThenJoin() throws IOException {
         final FlowFileRecord flowFileRecord1 = new 
StandardFlowFileRecord.Builder()
-            .id(1L)
-            .addAttribute("uuid", "11111111-1111-1111-1111-111111111111")
-            .entryDate(System.currentTimeMillis())
-            .build();
-        
+                .id(1L)
+                .addAttribute("uuid", "11111111-1111-1111-1111-111111111111")
+                .entryDate(System.currentTimeMillis())
+                .build();
+
         final FlowFileRecord flowFileRecord2 = new 
StandardFlowFileRecord.Builder()
-            .id(2L)
-            .addAttribute("uuid", "22222222-2222-2222-2222-222222222222")
-            .entryDate(System.currentTimeMillis())
-            .build();
-        
+                .id(2L)
+                .addAttribute("uuid", "22222222-2222-2222-2222-222222222222")
+                .entryDate(System.currentTimeMillis())
+                .build();
+
         flowFileQueue.put(flowFileRecord1);
         flowFileQueue.put(flowFileRecord2);
-        
+
         FlowFile ff1 = session.get();
         FlowFile ff2 = session.get();
 
         ff1 = session.putAttribute(ff1, "index", "1");
         ff2 = session.putAttribute(ff2, "index", "2");
-        
+
         final List<FlowFile> parents = new ArrayList<>(2);
         parents.add(ff1);
         parents.add(ff2);
-        
+
         final FlowFile child = session.create(parents);
-        
+
         final Relationship rel = new Relationship.Builder().name("A").build();
-        
+
         session.transfer(ff1, rel);
         session.transfer(ff2, rel);
         session.transfer(child, rel);
-        
+
         session.commit();
-        
+
         final List<ProvenanceEventRecord> events = 
provenanceRepo.getEvents(0L, 1000);
 
         // We should have a JOIN and 2 ATTRIBUTE_MODIFIED's
         assertEquals(3, events.size());
-        
+
         int joinCount = 0;
         int ff1UpdateCount = 0;
         int ff2UpdateCount = 0;
-        
-        for ( final ProvenanceEventRecord event : events ) {
+
+        for (final ProvenanceEventRecord event : events) {
             switch (event.getEventType()) {
                 case JOIN:
                     assertEquals(child.getAttribute("uuid"), 
event.getFlowFileUuid());
                     joinCount++;
                     break;
                 case ATTRIBUTES_MODIFIED:
-                    if ( 
event.getFlowFileUuid().equals(ff1.getAttribute("uuid")) ) {
+                    if 
(event.getFlowFileUuid().equals(ff1.getAttribute("uuid"))) {
                         ff1UpdateCount++;
-                    } else if ( 
event.getFlowFileUuid().equals(ff2.getAttribute("uuid")) ) {
+                    } else if 
(event.getFlowFileUuid().equals(ff2.getAttribute("uuid"))) {
                         ff2UpdateCount++;
                     } else {
                         Assert.fail("Got ATTRIBUTE_MODIFIED for wrong 
FlowFile: " + event.getFlowFileUuid());
@@ -487,14 +527,14 @@ public class TestStandardProcessSession {
                     Assert.fail("Unexpected event type: " + event);
             }
         }
-        
+
         assertEquals(1, joinCount);
         assertEquals(1, ff1UpdateCount);
         assertEquals(1, ff2UpdateCount);
-        
+
         assertEquals(1, joinCount);
     }
-    
+
     @Test
     public void testForkOneToOneReported() throws IOException {
         final FlowFileRecord flowFileRecord = new 
StandardFlowFileRecord.Builder()
@@ -804,34 +844,34 @@ public class TestStandardProcessSession {
     @Test
     public void 
testContentNotFoundExceptionThrownWhenUnableToReadDataOffsetTooLarge() {
         final FlowFileRecord flowFileRecord = new 
StandardFlowFileRecord.Builder()
-            .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
-            .entryDate(System.currentTimeMillis())
-            .contentClaim(new ContentClaim() {
-                @Override
-                public int compareTo(ContentClaim arg0) {
-                    return 0;
-                }
-    
-                @Override
-                public String getId() {
-                    return "0";
-                }
-    
-                @Override
-                public String getContainer() {
-                    return "container";
-                }
-    
-                @Override
-                public String getSection() {
-                    return "section";
-                }
-    
-                @Override
-                public boolean isLossTolerant() {
-                    return true;
-                }
-            }).build();
+                .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
+                .entryDate(System.currentTimeMillis())
+                .contentClaim(new ContentClaim() {
+                    @Override
+                    public int compareTo(ContentClaim arg0) {
+                        return 0;
+                    }
+
+                    @Override
+                    public String getId() {
+                        return "0";
+                    }
+
+                    @Override
+                    public String getContainer() {
+                        return "container";
+                    }
+
+                    @Override
+                    public String getSection() {
+                        return "section";
+                    }
+
+                    @Override
+                    public boolean isLossTolerant() {
+                        return true;
+                    }
+                }).build();
         flowFileQueue.put(flowFileRecord);
 
         FlowFile ff1 = session.get();
@@ -844,35 +884,35 @@ public class TestStandardProcessSession {
         session.commit();
 
         final FlowFileRecord flowFileRecord2 = new 
StandardFlowFileRecord.Builder()
-            .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
-            .entryDate(System.currentTimeMillis())
-            .contentClaim(new ContentClaim() {
-                @Override
-                public int compareTo(ContentClaim arg0) {
-                    return 0;
-                }
-    
-                @Override
-                public String getId() {
-                    return "0";
-                }
-    
-                @Override
-                public String getContainer() {
-                    return "container";
-                }
-    
-                @Override
-                public String getSection() {
-                    return "section";
-                }
-    
-                @Override
-                public boolean isLossTolerant() {
-                    return true;
-                }
-            })
-            .contentClaimOffset(1000L).size(1L).build();
+                .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
+                .entryDate(System.currentTimeMillis())
+                .contentClaim(new ContentClaim() {
+                    @Override
+                    public int compareTo(ContentClaim arg0) {
+                        return 0;
+                    }
+
+                    @Override
+                    public String getId() {
+                        return "0";
+                    }
+
+                    @Override
+                    public String getContainer() {
+                        return "container";
+                    }
+
+                    @Override
+                    public String getSection() {
+                        return "section";
+                    }
+
+                    @Override
+                    public boolean isLossTolerant() {
+                        return true;
+                    }
+                })
+                .contentClaimOffset(1000L).size(1L).build();
         flowFileQueue.put(flowFileRecord2);
 
         // attempt to read the data.
@@ -933,21 +973,20 @@ public class TestStandardProcessSession {
         }
     }
 
-    
     @Test
     public void testCreateEmitted() throws IOException {
         FlowFile newFlowFile = session.create();
         session.transfer(newFlowFile, new 
Relationship.Builder().name("A").build());
         session.commit();
-        
+
         final List<ProvenanceEventRecord> events = 
provenanceRepo.getEvents(0L, 10000);
         assertFalse(events.isEmpty());
         assertEquals(1, events.size());
-        
+
         final ProvenanceEventRecord event = events.get(0);
         assertEquals(ProvenanceEventType.CREATE, event.getEventType());
     }
-    
+
     @Test
     public void testContentModifiedNotEmittedForCreate() throws IOException {
         FlowFile newFlowFile = session.create();
@@ -958,23 +997,23 @@ public class TestStandardProcessSession {
         });
         session.transfer(newFlowFile, new 
Relationship.Builder().name("A").build());
         session.commit();
-        
+
         final List<ProvenanceEventRecord> events = 
provenanceRepo.getEvents(0L, 10000);
         assertFalse(events.isEmpty());
         assertEquals(1, events.size());
-        
+
         final ProvenanceEventRecord event = events.get(0);
         assertEquals(ProvenanceEventType.CREATE, event.getEventType());
     }
-    
+
     @Test
     public void testContentModifiedEmittedAndNotAttributesModified() throws 
IOException {
         final FlowFileRecord flowFile = new StandardFlowFileRecord.Builder()
-            .id(1L)
-            .addAttribute("uuid", "000000000000-0000-0000-0000-00000000")
-            .build();
+                .id(1L)
+                .addAttribute("uuid", "000000000000-0000-0000-0000-00000000")
+                .build();
         this.flowFileQueue.put(flowFile);
-        
+
         FlowFile existingFlowFile = session.get();
         existingFlowFile = session.write(existingFlowFile, new 
OutputStreamCallback() {
             @Override
@@ -984,38 +1023,36 @@ public class TestStandardProcessSession {
         existingFlowFile = session.putAttribute(existingFlowFile, "attr", "a");
         session.transfer(existingFlowFile, new 
Relationship.Builder().name("A").build());
         session.commit();
-        
+
         final List<ProvenanceEventRecord> events = 
provenanceRepo.getEvents(0L, 10000);
         assertFalse(events.isEmpty());
         assertEquals(1, events.size());
-        
+
         final ProvenanceEventRecord event = events.get(0);
         assertEquals(ProvenanceEventType.CONTENT_MODIFIED, 
event.getEventType());
     }
-    
+
     @Test
     public void testAttributesModifiedEmitted() throws IOException {
         final FlowFileRecord flowFile = new StandardFlowFileRecord.Builder()
-            .id(1L)
-            .addAttribute("uuid", "000000000000-0000-0000-0000-00000000")
-            .build();
+                .id(1L)
+                .addAttribute("uuid", "000000000000-0000-0000-0000-00000000")
+                .build();
         this.flowFileQueue.put(flowFile);
-        
+
         FlowFile existingFlowFile = session.get();
         existingFlowFile = session.putAttribute(existingFlowFile, "attr", "a");
         session.transfer(existingFlowFile, new 
Relationship.Builder().name("A").build());
         session.commit();
-        
+
         final List<ProvenanceEventRecord> events = 
provenanceRepo.getEvents(0L, 10000);
         assertFalse(events.isEmpty());
         assertEquals(1, events.size());
-        
+
         final ProvenanceEventRecord event = events.get(0);
         assertEquals(ProvenanceEventType.ATTRIBUTES_MODIFIED, 
event.getEventType());
     }
-    
-    
-    
+
     private static class MockFlowFileRepository implements FlowFileRepository {
 
         private final AtomicLong idGenerator = new AtomicLong(0L);
@@ -1082,7 +1119,7 @@ public class TestStandardProcessSession {
         @Override
         public void shutdown() {
         }
-        
+
         public Set<ContentClaim> getExistingClaims() {
             final Set<ContentClaim> claims = new HashSet<>();
 

Reply via email to