Repository: nifi
Updated Branches:
  refs/heads/master 86bba1b20 -> f5060a6d6


NIFI-1568: Add Filter Capability to UnpackContent

Adds a "File Filter" property to the `UnpackContent` processor
to allow users to specify which files are eligible for extraction.
By default, all files will be extracted.

Signed-off-by: Matt Burgess <mattyb...@apache.org>

Refactor how Unpacker is initialized

Re-uses unpackers to avoid creating them each time a
flowfile is received. Moved the package formats into
an enum for clarity.

Signed-off-by: Matt Burgess <mattyb...@apache.org>

Fix packaging format enum warning

The `AUTO_DETECT_FORMAT` enum for PackagingFormat
is not valid for initilization. When this value
is set, then we use the mime.type to determine
which packaging format to use.

We never pass `AUTO_DETECT_FORMAT` to the
`initUnpacker` method.

Signed-off-by: Matt Burgess <mattyb...@apache.org>

This closes #248


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/f5060a6d
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/f5060a6d
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/f5060a6d

Branch: refs/heads/master
Commit: f5060a6d63dced1ac2a34b49ba134d578d9cd97a
Parents: 86bba1b
Author: ricky <ri...@cloudera.com>
Authored: Thu Feb 25 17:20:51 2016 -0500
Committer: Matt Burgess <mattyb...@apache.org>
Committed: Mon Jun 20 10:44:38 2016 -0400

----------------------------------------------------------------------
 .../nifi/processors/standard/UnpackContent.java | 230 +++++++++++++------
 .../processors/standard/TestUnpackContent.java  | 113 ++++++++-
 2 files changed, 264 insertions(+), 79 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/f5060a6d/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UnpackContent.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UnpackContent.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UnpackContent.java
index decbd54..933b027 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UnpackContent.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UnpackContent.java
@@ -29,6 +29,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
+import java.util.regex.Pattern;
 
 import org.apache.commons.compress.archivers.ArchiveEntry;
 import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
@@ -45,6 +46,8 @@ import org.apache.nifi.annotation.behavior.WritesAttributes;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.SeeAlso;
 import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
@@ -57,6 +60,7 @@ import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.io.InputStreamCallback;
 import org.apache.nifi.processor.io.OutputStreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.stream.io.BufferedInputStream;
 import org.apache.nifi.stream.io.BufferedOutputStream;
 import org.apache.nifi.stream.io.StreamUtils;
@@ -78,7 +82,7 @@ import org.apache.nifi.util.ObjectHolder;
         + "the attribute is set to application/zip, the ZIP Packaging Format 
will be used. If the attribute is set to application/flowfile-v3 or "
         + "application/flowfile-v2 or application/flowfile-v1, the appropriate 
FlowFile Packaging Format will be used. If this attribute is missing, "
         + "the FlowFile will be routed to 'failure'. Otherwise, if the 
