Repository: nifi Updated Branches: refs/heads/master 86bba1b20 -> f5060a6d6
NIFI-1568: Add Filter Capability to UnpackContent Adds a "File Filter" property to the `UnpackContent` processor to allow users to specify which files are eligible for extraction. By default, all files will be extracted. Signed-off-by: Matt Burgess <mattyb...@apache.org> Refactor how Unpacker is initialized Re-uses unpackers to avoid creating them each time a flowfile is received. Moved the package formats into an enum for clarity. Signed-off-by: Matt Burgess <mattyb...@apache.org> Fix packaging format enum warning The `AUTO_DETECT_FORMAT` enum for PackagingFormat is not valid for initilization. When this value is set, then we use the mime.type to determine which packaging format to use. We never pass `AUTO_DETECT_FORMAT` to the `initUnpacker` method. Signed-off-by: Matt Burgess <mattyb...@apache.org> This closes #248 Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/f5060a6d Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/f5060a6d Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/f5060a6d Branch: refs/heads/master Commit: f5060a6d63dced1ac2a34b49ba134d578d9cd97a Parents: 86bba1b Author: ricky <ri...@cloudera.com> Authored: Thu Feb 25 17:20:51 2016 -0500 Committer: Matt Burgess <mattyb...@apache.org> Committed: Mon Jun 20 10:44:38 2016 -0400 ---------------------------------------------------------------------- .../nifi/processors/standard/UnpackContent.java | 230 +++++++++++++------ .../processors/standard/TestUnpackContent.java | 113 ++++++++- 2 files changed, 264 insertions(+), 79 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/f5060a6d/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UnpackContent.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UnpackContent.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UnpackContent.java index decbd54..933b027 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UnpackContent.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UnpackContent.java @@ -29,6 +29,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.UUID; +import java.util.regex.Pattern; import org.apache.commons.compress.archivers.ArchiveEntry; import org.apache.commons.compress.archivers.tar.TarArchiveEntry; @@ -45,6 +46,8 @@ import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.attributes.CoreAttributes; @@ -57,6 +60,7 @@ import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.io.InputStreamCallback; import org.apache.nifi.processor.io.OutputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.stream.io.BufferedInputStream; import org.apache.nifi.stream.io.BufferedOutputStream; import org.apache.nifi.stream.io.StreamUtils; @@ -78,7 +82,7 @@ import org.apache.nifi.util.ObjectHolder; + "the attribute is set to application/zip, the ZIP Packaging Format will be used. If the attribute is set to application/flowfile-v3 or " + "application/flowfile-v2 or application/flowfile-v1, the appropriate FlowFile Packaging Format will be used. If this attribute is missing, " + "the FlowFile will be routed to 'failure'. Otherwise, if the attribute's value is not one of those mentioned above, the FlowFile will be " - + "routed to 'success' without being unpacked") + + "routed to 'success' without being unpacked. Use the File Filter property only extract files matching a specific regular expression.") @WritesAttributes({ @WritesAttribute(attribute = "mime.type", description = "If the FlowFile is successfully unpacked, its MIME Type is no longer known, so the mime.type " + "attribute is set to application/octet-stream."), @@ -91,14 +95,6 @@ import org.apache.nifi.util.ObjectHolder; + "the MergeContent processor automatically adds those extensions if it is used to rebuild the original FlowFile")}) @SeeAlso(MergeContent.class) public class UnpackContent extends AbstractProcessor { - - public static final String AUTO_DETECT_FORMAT = "use mime.type attribute"; - public static final String TAR_FORMAT = "tar"; - public static final String ZIP_FORMAT = "zip"; - public static final String FLOWFILE_STREAM_FORMAT_V3 = "flowfile-stream-v3"; - public static final String FLOWFILE_STREAM_FORMAT_V2 = "flowfile-stream-v2"; - public static final String FLOWFILE_TAR_FORMAT = "flowfile-tar-v1"; - // attribute keys public static final String FRAGMENT_ID = "fragment.identifier"; public static final String FRAGMENT_INDEX = "fragment.index"; @@ -111,8 +107,18 @@ public class UnpackContent extends AbstractProcessor { .name("Packaging Format") .description("The Packaging Format used to create the file") .required(true) - .allowableValues(AUTO_DETECT_FORMAT, TAR_FORMAT, ZIP_FORMAT, FLOWFILE_STREAM_FORMAT_V3, FLOWFILE_STREAM_FORMAT_V2, FLOWFILE_TAR_FORMAT) - .defaultValue(AUTO_DETECT_FORMAT) + .allowableValues(PackageFormat.AUTO_DETECT_FORMAT.toString(), PackageFormat.TAR_FORMAT.toString(), + PackageFormat.ZIP_FORMAT.toString(), PackageFormat.FLOWFILE_STREAM_FORMAT_V3.toString(), + PackageFormat.FLOWFILE_STREAM_FORMAT_V2.toString(), PackageFormat.FLOWFILE_TAR_FORMAT.toString()) + .defaultValue(PackageFormat.AUTO_DETECT_FORMAT.toString()) + .build(); + + public static final PropertyDescriptor FILE_FILTER = new PropertyDescriptor.Builder() + .name("File Filter") + .description("Only files whose names match the given regular expression will be extracted (tar/zip only)") + .required(true) + .defaultValue(".*") + .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR) .build(); public static final Relationship REL_SUCCESS = new Relationship.Builder() @@ -131,6 +137,16 @@ public class UnpackContent extends AbstractProcessor { private Set<Relationship> relationships; private List<PropertyDescriptor> properties; + private Unpacker unpacker; + private boolean addFragmentAttrs; + private Pattern fileFilter; + + private Unpacker tarUnpacker; + private Unpacker zipUnpacker; + private Unpacker flowFileStreamV3Unpacker; + private Unpacker flowFileStreamV2Unpacker; + private Unpacker flowFileTarUnpacker; + @Override protected void init(final ProcessorInitializationContext context) { final Set<Relationship> relationships = new HashSet<>(); @@ -141,6 +157,7 @@ public class UnpackContent extends AbstractProcessor { final List<PropertyDescriptor> properties = new ArrayList<>(); properties.add(PACKAGING_FORMAT); + properties.add(FILE_FILTER); this.properties = Collections.unmodifiableList(properties); } @@ -154,75 +171,88 @@ public class UnpackContent extends AbstractProcessor { return properties; } - @Override - public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { - FlowFile flowFile = session.get(); - if (flowFile == null) { - return; - } + @OnStopped + public void onStopped() { + unpacker = null; + fileFilter = null; + } - final ComponentLog logger = getLogger(); - String packagingFormat = context.getProperty(PACKAGING_FORMAT).getValue().toLowerCase(); - if (AUTO_DETECT_FORMAT.equals(packagingFormat)) { - final String mimeType = flowFile.getAttribute(CoreAttributes.MIME_TYPE.key()); - if (mimeType == null) { - logger.error("No mime.type attribute set for {}; routing to failure", new Object[]{flowFile}); - session.transfer(flowFile, REL_FAILURE); - return; - } + @OnScheduled + public void onScheduled(ProcessContext context) throws ProcessException { + if (fileFilter == null) { + fileFilter = Pattern.compile(context.getProperty(FILE_FILTER).getValue()); + tarUnpacker = new TarUnpacker(fileFilter); + zipUnpacker = new ZipUnpacker(fileFilter); + flowFileStreamV3Unpacker = new FlowFileStreamUnpacker(new FlowFileUnpackagerV3()); + flowFileStreamV2Unpacker = new FlowFileStreamUnpacker(new FlowFileUnpackagerV2()); + flowFileTarUnpacker = new FlowFileStreamUnpacker(new FlowFileUnpackagerV1()); + } - switch (mimeType.toLowerCase()) { - case "application/tar": - packagingFormat = TAR_FORMAT; - break; - case "application/x-tar": - packagingFormat = TAR_FORMAT; - break; - case "application/zip": - packagingFormat = ZIP_FORMAT; - break; - case "application/flowfile-v3": - packagingFormat = FLOWFILE_STREAM_FORMAT_V3; - break; - case "application/flowfile-v2": - packagingFormat = FLOWFILE_STREAM_FORMAT_V2; - break; - case "application/flowfile-v1": - packagingFormat = FLOWFILE_TAR_FORMAT; - break; - default: { - logger.info("Cannot unpack {} because its mime.type attribute is set to '{}', which is not a format that can be unpacked; routing to 'success'", new Object[]{flowFile, mimeType}); - session.transfer(flowFile, REL_SUCCESS); - return; - } - } + PackageFormat format = PackageFormat.getFormat(context.getProperty(PACKAGING_FORMAT).getValue()); + if (format != PackageFormat.AUTO_DETECT_FORMAT && unpacker == null) { + initUnpacker(format); } + } - final Unpacker unpacker; - final boolean addFragmentAttrs; + public void initUnpacker(PackageFormat packagingFormat) { switch (packagingFormat) { case TAR_FORMAT: - unpacker = new TarUnpacker(); + case X_TAR_FORMAT: + unpacker = tarUnpacker; addFragmentAttrs = true; break; case ZIP_FORMAT: - unpacker = new ZipUnpacker(); + unpacker = zipUnpacker; addFragmentAttrs = true; break; case FLOWFILE_STREAM_FORMAT_V2: - unpacker = new FlowFileStreamUnpacker(new FlowFileUnpackagerV2()); + unpacker = flowFileStreamV2Unpacker; addFragmentAttrs = false; break; case FLOWFILE_STREAM_FORMAT_V3: - unpacker = new FlowFileStreamUnpacker(new FlowFileUnpackagerV3()); + unpacker = flowFileStreamV3Unpacker; addFragmentAttrs = false; break; case FLOWFILE_TAR_FORMAT: - unpacker = new FlowFileStreamUnpacker(new FlowFileUnpackagerV1()); + unpacker = flowFileTarUnpacker; addFragmentAttrs = false; break; - default: - throw new AssertionError("Packaging Format was " + context.getProperty(PACKAGING_FORMAT).getValue()); + case AUTO_DETECT_FORMAT: + // The format of the unpacker should be known before initialization + throw new ProcessException(packagingFormat + " is not a valid packaging format"); + } + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + final ComponentLog logger = getLogger(); + PackageFormat packagingFormat = PackageFormat.getFormat(context.getProperty(PACKAGING_FORMAT).getValue().toLowerCase()); + if (packagingFormat == PackageFormat.AUTO_DETECT_FORMAT) { + packagingFormat = null; + final String mimeType = flowFile.getAttribute(CoreAttributes.MIME_TYPE.key()); + if (mimeType == null) { + logger.error("No mime.type attribute set for {}; routing to failure", new Object[]{flowFile}); + session.transfer(flowFile, REL_FAILURE); + return; + } + + for (PackageFormat format: PackageFormat.values()) { + if (mimeType.toLowerCase().equals(format.getMimeType())) { + packagingFormat = format; + } + } + if (packagingFormat == null) { + logger.info("Cannot unpack {} because its mime.type attribute is set to '{}', which is not a format that can be unpacked; routing to 'success'", new Object[]{flowFile, mimeType}); + session.transfer(flowFile, REL_SUCCESS); + return; + } else { + initUnpacker(packagingFormat); + } } final List<FlowFile> unpacked = new ArrayList<>(); @@ -248,12 +278,26 @@ public class UnpackContent extends AbstractProcessor { } } - private static interface Unpacker { + private static abstract class Unpacker { + private Pattern fileFilter = null; + + public Unpacker() {}; - void unpack(ProcessSession session, FlowFile source, List<FlowFile> unpacked); + public Unpacker(Pattern fileFilter) { + this.fileFilter = fileFilter; + } + + abstract void unpack(ProcessSession session, FlowFile source, List<FlowFile> unpacked); + + protected boolean fileMatches(ArchiveEntry entry) { + return fileFilter == null || fileFilter.matcher(entry.getName()).find(); + } } - private static class TarUnpacker implements Unpacker { + private static class TarUnpacker extends Unpacker { + public TarUnpacker(Pattern fileFilter) { + super(fileFilter); + } @Override public void unpack(final ProcessSession session, final FlowFile source, final List<FlowFile> unpacked) { @@ -265,7 +309,7 @@ public class UnpackContent extends AbstractProcessor { try (final TarArchiveInputStream tarIn = new TarArchiveInputStream(new BufferedInputStream(in))) { TarArchiveEntry tarEntry; while ((tarEntry = tarIn.getNextTarEntry()) != null) { - if (tarEntry.isDirectory()) { + if (tarEntry.isDirectory() || !fileMatches(tarEntry)) { continue; } final File file = new File(tarEntry.getName()); @@ -304,7 +348,10 @@ public class UnpackContent extends AbstractProcessor { } } - private static class ZipUnpacker implements Unpacker { + private static class ZipUnpacker extends Unpacker { + public ZipUnpacker(Pattern fileFilter) { + super(fileFilter); + } @Override public void unpack(final ProcessSession session, final FlowFile source, final List<FlowFile> unpacked) { @@ -316,7 +363,7 @@ public class UnpackContent extends AbstractProcessor { try (final ZipArchiveInputStream zipIn = new ZipArchiveInputStream(new BufferedInputStream(in))) { ArchiveEntry zipEntry; while ((zipEntry = zipIn.getNextEntry()) != null) { - if (zipEntry.isDirectory()) { + if (zipEntry.isDirectory() || !fileMatches(zipEntry)) { continue; } final File file = new File(zipEntry.getName()); @@ -352,7 +399,7 @@ public class UnpackContent extends AbstractProcessor { } } - private static class FlowFileStreamUnpacker implements Unpacker { + private static class FlowFileStreamUnpacker extends Unpacker { private final FlowFileUnpackager unpackager; @@ -445,4 +492,53 @@ public class UnpackContent extends AbstractProcessor { unpacked.add(newFF); } } + + protected enum PackageFormat { + AUTO_DETECT_FORMAT("use mime.type attribute"), + TAR_FORMAT("tar", "application/tar"), + X_TAR_FORMAT("tar", "application/x-tar"), + ZIP_FORMAT("zip", "application/zip"), + FLOWFILE_STREAM_FORMAT_V3("flowfile-stream-v3", "application/flowfile-v3"), + FLOWFILE_STREAM_FORMAT_V2("flowfile-stream-v2", "application/flowfile-v2"), + FLOWFILE_TAR_FORMAT("flowfile-tar-v1", "application/flowfile-v1"); + + + private final String textValue; + private String mimeType; + + PackageFormat(String textValue, String mimeType) { + this.textValue = textValue; + this.mimeType = mimeType; + } + + PackageFormat(String textValue) { + this.textValue = textValue; + } + + @Override public String toString() { + return textValue; + } + + public String getMimeType() { + return mimeType; + } + + public static PackageFormat getFormat(String textValue) { + switch (textValue) { + case "use mime.type attribute": + return AUTO_DETECT_FORMAT; + case "tar": + return TAR_FORMAT; + case "zip": + return ZIP_FORMAT; + case "flowfile-stream-v3": + return FLOWFILE_STREAM_FORMAT_V3; + case "flowfile-stream-v2": + return FLOWFILE_STREAM_FORMAT_V2; + case "flowfile-stream-v1": + return FLOWFILE_TAR_FORMAT; + } + return null; + } + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/f5060a6d/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUnpackContent.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUnpackContent.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUnpackContent.java index 0b0cc43..acebdea 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUnpackContent.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUnpackContent.java @@ -42,13 +42,13 @@ public class TestUnpackContent { public void testTar() throws IOException { final TestRunner unpackRunner = TestRunners.newTestRunner(new UnpackContent()); final TestRunner autoUnpackRunner = TestRunners.newTestRunner(new UnpackContent()); - unpackRunner.setProperty(UnpackContent.PACKAGING_FORMAT, UnpackContent.TAR_FORMAT); - autoUnpackRunner.setProperty(UnpackContent.PACKAGING_FORMAT, UnpackContent.AUTO_DETECT_FORMAT); + unpackRunner.setProperty(UnpackContent.PACKAGING_FORMAT, UnpackContent.PackageFormat.TAR_FORMAT.toString()); + autoUnpackRunner.setProperty(UnpackContent.PACKAGING_FORMAT, UnpackContent.PackageFormat.AUTO_DETECT_FORMAT.toString()); unpackRunner.enqueue(dataPath.resolve("data.tar")); Map<String, String> attributes = new HashMap<>(1); Map<String, String> attributes2 = new HashMap<>(1); - attributes.put("mime.type", "application/x-tar"); - attributes2.put("mime.type", "application/tar"); + attributes.put("mime.type", UnpackContent.PackageFormat.TAR_FORMAT.getMimeType()); + attributes2.put("mime.type", UnpackContent.PackageFormat.X_TAR_FORMAT.getMimeType()); autoUnpackRunner.enqueue(dataPath.resolve("data.tar"), attributes); autoUnpackRunner.enqueue(dataPath.resolve("data.tar"), attributes2); unpackRunner.run(); @@ -74,11 +74,57 @@ public class TestUnpackContent { } @Test + public void testTarWithFilter() throws IOException { + final TestRunner unpackRunner = TestRunners.newTestRunner(new UnpackContent()); + final TestRunner autoUnpackRunner = TestRunners.newTestRunner(new UnpackContent()); + unpackRunner.setProperty(UnpackContent.PACKAGING_FORMAT, UnpackContent.PackageFormat.TAR_FORMAT.toString()); + unpackRunner.setProperty(UnpackContent.FILE_FILTER, "^folder/date.txt$"); + autoUnpackRunner.setProperty(UnpackContent.PACKAGING_FORMAT, UnpackContent.PackageFormat.AUTO_DETECT_FORMAT.toString()); + autoUnpackRunner.setProperty(UnpackContent.FILE_FILTER, "^folder/cal.txt$"); + unpackRunner.enqueue(dataPath.resolve("data.tar")); + Map<String, String> attributes = new HashMap<>(1); + Map<String, String> attributes2 = new HashMap<>(1); + attributes.put("mime.type", "application/x-tar"); + attributes2.put("mime.type", "application/tar"); + autoUnpackRunner.enqueue(dataPath.resolve("data.tar"), attributes); + autoUnpackRunner.enqueue(dataPath.resolve("data.tar"), attributes2); + unpackRunner.run(); + autoUnpackRunner.run(2); + + unpackRunner.assertTransferCount(UnpackContent.REL_SUCCESS, 1); + unpackRunner.assertTransferCount(UnpackContent.REL_ORIGINAL, 1); + unpackRunner.assertTransferCount(UnpackContent.REL_FAILURE, 0); + + autoUnpackRunner.assertTransferCount(UnpackContent.REL_SUCCESS, 2); + autoUnpackRunner.assertTransferCount(UnpackContent.REL_ORIGINAL, 2); + autoUnpackRunner.assertTransferCount(UnpackContent.REL_FAILURE, 0); + + List<MockFlowFile> unpacked = unpackRunner.getFlowFilesForRelationship(UnpackContent.REL_SUCCESS); + for (final MockFlowFile flowFile : unpacked) { + final String filename = flowFile.getAttribute(CoreAttributes.FILENAME.key()); + final String folder = flowFile.getAttribute(CoreAttributes.PATH.key()); + final Path path = dataPath.resolve(folder).resolve(filename); + assertTrue(Files.exists(path)); + assertEquals("date.txt", filename); + flowFile.assertContentEquals(path.toFile()); + } + unpacked = autoUnpackRunner.getFlowFilesForRelationship(UnpackContent.REL_SUCCESS); + for (final MockFlowFile flowFile : unpacked) { + final String filename = flowFile.getAttribute(CoreAttributes.FILENAME.key()); + final String folder = flowFile.getAttribute(CoreAttributes.PATH.key()); + final Path path = dataPath.resolve(folder).resolve(filename); + assertTrue(Files.exists(path)); + assertEquals("cal.txt", filename); + flowFile.assertContentEquals(path.toFile()); + } + } + + @Test public void testZip() throws IOException { final TestRunner unpackRunner = TestRunners.newTestRunner(new UnpackContent()); final TestRunner autoUnpackRunner = TestRunners.newTestRunner(new UnpackContent()); - unpackRunner.setProperty(UnpackContent.PACKAGING_FORMAT, UnpackContent.ZIP_FORMAT); - autoUnpackRunner.setProperty(UnpackContent.PACKAGING_FORMAT, UnpackContent.AUTO_DETECT_FORMAT); + unpackRunner.setProperty(UnpackContent.PACKAGING_FORMAT, UnpackContent.PackageFormat.ZIP_FORMAT.toString()); + autoUnpackRunner.setProperty(UnpackContent.PACKAGING_FORMAT, UnpackContent.PackageFormat.AUTO_DETECT_FORMAT.toString()); unpackRunner.enqueue(dataPath.resolve("data.zip")); Map<String, String> attributes = new HashMap<>(1); attributes.put("mime.type", "application/zip"); @@ -106,9 +152,52 @@ public class TestUnpackContent { } @Test + public void testZipWithFilter() throws IOException { + final TestRunner unpackRunner = TestRunners.newTestRunner(new UnpackContent()); + final TestRunner autoUnpackRunner = TestRunners.newTestRunner(new UnpackContent()); + unpackRunner.setProperty(UnpackContent.FILE_FILTER, "^folder/date.txt$"); + unpackRunner.setProperty(UnpackContent.PACKAGING_FORMAT, UnpackContent.PackageFormat.ZIP_FORMAT.toString()); + autoUnpackRunner.setProperty(UnpackContent.PACKAGING_FORMAT, UnpackContent.PackageFormat.AUTO_DETECT_FORMAT.toString()); + autoUnpackRunner.setProperty(UnpackContent.FILE_FILTER, "^folder/cal.txt$"); + unpackRunner.enqueue(dataPath.resolve("data.zip")); + Map<String, String> attributes = new HashMap<>(1); + attributes.put("mime.type", "application/zip"); + autoUnpackRunner.enqueue(dataPath.resolve("data.zip"), attributes); + unpackRunner.run(); + autoUnpackRunner.run(); + + unpackRunner.assertTransferCount(UnpackContent.REL_SUCCESS, 1); + unpackRunner.assertTransferCount(UnpackContent.REL_ORIGINAL, 1); + unpackRunner.assertTransferCount(UnpackContent.REL_FAILURE, 0); + + autoUnpackRunner.assertTransferCount(UnpackContent.REL_SUCCESS, 1); + autoUnpackRunner.assertTransferCount(UnpackContent.REL_ORIGINAL, 1); + autoUnpackRunner.assertTransferCount(UnpackContent.REL_FAILURE, 0); + + List<MockFlowFile> unpacked = unpackRunner.getFlowFilesForRelationship(UnpackContent.REL_SUCCESS); + for (final MockFlowFile flowFile : unpacked) { + final String filename = flowFile.getAttribute(CoreAttributes.FILENAME.key()); + final String folder = flowFile.getAttribute(CoreAttributes.PATH.key()); + final Path path = dataPath.resolve(folder).resolve(filename); + assertTrue(Files.exists(path)); + assertEquals("date.txt", filename); + flowFile.assertContentEquals(path.toFile()); + } + unpacked = autoUnpackRunner.getFlowFilesForRelationship(UnpackContent.REL_SUCCESS); + for (final MockFlowFile flowFile : unpacked) { + final String filename = flowFile.getAttribute(CoreAttributes.FILENAME.key()); + final String folder = flowFile.getAttribute(CoreAttributes.PATH.key()); + final Path path = dataPath.resolve(folder).resolve(filename); + assertTrue(Files.exists(path)); + assertEquals("cal.txt", filename); + flowFile.assertContentEquals(path.toFile()); + } + } + + @Test public void testFlowFileStreamV3() throws IOException { final TestRunner runner = TestRunners.newTestRunner(new UnpackContent()); - runner.setProperty(UnpackContent.PACKAGING_FORMAT, UnpackContent.FLOWFILE_STREAM_FORMAT_V3); + runner.setProperty(UnpackContent.PACKAGING_FORMAT, UnpackContent.PackageFormat.FLOWFILE_STREAM_FORMAT_V3.toString()); runner.enqueue(dataPath.resolve("data.flowfilev3")); runner.run(); @@ -131,7 +220,7 @@ public class TestUnpackContent { @Test public void testFlowFileStreamV2() throws IOException { final TestRunner runner = TestRunners.newTestRunner(new UnpackContent()); - runner.setProperty(UnpackContent.PACKAGING_FORMAT, UnpackContent.FLOWFILE_STREAM_FORMAT_V2); + runner.setProperty(UnpackContent.PACKAGING_FORMAT, UnpackContent.PackageFormat.FLOWFILE_STREAM_FORMAT_V2.toString()); runner.enqueue(dataPath.resolve("data.flowfilev2")); runner.run(); @@ -154,7 +243,7 @@ public class TestUnpackContent { @Test public void testTarThenMerge() throws IOException { final TestRunner unpackRunner = TestRunners.newTestRunner(new UnpackContent()); - unpackRunner.setProperty(UnpackContent.PACKAGING_FORMAT, UnpackContent.TAR_FORMAT); + unpackRunner.setProperty(UnpackContent.PACKAGING_FORMAT, UnpackContent.PackageFormat.TAR_FORMAT.toString()); unpackRunner.enqueue(dataPath.resolve("data.tar")); unpackRunner.run(); @@ -188,7 +277,7 @@ public class TestUnpackContent { @Test public void testZipThenMerge() throws IOException { final TestRunner unpackRunner = TestRunners.newTestRunner(new UnpackContent()); - unpackRunner.setProperty(UnpackContent.PACKAGING_FORMAT, UnpackContent.ZIP_FORMAT); + unpackRunner.setProperty(UnpackContent.PACKAGING_FORMAT, UnpackContent.PackageFormat.ZIP_FORMAT.toString()); unpackRunner.enqueue(dataPath.resolve("data.zip")); unpackRunner.run(); @@ -222,7 +311,7 @@ public class TestUnpackContent { @Test public void testZipHandlesBadData() throws IOException { final TestRunner unpackRunner = TestRunners.newTestRunner(new UnpackContent()); - unpackRunner.setProperty(UnpackContent.PACKAGING_FORMAT, UnpackContent.ZIP_FORMAT); + unpackRunner.setProperty(UnpackContent.PACKAGING_FORMAT, UnpackContent.PackageFormat.ZIP_FORMAT.toString()); unpackRunner.enqueue(dataPath.resolve("data.tar")); unpackRunner.run(); @@ -235,7 +324,7 @@ public class TestUnpackContent { @Test public void testTarHandlesBadData() throws IOException { final TestRunner unpackRunner = TestRunners.newTestRunner(new UnpackContent()); - unpackRunner.setProperty(UnpackContent.PACKAGING_FORMAT, UnpackContent.TAR_FORMAT); + unpackRunner.setProperty(UnpackContent.PACKAGING_FORMAT, UnpackContent.PackageFormat.TAR_FORMAT.toString()); unpackRunner.enqueue(dataPath.resolve("data.zip")); unpackRunner.run();