This is an automated email from the ASF dual-hosted git repository.
pan3793 pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/branch-3.5 by this push:
new 419e0e343d0 HADOOP-19874. ZStandardCodec supports multi-threaded
compression (#8461)
419e0e343d0 is described below
commit 419e0e343d0d6f254d970f36676ec570fbca645a
Author: Cheng Pan <[email protected]>
AuthorDate: Mon May 4 17:11:21 2026 +0800
HADOOP-19874. ZStandardCodec supports multi-threaded compression (#8461)
Reviewed-by: Akira Ajisaka <[email protected]>
Reviewed-by: Shilun Fan <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
---
.../apache/hadoop/fs/CommonConfigurationKeys.java | 11 ++
.../apache/hadoop/io/compress/ZStandardCodec.java | 26 +++-
.../io/compress/zstd/ZStandardCompressor.java | 44 ++++--
.../src/main/resources/core-default.xml | 15 +++
.../zstd/TestZStandardCompressorDecompressor.java | 149 ++++++++++++++++++++-
5 files changed, 230 insertions(+), 15 deletions(-)
diff --git
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
index f58331baa81..59019feb112 100644
---
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
+++
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
@@ -168,6 +168,17 @@ public class CommonConfigurationKeys extends
CommonConfigurationKeysPublic {
public static final int
IO_COMPRESSION_CODEC_ZSTD_BUFFER_SIZE_DEFAULT = 0;
+ /** ZStandard number of compression worker threads.
+ * A value of 0 (the default) disables worker threads and runs
+ * compression on the calling thread, matching the upstream zstd
+ * default. A positive value enables multi-threaded compression with
+ * the specified number of background workers. */
+ public static final String IO_COMPRESSION_CODEC_ZSTD_WORKERS_KEY =
+ "io.compression.codec.zstd.workers";
+
+ /** Default value for IO_COMPRESSION_CODEC_ZSTD_WORKERS_KEY (disabled). */
+ public static final int IO_COMPRESSION_CODEC_ZSTD_WORKERS_DEFAULT = 0;
+
/** Internal buffer size for Lz4 compressor/decompressors */
public static final String IO_COMPRESSION_CODEC_LZ4_BUFFERSIZE_KEY =
"io.compression.codec.lz4.buffersize";
diff --git
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/ZStandardCodec.java
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/ZStandardCodec.java
index 7b7ad69014c..109fd27a93b 100644
---
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/ZStandardCodec.java
+++
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/ZStandardCodec.java
@@ -68,6 +68,28 @@ public static int getCompressionLevel(Configuration conf) {
CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_LEVEL_DEFAULT);
}
+ /**
+ * Returns the number of compression worker threads to be used by the
+ * ZStandard compressor. A value of 0 (the default) disables worker
+ * threads, matching the upstream zstd default. Negative values are
+ * rejected.
+ *
+ * @param conf the configuration to read from
+ * @return the configured number of zstd compression worker threads
+ * @throws IllegalArgumentException if the configured value is negative
+ */
+ public static int getCompressionWorkers(Configuration conf) {
+ int workers = conf.getInt(
+ CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_WORKERS_KEY,
+ CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_WORKERS_DEFAULT);
+ if (workers < 0) {
+ throw new IllegalArgumentException(
+ CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_WORKERS_KEY
+ + " must be >= 0, got " + workers);
+ }
+ return workers;
+ }
+
public static int getCompressionBufferSize(Configuration conf) {
int bufferSize = getBufferSize(conf);
return bufferSize == 0 ?
@@ -135,7 +157,9 @@ public Class<? extends Compressor> getCompressorType() {
@Override
public Compressor createCompressor() {
return new ZStandardCompressor(
- getCompressionLevel(conf), getCompressionBufferSize(conf));
+ getCompressionLevel(conf),
+ getCompressionWorkers(conf),
+ getCompressionBufferSize(conf));
}
diff --git
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zstd/ZStandardCompressor.java
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zstd/ZStandardCompressor.java
index 4fe16c87f16..8d989ec96db 100644
---
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zstd/ZStandardCompressor.java
+++
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zstd/ZStandardCompressor.java
@@ -43,6 +43,7 @@ public class ZStandardCompressor implements Compressor {
LoggerFactory.getLogger(ZStandardCompressor.class);
private int level;
+ private int workers;
private int directBufferSize;
private byte[] userBuf = null;
private int userBufOff = 0, userBufLen = 0;
@@ -74,12 +75,30 @@ public static int getRecommendedBufferSize() {
* @param bufferSize bufferSize.
*/
public ZStandardCompressor(int level, int bufferSize) {
- this(level, bufferSize, bufferSize);
+ this(level,
+ CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_WORKERS_DEFAULT,
+ bufferSize, bufferSize);
+ }
+
+ /**
+ * Creates a new compressor with the supplied compression level and number
+ * of compression worker threads. Compressed data will be generated in
+ * ZStandard format.
+ *
+ * @param level the zstd compression level
+ * @param workers number of zstd compression worker threads (0 disables
+ * multi-threaded compression)
+ * @param bufferSize the input/output direct buffer size
+ */
+ public ZStandardCompressor(int level, int workers, int bufferSize) {
+ this(level, workers, bufferSize, bufferSize);
}
@VisibleForTesting
- ZStandardCompressor(int level, int inputBufferSize, int outputBufferSize) {
+ ZStandardCompressor(int level, int workers, int inputBufferSize,
+ int outputBufferSize) {
this.level = level;
+ this.workers = workers;
zstdJniCtx = new ZstdCompressCtx();
uncompressedDirectBuf = ByteBuffer.allocateDirect(inputBufferSize);
directBufferSize = outputBufferSize;
@@ -101,6 +120,7 @@ public void reinit(Configuration conf) {
return;
}
level = ZStandardCodec.getCompressionLevel(conf);
+ workers = ZStandardCodec.getCompressionWorkers(conf);
reset();
LOG.debug("Reinit compressor with new compression configuration");
}
@@ -196,11 +216,9 @@ public int compress(byte[] b, int off, int len) throws
IOException {
return n;
}
- // Always invoke the streaming API — even with empty input — so internally
- // buffered bytes continue to be drained, matching native ZSTD_flushStream.
- // Use END only when finish=true, no more user data, and all direct-buffer
- // data consumed (mirrors ZSTD_endStream); otherwise FLUSH (mirrors
- // ZSTD_compressStream + ZSTD_flushStream).
+ // Always invoke the streaming API - even with empty input - so internally
+ // buffered bytes continue to be drained. Use END only when finish=true, no
+ // more user data, and all direct-buffer data consumed; otherwise CONTINUE.
boolean allConsumed = (uncompressedDirectBufLen - uncompressedDirectBufOff
<= 0);
boolean shouldEnd = finish && userBufLen == 0 && allConsumed;
@@ -209,7 +227,16 @@ public int compress(byte[] b, int off, int len) throws
IOException {
compressedDirectBuf.position(0);
compressedDirectBuf.limit(directBufferSize);
- EndDirective endOp = shouldEnd ? EndDirective.END : EndDirective.FLUSH;
+ // CONTINUE should be used for non-end case, to support multi-threaded:
+ // 1. CONTINUE + workers >= 1: non-blocking. The call copies as much input
+ // as it can into a job, dispatches to workers, drains whatever output
+ // is ready, and returns. Multiple jobs can be in flight in parallel.
+ // 2. FLUSH + workers >= 1: multi-threaded compression will block to flush
+ // as much output as possible. The call won't return until every
queued
+ // job has finished and its output has been drained to the dst buffer.
+ // 3. END + workers >= 1: same as FLUSH but also closes the frame. Same
+ // blocking behavior.
+ EndDirective endOp = shouldEnd ? EndDirective.END : EndDirective.CONTINUE;
boolean done = zstdJniCtx.compressDirectByteBufferStream(
compressedDirectBuf, uncompressedDirectBuf, endOp);
@@ -269,6 +296,7 @@ public void reset() {
checkStream();
zstdJniCtx.reset();
zstdJniCtx.setLevel(level);
+ zstdJniCtx.setWorkers(workers);
finish = false;
finished = false;
bytesRead = 0;
diff --git
a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
index 21446d52138..a07460dc308 100644
--- a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
+++ b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
@@ -981,6 +981,21 @@
</description>
</property>
+<property>
+ <name>io.compression.codec.zstd.workers</name>
+ <value>0</value>
+ <description>
+ Number of worker threads used by the ZStandard compressor for
+ multi-threaded compression. The default value 0 disables worker
+ threads and runs compression on the calling thread, matching the
+ upstream zstd default. Setting this to a positive value enables
+ parallel compression with the specified number of background
+ workers; the value is capped internally by the zstd library.
+ Negative values are rejected. This setting only affects compression;
+ decompression is unaffected.
+ </description>
+</property>
+
<property>
<name>io.serializations</name>
<value>org.apache.hadoop.io.serializer.WritableSerialization,
org.apache.hadoop.io.serializer.avro.AvroSpecificSerialization,
org.apache.hadoop.io.serializer.avro.AvroReflectSerialization</value>
diff --git
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/zstd/TestZStandardCompressorDecompressor.java
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/zstd/TestZStandardCompressorDecompressor.java
index a141a974e59..e1546cdbbfa 100644
---
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/zstd/TestZStandardCompressorDecompressor.java
+++
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/zstd/TestZStandardCompressorDecompressor.java
@@ -198,8 +198,16 @@ public void
testSetInputWithBytesSizeMoreThenDefaultZStandardBufferSize()
byte[] bytes = generate(bytesSize);
assertTrue(compressor.needsInput(), "needsInput error !!!");
compressor.setInput(bytes, 0, bytes.length);
+ compressor.finish();
byte[] emptyBytes = new byte[bytesSize];
- int cSize = compressor.compress(emptyBytes, 0, bytes.length);
+ // Drive compress() in a loop until the compressor reports finished(),
+ // mirroring how CompressorStream drains the compressor.
+ int cSize = 0;
+ while (!compressor.finished() && cSize < emptyBytes.length) {
+ compressor.needsInput();
+ cSize += compressor.compress(emptyBytes, cSize,
+ emptyBytes.length - cSize);
+ }
assertTrue(cSize > 0);
}
@@ -330,13 +338,27 @@ public void testCompressDecompress() throws Exception {
assertEquals(0, compressor.getBytesRead());
compressor.finish();
+ // Drive compress() in a loop until the compressor reports finished(),
+ // mirroring how CompressorStream drains the compressor.
byte[] compressedResult = new byte[rawDataSize];
- int cSize = compressor.compress(compressedResult, 0, rawDataSize);
+ int cSize = 0;
+ while (!compressor.finished() && cSize < compressedResult.length) {
+ cSize += compressor.compress(compressedResult, cSize,
+ compressedResult.length - cSize);
+ }
+ assertTrue(compressor.finished());
assertEquals(rawDataSize, compressor.getBytesRead());
assertTrue(cSize < rawDataSize);
decompressor.setInput(compressedResult, 0, cSize);
+ // Drive decompress() in a loop until the decompressor reports finished()
+ // (see CompressDecompressTester#COMPRESS_DECOMPRESS_BLOCK).
byte[] decompressedBytes = new byte[rawDataSize];
- decompressor.decompress(decompressedBytes, 0, decompressedBytes.length);
+ int dSize = 0;
+ while (!decompressor.finished() && dSize < decompressedBytes.length) {
+ dSize += decompressor.decompress(decompressedBytes, dSize,
+ decompressedBytes.length - dSize);
+ }
+ assertEquals(rawDataSize, dSize);
assertEquals(bytesToHex(rawData), bytesToHex(decompressedBytes));
compressor.reset();
decompressor.reset();
@@ -354,7 +376,7 @@ public void testCompressingWithOneByteOutputBuffer() throws
Exception {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
Compressor compressor =
- new ZStandardCompressor(3, IO_FILE_BUFFER_SIZE_DEFAULT, 1);
+ new ZStandardCompressor(3, 0, IO_FILE_BUFFER_SIZE_DEFAULT, 1);
CompressionOutputStream outputStream =
codec.createOutputStream(baos, compressor);
@@ -397,14 +419,28 @@ public void testZStandardCompressDecompress() throws
Exception {
compressor.setInput(rawData, 0, rawData.length);
compressor.finish();
+ // Drive compress() in a loop until the compressor reports finished(),
+ // mirroring how CompressorStream drains the compressor.
byte[] compressedResult = new byte[rawDataSize];
- int cSize = compressor.compress(compressedResult, 0, rawDataSize);
+ int cSize = 0;
+ while (!compressor.finished() && cSize < compressedResult.length) {
+ cSize += compressor.compress(compressedResult, cSize,
+ compressedResult.length - cSize);
+ }
+ assertTrue(compressor.finished());
assertEquals(rawDataSize, compressor.getBytesRead());
assertTrue(cSize < rawDataSize,
"compressed size no less then original size");
decompressor.setInput(compressedResult, 0, cSize);
+ // Drive decompress() in a loop until the decompressor reports finished()
+ // (see CompressDecompressTester#COMPRESS_DECOMPRESS_BLOCK).
byte[] decompressedBytes = new byte[rawDataSize];
- decompressor.decompress(decompressedBytes, 0, decompressedBytes.length);
+ int dSize = 0;
+ while (!decompressor.finished() && dSize < decompressedBytes.length) {
+ dSize += decompressor.decompress(decompressedBytes, dSize,
+ decompressedBytes.length - dSize);
+ }
+ assertEquals(rawDataSize, dSize);
String decompressed = bytesToHex(decompressedBytes);
String original = bytesToHex(rawData);
assertEquals(original, decompressed);
@@ -521,6 +557,107 @@ public void
testDecompressReturnsWhenNothingToDecompress() throws Exception {
assertEquals(0, result);
}
+ // workers > 0 should produce data that round-trips correctly through the
+ // decompressor, matching the bytes produced with the default workers=0.
+ @Test
+ public void testCompressionWithWorkers() throws Exception {
+ byte[] bytes = FileUtils.readFileToByteArray(uncompressedFile);
+
+ Configuration conf = new Configuration();
+ conf.setInt("io.compression.codec.zstd.workers", 2);
+ ZStandardCodec codec = new ZStandardCodec();
+ codec.setConf(conf);
+
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ Compressor compressor = codec.createCompressor();
+ try (CompressionOutputStream outputStream =
+ codec.createOutputStream(baos, compressor)) {
+ outputStream.write(bytes);
+ outputStream.finish();
+ }
+ assertTrue(compressor.finished());
+ assertEquals(bytes.length, compressor.getBytesRead());
+
+ // Round-trip through the decompressor.
+ ByteArrayOutputStream decompressed = new ByteArrayOutputStream();
+ ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
+ Decompressor decompressor = codec.createDecompressor();
+ try (CompressionInputStream inputStream =
+ codec.createInputStream(bais, decompressor)) {
+ byte[] buf = new byte[4096];
+ int n;
+ while ((n = inputStream.read(buf, 0, buf.length)) != -1) {
+ decompressed.write(buf, 0, n);
+ }
+ }
+ assertArrayEquals(bytes, decompressed.toByteArray());
+ }
+
+ // A negative workers value must be rejected up-front by ZStandardCodec.
+ @Test
+ public void testNegativeWorkersIsRejected() {
+ Configuration conf = new Configuration();
+ conf.setInt("io.compression.codec.zstd.workers", -1);
+ ZStandardCodec codec = new ZStandardCodec();
+ codec.setConf(conf);
+ assertThrows(IllegalArgumentException.class, codec::createCompressor);
+ }
+
+ // The default value (workers=0) must keep behaviour identical to before.
+ @Test
+ public void testDefaultWorkersIsZero() throws Exception {
+ Configuration conf = new Configuration();
+ ZStandardCodec codec = new ZStandardCodec();
+ codec.setConf(conf);
+ assertEquals(0, ZStandardCodec.getCompressionWorkers(conf));
+
+ byte[] bytes = FileUtils.readFileToByteArray(uncompressedFile);
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ Compressor compressor = codec.createCompressor();
+ try (CompressionOutputStream outputStream =
+ codec.createOutputStream(baos, compressor)) {
+ outputStream.write(bytes);
+ outputStream.finish();
+ }
+ assertTrue(compressor.finished());
+ assertEquals(bytes.length, compressor.getBytesRead());
+ }
+
+ // reinit() should pick up an updated workers value for pooled compressors.
+ @Test
+ public void testReinitUpdatesWorkers() throws Exception {
+ byte[] bytes = FileUtils.readFileToByteArray(uncompressedFile);
+
+ ZStandardCodec codec = new ZStandardCodec();
+ codec.setConf(new Configuration());
+ Compressor compressor = codec.createCompressor();
+
+ Configuration newConf = new Configuration();
+ newConf.setInt("io.compression.codec.zstd.workers", 2);
+ compressor.reinit(newConf);
+
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ try (CompressionOutputStream outputStream =
+ codec.createOutputStream(baos, compressor)) {
+ outputStream.write(bytes);
+ outputStream.finish();
+ }
+
+ // Round-trip to confirm the output is still valid zstd data.
+ ByteArrayOutputStream decompressed = new ByteArrayOutputStream();
+ ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
+ Decompressor decompressor = codec.createDecompressor();
+ try (CompressionInputStream inputStream =
+ codec.createInputStream(bais, decompressor)) {
+ byte[] buf = new byte[4096];
+ int n;
+ while ((n = inputStream.read(buf, 0, buf.length)) != -1) {
+ decompressed.write(buf, 0, n);
+ }
+ }
+ assertArrayEquals(bytes, decompressed.toByteArray());
+ }
+
public static byte[] generate(int size) {
byte[] data = new byte[size];
for (int i = 0; i < size; i++) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]