Repository: nifi Updated Branches: refs/heads/0.x c2e98f96e -> a952dc96a
NIFI-2636 resolve thread safety problem in UnpackContent Signed-off-by: Joe Skora <jsk...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/a952dc96 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/a952dc96 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/a952dc96 Branch: refs/heads/0.x Commit: a952dc96a23a418e25f9dc49c8874b59adc34654 Parents: c2e98f9 Author: Mike Moser <mose...@apache.org> Authored: Tue Aug 23 16:41:13 2016 -0400 Committer: Joe Skora <jsk...@apache.org> Committed: Thu Sep 22 14:23:30 2016 -0400 ---------------------------------------------------------------------- .../nifi/processors/standard/UnpackContent.java | 67 +++++++++----------- .../processors/standard/TestUnpackContent.java | 31 +++++++++ 2 files changed, 62 insertions(+), 36 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/a952dc96/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UnpackContent.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UnpackContent.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UnpackContent.java index 45e17c1..85b8029 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UnpackContent.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UnpackContent.java @@ -145,8 +145,6 @@ public class UnpackContent extends AbstractProcessor { private Set<Relationship> relationships; private List<PropertyDescriptor> properties; - private Unpacker unpacker; - private boolean addFragmentAttrs; private Pattern fileFilter; private Unpacker tarUnpacker; @@ -178,7 +176,6 @@ public class UnpackContent extends AbstractProcessor { @OnStopped public void onStopped() { - unpacker = null; fileFilter = null; } @@ -191,35 +188,6 @@ public class UnpackContent extends AbstractProcessor { } } - public void initUnpacker(PackageFormat packagingFormat) { - switch (packagingFormat) { - case TAR_FORMAT: - case X_TAR_FORMAT: - unpacker = tarUnpacker; - addFragmentAttrs = true; - break; - case ZIP_FORMAT: - unpacker = zipUnpacker; - addFragmentAttrs = true; - break; - case FLOWFILE_STREAM_FORMAT_V2: - unpacker = new FlowFileStreamUnpacker(new FlowFileUnpackagerV2()); - addFragmentAttrs = false; - break; - case FLOWFILE_STREAM_FORMAT_V3: - unpacker = new FlowFileStreamUnpacker(new FlowFileUnpackagerV3()); - addFragmentAttrs = false; - break; - case FLOWFILE_TAR_FORMAT: - unpacker = new FlowFileStreamUnpacker(new FlowFileUnpackagerV1()); - addFragmentAttrs = false; - break; - case AUTO_DETECT_FORMAT: - // The format of the unpacker should be known before initialization - throw new ProcessException(packagingFormat + " is not a valid packaging format"); - } - } - @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { FlowFile flowFile = session.get(); @@ -247,11 +215,38 @@ public class UnpackContent extends AbstractProcessor { logger.info("Cannot unpack {} because its mime.type attribute is set to '{}', which is not a format that can be unpacked; routing to 'success'", new Object[]{flowFile, mimeType}); session.transfer(flowFile, REL_SUCCESS); return; - } else { - initUnpacker(packagingFormat); } - } else { - initUnpacker(packagingFormat); + } + + // set the Unpacker to use for this FlowFile. FlowFileUnpackager objects maintain state and are not reusable. + final Unpacker unpacker; + final boolean addFragmentAttrs; + switch (packagingFormat) { + case TAR_FORMAT: + case X_TAR_FORMAT: + unpacker = tarUnpacker; + addFragmentAttrs = true; + break; + case ZIP_FORMAT: + unpacker = zipUnpacker; + addFragmentAttrs = true; + break; + case FLOWFILE_STREAM_FORMAT_V2: + unpacker = new FlowFileStreamUnpacker(new FlowFileUnpackagerV2()); + addFragmentAttrs = false; + break; + case FLOWFILE_STREAM_FORMAT_V3: + unpacker = new FlowFileStreamUnpacker(new FlowFileUnpackagerV3()); + addFragmentAttrs = false; + break; + case FLOWFILE_TAR_FORMAT: + unpacker = new FlowFileStreamUnpacker(new FlowFileUnpackagerV1()); + addFragmentAttrs = false; + break; + case AUTO_DETECT_FORMAT: + default: + // The format of the unpacker should be known before initialization + throw new ProcessException(packagingFormat + " is not a valid packaging format"); } final List<FlowFile> unpacked = new ArrayList<>(); http://git-wip-us.apache.org/repos/asf/nifi/blob/a952dc96/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUnpackContent.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUnpackContent.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUnpackContent.java index c107e95..e91a6e6 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUnpackContent.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUnpackContent.java @@ -341,4 +341,35 @@ public class TestUnpackContent { unpackRunner.assertTransferCount(UnpackContent.REL_ORIGINAL, 0); unpackRunner.assertTransferCount(UnpackContent.REL_FAILURE, 1); } + + /* + * This test checks for thread safety problems when PackageFormat.AUTO_DETECT_FORMAT is used. + * It won't always fail if there is a issue with the code, but it will fail often enough to eventually be noticed. + * If this test fails at all, then it needs to be investigated. + */ + @Test + public void testThreadSafetyUsingAutoDetect() throws IOException { + final TestRunner runner = TestRunners.newTestRunner(new UnpackContent()); + runner.setProperty(UnpackContent.PACKAGING_FORMAT, UnpackContent.PackageFormat.AUTO_DETECT_FORMAT.toString()); + + Map<String, String> attrsTar = new HashMap<>(1); + Map<String, String> attrsFFv3 = new HashMap<>(1); + attrsTar.put("mime.type", UnpackContent.PackageFormat.TAR_FORMAT.getMimeType()); + attrsFFv3.put("mime.type", UnpackContent.PackageFormat.FLOWFILE_STREAM_FORMAT_V3.getMimeType()); + + int numThreads = 50; + runner.setThreadCount(numThreads); + + for (int i=0; i<numThreads; i++) { + if (i%2 == 0) { + runner.enqueue(dataPath.resolve("data.tar"), attrsTar); + } else { + runner.enqueue(dataPath.resolve("data.flowfilev3"), attrsFFv3); + } + } + + runner.run(numThreads); + + runner.assertTransferCount(UnpackContent.REL_SUCCESS, numThreads*2); + } }