This is an automated email from the ASF dual-hosted git repository.

pan3793 pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
     new 419e0e343d0 HADOOP-19874. ZStandardCodec supports multi-threaded 
compression (#8461)
419e0e343d0 is described below

commit 419e0e343d0d6f254d970f36676ec570fbca645a
Author: Cheng Pan <[email protected]>
AuthorDate: Mon May 4 17:11:21 2026 +0800

    HADOOP-19874. ZStandardCodec supports multi-threaded compression (#8461)
    
    Reviewed-by: Akira Ajisaka <[email protected]>
    Reviewed-by: Shilun Fan <[email protected]>
    Signed-off-by: Cheng Pan <[email protected]>
---
 .../apache/hadoop/fs/CommonConfigurationKeys.java  |  11 ++
 .../apache/hadoop/io/compress/ZStandardCodec.java  |  26 +++-
 .../io/compress/zstd/ZStandardCompressor.java      |  44 ++++--
 .../src/main/resources/core-default.xml            |  15 +++
 .../zstd/TestZStandardCompressorDecompressor.java  | 149 ++++++++++++++++++++-
 5 files changed, 230 insertions(+), 15 deletions(-)

diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
index f58331baa81..59019feb112 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
@@ -168,6 +168,17 @@ public class CommonConfigurationKeys extends 
CommonConfigurationKeysPublic {
   public static final int
       IO_COMPRESSION_CODEC_ZSTD_BUFFER_SIZE_DEFAULT = 0;
 
+  /** ZStandard number of compression worker threads.
+   * A value of 0 (the default) disables worker threads and runs
+   * compression on the calling thread, matching the upstream zstd
+   * default. A positive value enables multi-threaded compression with
+   * the specified number of background workers. */
+  public static final String IO_COMPRESSION_CODEC_ZSTD_WORKERS_KEY =
+      "io.compression.codec.zstd.workers";
+
+  /** Default value for IO_COMPRESSION_CODEC_ZSTD_WORKERS_KEY (disabled). */
+  public static final int IO_COMPRESSION_CODEC_ZSTD_WORKERS_DEFAULT = 0;
+
   /** Internal buffer size for Lz4 compressor/decompressors */
   public static final String IO_COMPRESSION_CODEC_LZ4_BUFFERSIZE_KEY =
       "io.compression.codec.lz4.buffersize";
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/ZStandardCodec.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/ZStandardCodec.java
index 7b7ad69014c..109fd27a93b 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/ZStandardCodec.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/ZStandardCodec.java
@@ -68,6 +68,28 @@ public static int getCompressionLevel(Configuration conf) {
         CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_LEVEL_DEFAULT);
   }
 
+  /**
+   * Returns the number of compression worker threads to be used by the
+   * ZStandard compressor. A value of 0 (the default) disables worker
+   * threads, matching the upstream zstd default. Negative values are
+   * rejected.
+   *
+   * @param conf the configuration to read from
+   * @return the configured number of zstd compression worker threads
+   * @throws IllegalArgumentException if the configured value is negative
+   */
+  public static int getCompressionWorkers(Configuration conf) {
+    int workers = conf.getInt(
+        CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_WORKERS_KEY,
+        CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_WORKERS_DEFAULT);
+    if (workers < 0) {
+      throw new IllegalArgumentException(
+          CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_WORKERS_KEY
+              + " must be >= 0, got " + workers);
+    }
+    return workers;
+  }
+
   public static int getCompressionBufferSize(Configuration conf) {
     int bufferSize = getBufferSize(conf);
     return bufferSize == 0 ?
@@ -135,7 +157,9 @@ public Class<? extends Compressor> getCompressorType() {
   @Override
   public Compressor createCompressor() {
     return new ZStandardCompressor(
-        getCompressionLevel(conf), getCompressionBufferSize(conf));
+        getCompressionLevel(conf),
+        getCompressionWorkers(conf),
+        getCompressionBufferSize(conf));
   }
 
 
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zstd/ZStandardCompressor.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zstd/ZStandardCompressor.java
index 4fe16c87f16..8d989ec96db 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zstd/ZStandardCompressor.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zstd/ZStandardCompressor.java
@@ -43,6 +43,7 @@ public class ZStandardCompressor implements Compressor {
       LoggerFactory.getLogger(ZStandardCompressor.class);
 
   private int level;
+  private int workers;
   private int directBufferSize;
   private byte[] userBuf = null;
   private int userBufOff = 0, userBufLen = 0;
@@ -74,12 +75,30 @@ public static int getRecommendedBufferSize() {
    * @param bufferSize bufferSize.
    */
   public ZStandardCompressor(int level, int bufferSize) {
-    this(level, bufferSize, bufferSize);
+    this(level,
+        CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_WORKERS_DEFAULT,
+        bufferSize, bufferSize);
+  }
+
+  /**
+   * Creates a new compressor with the supplied compression level and number
+   * of compression worker threads. Compressed data will be generated in
+   * ZStandard format.
+   *
+   * @param level the zstd compression level
+   * @param workers number of zstd compression worker threads (0 disables
+   *                multi-threaded compression)
+   * @param bufferSize the input/output direct buffer size
+   */
+  public ZStandardCompressor(int level, int workers, int bufferSize) {
+    this(level, workers, bufferSize, bufferSize);
   }
 
   @VisibleForTesting
-  ZStandardCompressor(int level, int inputBufferSize, int outputBufferSize) {
+  ZStandardCompressor(int level, int workers, int inputBufferSize,
+      int outputBufferSize) {
     this.level = level;
+    this.workers = workers;
     zstdJniCtx = new ZstdCompressCtx();
     uncompressedDirectBuf = ByteBuffer.allocateDirect(inputBufferSize);
     directBufferSize = outputBufferSize;
@@ -101,6 +120,7 @@ public void reinit(Configuration conf) {
       return;
     }
     level = ZStandardCodec.getCompressionLevel(conf);
+    workers = ZStandardCodec.getCompressionWorkers(conf);
     reset();
     LOG.debug("Reinit compressor with new compression configuration");
   }
@@ -196,11 +216,9 @@ public int compress(byte[] b, int off, int len) throws 
IOException {
       return n;
     }
 
-    // Always invoke the streaming API — even with empty input — so internally
-    // buffered bytes continue to be drained, matching native ZSTD_flushStream.
-    // Use END only when finish=true, no more user data, and all direct-buffer
-    // data consumed (mirrors ZSTD_endStream); otherwise FLUSH (mirrors
-    // ZSTD_compressStream + ZSTD_flushStream).
+    // Always invoke the streaming API - even with empty input - so internally
+    // buffered bytes continue to be drained. Use END only when finish=true, no
+    // more user data, and all direct-buffer data consumed; otherwise CONTINUE.
     boolean allConsumed = (uncompressedDirectBufLen - uncompressedDirectBufOff 
<= 0);
     boolean shouldEnd = finish && userBufLen == 0 && allConsumed;
 
@@ -209,7 +227,16 @@ public int compress(byte[] b, int off, int len) throws 
IOException {
     compressedDirectBuf.position(0);
     compressedDirectBuf.limit(directBufferSize);
 
-    EndDirective endOp = shouldEnd ? EndDirective.END : EndDirective.FLUSH;
+    // CONTINUE should be used for non-end case, to support multi-threaded:
+    // 1. CONTINUE + workers >= 1: non-blocking. The call copies as much input
+    //      as it can into a job, dispatches to workers, drains whatever output
+    //      is ready, and returns. Multiple jobs can be in flight in parallel.
+    // 2. FLUSH + workers >= 1: multi-threaded compression will block to flush
+    //      as much output as possible. The call won't return until every 
queued
+    //      job has finished and its output has been drained to the dst buffer.
+    // 3. END + workers >= 1: same as FLUSH but also closes the frame. Same
+    //      blocking behavior.
+    EndDirective endOp = shouldEnd ? EndDirective.END : EndDirective.CONTINUE;
     boolean done = zstdJniCtx.compressDirectByteBufferStream(
         compressedDirectBuf, uncompressedDirectBuf, endOp);
 
@@ -269,6 +296,7 @@ public void reset() {
     checkStream();
     zstdJniCtx.reset();
     zstdJniCtx.setLevel(level);
+    zstdJniCtx.setWorkers(workers);
     finish = false;
     finished = false;
     bytesRead = 0;
diff --git 
a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml 
b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
index 21446d52138..a07460dc308 100644
--- a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
+++ b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
@@ -981,6 +981,21 @@
   </description>
 </property>
 
+<property>
+  <name>io.compression.codec.zstd.workers</name>
+  <value>0</value>
+  <description>
+    Number of worker threads used by the ZStandard compressor for
+    multi-threaded compression. The default value 0 disables worker
+    threads and runs compression on the calling thread, matching the
+    upstream zstd default. Setting this to a positive value enables
+    parallel compression with the specified number of background
+    workers; the value is capped internally by the zstd library.
+    Negative values are rejected. This setting only affects compression;
+    decompression is unaffected.
+  </description>
+</property>
+
 <property>
   <name>io.serializations</name>
   <value>org.apache.hadoop.io.serializer.WritableSerialization, 
org.apache.hadoop.io.serializer.avro.AvroSpecificSerialization, 
org.apache.hadoop.io.serializer.avro.AvroReflectSerialization</value>
diff --git 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/zstd/TestZStandardCompressorDecompressor.java
 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/zstd/TestZStandardCompressorDecompressor.java
index a141a974e59..e1546cdbbfa 100644
--- 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/zstd/TestZStandardCompressorDecompressor.java
+++ 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/zstd/TestZStandardCompressorDecompressor.java
@@ -198,8 +198,16 @@ public void 
testSetInputWithBytesSizeMoreThenDefaultZStandardBufferSize()
     byte[] bytes = generate(bytesSize);
     assertTrue(compressor.needsInput(), "needsInput error !!!");
     compressor.setInput(bytes, 0, bytes.length);
+    compressor.finish();
     byte[] emptyBytes = new byte[bytesSize];
-    int cSize = compressor.compress(emptyBytes, 0, bytes.length);
+    // Drive compress() in a loop until the compressor reports finished(),
+    // mirroring how CompressorStream drains the compressor.
+    int cSize = 0;
+    while (!compressor.finished() && cSize < emptyBytes.length) {
+      compressor.needsInput();
+      cSize += compressor.compress(emptyBytes, cSize,
+          emptyBytes.length - cSize);
+    }
     assertTrue(cSize > 0);
   }
 
@@ -330,13 +338,27 @@ public void testCompressDecompress() throws Exception {
     assertEquals(0, compressor.getBytesRead());
     compressor.finish();
 
+    // Drive compress() in a loop until the compressor reports finished(),
+    // mirroring how CompressorStream drains the compressor.
     byte[] compressedResult = new byte[rawDataSize];
-    int cSize = compressor.compress(compressedResult, 0, rawDataSize);
+    int cSize = 0;
+    while (!compressor.finished() && cSize < compressedResult.length) {
+      cSize += compressor.compress(compressedResult, cSize,
+          compressedResult.length - cSize);
+    }
+    assertTrue(compressor.finished());
     assertEquals(rawDataSize, compressor.getBytesRead());
     assertTrue(cSize < rawDataSize);
     decompressor.setInput(compressedResult, 0, cSize);
+    // Drive decompress() in a loop until the decompressor reports finished()
+    // (see CompressDecompressTester#COMPRESS_DECOMPRESS_BLOCK).
     byte[] decompressedBytes = new byte[rawDataSize];
-    decompressor.decompress(decompressedBytes, 0, decompressedBytes.length);
+    int dSize = 0;
+    while (!decompressor.finished() && dSize < decompressedBytes.length) {
+      dSize += decompressor.decompress(decompressedBytes, dSize,
+          decompressedBytes.length - dSize);
+    }
+    assertEquals(rawDataSize, dSize);
     assertEquals(bytesToHex(rawData), bytesToHex(decompressedBytes));
     compressor.reset();
     decompressor.reset();
@@ -354,7 +376,7 @@ public void testCompressingWithOneByteOutputBuffer() throws 
Exception {
 
     ByteArrayOutputStream baos = new ByteArrayOutputStream();
     Compressor compressor =
-        new ZStandardCompressor(3, IO_FILE_BUFFER_SIZE_DEFAULT, 1);
+        new ZStandardCompressor(3, 0, IO_FILE_BUFFER_SIZE_DEFAULT, 1);
     CompressionOutputStream outputStream =
         codec.createOutputStream(baos, compressor);
 
@@ -397,14 +419,28 @@ public void testZStandardCompressDecompress() throws 
Exception {
     compressor.setInput(rawData, 0, rawData.length);
     compressor.finish();
 
+    // Drive compress() in a loop until the compressor reports finished(),
+    // mirroring how CompressorStream drains the compressor.
     byte[] compressedResult = new byte[rawDataSize];
-    int cSize = compressor.compress(compressedResult, 0, rawDataSize);
+    int cSize = 0;
+    while (!compressor.finished() && cSize < compressedResult.length) {
+      cSize += compressor.compress(compressedResult, cSize,
+          compressedResult.length - cSize);
+    }
+    assertTrue(compressor.finished());
     assertEquals(rawDataSize, compressor.getBytesRead());
     assertTrue(cSize < rawDataSize,
         "compressed size no less then original size");
     decompressor.setInput(compressedResult, 0, cSize);
+    // Drive decompress() in a loop until the decompressor reports finished()
+    // (see CompressDecompressTester#COMPRESS_DECOMPRESS_BLOCK).
     byte[] decompressedBytes = new byte[rawDataSize];
-    decompressor.decompress(decompressedBytes, 0, decompressedBytes.length);
+    int dSize = 0;
+    while (!decompressor.finished() && dSize < decompressedBytes.length) {
+      dSize += decompressor.decompress(decompressedBytes, dSize,
+          decompressedBytes.length - dSize);
+    }
+    assertEquals(rawDataSize, dSize);
     String decompressed = bytesToHex(decompressedBytes);
     String original = bytesToHex(rawData);
     assertEquals(original, decompressed);
@@ -521,6 +557,107 @@ public void 
testDecompressReturnsWhenNothingToDecompress() throws Exception {
     assertEquals(0, result);
   }
 
+  // workers > 0 should produce data that round-trips correctly through the
+  // decompressor, matching the bytes produced with the default workers=0.
+  @Test
+  public void testCompressionWithWorkers() throws Exception {
+    byte[] bytes = FileUtils.readFileToByteArray(uncompressedFile);
+
+    Configuration conf = new Configuration();
+    conf.setInt("io.compression.codec.zstd.workers", 2);
+    ZStandardCodec codec = new ZStandardCodec();
+    codec.setConf(conf);
+
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    Compressor compressor = codec.createCompressor();
+    try (CompressionOutputStream outputStream =
+             codec.createOutputStream(baos, compressor)) {
+      outputStream.write(bytes);
+      outputStream.finish();
+    }
+    assertTrue(compressor.finished());
+    assertEquals(bytes.length, compressor.getBytesRead());
+
+    // Round-trip through the decompressor.
+    ByteArrayOutputStream decompressed = new ByteArrayOutputStream();
+    ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
+    Decompressor decompressor = codec.createDecompressor();
+    try (CompressionInputStream inputStream =
+             codec.createInputStream(bais, decompressor)) {
+      byte[] buf = new byte[4096];
+      int n;
+      while ((n = inputStream.read(buf, 0, buf.length)) != -1) {
+        decompressed.write(buf, 0, n);
+      }
+    }
+    assertArrayEquals(bytes, decompressed.toByteArray());
+  }
+
+  // A negative workers value must be rejected up-front by ZStandardCodec.
+  @Test
+  public void testNegativeWorkersIsRejected() {
+    Configuration conf = new Configuration();
+    conf.setInt("io.compression.codec.zstd.workers", -1);
+    ZStandardCodec codec = new ZStandardCodec();
+    codec.setConf(conf);
+    assertThrows(IllegalArgumentException.class, codec::createCompressor);
+  }
+
+  // The default value (workers=0) must keep behaviour identical to before.
+  @Test
+  public void testDefaultWorkersIsZero() throws Exception {
+    Configuration conf = new Configuration();
+    ZStandardCodec codec = new ZStandardCodec();
+    codec.setConf(conf);
+    assertEquals(0, ZStandardCodec.getCompressionWorkers(conf));
+
+    byte[] bytes = FileUtils.readFileToByteArray(uncompressedFile);
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    Compressor compressor = codec.createCompressor();
+    try (CompressionOutputStream outputStream =
+             codec.createOutputStream(baos, compressor)) {
+      outputStream.write(bytes);
+      outputStream.finish();
+    }
+    assertTrue(compressor.finished());
+    assertEquals(bytes.length, compressor.getBytesRead());
+  }
+
+  // reinit() should pick up an updated workers value for pooled compressors.
+  @Test
+  public void testReinitUpdatesWorkers() throws Exception {
+    byte[] bytes = FileUtils.readFileToByteArray(uncompressedFile);
+
+    ZStandardCodec codec = new ZStandardCodec();
+    codec.setConf(new Configuration());
+    Compressor compressor = codec.createCompressor();
+
+    Configuration newConf = new Configuration();
+    newConf.setInt("io.compression.codec.zstd.workers", 2);
+    compressor.reinit(newConf);
+
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    try (CompressionOutputStream outputStream =
+             codec.createOutputStream(baos, compressor)) {
+      outputStream.write(bytes);
+      outputStream.finish();
+    }
+
+    // Round-trip to confirm the output is still valid zstd data.
+    ByteArrayOutputStream decompressed = new ByteArrayOutputStream();
+    ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
+    Decompressor decompressor = codec.createDecompressor();
+    try (CompressionInputStream inputStream =
+             codec.createInputStream(bais, decompressor)) {
+      byte[] buf = new byte[4096];
+      int n;
+      while ((n = inputStream.read(buf, 0, buf.length)) != -1) {
+        decompressed.write(buf, 0, n);
+      }
+    }
+    assertArrayEquals(bytes, decompressed.toByteArray());
+  }
+
   public static byte[] generate(int size) {
     byte[] data = new byte[size];
     for (int i = 0; i < size; i++) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to