Repository: nifi
Updated Branches:
  refs/heads/master 6466931c2 -> 229b20f39


NIFI-3419: Routing flow files causing ZipException to failure in MergeContent

This closes #1454.


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/229b20f3
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/229b20f3
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/229b20f3

Branch: refs/heads/master
Commit: 229b20f3954e66f3f8a5c373e08b887f1209a315
Parents: 6466931
Author: Joe Gresock <[email protected]>
Authored: Mon Jan 30 12:19:32 2017 +0000
Committer: Pierre Villard <[email protected]>
Committed: Mon Feb 6 19:34:49 2017 +0100

----------------------------------------------------------------------
 .../nifi/processors/standard/MergeContent.java  | 19 +++++++++++++-----
 .../processors/standard/TestMergeContent.java   | 21 ++++++++++++++++++++
 2 files changed, 35 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/229b20f3/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java
index 1c77cbd..3401d66 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java
@@ -40,6 +40,7 @@ import java.util.TreeMap;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.regex.Pattern;
 import java.util.zip.ZipEntry;
+import java.util.zip.ZipException;
 import java.util.zip.ZipOutputStream;
 
 import org.apache.avro.Schema;
@@ -810,6 +811,8 @@ public class MergeContent extends BinFiles {
 
         private final int compressionLevel;
 
+        private List<FlowFile> unmerged = new ArrayList<>();
+
         public ZipMerge(final int compressionLevel) {
             this.compressionLevel = compressionLevel;
         }
@@ -820,6 +823,7 @@ public class MergeContent extends BinFiles {
 
             final ProcessSession session = bin.getSession();
             final List<FlowFile> contents = bin.getContents();
+            unmerged.addAll(contents);
 
             FlowFile bundle = session.create(contents);
 
@@ -835,10 +839,15 @@ public class MergeContent extends BinFiles {
                             final String entryName = path + 
flowFile.getAttribute(CoreAttributes.FILENAME.key());
                             final ZipEntry zipEntry = new ZipEntry(entryName);
                             zipEntry.setSize(flowFile.getSize());
-                            out.putNextEntry(zipEntry);
-
-                            bin.getSession().exportTo(flowFile, out);
-                            out.closeEntry();
+                            try {
+                                out.putNextEntry(zipEntry);
+
+                                bin.getSession().exportTo(flowFile, out);
+                                out.closeEntry();
+                                unmerged.remove(flowFile);
+                            } catch (ZipException e) {
+                                getLogger().error("Encountered exception 
merging {}", new Object[]{flowFile}, e);
+                            }
                         }
 
                         out.finish();
@@ -858,7 +867,7 @@ public class MergeContent extends BinFiles {
 
         @Override
         public List<FlowFile> getUnmergedFlowFiles() {
-            return Collections.emptyList();
+            return unmerged;
         }
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/229b20f3/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java
index 6590d47..b6025c5 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java
@@ -488,6 +488,27 @@ public class TestMergeContent {
     }
 
     @Test
+    public void testZipException() throws IOException {
+        final TestRunner runner = TestRunners.newTestRunner(new 
MergeContent());
+        runner.setProperty(MergeContent.MAX_BIN_AGE, "1 sec");
+        runner.setProperty(MergeContent.MERGE_FORMAT, 
MergeContent.MERGE_FORMAT_ZIP);
+
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put(CoreAttributes.MIME_TYPE.key(), 
"application/plain-text");
+        attributes.put("filename", "duplicate-filename.txt");
+
+        runner.enqueue("Hello".getBytes("UTF-8"), attributes);
+        runner.enqueue(", ".getBytes("UTF-8"), attributes);
+        runner.enqueue("World!".getBytes("UTF-8"), attributes);
+        runner.run();
+
+        runner.assertQueueEmpty();
+        runner.assertTransferCount(MergeContent.REL_MERGED, 1);
+        runner.assertTransferCount(MergeContent.REL_FAILURE, 2);
+        runner.assertTransferCount(MergeContent.REL_ORIGINAL, 3);
+    }
+
+    @Test
     public void testTar() throws IOException {
         final TestRunner runner = TestRunners.newTestRunner(new 
MergeContent());
         runner.setProperty(MergeContent.MAX_BIN_AGE, "1 sec");

Reply via email to