attribute's value is not one of those mentioned above, the FlowFile will be "
-        + "routed to 'success' without being unpacked")
+        + "routed to 'success' without being unpacked. Use the File Filter 
property only extract files matching a specific regular expression.")
 @WritesAttributes({
     @WritesAttribute(attribute = "mime.type", description = "If the FlowFile 
is successfully unpacked, its MIME Type is no longer known, so the mime.type "
             + "attribute is set to application/octet-stream."),
@@ -91,14 +95,6 @@ import org.apache.nifi.util.ObjectHolder;
             + "the MergeContent processor automatically adds those extensions 
if it is used to rebuild the original FlowFile")})
 @SeeAlso(MergeContent.class)
 public class UnpackContent extends AbstractProcessor {
-
-    public static final String AUTO_DETECT_FORMAT = "use mime.type attribute";
-    public static final String TAR_FORMAT = "tar";
-    public static final String ZIP_FORMAT = "zip";
-    public static final String FLOWFILE_STREAM_FORMAT_V3 = 
"flowfile-stream-v3";
-    public static final String FLOWFILE_STREAM_FORMAT_V2 = 
"flowfile-stream-v2";
-    public static final String FLOWFILE_TAR_FORMAT = "flowfile-tar-v1";
-
     // attribute keys
     public static final String FRAGMENT_ID = "fragment.identifier";
     public static final String FRAGMENT_INDEX = "fragment.index";
@@ -111,8 +107,18 @@ public class UnpackContent extends AbstractProcessor {
             .name("Packaging Format")
             .description("The Packaging Format used to create the file")
             .required(true)
-            .allowableValues(AUTO_DETECT_FORMAT, TAR_FORMAT, ZIP_FORMAT, 
FLOWFILE_STREAM_FORMAT_V3, FLOWFILE_STREAM_FORMAT_V2, FLOWFILE_TAR_FORMAT)
-            .defaultValue(AUTO_DETECT_FORMAT)
+            .allowableValues(PackageFormat.AUTO_DETECT_FORMAT.toString(), 
PackageFormat.TAR_FORMAT.toString(),
+                    PackageFormat.ZIP_FORMAT.toString(), 
PackageFormat.FLOWFILE_STREAM_FORMAT_V3.toString(),
+                    PackageFormat.FLOWFILE_STREAM_FORMAT_V2.toString(), 
PackageFormat.FLOWFILE_TAR_FORMAT.toString())
+            .defaultValue(PackageFormat.AUTO_DETECT_FORMAT.toString())
+            .build();
+
+    public static final PropertyDescriptor FILE_FILTER = new 
PropertyDescriptor.Builder()
+            .name("File Filter")
+            .description("Only files whose names match the given regular 
expression will be extracted (tar/zip only)")
+            .required(true)
+            .defaultValue(".*")
+            .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
             .build();
 
     public static final Relationship REL_SUCCESS = new Relationship.Builder()
@@ -131,6 +137,16 @@ public class UnpackContent extends AbstractProcessor {
     private Set<Relationship> relationships;
     private List<PropertyDescriptor> properties;
 
+    private Unpacker unpacker;
+    private boolean addFragmentAttrs;
+    private Pattern fileFilter;
+
+    private Unpacker tarUnpacker;
+    private Unpacker zipUnpacker;
+    private Unpacker flowFileStreamV3Unpacker;
+    private Unpacker flowFileStreamV2Unpacker;
+    private Unpacker flowFileTarUnpacker;
+
     @Override
     protected void init(final ProcessorInitializationContext context) {
         final Set<Relationship> relationships = new HashSet<>();
@@ -141,6 +157,7 @@ public class UnpackContent extends AbstractProcessor {
 
         final List<PropertyDescriptor> properties = new ArrayList<>();
         properties.add(PACKAGING_FORMAT);
+        properties.add(FILE_FILTER);
         this.properties = Collections.unmodifiableList(properties);
     }
 
@@ -154,75 +171,88 @@ public class UnpackContent extends AbstractProcessor {
         return properties;
     }
 
-    @Override
-    public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
-        FlowFile flowFile = session.get();
-        if (flowFile == null) {
-            return;
-        }
+    @OnStopped
+    public void onStopped() {
+        unpacker = null;
+        fileFilter = null;
+    }
 
-        final ComponentLog logger = getLogger();
-        String packagingFormat = 
context.getProperty(PACKAGING_FORMAT).getValue().toLowerCase();
-        if (AUTO_DETECT_FORMAT.equals(packagingFormat)) {
-            final String mimeType = 
flowFile.getAttribute(CoreAttributes.MIME_TYPE.key());
-            if (mimeType == null) {
-                logger.error("No mime.type attribute set for {}; routing to 
failure", new Object[]{flowFile});
-                session.transfer(flowFile, REL_FAILURE);
-                return;
-            }
+    @OnScheduled
+    public void onScheduled(ProcessContext context) throws ProcessException {
+        if (fileFilter == null) {
+            fileFilter = 
Pattern.compile(context.getProperty(FILE_FILTER).getValue());
+            tarUnpacker = new TarUnpacker(fileFilter);
+            zipUnpacker = new ZipUnpacker(fileFilter);
+            flowFileStreamV3Unpacker = new FlowFileStreamUnpacker(new 
FlowFileUnpackagerV3());
+            flowFileStreamV2Unpacker = new FlowFileStreamUnpacker(new 
FlowFileUnpackagerV2());
+            flowFileTarUnpacker = new FlowFileStreamUnpacker(new 
FlowFileUnpackagerV1());
+        }
 
-            switch (mimeType.toLowerCase()) {
-                case "application/tar":
-                    packagingFormat = TAR_FORMAT;
-                    break;
-                case "application/x-tar":
-                    packagingFormat = TAR_FORMAT;
-                    break;
-                case "application/zip":
-                    packagingFormat = ZIP_FORMAT;
-                    break;
-                case "application/flowfile-v3":
-                    packagingFormat = FLOWFILE_STREAM_FORMAT_V3;
-                    break;
-                case "application/flowfile-v2":
-                    packagingFormat = FLOWFILE_STREAM_FORMAT_V2;
-                    break;
-                case "application/flowfile-v1":
-                    packagingFormat = FLOWFILE_TAR_FORMAT;
-                    break;
-                default: {
-                    logger.info("Cannot unpack {} because its mime.type 
attribute is set to '{}', which is not a format that can be unpacked; routing 
to 'success'", new Object[]{flowFile, mimeType});
-                    session.transfer(flowFile, REL_SUCCESS);
-                    return;
-                }
-            }
+        PackageFormat format = 
PackageFormat.getFormat(context.getProperty(PACKAGING_FORMAT).getValue());
+        if (format != PackageFormat.AUTO_DETECT_FORMAT && unpacker == null) {
+            initUnpacker(format);
         }
+    }
 
-        final Unpacker unpacker;
-        final boolean addFragmentAttrs;
+    public void initUnpacker(PackageFormat packagingFormat) {
         switch (packagingFormat) {
             case TAR_FORMAT:
-                unpacker = new TarUnpacker();
+            case X_TAR_FORMAT:
+                unpacker = tarUnpacker;
                 addFragmentAttrs = true;
                 break;
             case ZIP_FORMAT:
-                unpacker = new ZipUnpacker();
+                unpacker = zipUnpacker;
                 addFragmentAttrs = true;
                 break;
             case FLOWFILE_STREAM_FORMAT_V2:
-                unpacker = new FlowFileStreamUnpacker(new 
FlowFileUnpackagerV2());
+                unpacker = flowFileStreamV2Unpacker;
                 addFragmentAttrs = false;
                 break;
             case FLOWFILE_STREAM_FORMAT_V3:
-                unpacker = new FlowFileStreamUnpacker(new 
FlowFileUnpackagerV3());
+                unpacker = flowFileStreamV3Unpacker;
                 addFragmentAttrs = false;
                 break;
             case FLOWFILE_TAR_FORMAT:
-                unpacker = new FlowFileStreamUnpacker(new 
FlowFileUnpackagerV1());
+                unpacker = flowFileTarUnpacker;
                 addFragmentAttrs = false;
                 break;
-            default:
-                throw new AssertionError("Packaging Format was " + 
context.getProperty(PACKAGING_FORMAT).getValue());
+            case AUTO_DETECT_FORMAT:
+                // The format of the unpacker should be known before 
initialization
+                throw new ProcessException(packagingFormat + " is not a valid 
packaging format");
+        }
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        final ComponentLog logger = getLogger();
+        PackageFormat packagingFormat = 
PackageFormat.getFormat(context.getProperty(PACKAGING_FORMAT).getValue().toLowerCase());
+        if (packagingFormat == PackageFormat.AUTO_DETECT_FORMAT) {
+            packagingFormat = null;
+            final String mimeType = 
flowFile.getAttribute(CoreAttributes.MIME_TYPE.key());
+            if (mimeType == null) {
+                logger.error("No mime.type attribute set for {}; routing to 
failure", new Object[]{flowFile});
+                session.transfer(flowFile, REL_FAILURE);
+                return;
+            }
+
+            for (PackageFormat format: PackageFormat.values()) {
+                if (mimeType.toLowerCase().equals(format.getMimeType())) {
+                    packagingFormat = format;
+                }
+            }
+            if (packagingFormat == null) {
+                logger.info("Cannot unpack {} because its mime.type attribute 
is set to '{}', which is not a format that can be unpacked; routing to 
'success'", new Object[]{flowFile, mimeType});
+                session.transfer(flowFile, REL_SUCCESS);
+                return;
+            } else {
+                initUnpacker(packagingFormat);
+            }
         }
 
         final List<FlowFile> unpacked = new ArrayList<>();
@@ -248,12 +278,26 @@ public class UnpackContent extends AbstractProcessor {
         }
     }
 
-    private static interface Unpacker {
+    private static abstract class Unpacker {
+        private Pattern fileFilter = null;
+
+        public Unpacker() {};
 
-        void unpack(ProcessSession session, FlowFile source, List<FlowFile> 
unpacked);
+        public Unpacker(Pattern fileFilter) {
+            this.fileFilter = fileFilter;
+        }
+
+        abstract void unpack(ProcessSession session, FlowFile source, 
List<FlowFile> unpacked);
+
+        protected boolean fileMatches(ArchiveEntry entry) {
+            return fileFilter == null || 
fileFilter.matcher(entry.getName()).find();
+        }
     }
 
-    private static class TarUnpacker implements Unpacker {
+    private static class TarUnpacker extends Unpacker {
+        public TarUnpacker(Pattern fileFilter) {
+            super(fileFilter);
+        }
 
         @Override
         public void unpack(final ProcessSession session, final FlowFile 
source, final List<FlowFile> unpacked) {
@@ -265,7 +309,7 @@ public class UnpackContent extends AbstractProcessor {
                     try (final TarArchiveInputStream tarIn = new 
TarArchiveInputStream(new BufferedInputStream(in))) {
                         TarArchiveEntry tarEntry;
                         while ((tarEntry = tarIn.getNextTarEntry()) != null) {
-                            if (tarEntry.isDirectory()) {
+                            if (tarEntry.isDirectory() || 
!fileMatches(tarEntry)) {
                                 continue;
                             }
                             final File file = new File(tarEntry.getName());
@@ -304,7 +348,10 @@ public class UnpackContent extends AbstractProcessor {
         }
     }
 
-    private static class ZipUnpacker implements Unpacker {
+    private static class ZipUnpacker extends Unpacker {
+        public ZipUnpacker(Pattern fileFilter) {
+            super(fileFilter);
+        }
 
         @Override
         public void unpack(final ProcessSession session, final FlowFile 
source, final List<FlowFile> unpacked) {
@@ -316,7 +363,7 @@ public class UnpackContent extends AbstractProcessor {
                     try (final ZipArchiveInputStream zipIn = new 
ZipArchiveInputStream(new BufferedInputStream(in))) {
                         ArchiveEntry zipEntry;
                         while ((zipEntry = zipIn.getNextEntry()) != null) {
-                            if (zipEntry.isDirectory()) {
+                            if (zipEntry.isDirectory() || 
!fileMatches(zipEntry)) {
                                 continue;
                             }
                             final File file = new File(zipEntry.getName());
@@ -352,7 +399,7 @@ public class UnpackContent extends AbstractProcessor {
         }
     }
 
-    private static class FlowFileStreamUnpacker implements Unpacker {
+    private static class FlowFileStreamUnpacker extends Unpacker {
 
         private final FlowFileUnpackager unpackager;
 
@@ -445,4 +492,53 @@ public class UnpackContent extends AbstractProcessor {
             unpacked.add(newFF);
         }
     }
+
+    protected enum PackageFormat {
+        AUTO_DETECT_FORMAT("use mime.type attribute"),
+        TAR_FORMAT("tar", "application/tar"),
+        X_TAR_FORMAT("tar", "application/x-tar"),
+        ZIP_FORMAT("zip", "application/zip"),
+        FLOWFILE_STREAM_FORMAT_V3("flowfile-stream-v3", 
"application/flowfile-v3"),
+        FLOWFILE_STREAM_FORMAT_V2("flowfile-stream-v2", 
"application/flowfile-v2"),
+        FLOWFILE_TAR_FORMAT("flowfile-tar-v1", "application/flowfile-v1");
+
+
+        private final String textValue;
+        private String mimeType;
+
+        PackageFormat(String textValue, String mimeType) {
+            this.textValue = textValue;
+            this.mimeType = mimeType;
+        }
+
+        PackageFormat(String textValue) {
+            this.textValue = textValue;
+        }
+
+        @Override public String toString() {
+            return textValue;
+        }
+
+        public String getMimeType() {
+            return mimeType;
+        }
+
+        public static PackageFormat getFormat(String textValue) {
+            switch (textValue) {
+            case "use mime.type attribute":
+                return AUTO_DETECT_FORMAT;
+            case "tar":
+                return TAR_FORMAT;
+            case "zip":
+                return ZIP_FORMAT;
+            case "flowfile-stream-v3":
+                return FLOWFILE_STREAM_FORMAT_V3;
+            case "flowfile-stream-v2":
+                return FLOWFILE_STREAM_FORMAT_V2;
+            case "flowfile-stream-v1":
+                return FLOWFILE_TAR_FORMAT;
+            }
+            return null;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/f5060a6d/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUnpackContent.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUnpackContent.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUnpackContent.java
index 0b0cc43..acebdea 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUnpackContent.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUnpackContent.java
@@ -42,13 +42,13 @@ public class TestUnpackContent {
     public void testTar() throws IOException {
         final TestRunner unpackRunner = TestRunners.newTestRunner(new 
UnpackContent());
         final TestRunner autoUnpackRunner = TestRunners.newTestRunner(new 
UnpackContent());
-        unpackRunner.setProperty(UnpackContent.PACKAGING_FORMAT, 
UnpackContent.TAR_FORMAT);
-        autoUnpackRunner.setProperty(UnpackContent.PACKAGING_FORMAT, 
UnpackContent.AUTO_DETECT_FORMAT);
+        unpackRunner.setProperty(UnpackContent.PACKAGING_FORMAT, 
UnpackContent.PackageFormat.TAR_FORMAT.toString());
+        autoUnpackRunner.setProperty(UnpackContent.PACKAGING_FORMAT, 
UnpackContent.PackageFormat.AUTO_DETECT_FORMAT.toString());
         unpackRunner.enqueue(dataPath.resolve("data.tar"));
         Map<String, String> attributes = new HashMap<>(1);
         Map<String, String> attributes2 = new HashMap<>(1);
-        attributes.put("mime.type", "application/x-tar");
-        attributes2.put("mime.type", "application/tar");
+        attributes.put("mime.type", 
UnpackContent.PackageFormat.TAR_FORMAT.getMimeType());
+        attributes2.put("mime.type", 
UnpackContent.PackageFormat.X_TAR_FORMAT.getMimeType());
         autoUnpackRunner.enqueue(dataPath.resolve("data.tar"), attributes);
         autoUnpackRunner.enqueue(dataPath.resolve("data.tar"), attributes2);
         unpackRunner.run();
@@ -74,11 +74,57 @@ public class TestUnpackContent {
     }
 
     @Test
+    public void testTarWithFilter() throws IOException {
+        final TestRunner unpackRunner = TestRunners.newTestRunner(new 
UnpackContent());
+        final TestRunner autoUnpackRunner = TestRunners.newTestRunner(new 
UnpackContent());
+        unpackRunner.setProperty(UnpackContent.PACKAGING_FORMAT, 
UnpackContent.PackageFormat.TAR_FORMAT.toString());
+        unpackRunner.setProperty(UnpackContent.FILE_FILTER, 
"^folder/date.txt$");
+        autoUnpackRunner.setProperty(UnpackContent.PACKAGING_FORMAT, 
UnpackContent.PackageFormat.AUTO_DETECT_FORMAT.toString());
+        autoUnpackRunner.setProperty(UnpackContent.FILE_FILTER, 
"^folder/cal.txt$");
+        unpackRunner.enqueue(dataPath.resolve("data.tar"));
+        Map<String, String> attributes = new HashMap<>(1);
+        Map<String, String> attributes2 = new HashMap<>(1);
+        attributes.put("mime.type", "application/x-tar");
+        attributes2.put("mime.type", "application/tar");
+        autoUnpackRunner.enqueue(dataPath.resolve("data.tar"), attributes);
+        autoUnpackRunner.enqueue(dataPath.resolve("data.tar"), attributes2);
+        unpackRunner.run();
+        autoUnpackRunner.run(2);
+
+        unpackRunner.assertTransferCount(UnpackContent.REL_SUCCESS, 1);
+        unpackRunner.assertTransferCount(UnpackContent.REL_ORIGINAL, 1);
+        unpackRunner.assertTransferCount(UnpackContent.REL_FAILURE, 0);
+
+        autoUnpackRunner.assertTransferCount(UnpackContent.REL_SUCCESS, 2);
+        autoUnpackRunner.assertTransferCount(UnpackContent.REL_ORIGINAL, 2);
+        autoUnpackRunner.assertTransferCount(UnpackContent.REL_FAILURE, 0);
+
+        List<MockFlowFile> unpacked = 
unpackRunner.getFlowFilesForRelationship(UnpackContent.REL_SUCCESS);
+        for (final MockFlowFile flowFile : unpacked) {
+            final String filename = 
flowFile.getAttribute(CoreAttributes.FILENAME.key());
+            final String folder = 
flowFile.getAttribute(CoreAttributes.PATH.key());
+            final Path path = dataPath.resolve(folder).resolve(filename);
+            assertTrue(Files.exists(path));
+            assertEquals("date.txt", filename);
+            flowFile.assertContentEquals(path.toFile());
+        }
+        unpacked = 
autoUnpackRunner.getFlowFilesForRelationship(UnpackContent.REL_SUCCESS);
+        for (final MockFlowFile flowFile : unpacked) {
+            final String filename = 
flowFile.getAttribute(CoreAttributes.FILENAME.key());
+            final String folder = 
flowFile.getAttribute(CoreAttributes.PATH.key());
+            final Path path = dataPath.resolve(folder).resolve(filename);
+            assertTrue(Files.exists(path));
+            assertEquals("cal.txt", filename);
+            flowFile.assertContentEquals(path.toFile());
+        }
+    }
+
+    @Test
     public void testZip() throws IOException {
         final TestRunner unpackRunner = TestRunners.newTestRunner(new 
UnpackContent());
         final TestRunner autoUnpackRunner = TestRunners.newTestRunner(new 
UnpackContent());
-        unpackRunner.setProperty(UnpackContent.PACKAGING_FORMAT, 
UnpackContent.ZIP_FORMAT);
-        autoUnpackRunner.setProperty(UnpackContent.PACKAGING_FORMAT, 
UnpackContent.AUTO_DETECT_FORMAT);
+        unpackRunner.setProperty(UnpackContent.PACKAGING_FORMAT, 
UnpackContent.PackageFormat.ZIP_FORMAT.toString());
+        autoUnpackRunner.setProperty(UnpackContent.PACKAGING_FORMAT, 
UnpackContent.PackageFormat.AUTO_DETECT_FORMAT.toString());
         unpackRunner.enqueue(dataPath.resolve("data.zip"));
         Map<String, String> attributes = new HashMap<>(1);
         attributes.put("mime.type", "application/zip");
@@ -106,9 +152,52 @@ public class TestUnpackContent {
     }
 
     @Test
+    public void testZipWithFilter() throws IOException {
+        final TestRunner unpackRunner = TestRunners.newTestRunner(new 
UnpackContent());
+        final TestRunner autoUnpackRunner = TestRunners.newTestRunner(new 
UnpackContent());
+        unpackRunner.setProperty(UnpackContent.FILE_FILTER, 
"^folder/date.txt$");
+        unpackRunner.setProperty(UnpackContent.PACKAGING_FORMAT, 
UnpackContent.PackageFormat.ZIP_FORMAT.toString());
+        autoUnpackRunner.setProperty(UnpackContent.PACKAGING_FORMAT, 
UnpackContent.PackageFormat.AUTO_DETECT_FORMAT.toString());
+        autoUnpackRunner.setProperty(UnpackContent.FILE_FILTER, 
"^folder/cal.txt$");
+        unpackRunner.enqueue(dataPath.resolve("data.zip"));
+        Map<String, String> attributes = new HashMap<>(1);
+        attributes.put("mime.type", "application/zip");
+        autoUnpackRunner.enqueue(dataPath.resolve("data.zip"), attributes);
+        unpackRunner.run();
+        autoUnpackRunner.run();
+
+        unpackRunner.assertTransferCount(UnpackContent.REL_SUCCESS, 1);
+        unpackRunner.assertTransferCount(UnpackContent.REL_ORIGINAL, 1);
+        unpackRunner.assertTransferCount(UnpackContent.REL_FAILURE, 0);
+
+        autoUnpackRunner.assertTransferCount(UnpackContent.REL_SUCCESS, 1);
+        autoUnpackRunner.assertTransferCount(UnpackContent.REL_ORIGINAL, 1);
+        autoUnpackRunner.assertTransferCount(UnpackContent.REL_FAILURE, 0);
+
+        List<MockFlowFile> unpacked = 
unpackRunner.getFlowFilesForRelationship(UnpackContent.REL_SUCCESS);
+        for (final MockFlowFile flowFile : unpacked) {
+            final String filename = 
flowFile.getAttribute(CoreAttributes.FILENAME.key());
+            final String folder = 
flowFile.getAttribute(CoreAttributes.PATH.key());
+            final Path path = dataPath.resolve(folder).resolve(filename);
+            assertTrue(Files.exists(path));
+            assertEquals("date.txt", filename);
+            flowFile.assertContentEquals(path.toFile());
+        }
+        unpacked = 
autoUnpackRunner.getFlowFilesForRelationship(UnpackContent.REL_SUCCESS);
+        for (final MockFlowFile flowFile : unpacked) {
+            final String filename = 
flowFile.getAttribute(CoreAttributes.FILENAME.key());
+            final String folder = 
flowFile.getAttribute(CoreAttributes.PATH.key());
+            final Path path = dataPath.resolve(folder).resolve(filename);
+            assertTrue(Files.exists(path));
+            assertEquals("cal.txt", filename);
+            flowFile.assertContentEquals(path.toFile());
+        }
+    }
+
+    @Test
     public void testFlowFileStreamV3() throws IOException {
         final TestRunner runner = TestRunners.newTestRunner(new 
UnpackContent());
-        runner.setProperty(UnpackContent.PACKAGING_FORMAT, 
UnpackContent.FLOWFILE_STREAM_FORMAT_V3);
+        runner.setProperty(UnpackContent.PACKAGING_FORMAT, 
UnpackContent.PackageFormat.FLOWFILE_STREAM_FORMAT_V3.toString());
         runner.enqueue(dataPath.resolve("data.flowfilev3"));
 
         runner.run();
@@ -131,7 +220,7 @@ public class TestUnpackContent {
     @Test
     public void testFlowFileStreamV2() throws IOException {
         final TestRunner runner = TestRunners.newTestRunner(new 
UnpackContent());
-        runner.setProperty(UnpackContent.PACKAGING_FORMAT, 
UnpackContent.FLOWFILE_STREAM_FORMAT_V2);
+        runner.setProperty(UnpackContent.PACKAGING_FORMAT, 
UnpackContent.PackageFormat.FLOWFILE_STREAM_FORMAT_V2.toString());
         runner.enqueue(dataPath.resolve("data.flowfilev2"));
 
         runner.run();
@@ -154,7 +243,7 @@ public class TestUnpackContent {
     @Test
     public void testTarThenMerge() throws IOException {
         final TestRunner unpackRunner = TestRunners.newTestRunner(new 
UnpackContent());
-        unpackRunner.setProperty(UnpackContent.PACKAGING_FORMAT, 
UnpackContent.TAR_FORMAT);
+        unpackRunner.setProperty(UnpackContent.PACKAGING_FORMAT, 
UnpackContent.PackageFormat.TAR_FORMAT.toString());
 
         unpackRunner.enqueue(dataPath.resolve("data.tar"));
         unpackRunner.run();
@@ -188,7 +277,7 @@ public class TestUnpackContent {
     @Test
     public void testZipThenMerge() throws IOException {
         final TestRunner unpackRunner = TestRunners.newTestRunner(new 
UnpackContent());
-        unpackRunner.setProperty(UnpackContent.PACKAGING_FORMAT, 
UnpackContent.ZIP_FORMAT);
+        unpackRunner.setProperty(UnpackContent.PACKAGING_FORMAT, 
UnpackContent.PackageFormat.ZIP_FORMAT.toString());
 
         unpackRunner.enqueue(dataPath.resolve("data.zip"));
         unpackRunner.run();
@@ -222,7 +311,7 @@ public class TestUnpackContent {
     @Test
     public void testZipHandlesBadData() throws IOException {
         final TestRunner unpackRunner = TestRunners.newTestRunner(new 
UnpackContent());
-        unpackRunner.setProperty(UnpackContent.PACKAGING_FORMAT, 
UnpackContent.ZIP_FORMAT);
+        unpackRunner.setProperty(UnpackContent.PACKAGING_FORMAT, 
UnpackContent.PackageFormat.ZIP_FORMAT.toString());
 
         unpackRunner.enqueue(dataPath.resolve("data.tar"));
         unpackRunner.run();
@@ -235,7 +324,7 @@ public class TestUnpackContent {
     @Test
     public void testTarHandlesBadData() throws IOException {
         final TestRunner unpackRunner = TestRunners.newTestRunner(new 
UnpackContent());
-        unpackRunner.setProperty(UnpackContent.PACKAGING_FORMAT, 
UnpackContent.TAR_FORMAT);
+        unpackRunner.setProperty(UnpackContent.PACKAGING_FORMAT, 
UnpackContent.PackageFormat.TAR_FORMAT.toString());
 
         unpackRunner.enqueue(dataPath.resolve("data.zip"));
         unpackRunner.run();

Reply via email to