Repository: nifi
Updated Branches:
  refs/heads/master 8a447eec6 -> 9e705a846


NIFI-1866 ProcessException handling in StandardProcessSession


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/9e705a84
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/9e705a84
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/9e705a84

Branch: refs/heads/master
Commit: 9e705a8468e924df3a18a4101afd2b8c4c654ffa
Parents: 8a447ee
Author: Pierre Villard <[email protected]>
Authored: Thu May 12 23:31:45 2016 +0200
Committer: Mark Payne <[email protected]>
Committed: Wed Jun 8 15:58:27 2016 -0400

----------------------------------------------------------------------
 .../repository/StandardProcessSession.java      | 45 +++++++++++++++++---
 .../repository/TestStandardProcessSession.java  | 39 +++++++++++++++++
 2 files changed, 77 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/9e705a84/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
index 426775e..062e515 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
@@ -2340,18 +2340,49 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
     public void exportTo(final FlowFile source, final OutputStream 
destination) {
         validateRecordState(source);
         final StandardRepositoryRecord record = records.get(source);
+
+        if(record.getCurrentClaim() == null) {
+            return;
+        }
+
         try {
-            if (record.getCurrentClaim() == null) {
-                return;
+            ensureNotAppending(record.getCurrentClaim());
+        } catch (final IOException e) {
+            throw new FlowFileAccessException("Failed to access ContentClaim 
for " + source.toString(), e);
+        }
+
+        try (final InputStream rawIn = getInputStream(source, 
record.getCurrentClaim(), record.getCurrentClaimOffset());
+                final InputStream limitedIn = new LimitedInputStream(rawIn, 
source.getSize());
+                final InputStream disableOnCloseIn = new 
DisableOnCloseInputStream(limitedIn);
+                final ByteCountingInputStream countingStream = new 
ByteCountingInputStream(disableOnCloseIn, this.bytesRead)) {
+
+            // We want to differentiate between IOExceptions thrown by the 
repository and IOExceptions thrown from
+            // Processor code. As a result, as have the 
FlowFileAccessInputStream that catches IOException from the repository
+            // and translates into either FlowFileAccessException or 
ContentNotFoundException. We keep track of any
+            // ContentNotFoundException because if it is thrown, the Processor 
code may catch it and do something else with it
+            // but in reality, if it is thrown, we want to know about it and 
handle it, even if the Processor code catches it.
+            final FlowFileAccessInputStream ffais = new 
FlowFileAccessInputStream(countingStream, source, record.getCurrentClaim());
+            boolean cnfeThrown = false;
+
+            try {
+                recursionSet.add(source);
+                StreamUtils.copy(ffais, destination, source.getSize());
+            } catch (final ContentNotFoundException cnfe) {
+                cnfeThrown = true;
+                throw cnfe;
+            } finally {
+                recursionSet.remove(source);
+                IOUtils.closeQuietly(ffais);
+                // if cnfeThrown is true, we don't need to re-throw the 
Exception; it will propagate.
+                if (!cnfeThrown && ffais.getContentNotFoundException() != 
null) {
+                    throw ffais.getContentNotFoundException();
+                }
             }
 
-            ensureNotAppending(record.getCurrentClaim());
-            final long size = 
context.getContentRepository().exportTo(record.getCurrentClaim(), destination, 
record.getCurrentClaimOffset(), source.getSize());
-            bytesRead.increment(size);
         } catch (final ContentNotFoundException nfe) {
             handleContentNotFound(nfe, record);
-        } catch (final Throwable t) {
-            throw new FlowFileAccessException("Failed to export " + source + " 
to " + destination + " due to " + t.toString(), t);
+        } catch (final IOException ex) {
+            throw new ProcessException("IOException thrown from " + 
connectableDescription + ": " + ex.toString(), ex);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/9e705a84/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
index e418fa4..4ae2080 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
@@ -21,8 +21,12 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.notNull;
 
 import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileNotFoundException;
@@ -325,6 +329,41 @@ public class TestStandardProcessSession {
     }
 
     @Test
+    public void testExportTo() throws IOException {
+        final ContentClaim claim = contentRepo.create(false);
+        final FlowFileRecord flowFileRecord = new 
StandardFlowFileRecord.Builder()
+            .contentClaim(claim)
+            .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
+            .entryDate(System.currentTimeMillis())
+            .build();
+        flowFileQueue.put(flowFileRecord);
+        FlowFile flowFile = session.get();
+        assertNotNull(flowFile);
+
+        flowFile = session.append(flowFile, new OutputStreamCallback() {
+            @Override
+            public void process(OutputStream out) throws IOException {
+                out.write("Hello World".getBytes());
+            }
+        });
+
+        // should be OK
+        ByteArrayOutputStream os = new ByteArrayOutputStream();
+        session.exportTo(flowFile, os);
+        assertEquals("Hello World", new String(os.toByteArray()));
+        os.close();
+
+        // should throw ProcessException because of IOException (from 
processor code)
+        FileOutputStream mock = Mockito.mock(FileOutputStream.class);
+        doThrow(new IOException()).when(mock).write((byte[]) notNull(), 
any(Integer.class), any(Integer.class));
+        try {
+            session.exportTo(flowFile, mock);
+            Assert.fail("Expected ProcessException");
+        } catch (ProcessException e) {
+        }
+    }
+
+    @Test
     public void testReadAfterSessionClosesStream() throws IOException {
         final ContentClaim claim = contentRepo.create(false);
         final FlowFileRecord flowFileRecord = new 
StandardFlowFileRecord.Builder()

Reply via email to