This is an automated email from the ASF dual-hosted git repository.

rexxiong pushed a commit to branch branch-0.6
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/branch-0.6 by this push:
     new 551a1d8a2 [CELEBORN-2200] Throw IOException when compressed data 
header corrupted
551a1d8a2 is described below

commit 551a1d8a2f848c02850b61404155ea212af63e36
Author: jiang13021 <[email protected]>
AuthorDate: Wed Nov 12 14:34:46 2025 +0800

    [CELEBORN-2200] Throw IOException when compressed data header corrupted
    
    ### What changes were proposed in this pull request?
    As title.
    
    ### Why are the changes needed?
    We discovered that the corruption of the compressed data header may cause 
data loss. By throwing IOException, we can trigger a stage rerun to avoid data 
loss.
    
    ### Does this PR resolve a correctness bug?
    Yes
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Add UT testLz4CodecCorrupted & testZstdCodecCorrupted.
    
    Closes #3534 from jiang13021/celeborn-2200.
    
    Authored-by: jiang13021 <[email protected]>
    Signed-off-by: Shuang <[email protected]>
    (cherry picked from commit 79f0d319a17b838e15021b4abbb4040c710254ac)
    Signed-off-by: Shuang <[email protected]>
---
 .../celeborn/client/compress/Decompressor.java     |  4 +-
 .../celeborn/client/compress/Lz4Decompressor.java  | 18 ++++----
 .../celeborn/client/compress/ZstdDecompressor.java | 20 +++++----
 .../celeborn/client/compress/CodecSuiteJ.java      | 49 +++++++++++++++++++++-
 4 files changed, 72 insertions(+), 19 deletions(-)

diff --git 
a/client/src/main/java/org/apache/celeborn/client/compress/Decompressor.java 
b/client/src/main/java/org/apache/celeborn/client/compress/Decompressor.java
index 2918fe1f2..37cead4c2 100644
--- a/client/src/main/java/org/apache/celeborn/client/compress/Decompressor.java
+++ b/client/src/main/java/org/apache/celeborn/client/compress/Decompressor.java
@@ -17,6 +17,8 @@
 
 package org.apache.celeborn.client.compress;
 
+import java.io.IOException;
+
 import scala.Option;
 
 import org.apache.celeborn.common.CelebornConf;
