Repository: nifi Updated Branches: refs/heads/master 0d13de0cf -> 32f476aaa
NIFI-1357 Add Snappy compression to "CompressContent" Processor This closes #164. Signed-off-by: Aldrin Piri <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/32f476aa Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/32f476aa Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/32f476aa Branch: refs/heads/master Commit: 32f476aaa7040ed4718713f2ed11e11bc7ef4f1c Parents: 0d13de0 Author: Jeremy Dyer <[email protected]> Authored: Fri Jan 8 09:54:24 2016 -0500 Committer: Aldrin Piri <[email protected]> Committed: Thu Feb 25 12:47:01 2016 -0500 ---------------------------------------------------------------------- .../nifi-standard-processors/pom.xml | 5 ++ .../processors/standard/CompressContent.java | 35 +++++++++- .../standard/TestCompressContent.java | 65 +++++++++++++++++++ .../CompressedData/SampleFile.txt.snappy | Bin 0 -> 1730 bytes .../resources/CompressedData/SampleFile.txt.sz | Bin 0 -> 1669 bytes 5 files changed, 102 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/32f476aa/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml index 95d140a..5494206 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml @@ -211,6 +211,11 @@ language governing permissions and limitations under the License. --> <scope>test</scope> </dependency> <dependency> + <groupId>org.xerial.snappy</groupId> + <artifactId>snappy-java</artifactId> + <version>1.1.2</version> + </dependency> + <dependency> <groupId>com.h2database</groupId> <artifactId>h2</artifactId> <version>1.4.187</version> http://git-wip-us.apache.org/repos/asf/nifi/blob/32f476aa/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CompressContent.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CompressContent.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CompressContent.java index 593cf44..3ef9746 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CompressContent.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CompressContent.java @@ -64,12 +64,16 @@ import org.tukaani.xz.XZOutputStream; import lzma.sdk.lzma.Decoder; import lzma.streams.LzmaInputStream; import lzma.streams.LzmaOutputStream; +import org.xerial.snappy.SnappyFramedInputStream; +import org.xerial.snappy.SnappyFramedOutputStream; +import org.xerial.snappy.SnappyInputStream; +import org.xerial.snappy.SnappyOutputStream; @EventDriven @SideEffectFree @SupportsBatching @InputRequirement(Requirement.INPUT_REQUIRED) -@Tags({"content", "compress", "decompress", "gzip", "bzip2", "lzma", "xz-lzma2"}) +@Tags({"content", "compress", "decompress", "gzip", "bzip2", "lzma", "xz-lzma2", "snappy", "snappy framed"}) @CapabilityDescription("Compresses or decompresses the contents of FlowFiles using a user-specified compression algorithm and updates the mime.type " + "attribute as appropriate") @ReadsAttribute(attribute = "mime.type", description = "If the Compression Format is set to use mime.type attribute, this attribute is used to " @@ -83,14 +87,17 @@ public class CompressContent extends AbstractProcessor { public static final String COMPRESSION_FORMAT_BZIP2 = "bzip2"; public static final String COMPRESSION_FORMAT_XZ_LZMA2 = "xz-lzma2"; public static final String COMPRESSION_FORMAT_LZMA = "lzma"; + public static final String COMPRESSION_FORMAT_SNAPPY = "snappy"; + public static final String COMPRESSION_FORMAT_SNAPPY_FRAMED = "snappy framed"; public static final String MODE_COMPRESS = "compress"; public static final String MODE_DECOMPRESS = "decompress"; public static final PropertyDescriptor COMPRESSION_FORMAT = new PropertyDescriptor.Builder() .name("Compression Format") - .description("The compression format to use. Valid values are: GZIP, BZIP2, XZ-LZMA2, and LZMA") - .allowableValues(COMPRESSION_FORMAT_ATTRIBUTE, COMPRESSION_FORMAT_GZIP, COMPRESSION_FORMAT_BZIP2, COMPRESSION_FORMAT_XZ_LZMA2, COMPRESSION_FORMAT_LZMA) + .description("The compression format to use. Valid values are: GZIP, BZIP2, XZ-LZMA2, LZMA, Snappy, and Snappy Framed") + .allowableValues(COMPRESSION_FORMAT_ATTRIBUTE, COMPRESSION_FORMAT_GZIP, COMPRESSION_FORMAT_BZIP2, + COMPRESSION_FORMAT_XZ_LZMA2, COMPRESSION_FORMAT_LZMA, COMPRESSION_FORMAT_SNAPPY, COMPRESSION_FORMAT_SNAPPY_FRAMED) .defaultValue(COMPRESSION_FORMAT_ATTRIBUTE) .required(true) .build(); @@ -150,6 +157,8 @@ public class CompressContent extends AbstractProcessor { mimeTypeMap.put("application/bzip2", COMPRESSION_FORMAT_BZIP2); mimeTypeMap.put("application/x-bzip2", COMPRESSION_FORMAT_BZIP2); mimeTypeMap.put("application/x-lzma", COMPRESSION_FORMAT_LZMA); + mimeTypeMap.put("application/x-snappy", COMPRESSION_FORMAT_SNAPPY); + mimeTypeMap.put("application/x-snappy-framed", COMPRESSION_FORMAT_SNAPPY_FRAMED); this.compressionFormatMimeTypeMap = Collections.unmodifiableMap(mimeTypeMap); } @@ -210,6 +219,12 @@ public class CompressContent extends AbstractProcessor { case COMPRESSION_FORMAT_BZIP2: fileExtension = ".bz2"; break; + case COMPRESSION_FORMAT_SNAPPY: + fileExtension = ".snappy"; + break; + case COMPRESSION_FORMAT_SNAPPY_FRAMED: + fileExtension = ".sz"; + break; default: fileExtension = ""; break; @@ -243,6 +258,14 @@ public class CompressContent extends AbstractProcessor { compressionOut = new XZOutputStream(bufferedOut, new LZMA2Options()); mimeTypeRef.set("application/x-xz"); break; + case COMPRESSION_FORMAT_SNAPPY: + compressionOut = new SnappyOutputStream(bufferedOut); + mimeTypeRef.set("application/x-snappy"); + break; + case COMPRESSION_FORMAT_SNAPPY_FRAMED: + compressionOut = new SnappyFramedOutputStream(bufferedOut); + mimeTypeRef.set("application/x-snappy-framed"); + break; case COMPRESSION_FORMAT_BZIP2: default: mimeTypeRef.set("application/x-bzip2"); @@ -265,6 +288,12 @@ public class CompressContent extends AbstractProcessor { case COMPRESSION_FORMAT_GZIP: compressionIn = new GzipCompressorInputStream(bufferedIn, true); break; + case COMPRESSION_FORMAT_SNAPPY: + compressionIn = new SnappyInputStream(bufferedIn); + break; + case COMPRESSION_FORMAT_SNAPPY_FRAMED: + compressionIn = new SnappyFramedInputStream(bufferedIn); + break; default: compressionIn = new CompressorStreamFactory().createCompressorInputStream(compressionFormat.toLowerCase(), bufferedIn); } http://git-wip-us.apache.org/repos/asf/nifi/blob/32f476aa/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestCompressContent.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestCompressContent.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestCompressContent.java index 68cba4d..5f96036 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestCompressContent.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestCompressContent.java @@ -23,6 +23,7 @@ import java.nio.file.Paths; import java.util.HashMap; import java.util.Map; +import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; @@ -31,6 +32,70 @@ import org.junit.Test; public class TestCompressContent { @Test + public void testSnappyCompress() throws Exception { + final TestRunner runner = TestRunners.newTestRunner(CompressContent.class); + runner.setProperty(CompressContent.MODE, CompressContent.MODE_COMPRESS); + runner.setProperty(CompressContent.COMPRESSION_FORMAT, CompressContent.COMPRESSION_FORMAT_SNAPPY); + runner.setProperty(CompressContent.UPDATE_FILENAME, "true"); + + runner.enqueue(Paths.get("src/test/resources/CompressedData/SampleFile.txt")); + runner.run(); + + runner.assertAllFlowFilesTransferred(CompressContent.REL_SUCCESS, 1); + MockFlowFile flowFile = runner.getFlowFilesForRelationship(CompressContent.REL_SUCCESS).get(0); + flowFile.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/x-snappy"); + flowFile.assertAttributeEquals("filename", "SampleFile.txt.snappy"); + } + + @Test + public void testSnappyDecompress() throws Exception { + final TestRunner runner = TestRunners.newTestRunner(CompressContent.class); + runner.setProperty(CompressContent.MODE, CompressContent.MODE_DECOMPRESS); + runner.setProperty(CompressContent.COMPRESSION_FORMAT, CompressContent.COMPRESSION_FORMAT_SNAPPY); + runner.setProperty(CompressContent.UPDATE_FILENAME, "true"); + + runner.enqueue(Paths.get("src/test/resources/CompressedData/SampleFile.txt.snappy")); + runner.run(); + + runner.assertAllFlowFilesTransferred(CompressContent.REL_SUCCESS, 1); + MockFlowFile flowFile = runner.getFlowFilesForRelationship(CompressContent.REL_SUCCESS).get(0); + flowFile.assertContentEquals(Paths.get("src/test/resources/CompressedData/SampleFile.txt")); + flowFile.assertAttributeEquals("filename", "SampleFile.txt"); + } + + @Test + public void testSnappyFramedCompress() throws Exception { + final TestRunner runner = TestRunners.newTestRunner(CompressContent.class); + runner.setProperty(CompressContent.MODE, CompressContent.MODE_COMPRESS); + runner.setProperty(CompressContent.COMPRESSION_FORMAT, CompressContent.COMPRESSION_FORMAT_SNAPPY_FRAMED); + runner.setProperty(CompressContent.UPDATE_FILENAME, "true"); + + runner.enqueue(Paths.get("src/test/resources/CompressedData/SampleFile.txt")); + runner.run(); + + runner.assertAllFlowFilesTransferred(CompressContent.REL_SUCCESS, 1); + MockFlowFile flowFile = runner.getFlowFilesForRelationship(CompressContent.REL_SUCCESS).get(0); + flowFile.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/x-snappy-framed"); + flowFile.assertAttributeEquals("filename", "SampleFile.txt.sz"); + } + + @Test + public void testSnappyFramedDecompress() throws Exception { + final TestRunner runner = TestRunners.newTestRunner(CompressContent.class); + runner.setProperty(CompressContent.MODE, CompressContent.MODE_DECOMPRESS); + runner.setProperty(CompressContent.COMPRESSION_FORMAT, CompressContent.COMPRESSION_FORMAT_SNAPPY_FRAMED); + runner.setProperty(CompressContent.UPDATE_FILENAME, "true"); + + runner.enqueue(Paths.get("src/test/resources/CompressedData/SampleFile.txt.sz")); + runner.run(); + + runner.assertAllFlowFilesTransferred(CompressContent.REL_SUCCESS, 1); + MockFlowFile flowFile = runner.getFlowFilesForRelationship(CompressContent.REL_SUCCESS).get(0); + flowFile.assertContentEquals(Paths.get("src/test/resources/CompressedData/SampleFile.txt")); + flowFile.assertAttributeEquals("filename", "SampleFile.txt"); + } + + @Test public void testBzip2DecompressConcatenated() throws Exception { final TestRunner runner = TestRunners.newTestRunner(CompressContent.class); runner.setProperty(CompressContent.MODE, "decompress"); http://git-wip-us.apache.org/repos/asf/nifi/blob/32f476aa/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/CompressedData/SampleFile.txt.snappy ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/CompressedData/SampleFile.txt.snappy b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/CompressedData/SampleFile.txt.snappy new file mode 100644 index 0000000..60c384a Binary files /dev/null and b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/CompressedData/SampleFile.txt.snappy differ http://git-wip-us.apache.org/repos/asf/nifi/blob/32f476aa/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/CompressedData/SampleFile.txt.sz ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/CompressedData/SampleFile.txt.sz b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/CompressedData/SampleFile.txt.sz new file mode 100644 index 0000000..1065381 Binary files /dev/null and b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/CompressedData/SampleFile.txt.sz differ
