NIFI-396 created tests to demonstrate the situations where the ProcessSession 
throws an Exception and where it doesn't after it returns from the callback


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/7272d0df
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/7272d0df
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/7272d0df

Branch: refs/heads/NIFI-250
Commit: 7272d0df58c23d099809bf96993b55d73c617476
Parents: dea9e22
Author: Bobby Owolabi <[email protected]>
Authored: Wed Mar 18 23:30:57 2015 -0400
Committer: Bobby Owolabi <[email protected]>
Committed: Wed Mar 18 23:30:57 2015 -0400

----------------------------------------------------------------------
 .../repository/TestStandardProcessSession.java  | 108 +++++++++++++++++++
 1 file changed, 108 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/7272d0df/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
index 1ff63c5..2d09ea5 100644
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
+++ 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
@@ -56,6 +56,7 @@ import 
org.apache.nifi.controller.repository.claim.StandardContentClaimManager;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.groups.ProcessGroup;
 import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.FlowFileAccessException;
 import org.apache.nifi.processor.exception.MissingFlowFileException;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.io.InputStreamCallback;
@@ -65,6 +66,8 @@ import 
org.apache.nifi.provenance.MockProvenanceEventRepository;
 import org.apache.nifi.provenance.ProvenanceEventRecord;
 import org.apache.nifi.provenance.ProvenanceEventRepository;
 import org.apache.nifi.provenance.ProvenanceEventType;
+import org.apache.nifi.stream.io.ByteArrayOutputStream;
+import org.apache.nifi.util.ObjectHolder;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -235,6 +238,105 @@ public class TestStandardProcessSession {
         assertEquals(0, contentRepo.getExistingClaims().size());
     }
 
+    @Test(expected = FlowFileAccessException.class)
+    public void testAppendAfterSessionClosesStream() throws IOException {
+        final ContentClaim claim = contentRepo.create(false);
+        final FlowFileRecord flowFileRecord = new 
StandardFlowFileRecord.Builder()
+                .contentClaim(claim)
+                .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
+                .entryDate(System.currentTimeMillis())
+                .build();
+        flowFileQueue.put(flowFileRecord);
+        FlowFile flowFile = session.get();
+        assertNotNull(flowFile);
+        final ObjectHolder<OutputStream> outputStreamHolder = new 
ObjectHolder<>(null);
+        flowFile = session.append(flowFile, new OutputStreamCallback() {
+            @Override
+            public void process(final OutputStream outputStream) throws 
IOException {
+                outputStreamHolder.set(outputStream);
+            }
+        });
+        try (final OutputStream outputStream = outputStreamHolder.get()) {
+            outputStream.write(5);
+        }
+    }
+
+    @Test(expected = FlowFileAccessException.class)
+    public void testReadAfterSessionClosesStream() throws IOException {
+        final ContentClaim claim = contentRepo.create(false);
+        final FlowFileRecord flowFileRecord = new 
StandardFlowFileRecord.Builder()
+                .contentClaim(claim)
+                .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
+                .entryDate(System.currentTimeMillis())
+                .build();
+        flowFileQueue.put(flowFileRecord);
+        FlowFile flowFile = session.get();
+        assertNotNull(flowFile);
+        final ObjectHolder<InputStream> inputStreamHolder = new 
ObjectHolder<>(null);
+        session.read(flowFile, new InputStreamCallback() {
+            @Override
+            public void process(final InputStream inputStream) throws 
IOException {
+                inputStreamHolder.set(inputStream);
+            }
+        });
+        try (final InputStream inputStream = inputStreamHolder.get()) {
+            inputStream.read();
+        }
+    }
+
+    @Test
+    public void testStreamAfterSessionClosesStream() throws IOException {
+        final ContentClaim claim = contentRepo.create(false);
+        final FlowFileRecord flowFileRecord = new 
StandardFlowFileRecord.Builder()
+                .contentClaim(claim)
+                .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
+                .entryDate(System.currentTimeMillis())
+                .build();
+        flowFileQueue.put(flowFileRecord);
+        FlowFile flowFile = session.get();
+        assertNotNull(flowFile);
+        final ObjectHolder<InputStream> inputStreamHolder = new 
ObjectHolder<>(null);
+        final ObjectHolder<OutputStream> outputStreamHolder = new 
ObjectHolder<>(null);
+        flowFile = session.write(flowFile, new StreamCallback() {
+            @Override
+            public void process(final InputStream input, final OutputStream 
output) throws IOException {
+                inputStreamHolder.set(input);
+                outputStreamHolder.set(output);
+            }
+        });
+        try (final InputStream inputStream = inputStreamHolder.get()) {
+            inputStream.read();
+            Assert.fail("Expected Exception to be thrown when read is 
attempted after session closes stream");
+        } catch (final Exception ex) {}
+        try (final OutputStream outputStream = outputStreamHolder.get()) {
+            outputStream.write(5);
+            Assert.fail("Expected Exception to be thrown when write is 
attempted after session closes stream");
+        } catch (final Exception ex) {}
+    }
+
+    @Test(expected = FlowFileAccessException.class)
+    public void testWriteAfterSessionClosesStream() throws IOException {
+        final ContentClaim claim = contentRepo.create(false);
+        final FlowFileRecord flowFileRecord = new 
StandardFlowFileRecord.Builder()
+                .contentClaim(claim)
+                .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
+                .entryDate(System.currentTimeMillis())
+                .build();
+        flowFileQueue.put(flowFileRecord);
+        FlowFile flowFile = session.get();
+        assertNotNull(flowFile);
+        final ObjectHolder<OutputStream> outputStreamHolder = new 
ObjectHolder<>(null);
+        flowFile = session.write(flowFile, new OutputStreamCallback() {
+            @Override
+            public void process(final OutputStream out) throws IOException {
+                outputStreamHolder.set(out);
+            }
+        });
+        try (final OutputStream outputStream = outputStreamHolder.get()) {
+            outputStream.write(5);
+        }
+    }
+
     @Test
     public void testCreateThenRollbackRemovesContent() throws IOException {
 
@@ -998,6 +1100,12 @@ public class TestStandardProcessSession {
         public ContentClaim create(boolean lossTolerant) throws IOException {
             final ContentClaim claim = 
claimManager.newContentClaim("container", "section", 
String.valueOf(idGenerator.getAndIncrement()), false);
             claimantCounts.put(claim, new AtomicInteger(1));
+            final Path path = getPath(claim);
+            final Path parent = path.getParent();
+            if (Files.exists(parent) == false) {
+                Files.createDirectories(parent);
+            }
+            Files.createFile(getPath(claim));
             return claim;
         }
 

Reply via email to