Repository: nifi Updated Branches: refs/heads/master 6466931c2 -> 229b20f39
NIFI-3419: Routing flow files causing ZipException to failure in MergeContent This closes #1454. Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/229b20f3 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/229b20f3 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/229b20f3 Branch: refs/heads/master Commit: 229b20f3954e66f3f8a5c373e08b887f1209a315 Parents: 6466931 Author: Joe Gresock <[email protected]> Authored: Mon Jan 30 12:19:32 2017 +0000 Committer: Pierre Villard <[email protected]> Committed: Mon Feb 6 19:34:49 2017 +0100 ---------------------------------------------------------------------- .../nifi/processors/standard/MergeContent.java | 19 +++++++++++++----- .../processors/standard/TestMergeContent.java | 21 ++++++++++++++++++++ 2 files changed, 35 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/229b20f3/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java index 1c77cbd..3401d66 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java @@ -40,6 +40,7 @@ import java.util.TreeMap; import java.util.concurrent.atomic.AtomicReference; import java.util.regex.Pattern; import java.util.zip.ZipEntry; +import java.util.zip.ZipException; import java.util.zip.ZipOutputStream; import org.apache.avro.Schema; @@ -810,6 +811,8 @@ public class MergeContent extends BinFiles { private final int compressionLevel; + private List<FlowFile> unmerged = new ArrayList<>(); + public ZipMerge(final int compressionLevel) { this.compressionLevel = compressionLevel; } @@ -820,6 +823,7 @@ public class MergeContent extends BinFiles { final ProcessSession session = bin.getSession(); final List<FlowFile> contents = bin.getContents(); + unmerged.addAll(contents); FlowFile bundle = session.create(contents); @@ -835,10 +839,15 @@ public class MergeContent extends BinFiles { final String entryName = path + flowFile.getAttribute(CoreAttributes.FILENAME.key()); final ZipEntry zipEntry = new ZipEntry(entryName); zipEntry.setSize(flowFile.getSize()); - out.putNextEntry(zipEntry); - - bin.getSession().exportTo(flowFile, out); - out.closeEntry(); + try { + out.putNextEntry(zipEntry); + + bin.getSession().exportTo(flowFile, out); + out.closeEntry(); + unmerged.remove(flowFile); + } catch (ZipException e) { + getLogger().error("Encountered exception merging {}", new Object[]{flowFile}, e); + } } out.finish(); @@ -858,7 +867,7 @@ public class MergeContent extends BinFiles { @Override public List<FlowFile> getUnmergedFlowFiles() { - return Collections.emptyList(); + return unmerged; } } http://git-wip-us.apache.org/repos/asf/nifi/blob/229b20f3/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java index 6590d47..b6025c5 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java @@ -488,6 +488,27 @@ public class TestMergeContent { } @Test + public void testZipException() throws IOException { + final TestRunner runner = TestRunners.newTestRunner(new MergeContent()); + runner.setProperty(MergeContent.MAX_BIN_AGE, "1 sec"); + runner.setProperty(MergeContent.MERGE_FORMAT, MergeContent.MERGE_FORMAT_ZIP); + + final Map<String, String> attributes = new HashMap<>(); + attributes.put(CoreAttributes.MIME_TYPE.key(), "application/plain-text"); + attributes.put("filename", "duplicate-filename.txt"); + + runner.enqueue("Hello".getBytes("UTF-8"), attributes); + runner.enqueue(", ".getBytes("UTF-8"), attributes); + runner.enqueue("World!".getBytes("UTF-8"), attributes); + runner.run(); + + runner.assertQueueEmpty(); + runner.assertTransferCount(MergeContent.REL_MERGED, 1); + runner.assertTransferCount(MergeContent.REL_FAILURE, 2); + runner.assertTransferCount(MergeContent.REL_ORIGINAL, 3); + } + + @Test public void testTar() throws IOException { final TestRunner runner = TestRunners.newTestRunner(new MergeContent()); runner.setProperty(MergeContent.MAX_BIN_AGE, "1 sec");
