This is an automated email from the ASF dual-hosted git repository.
joewitt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new cd90b1d NIFI-6773 This closes #3820. Adding lz4-framed option for
compression/decompression NIFI-6773 Adding two unit tests for lz4-framed
compression/decompression NIFI-6773 Adding compressed lz4 file for
decompression test
cd90b1d is described below
commit cd90b1d3e1b623556f17106c637b60eb1eaa37f0
Author: Andrew Rodriguez <[email protected]>
AuthorDate: Wed Oct 16 15:09:58 2019 -0500
NIFI-6773 This closes #3820. Adding lz4-framed option for
compression/decompression
NIFI-6773 Adding two unit tests for lz4-framed compression/decompression
NIFI-6773 Adding compressed lz4 file for decompression test
Signed-off-by: Joe Witt <[email protected]>
---
.../nifi/processors/standard/CompressContent.java | 20 +++++++++++--
.../processors/standard/TestCompressContent.java | 32 +++++++++++++++++++++
.../resources/CompressedData/SampleFile.txt.lz4 | Bin 0 -> 217 bytes
3 files changed, 49 insertions(+), 3 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CompressContent.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CompressContent.java
index f292705..ef76155 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CompressContent.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CompressContent.java
@@ -37,6 +37,7 @@ import lzma.streams.LzmaOutputStream;
import org.apache.commons.compress.compressors.CompressorStreamFactory;
import
org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream;
+import
org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorInputStream;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
@@ -73,7 +74,7 @@ import org.xerial.snappy.SnappyOutputStream;
@SideEffectFree
@SupportsBatching
@InputRequirement(Requirement.INPUT_REQUIRED)
-@Tags({"content", "compress", "decompress", "gzip", "bzip2", "lzma",
"xz-lzma2", "snappy", "snappy framed"})
+@Tags({"content", "compress", "decompress", "gzip", "bzip2", "lzma",
"xz-lzma2", "snappy", "snappy framed", "lz4-framed"})
@CapabilityDescription("Compresses or decompresses the contents of FlowFiles
using a user-specified compression algorithm and updates the mime.type "
+ "attribute as appropriate")
@ReadsAttribute(attribute = "mime.type", description = "If the Compression
Format is set to use mime.type attribute, this attribute is used to "
@@ -90,15 +91,17 @@ public class CompressContent extends AbstractProcessor {
public static final String COMPRESSION_FORMAT_LZMA = "lzma";
public static final String COMPRESSION_FORMAT_SNAPPY = "snappy";
public static final String COMPRESSION_FORMAT_SNAPPY_FRAMED = "snappy
framed";
+ public static final String COMPRESSION_FORMAT_LZ4_FRAMED ="lz4-framed";
public static final String MODE_COMPRESS = "compress";
public static final String MODE_DECOMPRESS = "decompress";
public static final PropertyDescriptor COMPRESSION_FORMAT = new
PropertyDescriptor.Builder()
.name("Compression Format")
- .description("The compression format to use. Valid values are: GZIP,
BZIP2, XZ-LZMA2, LZMA, Snappy, and Snappy Framed")
+ .description("The compression format to use. Valid values are: GZIP,
BZIP2, XZ-LZMA2, LZMA, Snappy, Snappy Framed, and LZ4-Framed")
.allowableValues(COMPRESSION_FORMAT_ATTRIBUTE, COMPRESSION_FORMAT_GZIP,
COMPRESSION_FORMAT_BZIP2,
- COMPRESSION_FORMAT_XZ_LZMA2, COMPRESSION_FORMAT_LZMA,
COMPRESSION_FORMAT_SNAPPY, COMPRESSION_FORMAT_SNAPPY_FRAMED)
+ COMPRESSION_FORMAT_XZ_LZMA2, COMPRESSION_FORMAT_LZMA,
COMPRESSION_FORMAT_SNAPPY, COMPRESSION_FORMAT_SNAPPY_FRAMED,
+ COMPRESSION_FORMAT_LZ4_FRAMED)
.defaultValue(COMPRESSION_FORMAT_ATTRIBUTE)
.required(true)
.build();
@@ -161,6 +164,7 @@ public class CompressContent extends AbstractProcessor {
mimeTypeMap.put("application/x-lzma", COMPRESSION_FORMAT_LZMA);
mimeTypeMap.put("application/x-snappy", COMPRESSION_FORMAT_SNAPPY);
mimeTypeMap.put("application/x-snappy-framed",
COMPRESSION_FORMAT_SNAPPY_FRAMED);
+ mimeTypeMap.put("application/x-lz4-framed",
COMPRESSION_FORMAT_LZ4_FRAMED);
this.compressionFormatMimeTypeMap =
Collections.unmodifiableMap(mimeTypeMap);
}
@@ -227,6 +231,9 @@ public class CompressContent extends AbstractProcessor {
case COMPRESSION_FORMAT_SNAPPY_FRAMED:
fileExtension = ".sz";
break;
+ case COMPRESSION_FORMAT_LZ4_FRAMED:
+ fileExtension = ".lz4";
+ break;
default:
fileExtension = "";
break;
@@ -268,6 +275,10 @@ public class CompressContent extends AbstractProcessor {
compressionOut = new
SnappyFramedOutputStream(bufferedOut);
mimeTypeRef.set("application/x-snappy-framed");
break;
+ case COMPRESSION_FORMAT_LZ4_FRAMED:
+
mimeTypeRef.set("application/x-lz4-framed");
+ compressionOut = new
CompressorStreamFactory().createCompressorOutputStream(compressionFormat.toLowerCase(),
bufferedOut);
+ break;
case COMPRESSION_FORMAT_BZIP2:
default:
mimeTypeRef.set("application/x-bzip2");
@@ -296,6 +307,9 @@ public class CompressContent extends AbstractProcessor {
case COMPRESSION_FORMAT_SNAPPY_FRAMED:
compressionIn = new
SnappyFramedInputStream(bufferedIn);
break;
+ case COMPRESSION_FORMAT_LZ4_FRAMED:
+ compressionIn = new
FramedLZ4CompressorInputStream(bufferedIn, true);
+ break;
default:
compressionIn = new
CompressorStreamFactory().createCompressorInputStream(compressionFormat.toLowerCase(),
bufferedIn);
}
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestCompressContent.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestCompressContent.java
index 50c4cd5..9ed971d 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestCompressContent.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestCompressContent.java
@@ -253,4 +253,36 @@ public class TestCompressContent {
runner.getFlowFilesForRelationship(CompressContent.REL_FAILURE).get(0).assertContentEquals(data);
}
+
+ @Test
+ public void testLz4FramedCompress() throws Exception {
+ final TestRunner runner =
TestRunners.newTestRunner(CompressContent.class);
+ runner.setProperty(CompressContent.MODE,
CompressContent.MODE_COMPRESS);
+ runner.setProperty(CompressContent.COMPRESSION_FORMAT,
CompressContent.COMPRESSION_FORMAT_LZ4_FRAMED);
+ runner.setProperty(CompressContent.UPDATE_FILENAME, "true");
+
+
runner.enqueue(Paths.get("src/test/resources/CompressedData/SampleFile.txt"));
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(CompressContent.REL_SUCCESS, 1);
+ MockFlowFile flowFile =
runner.getFlowFilesForRelationship(CompressContent.REL_SUCCESS).get(0);
+ flowFile.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(),
"application/x-lz4-framed");
+ flowFile.assertAttributeEquals("filename", "SampleFile.txt.lz4");
+ }
+
+ @Test
+ public void testLz4FramedDecompress() throws Exception {
+ final TestRunner runner =
TestRunners.newTestRunner(CompressContent.class);
+ runner.setProperty(CompressContent.MODE,
CompressContent.MODE_DECOMPRESS);
+ runner.setProperty(CompressContent.COMPRESSION_FORMAT,
CompressContent.COMPRESSION_FORMAT_LZ4_FRAMED);
+ runner.setProperty(CompressContent.UPDATE_FILENAME, "true");
+
+
runner.enqueue(Paths.get("src/test/resources/CompressedData/SampleFile.txt.lz4"));
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(CompressContent.REL_SUCCESS, 1);
+ MockFlowFile flowFile =
runner.getFlowFilesForRelationship(CompressContent.REL_SUCCESS).get(0);
+
flowFile.assertContentEquals(Paths.get("src/test/resources/CompressedData/SampleFile.txt"));
+ flowFile.assertAttributeEquals("filename", "SampleFile.txt");
+ }
}
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/CompressedData/SampleFile.txt.lz4
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/CompressedData/SampleFile.txt.lz4
new file mode 100644
index 0000000..760ecd0
Binary files /dev/null and
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/CompressedData/SampleFile.txt.lz4
differ