Repository: nifi
Updated Branches:
  refs/heads/0.x c2e98f96e -> a952dc96a


NIFI-2636 resolve thread safety problem in UnpackContent

Signed-off-by: Joe Skora <jsk...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/a952dc96
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/a952dc96
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/a952dc96

Branch: refs/heads/0.x
Commit: a952dc96a23a418e25f9dc49c8874b59adc34654
Parents: c2e98f9
Author: Mike Moser <mose...@apache.org>
Authored: Tue Aug 23 16:41:13 2016 -0400
Committer: Joe Skora <jsk...@apache.org>
Committed: Thu Sep 22 14:23:30 2016 -0400

----------------------------------------------------------------------
 .../nifi/processors/standard/UnpackContent.java | 67 +++++++++-----------
 .../processors/standard/TestUnpackContent.java  | 31 +++++++++
 2 files changed, 62 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/a952dc96/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UnpackContent.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UnpackContent.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UnpackContent.java
index 45e17c1..85b8029 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UnpackContent.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UnpackContent.java
@@ -145,8 +145,6 @@ public class UnpackContent extends AbstractProcessor {
     private Set<Relationship> relationships;
     private List<PropertyDescriptor> properties;
 
-    private Unpacker unpacker;
-    private boolean addFragmentAttrs;
     private Pattern fileFilter;
 
     private Unpacker tarUnpacker;
@@ -178,7 +176,6 @@ public class UnpackContent extends AbstractProcessor {
 
     @OnStopped
     public void onStopped() {
-        unpacker = null;
         fileFilter = null;
     }
 
@@ -191,35 +188,6 @@ public class UnpackContent extends AbstractProcessor {
         }
     }
 
-    public void initUnpacker(PackageFormat packagingFormat) {
-        switch (packagingFormat) {
-            case TAR_FORMAT:
-            case X_TAR_FORMAT:
-                unpacker = tarUnpacker;
-                addFragmentAttrs = true;
-                break;
-            case ZIP_FORMAT:
-                unpacker = zipUnpacker;
-                addFragmentAttrs = true;
-                break;
-            case FLOWFILE_STREAM_FORMAT_V2:
-                unpacker = new FlowFileStreamUnpacker(new 
FlowFileUnpackagerV2());
-                addFragmentAttrs = false;
-                break;
-            case FLOWFILE_STREAM_FORMAT_V3:
-                unpacker = new FlowFileStreamUnpacker(new 
FlowFileUnpackagerV3());
-                addFragmentAttrs = false;
-                break;
-            case FLOWFILE_TAR_FORMAT:
-                unpacker = new FlowFileStreamUnpacker(new 
FlowFileUnpackagerV1());
-                addFragmentAttrs = false;
-                break;
-            case AUTO_DETECT_FORMAT:
-                // The format of the unpacker should be known before 
initialization
-                throw new ProcessException(packagingFormat + " is not a valid 
packaging format");
-        }
-    }
-
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
         FlowFile flowFile = session.get();
@@ -247,11 +215,38 @@ public class UnpackContent extends AbstractProcessor {
                 logger.info("Cannot unpack {} because its mime.type attribute 
is set to '{}', which is not a format that can be unpacked; routing to 
'success'", new Object[]{flowFile, mimeType});
                 session.transfer(flowFile, REL_SUCCESS);
                 return;
-            } else {
-                initUnpacker(packagingFormat);
             }
-        } else {
-            initUnpacker(packagingFormat);
+        }
+
+        // set the Unpacker to use for this FlowFile.  FlowFileUnpackager 
objects maintain state and are not reusable.
+        final Unpacker unpacker;
+        final boolean addFragmentAttrs;
+        switch (packagingFormat) {
+        case TAR_FORMAT:
+        case X_TAR_FORMAT:
+            unpacker = tarUnpacker;
+            addFragmentAttrs = true;
+            break;
+        case ZIP_FORMAT:
+            unpacker = zipUnpacker;
+            addFragmentAttrs = true;
+            break;
+        case FLOWFILE_STREAM_FORMAT_V2:
+            unpacker = new FlowFileStreamUnpacker(new FlowFileUnpackagerV2());
+            addFragmentAttrs = false;
+            break;
+        case FLOWFILE_STREAM_FORMAT_V3:
+            unpacker = new FlowFileStreamUnpacker(new FlowFileUnpackagerV3());
+            addFragmentAttrs = false;
+            break;
+        case FLOWFILE_TAR_FORMAT:
+            unpacker = new FlowFileStreamUnpacker(new FlowFileUnpackagerV1());
+            addFragmentAttrs = false;
+            break;
+        case AUTO_DETECT_FORMAT:
+        default:
+            // The format of the unpacker should be known before initialization
+            throw new ProcessException(packagingFormat + " is not a valid 
packaging format");
         }
 
         final List<FlowFile> unpacked = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/nifi/blob/a952dc96/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUnpackContent.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUnpackContent.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUnpackContent.java
index c107e95..e91a6e6 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUnpackContent.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUnpackContent.java
@@ -341,4 +341,35 @@ public class TestUnpackContent {
         unpackRunner.assertTransferCount(UnpackContent.REL_ORIGINAL, 0);
         unpackRunner.assertTransferCount(UnpackContent.REL_FAILURE, 1);
     }
+
+    /*
+     * This test checks for thread safety problems when 
PackageFormat.AUTO_DETECT_FORMAT is used.
+     * It won't always fail if there is a issue with the code, but it will 
fail often enough to eventually be noticed.
+     * If this test fails at all, then it needs to be investigated.
+     */
+    @Test
+    public void testThreadSafetyUsingAutoDetect() throws IOException {
+        final TestRunner runner = TestRunners.newTestRunner(new 
UnpackContent());
+        runner.setProperty(UnpackContent.PACKAGING_FORMAT, 
UnpackContent.PackageFormat.AUTO_DETECT_FORMAT.toString());
+
+        Map<String, String> attrsTar = new HashMap<>(1);
+        Map<String, String> attrsFFv3 = new HashMap<>(1);
+        attrsTar.put("mime.type", 
UnpackContent.PackageFormat.TAR_FORMAT.getMimeType());
+        attrsFFv3.put("mime.type", 
UnpackContent.PackageFormat.FLOWFILE_STREAM_FORMAT_V3.getMimeType());
+
+        int numThreads = 50;
+        runner.setThreadCount(numThreads);
+
+        for (int i=0; i<numThreads; i++) {
+            if (i%2 == 0) {
+                runner.enqueue(dataPath.resolve("data.tar"), attrsTar);
+            } else {
+                runner.enqueue(dataPath.resolve("data.flowfilev3"), attrsFFv3);
+            }
+        }
+
+        runner.run(numThreads);
+
+        runner.assertTransferCount(UnpackContent.REL_SUCCESS, numThreads*2);
+    }
 }

Reply via email to