This is an automated email from the ASF dual-hosted git repository.
mthomsen pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 64acd8b1c5 NIFI-2827 Adding zstd-jni to the necessary pom.xml. It's
already in the LICENSE. NIFI-2827 Update CompressContent.java to use zstd
compression format NIFI-2827 Update test cases for CompressContent.java to
include zstd format NIFI-2827 Update JsonRecordSetWriter.java to enable zstd
compression format
64acd8b1c5 is described below
commit 64acd8b1c5f84473e3d07e23450570a9dc8c4242
Author: Matthew Hawkins <[email protected]>
AuthorDate: Fri Aug 12 02:36:27 2022 +1000
NIFI-2827 Adding zstd-jni to the necessary pom.xml. It's already in the
LICENSE.
NIFI-2827 Update CompressContent.java to use zstd compression format
NIFI-2827 Update test cases for CompressContent.java to include zstd format
NIFI-2827 Update JsonRecordSetWriter.java to enable zstd compression format
This closes #6294
Signed-off-by: Mike Thomsen <[email protected]>
---
minifi/pom.xml | 5 ++++
.../nifi-standard-processors/pom.xml | 4 +++
.../nifi/processors/standard/CompressContent.java | 23 +++++++++++++---
.../processors/standard/TestCompressContent.java | 30 +++++++++++++++++++++
.../resources/CompressedData/SampleFile.txt.zst | Bin 0 -> 80 bytes
nifi-nar-bundles/nifi-standard-bundle/pom.xml | 5 ++++
.../org/apache/nifi/json/JsonRecordSetWriter.java | 9 +++++--
7 files changed, 70 insertions(+), 6 deletions(-)
diff --git a/minifi/pom.xml b/minifi/pom.xml
index e06e6543f9..9b367f8336 100644
--- a/minifi/pom.xml
+++ b/minifi/pom.xml
@@ -504,6 +504,11 @@ limitations under the License.
<artifactId>lzma-java</artifactId>
<version>1.3</version>
</dependency>
+ <dependency>
+ <groupId>com.github.luben</groupId>
+ <artifactId>zstd-jni</artifactId>
+ <version>1.5.2-3</version>
+ </dependency>
<dependency>
<groupId>org.tukaani</groupId>
<artifactId>xz</artifactId>
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
index b191e369be..9956a75881 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
@@ -221,6 +221,10 @@
<groupId>com.github.jponge</groupId>
<artifactId>lzma-java</artifactId>
</dependency>
+ <dependency>
+ <groupId>com.github.luben</groupId>
+ <artifactId>zstd-jni</artifactId>
+ </dependency>
<dependency>
<groupId>org.tukaani</groupId>
<artifactId>xz</artifactId>
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CompressContent.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CompressContent.java
index 60fb4c3653..6ad4192cd1 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CompressContent.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CompressContent.java
@@ -23,6 +23,8 @@ import
org.apache.commons.compress.compressors.CompressorStreamFactory;
import
org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream;
import
org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorInputStream;
+import
org.apache.commons.compress.compressors.zstandard.ZstdCompressorInputStream;
+import
org.apache.commons.compress.compressors.zstandard.ZstdCompressorOutputStream;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
@@ -82,7 +84,7 @@ import java.util.zip.InflaterInputStream;
@SideEffectFree
@SupportsBatching
@InputRequirement(Requirement.INPUT_REQUIRED)
-@Tags({"content", "compress", "decompress", "gzip", "bzip2", "lzma",
"xz-lzma2", "snappy", "snappy-hadoop", "snappy framed", "lz4-framed",
"deflate"})
+@Tags({"content", "compress", "decompress", "gzip", "bzip2", "lzma",
"xz-lzma2", "snappy", "snappy-hadoop", "snappy framed", "lz4-framed",
"deflate", "zstd"})
@CapabilityDescription("Compresses or decompresses the contents of FlowFiles
using a user-specified compression algorithm and updates the mime.type "
+ "attribute as appropriate. This processor operates in a very memory
efficient way so very large objects well beyond the heap size "
+ "are generally fine to process")
@@ -104,16 +106,17 @@ public class CompressContent extends AbstractProcessor {
public static final String COMPRESSION_FORMAT_SNAPPY_HADOOP =
"snappy-hadoop";
public static final String COMPRESSION_FORMAT_SNAPPY_FRAMED = "snappy
framed";
public static final String COMPRESSION_FORMAT_LZ4_FRAMED ="lz4-framed";
+ public static final String COMPRESSION_FORMAT_ZSTD = "zstd";
public static final String MODE_COMPRESS = "compress";
public static final String MODE_DECOMPRESS = "decompress";
public static final PropertyDescriptor COMPRESSION_FORMAT = new
PropertyDescriptor.Builder()
.name("Compression Format")
- .description("The compression format to use. Valid values are: GZIP,
Deflate, BZIP2, XZ-LZMA2, LZMA, Snappy, Snappy Hadoop, Snappy Framed, and
LZ4-Framed")
+ .description("The compression format to use. Valid values are: GZIP,
Deflate, ZSTD, BZIP2, XZ-LZMA2, LZMA, Snappy, Snappy Hadoop, Snappy Framed, and
LZ4-Framed")
.allowableValues(COMPRESSION_FORMAT_ATTRIBUTE,
COMPRESSION_FORMAT_GZIP, COMPRESSION_FORMAT_DEFLATE, COMPRESSION_FORMAT_BZIP2,
COMPRESSION_FORMAT_XZ_LZMA2, COMPRESSION_FORMAT_LZMA,
COMPRESSION_FORMAT_SNAPPY, COMPRESSION_FORMAT_SNAPPY_HADOOP,
COMPRESSION_FORMAT_SNAPPY_FRAMED,
- COMPRESSION_FORMAT_LZ4_FRAMED)
+ COMPRESSION_FORMAT_LZ4_FRAMED, COMPRESSION_FORMAT_ZSTD)
.defaultValue(COMPRESSION_FORMAT_ATTRIBUTE)
.required(true)
.build();
@@ -132,7 +135,7 @@ public class CompressContent extends AbstractProcessor {
.defaultValue("1")
.required(true)
.allowableValues("0", "1", "2", "3", "4", "5", "6", "7", "8", "9")
- .dependsOn(COMPRESSION_FORMAT, COMPRESSION_FORMAT_ATTRIBUTE,
COMPRESSION_FORMAT_GZIP, COMPRESSION_FORMAT_DEFLATE,
COMPRESSION_FORMAT_XZ_LZMA2)
+ .dependsOn(COMPRESSION_FORMAT, COMPRESSION_FORMAT_ATTRIBUTE,
COMPRESSION_FORMAT_GZIP, COMPRESSION_FORMAT_DEFLATE,
COMPRESSION_FORMAT_XZ_LZMA2, COMPRESSION_FORMAT_ZSTD)
.dependsOn(MODE, MODE_COMPRESS)
.build();
@@ -184,6 +187,7 @@ public class CompressContent extends AbstractProcessor {
mimeTypeMap.put("application/x-snappy-hadoop",
COMPRESSION_FORMAT_SNAPPY_HADOOP);
mimeTypeMap.put("application/x-snappy-framed",
COMPRESSION_FORMAT_SNAPPY_FRAMED);
mimeTypeMap.put("application/x-lz4-framed",
COMPRESSION_FORMAT_LZ4_FRAMED);
+ mimeTypeMap.put("application/zstd", COMPRESSION_FORMAT_ZSTD);
this.compressionFormatMimeTypeMap =
Collections.unmodifiableMap(mimeTypeMap);
}
@@ -273,6 +277,9 @@ public class CompressContent extends AbstractProcessor {
case COMPRESSION_FORMAT_LZ4_FRAMED:
fileExtension = ".lz4";
break;
+ case COMPRESSION_FORMAT_ZSTD:
+ fileExtension = ".zst";
+ break;
default:
fileExtension = "";
break;
@@ -328,6 +335,11 @@ public class CompressContent extends AbstractProcessor {
mimeTypeRef.set("application/x-lz4-framed");
compressionOut = new
CompressorStreamFactory().createCompressorOutputStream(compressionFormat.toLowerCase(),
bufferedOut);
break;
+ case COMPRESSION_FORMAT_ZSTD:
+ final int zstdcompressionLevel =
context.getProperty(COMPRESSION_LEVEL).asInteger() * 2;
+ compressionOut = new
ZstdCompressorOutputStream(bufferedOut, zstdcompressionLevel);
+ mimeTypeRef.set("application/zstd");
+ break;
case COMPRESSION_FORMAT_BZIP2:
default:
mimeTypeRef.set("application/x-bzip2");
@@ -364,6 +376,9 @@ public class CompressContent extends AbstractProcessor {
case COMPRESSION_FORMAT_LZ4_FRAMED:
compressionIn = new
FramedLZ4CompressorInputStream(bufferedIn, true);
break;
+ case COMPRESSION_FORMAT_ZSTD:
+ compressionIn = new
ZstdCompressorInputStream(bufferedIn);
+ break;
default:
compressionIn = new
CompressorStreamFactory().createCompressorInputStream(compressionFormat.toLowerCase(),
bufferedIn);
}
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestCompressContent.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestCompressContent.java
index 0445f2bd8b..0d8727a8e3 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestCompressContent.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestCompressContent.java
@@ -347,4 +347,34 @@ public class TestCompressContent {
flowFile.assertContentEquals(Paths.get("src/test/resources/CompressedData/SampleFile.txt"));
flowFile.assertAttributeEquals("filename", "SampleFile.txt");
}
+
+ @Test
+ public void testZstdCompress() throws Exception {
+ final TestRunner runner =
TestRunners.newTestRunner(CompressContent.class);
+ runner.setProperty(CompressContent.MODE,
CompressContent.MODE_COMPRESS);
+ runner.setProperty(CompressContent.COMPRESSION_FORMAT,
CompressContent.COMPRESSION_FORMAT_ZSTD);
+ runner.setProperty(CompressContent.UPDATE_FILENAME, "true");
+
+
runner.enqueue(Paths.get("src/test/resources/CompressedData/SampleFile.txt"));
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(CompressContent.REL_SUCCESS, 1);
+ MockFlowFile flowFile =
runner.getFlowFilesForRelationship(CompressContent.REL_SUCCESS).get(0);
+ flowFile.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(),
"application/zstd");
+ flowFile.assertAttributeEquals("filename", "SampleFile.txt.zst");
+ }
+
+ @Test
+ public void testZstdDecompress() throws Exception {
+ final TestRunner runner =
TestRunners.newTestRunner(CompressContent.class);
+ runner.setProperty(CompressContent.MODE,
CompressContent.MODE_DECOMPRESS);
+ runner.setProperty(CompressContent.COMPRESSION_FORMAT,
CompressContent.COMPRESSION_FORMAT_ZSTD);
+ runner.setProperty(CompressContent.UPDATE_FILENAME, "true");
+
runner.enqueue(Paths.get("src/test/resources/CompressedData/SampleFile.txt.zst"));
+ runner.run();
+ runner.assertAllFlowFilesTransferred(CompressContent.REL_SUCCESS, 1);
+ MockFlowFile flowFile =
runner.getFlowFilesForRelationship(CompressContent.REL_SUCCESS).get(0);
+
flowFile.assertContentEquals(Paths.get("src/test/resources/CompressedData/SampleFile.txt"));
+ flowFile.assertAttributeEquals("filename", "SampleFile.txt");
+ }
}
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/CompressedData/SampleFile.txt.zst
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/CompressedData/SampleFile.txt.zst
new file mode 100755
index 0000000000..e427761608
Binary files /dev/null and
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/CompressedData/SampleFile.txt.zst
differ
diff --git a/nifi-nar-bundles/nifi-standard-bundle/pom.xml
b/nifi-nar-bundles/nifi-standard-bundle/pom.xml
index 694fa2acc5..20c3f4fc3f 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/pom.xml
+++ b/nifi-nar-bundles/nifi-standard-bundle/pom.xml
@@ -148,6 +148,11 @@
<artifactId>lzma-java</artifactId>
<version>1.3</version>
</dependency>
+ <dependency>
+ <groupId>com.github.luben</groupId>
+ <artifactId>zstd-jni</artifactId>
+ <version>1.5.2-3</version>
+ </dependency>
<dependency>
<groupId>org.tukaani</groupId>
<artifactId>xz</artifactId>
diff --git
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonRecordSetWriter.java
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonRecordSetWriter.java
index dee9a4f7e9..153b1308bd 100644
---
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonRecordSetWriter.java
+++
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonRecordSetWriter.java
@@ -72,6 +72,7 @@ public class JsonRecordSetWriter extends
DateTimeTextRecordSetWriter implements
public static final String COMPRESSION_FORMAT_SNAPPY = "snappy";
public static final String COMPRESSION_FORMAT_SNAPPY_FRAMED = "snappy
framed";
public static final String COMPRESSION_FORMAT_NONE = "none";
+ public static final String COMPRESSION_FORMAT_ZSTD = "zstd";
static final PropertyDescriptor SUPPRESS_NULLS = new
PropertyDescriptor.Builder()
.name("suppress-nulls")
@@ -101,9 +102,9 @@ public class JsonRecordSetWriter extends
DateTimeTextRecordSetWriter implements
public static final PropertyDescriptor COMPRESSION_FORMAT = new
PropertyDescriptor.Builder()
.name("compression-format")
.displayName("Compression Format")
- .description("The compression format to use. Valid values are:
GZIP, BZIP2, XZ-LZMA2, LZMA, Snappy, and Snappy Framed")
+ .description("The compression format to use. Valid values are:
GZIP, BZIP2, ZSTD, XZ-LZMA2, LZMA, Snappy, and Snappy Framed")
.allowableValues(COMPRESSION_FORMAT_NONE, COMPRESSION_FORMAT_GZIP,
COMPRESSION_FORMAT_BZIP2, COMPRESSION_FORMAT_XZ_LZMA2,
- COMPRESSION_FORMAT_SNAPPY,
COMPRESSION_FORMAT_SNAPPY_FRAMED)
+ COMPRESSION_FORMAT_SNAPPY,
COMPRESSION_FORMAT_SNAPPY_FRAMED, COMPRESSION_FORMAT_ZSTD)
.defaultValue(COMPRESSION_FORMAT_NONE)
.required(true)
.build();
@@ -203,6 +204,10 @@ public class JsonRecordSetWriter extends
DateTimeTextRecordSetWriter implements
mimeTypeRef = "application/x-bzip2";
compressionOut = new
CompressorStreamFactory().createCompressorOutputStream(compressionFormat.toLowerCase(),
bufferedOut);
break;
+ case COMPRESSION_FORMAT_ZSTD:
+ mimeTypeRef = "application/zstd";
+ compressionOut = new
CompressorStreamFactory().createCompressorOutputStream(compressionFormat.toLowerCase(),
bufferedOut);
+ break;
default:
mimeTypeRef = "application/json";
compressionOut = out;