@@ -24,7 +26,7 @@ import org.apache.celeborn.common.protocol.CompressionCodec;
 
 public interface Decompressor {
 
-  int decompress(byte[] src, byte[] dst, int dstOff);
+  int decompress(byte[] src, byte[] dst, int dstOff) throws IOException;
 
   int getOriginalLen(byte[] src);
 
diff --git 
a/client/src/main/java/org/apache/celeborn/client/compress/Lz4Decompressor.java 
b/client/src/main/java/org/apache/celeborn/client/compress/Lz4Decompressor.java
index 94569f800..8a538ef34 100644
--- 
a/client/src/main/java/org/apache/celeborn/client/compress/Lz4Decompressor.java
+++ 
b/client/src/main/java/org/apache/celeborn/client/compress/Lz4Decompressor.java
@@ -17,6 +17,7 @@
 
 package org.apache.celeborn.client.compress;
 
+import java.io.IOException;
 import java.util.Map;
 import java.util.function.Supplier;
 import java.util.zip.Checksum;
@@ -56,7 +57,7 @@ public class Lz4Decompressor extends Lz4Trait implements 
Decompressor {
   }
 
   @Override
-  public int decompress(byte[] src, byte[] dst, int dstOff) {
+  public int decompress(byte[] src, byte[] dst, int dstOff) throws IOException 
{
     int compressionMethod = src[MAGIC_LENGTH] & 0xFF;
     int compressedLen = readIntLE(src, MAGIC_LENGTH + 1);
     int originalLen = readIntLE(src, MAGIC_LENGTH + 5);
@@ -69,19 +70,20 @@ public class Lz4Decompressor extends Lz4Trait implements 
Decompressor {
       case COMPRESSION_METHOD_LZ4:
         int compressedLen2 = decompressor.decompress(src, HEADER_LENGTH, dst, 
dstOff, originalLen);
         if (compressedLen != compressedLen2) {
-          logger.error(
-              "Compressed length corrupted! expected: {}, actual: {}.",
-              compressedLen,
-              compressedLen2);
-          return -1;
+          throw new IOException(
+              "Compressed length corrupted! expected: "
+                  + compressedLen
+                  + ", actual: "
+                  + compressedLen2
+                  + ".");
         }
     }
 
     checksum.reset();
     checksum.update(dst, dstOff, originalLen);
     if ((int) checksum.getValue() != check) {
-      logger.error("Checksum not equal! expected: {}, actual: {}.", check, 
checksum.getValue());
-      return -1;
+      throw new IOException(
+          "Checksum not equal! expected: " + check + ", actual: " + 
checksum.getValue() + ".");
     }
 
     return originalLen;
diff --git 
a/client/src/main/java/org/apache/celeborn/client/compress/ZstdDecompressor.java
 
b/client/src/main/java/org/apache/celeborn/client/compress/ZstdDecompressor.java
index c35bc9670..1c06a1c72 100644
--- 
a/client/src/main/java/org/apache/celeborn/client/compress/ZstdDecompressor.java
+++ 
b/client/src/main/java/org/apache/celeborn/client/compress/ZstdDecompressor.java
@@ -17,6 +17,7 @@
 
 package org.apache.celeborn.client.compress;
 
+import java.io.IOException;
 import java.util.zip.CRC32;
 import java.util.zip.Checksum;
 
@@ -38,7 +39,7 @@ public class ZstdDecompressor extends ZstdTrait implements 
Decompressor {
   }
 
   @Override
-  public int decompress(byte[] src, byte[] dst, int dstOff) {
+  public int decompress(byte[] src, byte[] dst, int dstOff) throws IOException 
{
     int compressionMethod = src[MAGIC_LENGTH] & 0xFF;
     int compressedLen = readIntLE(src, MAGIC_LENGTH + 1);
     int originalLen = readIntLE(src, MAGIC_LENGTH + 5);
@@ -54,21 +55,24 @@ public class ZstdDecompressor extends ZstdTrait implements 
Decompressor {
                 Zstd.decompressByteArray(
                     dst, dstOff, originalLen, src, HEADER_LENGTH, 
compressedLen);
         if (originalLen != originalLen2) {
-          logger.error(
-              "Original length corrupted! expected: {}, actual: {}.", 
originalLen, originalLen2);
-          return -1;
+          throw new IOException(
+              "Original length corrupted! expected: "
+                  + originalLen
+                  + ", actual: "
+                  + originalLen2
+                  + ".");
         }
         break;
       default:
-        logger.error("Unknown compression method whose decimal number is {} 
.", compressionMethod);
-        return -1;
+        throw new IOException(
+            "Unknown compression method whose decimal number is {" + 
compressionMethod + "} .");
     }
 
     checksum.reset();
     checksum.update(dst, dstOff, originalLen);
     if ((int) checksum.getValue() != check) {
-      logger.error("Checksum not equal! expected: {}, actual: {}.", check, 
checksum.getValue());
-      return -1;
+      throw new IOException(
+          "Checksum not equal! expected: " + check + ", actual: " + 
checksum.getValue() + ".");
     }
     return originalLen;
   }
diff --git 
a/client/src/test/java/org/apache/celeborn/client/compress/CodecSuiteJ.java 
b/client/src/test/java/org/apache/celeborn/client/compress/CodecSuiteJ.java
index 07ba142e4..1b010422e 100644
--- a/client/src/test/java/org/apache/celeborn/client/compress/CodecSuiteJ.java
+++ b/client/src/test/java/org/apache/celeborn/client/compress/CodecSuiteJ.java
@@ -17,6 +17,7 @@
 
 package org.apache.celeborn.client.compress;
 
+import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 
 import scala.Option;
@@ -30,7 +31,7 @@ import org.apache.celeborn.common.CelebornConf;
 public class CodecSuiteJ {
 
   @Test
-  public void testLz4Codec() {
+  public void testLz4Codec() throws IOException {
     int blockSize = new CelebornConf().clientPushBufferMaxSize();
     Lz4Compressor lz4Compressor = new Lz4Compressor(blockSize);
     byte[] data = 
RandomStringUtils.random(1024).getBytes(StandardCharsets.UTF_8);
@@ -47,7 +48,29 @@ public class CodecSuiteJ {
   }
 
   @Test
-  public void testZstdCodec() {
+  public void testLz4CodecCorrupted() {
+    int blockSize = (new CelebornConf()).clientPushBufferMaxSize();
+    Lz4Compressor lz4Compressor = new Lz4Compressor(blockSize);
+    byte[] data = 
RandomStringUtils.random(1024).getBytes(StandardCharsets.UTF_8);
+    int oriLength = data.length;
+    lz4Compressor.compress(data, 0, oriLength);
+
+    byte[] compressedBuffer = lz4Compressor.getCompressedBuffer().clone();
+    // Manually corrupted data
+    compressedBuffer[Lz4Trait.MAGIC_LENGTH + 9] = 
++compressedBuffer[Lz4Trait.MAGIC_LENGTH + 9];
+
+    Lz4Decompressor lz4Decompressor = new Lz4Decompressor(Option.empty());
+    byte[] dst = new byte[oriLength];
+    try {
+      lz4Decompressor.decompress(compressedBuffer, dst, 0);
+      Assert.fail("The compressed data is corrupted, so decompression should 
fail.");
+    } catch (IOException e) {
+      Assert.assertTrue(e.getMessage().contains("Checksum not equal!"));
+    }
+  }
+
+  @Test
+  public void testZstdCodec() throws IOException {
     for (int level = -5; level <= 22; level++) {
       int blockSize = new CelebornConf().clientPushBufferMaxSize();
       ZstdCompressor zstdCompressor = new ZstdCompressor(blockSize, level);
@@ -65,4 +88,26 @@ public class CodecSuiteJ {
       Assert.assertArrayEquals(data, dst);
     }
   }
+
+  @Test
+  public void testZstdCodecCorrupted() {
+    int blockSize = (new CelebornConf()).clientPushBufferMaxSize();
+    ZstdCompressor zstdCompressor = new ZstdCompressor(blockSize, 1);
+    byte[] data = 
RandomStringUtils.random(1024).getBytes(StandardCharsets.UTF_8);
+    int oriLength = data.length;
+    zstdCompressor.compress(data, 0, oriLength);
+
+    byte[] compressedBuffer = zstdCompressor.getCompressedBuffer().clone();
+    // Manually corrupted data
+    compressedBuffer[ZstdTrait.MAGIC_LENGTH + 9] = 
++compressedBuffer[ZstdTrait.MAGIC_LENGTH + 9];
+
+    ZstdDecompressor zstdDecompressor = new ZstdDecompressor();
+    byte[] dst = new byte[oriLength];
+    try {
+      zstdDecompressor.decompress(compressedBuffer, dst, 0);
+      Assert.fail("The compressed data is corrupted, so decompression should 
fail.");
+    } catch (IOException e) {
+      Assert.assertTrue(e.getMessage().contains("Checksum not equal!"));
+    }
+  }
 }

Reply via email to