This is an automated email from the ASF dual-hosted git repository.

nicholasjiang pushed a commit to branch branch-0.6
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/branch-0.6 by this push:
     new 16b12053c [CELEBORN-2090] Support Lz4 Decompression in CppClient
16b12053c is described below

commit 16b12053cad69e5137896b82b82feb2318bcf1b5
Author: Jray <[email protected]>
AuthorDate: Fri Aug 8 18:19:48 2025 +0800

    [CELEBORN-2090] Support Lz4 Decompression in CppClient
    
    ### What changes were proposed in this pull request?
    This PR adds support for lz4 decompression in CppClient.
    
    ### Why are the changes needed?
    To support reading from Celeborn with CppClient.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    By compilation and UTs.
    
    Closes #3402 from Jraaay/feat/cpp_client_lz4_decompression.
    
    Authored-by: Jray <[email protected]>
    Signed-off-by: SteNicholas <[email protected]>
    (cherry picked from commit cfb490c9380cd5e8e903042de9350b3eea8ce9c8)
    Signed-off-by: SteNicholas <[email protected]>
---
 .github/workflows/cpp_integration.yml              | 15 +++-
 LICENSE                                            |  3 +
 cpp/CMakeLists.txt                                 |  6 ++
 cpp/README.md                                      |  2 +-
 cpp/celeborn/client/CMakeLists.txt                 |  5 +-
 cpp/celeborn/client/ShuffleClient.cpp              | 14 +++-
 cpp/celeborn/client/ShuffleClient.h                | 16 ++++
 .../celeborn/client/compress/Decompressor.cpp      | 25 ++++--
 .../celeborn/client/compress/Decompressor.h        | 33 ++++++--
 cpp/celeborn/client/compress/Lz4Decompressor.cpp   | 97 ++++++++++++++++++++++
 .../celeborn/client/compress/Lz4Decompressor.h     | 32 +++++--
 .../celeborn/client/compress/Lz4Trait.h            | 25 ++++--
 cpp/celeborn/client/reader/CelebornInputStream.cpp | 41 +++++++--
 cpp/celeborn/client/reader/CelebornInputStream.h   |  8 +-
 cpp/celeborn/client/tests/CMakeLists.txt           |  3 +-
 cpp/celeborn/client/tests/Lz4DecompressorTest.cpp  | 70 ++++++++++++++++
 cpp/celeborn/conf/CMakeLists.txt                   |  1 +
 cpp/celeborn/conf/CelebornConf.cpp                 |  8 ++
 cpp/celeborn/conf/CelebornConf.h                   |  6 ++
 cpp/celeborn/memory/ByteBuffer.cpp                 | 22 +++++
 cpp/celeborn/memory/ByteBuffer.h                   | 13 ++-
 cpp/celeborn/memory/tests/ByteBufferTest.cpp       | 76 +++++++++++++++++
 cpp/celeborn/protocol/CMakeLists.txt               |  3 +-
 .../celeborn/protocol/CompressionCodec.cpp         | 30 +++++--
 .../celeborn/protocol/CompressionCodec.h           | 20 +++--
 cpp/celeborn/tests/DataSumWithReaderClient.cpp     | 11 ++-
 cpp/cmake/FindLZ4.cmake                            | 41 +++++++++
 cpp/scripts/setup-ubuntu.sh                        |  2 +
 ...stBase.scala => JavaWriteCppReadTestBase.scala} | 23 +++--
 ...ONE.scala => JavaWriteCppReadTestWithLZ4.scala} |  4 +-
 ...NE.scala => JavaWriteCppReadTestWithNONE.scala} |  4 +-
 31 files changed, 587 insertions(+), 72 deletions(-)

diff --git a/.github/workflows/cpp_integration.yml 
b/.github/workflows/cpp_integration.yml
index 0af4c61e5..0b221aeb9 100644
--- a/.github/workflows/cpp_integration.yml
+++ b/.github/workflows/cpp_integration.yml
@@ -30,7 +30,7 @@ on:
 jobs:
   celeborn_cpp_check_lint:
     runs-on: ubuntu-22.04
-    container: holylow/celeborn-cpp-dev:0.3
+    container: jraaaay/celeborn-cpp-dev:0.4
     steps:
       - uses: actions/checkout@v4
         with:
@@ -43,7 +43,7 @@ jobs:
             xargs clang-format-15 -style=file:./.clang-format -n --Werror
   celeborn_cpp_unit_test:
     runs-on: ubuntu-22.04
-    container: holylow/celeborn-cpp-dev:0.3
+    container: jraaaay/celeborn-cpp-dev:0.4
     steps:
       - uses: actions/checkout@v4
         with:
@@ -59,7 +59,7 @@ jobs:
         run: ctest
   celeborn_cpp_integration_test:
     runs-on: ubuntu-22.04
-    container: holylow/celeborn-cpp-dev:0.3
+    container: jraaaay/celeborn-cpp-dev:0.4
     steps:
       - uses: actions/checkout@v4
         with:
@@ -90,5 +90,12 @@ jobs:
           build/mvn -pl worker \
             test-compile exec:java \
             -Dexec.classpathScope="test" \
-            
-Dexec.mainClass="org.apache.celeborn.service.deploy.cluster.JavaReadCppWriteTestWithNONE"
 \
+            
-Dexec.mainClass="org.apache.celeborn.service.deploy.cluster.JavaWriteCppReadTestWithNONE"
 \
+            -Dexec.args="-XX:MaxDirectMemorySize=2G"
+      - name: Run Java-Cpp Hybrid Integration Test (LZ4 Decompression)
+        run: |
+          build/mvn -pl worker \
+            test-compile exec:java \
+            -Dexec.classpathScope="test" \
+            
-Dexec.mainClass="org.apache.celeborn.service.deploy.cluster.JavaWriteCppReadTestWithLZ4"
 \
             -Dexec.args="-XX:MaxDirectMemorySize=2G"
diff --git a/LICENSE b/LICENSE
index f757490f9..ee0a3f890 100644
--- a/LICENSE
+++ b/LICENSE
@@ -274,6 +274,9 @@ Meta Velox
 ./cpp/celeborn/conf/BaseConf.h
 ./cpp/celeborn/conf/BaseConf.cpp
 
+Meta Folly
+./cpp/cmake/FindLz4.cmake
+
 
------------------------------------------------------------------------------------
 This product bundles various third-party components under the CC0 license.
 This section summarizes those components.
diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt
index 8552b7ef1..f24a8abff 100644
--- a/cpp/CMakeLists.txt
+++ b/cpp/CMakeLists.txt
@@ -129,6 +129,12 @@ find_package(Sodium REQUIRED)
 find_library(FIZZ fizz REQUIRED)
 find_library(WANGLE wangle REQUIRED)
 
+find_package(LZ4 REQUIRED)
+set(LZ4_WITH_DEPENDENCIES
+        ${LZ4_LIBRARY}
+        xxhash
+)
+
 find_library(RE2 re2)
 
 find_package(fizz CONFIG REQUIRED)
diff --git a/cpp/README.md b/cpp/README.md
index 2aa07b4e1..d2c794228 100644
--- a/cpp/README.md
+++ b/cpp/README.md
@@ -21,7 +21,7 @@ docker run \
     -w /celeborn \
     -it --rm \
     --name celeborn-cpp-dev-container \
-    holylow/celeborn-cpp-dev:0.3 \
+    jraaaay/celeborn-cpp-dev:0.4 \
     /bin/bash
 ```
 
diff --git a/cpp/celeborn/client/CMakeLists.txt 
b/cpp/celeborn/client/CMakeLists.txt
index 2491b7d7f..94e58795d 100644
--- a/cpp/celeborn/client/CMakeLists.txt
+++ b/cpp/celeborn/client/CMakeLists.txt
@@ -16,7 +16,9 @@ add_library(
         client
         reader/WorkerPartitionReader.cpp
         reader/CelebornInputStream.cpp
-        ShuffleClient.cpp)
+        ShuffleClient.cpp
+        compress/Decompressor.cpp
+        compress/Lz4Decompressor.cpp)
 
 target_include_directories(client PUBLIC ${CMAKE_BINARY_DIR})
 
@@ -31,6 +33,7 @@ target_link_libraries(
         ${FIZZ}
         ${LIBSODIUM_LIBRARY}
         ${FOLLY_WITH_DEPENDENCIES}
+        ${LZ4_WITH_DEPENDENCIES}
         ${GLOG}
         ${GFLAGS_LIBRARIES}
 )
diff --git a/cpp/celeborn/client/ShuffleClient.cpp 
b/cpp/celeborn/client/ShuffleClient.cpp
index 8e8ca2121..ccf7c6dcb 100644
--- a/cpp/celeborn/client/ShuffleClient.cpp
+++ b/cpp/celeborn/client/ShuffleClient.cpp
@@ -48,6 +48,17 @@ std::unique_ptr<CelebornInputStream> 
ShuffleClientImpl::readPartition(
     int attemptNumber,
     int startMapIndex,
     int endMapIndex) {
+  return ShuffleClientImpl::readPartition(
+      shuffleId, partitionId, attemptNumber, startMapIndex, endMapIndex, true);
+}
+
+std::unique_ptr<CelebornInputStream> ShuffleClientImpl::readPartition(
+    int shuffleId,
+    int partitionId,
+    int attemptNumber,
+    int startMapIndex,
+    int endMapIndex,
+    bool needCompression) {
   const auto& reducerFileGroupInfo = getReducerFileGroupInfo(shuffleId);
   std::string shuffleKey = utils::makeShuffleKey(appUniqueId_, shuffleId);
   std::vector<std::shared_ptr<const protocol::PartitionLocation>> locations;
@@ -64,7 +75,8 @@ std::unique_ptr<CelebornInputStream> 
ShuffleClientImpl::readPartition(
       reducerFileGroupInfo.attempts,
       attemptNumber,
       startMapIndex,
-      endMapIndex);
+      endMapIndex,
+      needCompression);
 }
 
 void ShuffleClientImpl::updateReducerFileGroup(int shuffleId) {
diff --git a/cpp/celeborn/client/ShuffleClient.h 
b/cpp/celeborn/client/ShuffleClient.h
index 288e52e9e..284c7ade9 100644
--- a/cpp/celeborn/client/ShuffleClient.h
+++ b/cpp/celeborn/client/ShuffleClient.h
@@ -38,6 +38,14 @@ class ShuffleClient {
       int startMapIndex,
       int endMapIndex) = 0;
 
+  virtual std::unique_ptr<CelebornInputStream> readPartition(
+      int shuffleId,
+      int partitionId,
+      int attemptNumber,
+      int startMapIndex,
+      int endMapIndex,
+      bool needCompression) = 0;
+
   virtual bool cleanupShuffle(int shuffleId) = 0;
 
   virtual void shutdown() = 0;
@@ -62,6 +70,14 @@ class ShuffleClientImpl : public ShuffleClient {
       int startMapIndex,
       int endMapIndex) override;
 
+  std::unique_ptr<CelebornInputStream> readPartition(
+      int shuffleId,
+      int partitionId,
+      int attemptNumber,
+      int startMapIndex,
+      int endMapIndex,
+      bool needCompression) override;
+
   void updateReducerFileGroup(int shuffleId) override;
 
   bool cleanupShuffle(int shuffleId) override;
diff --git 
a/worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/JavaReadCppWriteTestWithNONE.scala
 b/cpp/celeborn/client/compress/Decompressor.cpp
similarity index 55%
copy from 
worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/JavaReadCppWriteTestWithNONE.scala
copy to cpp/celeborn/client/compress/Decompressor.cpp
index 4aa5b0ed7..999d01822 100644
--- 
a/worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/JavaReadCppWriteTestWithNONE.scala
+++ b/cpp/celeborn/client/compress/Decompressor.cpp
@@ -15,13 +15,28 @@
  * limitations under the License.
  */
 
-package org.apache.celeborn.service.deploy.cluster
+#include <stdexcept>
 
-import org.apache.celeborn.common.protocol.CompressionCodec
+#include "celeborn/client/compress/Lz4Decompressor.h"
+#include "celeborn/utils/Exceptions.h"
 
-object JavaReadCppWriteTestWithNONE extends JavaReadCppWriteTestBase {
+namespace celeborn {
+namespace client {
+namespace compress {
 
-  def main(args: Array[String]) = {
-    testJavaReadCppWrite(CompressionCodec.NONE)
+std::unique_ptr<Decompressor> Decompressor::createDecompressor(
+    protocol::CompressionCodec codec) {
+  switch (codec) {
+    case protocol::CompressionCodec::LZ4:
+      return std::make_unique<Lz4Decompressor>();
+    case protocol::CompressionCodec::ZSTD:
+      // TODO: impl zstd
+      CELEBORN_FAIL("Compression codec ZSTD is not supported.");
+    default:
+      CELEBORN_FAIL("Unknown compression codec.");
   }
 }
+
+} // namespace compress
+} // namespace client
+} // namespace celeborn
diff --git 
a/worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/JavaReadCppWriteTestWithNONE.scala
 b/cpp/celeborn/client/compress/Decompressor.h
similarity index 53%
copy from 
worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/JavaReadCppWriteTestWithNONE.scala
copy to cpp/celeborn/client/compress/Decompressor.h
index 4aa5b0ed7..694c07d42 100644
--- 
a/worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/JavaReadCppWriteTestWithNONE.scala
+++ b/cpp/celeborn/client/compress/Decompressor.h
@@ -15,13 +15,34 @@
  * limitations under the License.
  */
 
-package org.apache.celeborn.service.deploy.cluster
+#pragma once
 
-import org.apache.celeborn.common.protocol.CompressionCodec
+#include <memory>
 
-object JavaReadCppWriteTestWithNONE extends JavaReadCppWriteTestBase {
+#include "celeborn/protocol/CompressionCodec.h"
 
-  def main(args: Array[String]) = {
-    testJavaReadCppWrite(CompressionCodec.NONE)
+namespace celeborn {
+namespace client {
+namespace compress {
+
+class Decompressor {
+ public:
+  virtual ~Decompressor() = default;
+
+  virtual int decompress(const uint8_t* src, uint8_t* dst, int dst_off) = 0;
+
+  virtual int getOriginalLen(const uint8_t* src) = 0;
+
+  static std::unique_ptr<Decompressor> createDecompressor(
+      protocol::CompressionCodec codec);
+
+ protected:
+  static int32_t readIntLE(const uint8_t* buf, const int i) {
+    const auto p = buf;
+    return (p[i]) | (p[i + 1] << 8) | (p[i + 2] << 16) | (p[i + 3] << 24);
   }
-}
+};
+
+} // namespace compress
+} // namespace client
+} // namespace celeborn
diff --git a/cpp/celeborn/client/compress/Lz4Decompressor.cpp 
b/cpp/celeborn/client/compress/Lz4Decompressor.cpp
new file mode 100644
index 000000000..f85ced0d2
--- /dev/null
+++ b/cpp/celeborn/client/compress/Lz4Decompressor.cpp
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "celeborn/client/compress/Lz4Decompressor.h"
+#include <lz4.h>
+#include <cstring>
+#include <iostream>
+#include "celeborn/utils/Exceptions.h"
+
+namespace celeborn {
+namespace client {
+namespace compress {
+Lz4Decompressor::Lz4Decompressor() {
+  xxhash_state_ = XXH32_createState();
+  if (!xxhash_state_) {
+    CELEBORN_FAIL("Failed to create XXH32 state.")
+  }
+  XXH32_reset(xxhash_state_, kDefaultSeed);
+}
+
+Lz4Decompressor::~Lz4Decompressor() {
+  if (xxhash_state_) {
+    XXH32_freeState(xxhash_state_);
+  }
+}
+
+int Lz4Decompressor::getOriginalLen(const uint8_t* src) {
+  return readIntLE(src, kMagicLength + 5);
+}
+
+int Lz4Decompressor::decompress(
+    const uint8_t* src,
+    uint8_t* dst,
+    const int dstOff) {
+  const int compressionMethod = static_cast<unsigned char>(src[kMagicLength]);
+  const int compressedLen = readIntLE(src, kMagicLength + 1);
+  const int originalLen = readIntLE(src, kMagicLength + 5);
+  const int expectedCheck = readIntLE(src, kMagicLength + 9);
+
+  const uint8_t* compressedDataPtr = src + kHeaderLength;
+  uint8_t* dstPtr = dst + dstOff;
+
+  switch (compressionMethod) {
+    case kCompressionMethodRaw:
+      std::memcpy(dstPtr, compressedDataPtr, originalLen);
+      break;
+    case kCompressionMethodLZ4: {
+      const int decompressedBytes = LZ4_decompress_safe(
+          reinterpret_cast<const char*>(compressedDataPtr),
+          reinterpret_cast<char*>(dstPtr),
+          compressedLen,
+          originalLen);
+
+      if (decompressedBytes != originalLen) {
+        CELEBORN_FAIL(
+            std::string("Decompression failed! LZ4 error or size mismatch. ") +
+            "Expected: " + std::to_string(originalLen) +
+            ", Got: " + std::to_string(decompressedBytes));
+      }
+      break;
+    }
+    default:
+      CELEBORN_FAIL(
+          std::string("Unsupported compression method: ") +
+          std::to_string(compressionMethod));
+  }
+
+  XXH32_reset(xxhash_state_, kDefaultSeed);
+  XXH32_update(xxhash_state_, dstPtr, originalLen);
+  const uint32_t actualCheck = XXH32_digest(xxhash_state_) & 0xFFFFFFFL;
+
+  if (static_cast<uint32_t>(expectedCheck) != actualCheck) {
+    CELEBORN_FAIL(
+        std::string("Checksum mismatch! Expected: ") +
+        std::to_string(expectedCheck) +
+        ", Actual: " + std::to_string(actualCheck));
+  }
+
+  return originalLen;
+}
+} // namespace compress
+} // namespace client
+} // namespace celeborn
diff --git 
a/worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/JavaReadCppWriteTestWithNONE.scala
 b/cpp/celeborn/client/compress/Lz4Decompressor.h
similarity index 54%
copy from 
worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/JavaReadCppWriteTestWithNONE.scala
copy to cpp/celeborn/client/compress/Lz4Decompressor.h
index 4aa5b0ed7..3d34a6876 100644
--- 
a/worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/JavaReadCppWriteTestWithNONE.scala
+++ b/cpp/celeborn/client/compress/Lz4Decompressor.h
@@ -15,13 +15,31 @@
  * limitations under the License.
  */
 
-package org.apache.celeborn.service.deploy.cluster
+#pragma once
 
-import org.apache.celeborn.common.protocol.CompressionCodec
+#include <xxhash.h>
+#include "celeborn/client/compress/Decompressor.h"
+#include "celeborn/client/compress/Lz4Trait.h"
 
-object JavaReadCppWriteTestWithNONE extends JavaReadCppWriteTestBase {
+namespace celeborn {
+namespace client {
+namespace compress {
 
-  def main(args: Array[String]) = {
-    testJavaReadCppWrite(CompressionCodec.NONE)
-  }
-}
+class Lz4Decompressor final : public Decompressor, Lz4Trait {
+ public:
+  Lz4Decompressor();
+  ~Lz4Decompressor() override;
+
+  int getOriginalLen(const uint8_t* src) override;
+  int decompress(const uint8_t* src, uint8_t* dst, int dstOff) override;
+
+  Lz4Decompressor(const Lz4Decompressor&) = delete;
+  Lz4Decompressor& operator=(const Lz4Decompressor&) = delete;
+
+ private:
+  XXH32_state_t* xxhash_state_;
+};
+
+} // namespace compress
+} // namespace client
+} // namespace celeborn
diff --git 
a/worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/JavaReadCppWriteTestWithNONE.scala
 b/cpp/celeborn/client/compress/Lz4Trait.h
similarity index 60%
copy from 
worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/JavaReadCppWriteTestWithNONE.scala
copy to cpp/celeborn/client/compress/Lz4Trait.h
index 4aa5b0ed7..fc236a1c5 100644
--- 
a/worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/JavaReadCppWriteTestWithNONE.scala
+++ b/cpp/celeborn/client/compress/Lz4Trait.h
@@ -15,13 +15,24 @@
  * limitations under the License.
  */
 
-package org.apache.celeborn.service.deploy.cluster
+#pragma once
 
-import org.apache.celeborn.common.protocol.CompressionCodec
+namespace celeborn {
+namespace client {
+namespace compress {
 
-object JavaReadCppWriteTestWithNONE extends JavaReadCppWriteTestBase {
+struct Lz4Trait {
+  static constexpr char kMagic[] = {'L', 'Z', '4', 'B', 'l', 'o', 'c', 'k'};
+  static constexpr int kMagicLength = sizeof(kMagic);
 
-  def main(args: Array[String]) = {
-    testJavaReadCppWrite(CompressionCodec.NONE)
-  }
-}
+  static constexpr int kHeaderLength = kMagicLength + 1 + 4 + 4 + 4;
+
+  static constexpr int kCompressionMethodRaw = 0x10;
+  static constexpr int kCompressionMethodLZ4 = 0x20;
+
+  static constexpr int kDefaultSeed = 0x9747b28c;
+};
+
+} // namespace compress
+} // namespace client
+} // namespace celeborn
diff --git a/cpp/celeborn/client/reader/CelebornInputStream.cpp 
b/cpp/celeborn/client/reader/CelebornInputStream.cpp
index a3023d6f3..d75b705e5 100644
--- a/cpp/celeborn/client/reader/CelebornInputStream.cpp
+++ b/cpp/celeborn/client/reader/CelebornInputStream.cpp
@@ -16,6 +16,8 @@
  */
 
 #include "celeborn/client/reader/CelebornInputStream.h"
+#include <lz4.h>
+#include "celeborn/client/compress/Decompressor.h"
 
 namespace celeborn {
 namespace client {
@@ -27,7 +29,8 @@ CelebornInputStream::CelebornInputStream(
     const std::vector<int>& attempts,
     int attemptNumber,
     int startMapIndex,
-    int endMapIndex)
+    int endMapIndex,
+    bool needCompression)
     : shuffleKey_(shuffleKey),
       conf_(conf),
       clientFactory_(clientFactory),
@@ -38,7 +41,15 @@ CelebornInputStream::CelebornInputStream(
       endMapIndex_(endMapIndex),
       currLocationIndex_(0),
       currBatchPos_(0),
-      currBatchSize_(0) {
+      currBatchSize_(0),
+      shouldDecompress_(
+          conf_->shuffleCompressionCodec() !=
+              protocol::CompressionCodec::NONE &&
+          needCompression) {
+  if (shouldDecompress_) {
+    decompressor_ = compress::Decompressor::createDecompressor(
+        conf_->shuffleCompressionCodec());
+  }
   moveToNextReader();
 }
 
@@ -57,8 +68,8 @@ int CelebornInputStream::read(uint8_t* buffer, size_t offset, 
size_t len) {
     }
     size_t batchRemainingSize = currBatchSize_ - currBatchPos_;
     size_t toReadBytes = std::min(len - readBytes, batchRemainingSize);
-    CELEBORN_CHECK_GE(currChunk_->remainingSize(), toReadBytes);
-    auto size = currChunk_->readToBuffer(&buf[readBytes], toReadBytes);
+    CELEBORN_CHECK_GE(decompressedChunk_->remainingSize(), toReadBytes);
+    auto size = decompressedChunk_->readToBuffer(&buf[readBytes], toReadBytes);
     CELEBORN_CHECK_EQ(toReadBytes, size);
     readBytes += toReadBytes;
     currBatchPos_ += toReadBytes;
@@ -83,13 +94,31 @@ bool CelebornInputStream::fillBuffer() {
     CELEBORN_CHECK_GE(currChunk_->remainingSize(), size);
     CELEBORN_CHECK_LT(mapId, attempts_.size());
 
-    // TODO: compression is not supported yet!
+    if (shouldDecompress_) {
+      if (size > compressedBuf_.size()) {
+        compressedBuf_.resize(size);
+      }
+      currChunk_->readToBuffer(compressedBuf_.data(), size);
+    }
 
     if (attemptId == attempts_[mapId]) {
       auto& batchRecord = getBatchRecord(mapId);
       if (batchRecord.count(batchId) <= 0) {
         batchRecord.insert(batchId);
-        currBatchSize_ = size;
+        if (shouldDecompress_) {
+          const auto originalLength =
+              decompressor_->getOriginalLen(compressedBuf_.data());
+          std::unique_ptr<folly::IOBuf> decompressedBuf_ =
+              folly::IOBuf::createCombined(originalLength);
+          decompressedBuf_->append(originalLength);
+          currBatchSize_ = decompressor_->decompress(
+              compressedBuf_.data(), decompressedBuf_->writableData(), 0);
+          decompressedChunk_ = memory::ByteBuffer::createReadOnly(
+              std::move(decompressedBuf_), false);
+        } else {
+          currBatchSize_ = size;
+          decompressedChunk_ = currChunk_->readToReadOnlyBuffer(size);
+        }
         currBatchPos_ = 0;
         hasData = true;
         break;
diff --git a/cpp/celeborn/client/reader/CelebornInputStream.h 
b/cpp/celeborn/client/reader/CelebornInputStream.h
index af143321c..5dd9c4f76 100644
--- a/cpp/celeborn/client/reader/CelebornInputStream.h
+++ b/cpp/celeborn/client/reader/CelebornInputStream.h
@@ -17,6 +17,7 @@
 
 #pragma once
 
+#include "celeborn/client/compress/Decompressor.h"
 #include "celeborn/client/reader/WorkerPartitionReader.h"
 #include "celeborn/conf/CelebornConf.h"
 
@@ -33,7 +34,8 @@ class CelebornInputStream {
       const std::vector<int>& attempts,
       int attemptNumber,
       int startMapIndex,
-      int endMapIndex);
+      int endMapIndex,
+      bool needCompression);
 
   int read(uint8_t* buffer, size_t offset, size_t len);
 
@@ -68,9 +70,13 @@ class CelebornInputStream {
   int attemptNumber_;
   int startMapIndex_;
   int endMapIndex_;
+  bool shouldDecompress_;
+  std::unique_ptr<compress::Decompressor> decompressor_;
+  std::vector<uint8_t> compressedBuf_;
 
   int currLocationIndex_;
   std::unique_ptr<memory::ReadOnlyByteBuffer> currChunk_;
+  std::unique_ptr<memory::ReadOnlyByteBuffer> decompressedChunk_;
   size_t currBatchPos_;
   size_t currBatchSize_;
   std::shared_ptr<PartitionReader> currReader_;
diff --git a/cpp/celeborn/client/tests/CMakeLists.txt 
b/cpp/celeborn/client/tests/CMakeLists.txt
index 6543ca28d..ab1dea923 100644
--- a/cpp/celeborn/client/tests/CMakeLists.txt
+++ b/cpp/celeborn/client/tests/CMakeLists.txt
@@ -15,7 +15,8 @@
 
 add_executable(
         celeborn_client_test
-        WorkerPartitionReaderTest.cpp)
+        WorkerPartitionReaderTest.cpp
+        Lz4DecompressorTest.cpp)
 
 add_test(NAME celeborn_client_test COMMAND celeborn_client_test)
 
diff --git a/cpp/celeborn/client/tests/Lz4DecompressorTest.cpp 
b/cpp/celeborn/client/tests/Lz4DecompressorTest.cpp
new file mode 100644
index 000000000..6cd786367
--- /dev/null
+++ b/cpp/celeborn/client/tests/Lz4DecompressorTest.cpp
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <gtest/gtest.h>
+
+#include "celeborn/client/compress/Lz4Decompressor.h"
+
+using namespace celeborn;
+using namespace celeborn::client;
+using namespace celeborn::protocol;
+
+TEST(Lz4DecompressorTest, DecompressWithLz4) {
+  compress::Lz4Decompressor decompressor;
+
+  std::vector<uint8_t> compressedData = {
+      76,  90, 52, 66,  108, 111, 99,  107, 32,  29,  0,   0,   0,
+      31,  0,  0,  0,   116, 18,  177, 8,   83,  72,  101, 108, 108,
+      111, 1,  0,  240, 4,   32,  67,  101, 108, 101, 98,  111, 114,
+      110, 33, 33, 33,  33,  33,  33,  33,  33,  33,  33};
+
+  const int originalLen = decompressor.getOriginalLen(compressedData.data());
+
+  const auto decompressedData = new uint8_t[originalLen + 1];
+  decompressedData[originalLen] = '\0';
+
+  const bool success =
+      decompressor.decompress(compressedData.data(), decompressedData, 0);
+
+  EXPECT_TRUE(success);
+
+  EXPECT_EQ(
+      reinterpret_cast<char*>(decompressedData),
+      std::string("Helloooooooo Celeborn!!!!!!!!!!"));
+}
+
+TEST(Lz4DecompressorTest, DecompressWithRaw) {
+  compress::Lz4Decompressor decompressor;
+
+  std::vector<uint8_t> compressedData = {
+      76, 90, 52,  66,  108, 111, 99,  107, 16,  15,  0,   0,   0,
+      15, 0,  0,   0,   188, 66,  58,  13,  72,  101, 108, 108, 111,
+      32, 67, 101, 108, 101, 98,  111, 114, 110, 33,  110, 33};
+
+  const int originalLen = decompressor.getOriginalLen(compressedData.data());
+
+  const auto decompressedData = new uint8_t[originalLen + 1];
+  decompressedData[originalLen] = '\0';
+
+  const bool success =
+      decompressor.decompress(compressedData.data(), decompressedData, 0);
+
+  EXPECT_TRUE(success);
+
+  EXPECT_EQ(
+      reinterpret_cast<char*>(decompressedData),
+      std::string("Hello Celeborn!"));
+}
diff --git a/cpp/celeborn/conf/CMakeLists.txt b/cpp/celeborn/conf/CMakeLists.txt
index f9fbca886..86dfc7e8d 100644
--- a/cpp/celeborn/conf/CMakeLists.txt
+++ b/cpp/celeborn/conf/CMakeLists.txt
@@ -20,6 +20,7 @@ add_library(
 
 target_link_libraries(
         conf
+        protocol
         utils
         memory
         ${FOLLY_WITH_DEPENDENCIES}
diff --git a/cpp/celeborn/conf/CelebornConf.cpp 
b/cpp/celeborn/conf/CelebornConf.cpp
index f8d34862e..4f85c19a0 100644
--- a/cpp/celeborn/conf/CelebornConf.cpp
+++ b/cpp/celeborn/conf/CelebornConf.cpp
@@ -140,6 +140,9 @@ const std::unordered_map<std::string, 
folly::Optional<std::string>>
         NUM_PROP(kNetworkIoNumConnectionsPerPeer, "1"),
         NUM_PROP(kNetworkIoClientThreads, 0),
         NUM_PROP(kClientFetchMaxReqsInFlight, 3),
+        STR_PROP(
+            kShuffleCompressionCodec,
+            protocol::toString(protocol::CompressionCodec::NONE)),
         // NUM_PROP(kNumExample, 50'000),
         // BOOL_PROP(kBoolExample, false),
 };
@@ -202,5 +205,10 @@ int CelebornConf::networkIoClientThreads() const {
 int CelebornConf::clientFetchMaxReqsInFlight() const {
   return std::stoi(optionalProperty(kClientFetchMaxReqsInFlight).value());
 }
+
+protocol::CompressionCodec CelebornConf::shuffleCompressionCodec() const {
+  return protocol::toCompressionCodec(
+      optionalProperty(kShuffleCompressionCodec).value());
+}
 } // namespace conf
 } // namespace celeborn
diff --git a/cpp/celeborn/conf/CelebornConf.h b/cpp/celeborn/conf/CelebornConf.h
index 7f36823fa..783bc96e1 100644
--- a/cpp/celeborn/conf/CelebornConf.h
+++ b/cpp/celeborn/conf/CelebornConf.h
@@ -18,6 +18,7 @@
 #pragma once
 
 #include "celeborn/conf/BaseConf.h"
+#include "celeborn/protocol/CompressionCodec.h"
 #include "celeborn/utils/CelebornUtils.h"
 
 namespace celeborn {
@@ -60,6 +61,9 @@ class CelebornConf : public BaseConf {
   static constexpr std::string_view kClientFetchMaxReqsInFlight{
       "celeborn.client.fetch.maxReqsInFlight"};
 
+  static constexpr std::string_view kShuffleCompressionCodec{
+      "celeborn.client.shuffle.compression.codec"};
+
   CelebornConf();
 
   CelebornConf(const std::string& filename);
@@ -83,6 +87,8 @@ class CelebornConf : public BaseConf {
   int networkIoClientThreads() const;
 
   int clientFetchMaxReqsInFlight() const;
+
+  protocol::CompressionCodec shuffleCompressionCodec() const;
 };
 } // namespace conf
 } // namespace celeborn
diff --git a/cpp/celeborn/memory/ByteBuffer.cpp 
b/cpp/celeborn/memory/ByteBuffer.cpp
index 061ca2edf..a44ba92bd 100644
--- a/cpp/celeborn/memory/ByteBuffer.cpp
+++ b/cpp/celeborn/memory/ByteBuffer.cpp
@@ -74,5 +74,27 @@ std::unique_ptr<folly::IOBuf> ByteBuffer::trimBuffer(
   }
   return std::move(data);
 }
+
+std::unique_ptr<ReadOnlyByteBuffer> ReadOnlyByteBuffer::readToReadOnlyBuffer(
+    const size_t len) const {
+  std::unique_ptr<folly::IOBuf> leftData = folly::IOBuf::create(0);
+  auto cnt = 0;
+  while (cnt < len) {
+    if (this->remainingSize() == 0) {
+      break;
+    }
+    std::unique_ptr<folly::IOBuf> newBlock =
+        std::move(this->cursor_->currentBuffer()->clone());
+    newBlock->pop();
+    newBlock->trimStart(this->cursor_->getPositionInCurrentBuffer());
+    if (newBlock->length() > len - cnt) {
+      newBlock->trimEnd(newBlock->length() - (len - cnt));
+    }
+    this->cursor_->skip(newBlock->length());
+    cnt += newBlock->length();
+    leftData->appendToChain(std::move(newBlock));
+  }
+  return createReadOnly(std::move(leftData), isBigEndian_);
+}
 } // namespace memory
 } // namespace celeborn
diff --git a/cpp/celeborn/memory/ByteBuffer.h b/cpp/celeborn/memory/ByteBuffer.h
index 763582d32..0c1027fe2 100644
--- a/cpp/celeborn/memory/ByteBuffer.h
+++ b/cpp/celeborn/memory/ByteBuffer.h
@@ -48,12 +48,11 @@ class ByteBuffer {
     assert(data_);
   }
 
-  std::unique_ptr<folly::IOBuf> data_;
-  bool isBigEndian_;
-
- private:
   static std::unique_ptr<folly::IOBuf> trimBuffer(
       const ReadOnlyByteBuffer& buffer);
+
+  std::unique_ptr<folly::IOBuf> data_;
+  bool isBigEndian_;
 };
 
 class ReadOnlyByteBuffer : public ByteBuffer {
@@ -126,6 +125,8 @@ class ReadOnlyByteBuffer : public ByteBuffer {
     return cursor_->pullAtMost(buf, len);
   }
 
+  std::unique_ptr<ReadOnlyByteBuffer> readToReadOnlyBuffer(size_t len) const;
+
   std::unique_ptr<folly::IOBuf> getData() const {
     return data_->clone();
   }
@@ -171,6 +172,10 @@ class WriteOnlyByteBuffer : public ByteBuffer {
     appender_->push(reinterpret_cast<const uint8_t*>(ptr), data.size());
   }
 
+  void writeFromBuffer(const void* data, const size_t len) const {
+    appender_->push(static_cast<const uint8_t*>(data), len);
+  }
+
   size_t size() const {
     return data_->computeChainDataLength();
   }
diff --git a/cpp/celeborn/memory/tests/ByteBufferTest.cpp 
b/cpp/celeborn/memory/tests/ByteBufferTest.cpp
index 350b019a6..f6c4c6711 100644
--- a/cpp/celeborn/memory/tests/ByteBufferTest.cpp
+++ b/cpp/celeborn/memory/tests/ByteBufferTest.cpp
@@ -144,6 +144,75 @@ void testReadData(ReadOnlyByteBuffer* readBuffer, size_t 
size) {
   EXPECT_THROW(readBuffer->readLE<int64_t>(), std::exception);
 }
 
+void testReadOnlyBufferReadData(ReadOnlyByteBuffer* readBuffer, size_t size) {
+  EXPECT_EQ(size, testSize);
+  size_t remainingSize = size;
+  EXPECT_EQ(readBuffer->size(), size);
+  EXPECT_EQ(readBuffer->remainingSize(), remainingSize);
+
+  // Test read string.
+  auto strRead =
+      readBuffer->readToReadOnlyBuffer(strPayload.size())->readToString();
+  EXPECT_EQ(strRead, strPayload);
+  remainingSize -= strPayload.size();
+  EXPECT_EQ(readBuffer->remainingSize(), remainingSize);
+
+  // Test read BigEndian.
+  EXPECT_EQ(
+      readBuffer->readToReadOnlyBuffer(sizeof(int16_t))->readBE<int16_t>(),
+      int16Payload);
+  remainingSize -= sizeof(int16_t);
+  EXPECT_EQ(readBuffer->remainingSize(), remainingSize);
+  EXPECT_EQ(
+      readBuffer->readToReadOnlyBuffer(sizeof(int32_t))->readBE<int32_t>(),
+      int32Payload);
+  remainingSize -= sizeof(int32_t);
+  EXPECT_EQ(readBuffer->remainingSize(), remainingSize);
+  EXPECT_EQ(
+      readBuffer->readToReadOnlyBuffer(sizeof(int64_t))->readBE<int64_t>(),
+      int64Payload);
+  remainingSize -= sizeof(int64_t);
+  EXPECT_EQ(readBuffer->remainingSize(), remainingSize);
+
+  // Test read LittleEndian.
+  EXPECT_EQ(
+      readBuffer->readToReadOnlyBuffer(sizeof(int16_t))->readLE<int16_t>(),
+      int16Payload);
+  remainingSize -= sizeof(int16_t);
+  EXPECT_EQ(readBuffer->remainingSize(), remainingSize);
+  EXPECT_EQ(
+      readBuffer->readToReadOnlyBuffer(sizeof(int32_t))->readLE<int32_t>(),
+      int32Payload);
+  remainingSize -= sizeof(int32_t);
+  EXPECT_EQ(readBuffer->remainingSize(), remainingSize);
+  EXPECT_EQ(
+      readBuffer->readToReadOnlyBuffer(sizeof(int64_t))->readLE<int64_t>(),
+      int64Payload);
+  remainingSize -= sizeof(int64_t);
+  EXPECT_EQ(readBuffer->remainingSize(), remainingSize);
+
+  // Test retreat and skip.
+  const auto retreatSize = sizeof(int32_t) + sizeof(int64_t);
+  remainingSize += retreatSize;
+  readBuffer->retreat(retreatSize);
+  EXPECT_EQ(readBuffer->remainingSize(), remainingSize);
+  EXPECT_EQ(
+      readBuffer->readToReadOnlyBuffer(sizeof(int32_t))->readLE<int32_t>(),
+      int32Payload);
+  remainingSize -= sizeof(int32_t);
+  EXPECT_EQ(readBuffer->remainingSize(), remainingSize);
+  readBuffer->skip(sizeof(int64_t));
+  remainingSize -= sizeof(int64_t);
+  EXPECT_EQ(readBuffer->remainingSize(), remainingSize);
+
+  // Test read end.
+  EXPECT_EQ(readBuffer->size(), size);
+  EXPECT_EQ(readBuffer->remainingSize(), 0);
+  EXPECT_THROW(
+      readBuffer->readToReadOnlyBuffer(sizeof(int64_t))->readLE<int64_t>(),
+      std::exception);
+}
+
 TEST(ByteBufferTest, continuousBufferRead) {
   size_t size = 0;
   auto data = createRawData(size);
@@ -189,6 +258,13 @@ TEST(ByteBufferTest, writeBufferAndRead) {
   testReadData(readBuffer.get(), size);
 }
 
+TEST(ByteBufferTest, readOnlyBufferRead) {
+  size_t size = 0;
+  auto writeBuffer = createWriteOnlyBuffer(size);
+  auto readBuffer = WriteOnlyByteBuffer::toReadOnly(std::move(writeBuffer));
+  testReadOnlyBufferReadData(readBuffer.get(), size);
+}
+
 TEST(ByteBufferTest, concatReadBuffer) {
   size_t size = 0;
   auto writeBuffer1 = createWriteOnlyBuffer(size);
diff --git a/cpp/celeborn/protocol/CMakeLists.txt 
b/cpp/celeborn/protocol/CMakeLists.txt
index a1dc054d3..f0b0bea78 100644
--- a/cpp/celeborn/protocol/CMakeLists.txt
+++ b/cpp/celeborn/protocol/CMakeLists.txt
@@ -17,7 +17,8 @@ add_library(
         STATIC
         PartitionLocation.cpp
         TransportMessage.cpp
-        ControlMessages.cpp)
+        ControlMessages.cpp
+        CompressionCodec.cpp)
 
 target_include_directories(protocol PUBLIC ${CMAKE_BINARY_DIR})
 
diff --git 
a/worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/JavaReadCppWriteTestWithNONE.scala
 b/cpp/celeborn/protocol/CompressionCodec.cpp
similarity index 56%
copy from 
worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/JavaReadCppWriteTestWithNONE.scala
copy to cpp/celeborn/protocol/CompressionCodec.cpp
index 4aa5b0ed7..93f90ebd1 100644
--- 
a/worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/JavaReadCppWriteTestWithNONE.scala
+++ b/cpp/celeborn/protocol/CompressionCodec.cpp
@@ -15,13 +15,31 @@
  * limitations under the License.
  */
 
-package org.apache.celeborn.service.deploy.cluster
+#include "celeborn/protocol/CompressionCodec.h"
 
-import org.apache.celeborn.common.protocol.CompressionCodec
-
-object JavaReadCppWriteTestWithNONE extends JavaReadCppWriteTestBase {
+namespace celeborn {
+namespace protocol {
+CompressionCodec toCompressionCodec(std::string_view code) {
+  if (code == "LZ4") {
+    return CompressionCodec::LZ4;
+  }
+  if (code == "ZSTD") {
+    return CompressionCodec::ZSTD;
+  }
+  return CompressionCodec::NONE;
+}
 
-  def main(args: Array[String]) = {
-    testJavaReadCppWrite(CompressionCodec.NONE)
+std::string_view toString(CompressionCodec codec) {
+  switch (codec) {
+    case CompressionCodec::LZ4:
+      return "LZ4";
+    case CompressionCodec::ZSTD:
+      return "ZSTD";
+    case CompressionCodec::NONE:
+      return "NONE";
+    default:
+      return "UNKNOWN";
   }
 }
+} // namespace protocol
+} // namespace celeborn
diff --git 
a/worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/JavaReadCppWriteTestWithNONE.scala
 b/cpp/celeborn/protocol/CompressionCodec.h
similarity index 73%
copy from 
worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/JavaReadCppWriteTestWithNONE.scala
copy to cpp/celeborn/protocol/CompressionCodec.h
index 4aa5b0ed7..785167c1f 100644
--- 
a/worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/JavaReadCppWriteTestWithNONE.scala
+++ b/cpp/celeborn/protocol/CompressionCodec.h
@@ -15,13 +15,19 @@
  * limitations under the License.
  */
 
-package org.apache.celeborn.service.deploy.cluster
+#pragma once
+#include <string_view>
 
-import org.apache.celeborn.common.protocol.CompressionCodec
+namespace celeborn {
+namespace protocol {
+enum class CompressionCodec {
+  LZ4,
+  ZSTD,
+  NONE,
+};
 
-object JavaReadCppWriteTestWithNONE extends JavaReadCppWriteTestBase {
+CompressionCodec toCompressionCodec(std::string_view code);
 
-  def main(args: Array[String]) = {
-    testJavaReadCppWrite(CompressionCodec.NONE)
-  }
-}
+std::string_view toString(CompressionCodec codec);
+} // namespace protocol
+} // namespace celeborn
diff --git a/cpp/celeborn/tests/DataSumWithReaderClient.cpp 
b/cpp/celeborn/tests/DataSumWithReaderClient.cpp
index d60166e92..8c060fa54 100644
--- a/cpp/celeborn/tests/DataSumWithReaderClient.cpp
+++ b/cpp/celeborn/tests/DataSumWithReaderClient.cpp
@@ -23,7 +23,7 @@
 
 int main(int argc, char** argv) {
   // Read the configs.
-  assert(argc == 8);
+  assert(argc == 9);
   std::string lifecycleManagerHost = argv[1];
   int lifecycleManagerPort = std::atoi(argv[2]);
   std::string appUniqueId = argv[3];
@@ -31,15 +31,19 @@ int main(int argc, char** argv) {
   int attemptId = std::atoi(argv[5]);
   int numPartitions = std::atoi(argv[6]);
   std::string resultFile = argv[7];
+  std::string compressCodec = argv[8];
   std::cout << "lifecycleManagerHost = " << lifecycleManagerHost
             << ", lifecycleManagerPort = " << lifecycleManagerPort
             << ", appUniqueId = " << appUniqueId
             << ", shuffleId = " << shuffleId << ", attemptId = " << attemptId
             << ", numPartitions = " << numPartitions
-            << ", resultFile = " << resultFile << std::endl;
+            << ", resultFile = " << resultFile << std::endl
+            << ", compressCodec = " << compressCodec << std::endl;
 
   // Create shuffleClient and setup.
   auto conf = std::make_shared<celeborn::conf::CelebornConf>();
+  conf->registerProperty(
+      celeborn::conf::CelebornConf::kShuffleCompressionCodec, compressCodec);
   auto clientFactory =
       std::make_shared<celeborn::network::TransportClientFactory>(conf);
   auto shuffleClient = std::make_unique<celeborn::client::ShuffleClientImpl>(
@@ -63,6 +67,9 @@ int main(int argc, char** argv) {
         dataCnt++;
         continue;
       }
+      if (c == '+') {
+        continue;
+      }
       assert(c >= '0' && c <= '9');
       data *= 10;
       data += c - '0';
diff --git a/cpp/cmake/FindLZ4.cmake b/cpp/cmake/FindLZ4.cmake
new file mode 100644
index 000000000..918568b55
--- /dev/null
+++ b/cpp/cmake/FindLZ4.cmake
@@ -0,0 +1,41 @@
+# Copyright (c) Meta Platforms, Inc. and affiliates.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Finds liblz4.
+#
+# This module defines:
+# LZ4_FOUND
+# LZ4_INCLUDE_DIR
+# LZ4_LIBRARY
+#
+
+find_path(LZ4_INCLUDE_DIR NAMES lz4.h)
+
+find_library(LZ4_LIBRARY_DEBUG NAMES lz4d)
+find_library(LZ4_LIBRARY_RELEASE NAMES lz4)
+
+include(SelectLibraryConfigurations)
+SELECT_LIBRARY_CONFIGURATIONS(LZ4)
+
+include(FindPackageHandleStandardArgs)
+FIND_PACKAGE_HANDLE_STANDARD_ARGS(
+    LZ4 DEFAULT_MSG
+    LZ4_LIBRARY LZ4_INCLUDE_DIR
+)
+
+if (LZ4_FOUND)
+    message(STATUS "Found LZ4: ${LZ4_LIBRARY}")
+endif()
+
+mark_as_advanced(LZ4_INCLUDE_DIR LZ4_LIBRARY)
diff --git a/cpp/scripts/setup-ubuntu.sh b/cpp/scripts/setup-ubuntu.sh
index fe137a180..e26ffa4f3 100755
--- a/cpp/scripts/setup-ubuntu.sh
+++ b/cpp/scripts/setup-ubuntu.sh
@@ -126,6 +126,8 @@ function install_celeborn_cpp_deps_from_apt {
     libgmock-dev \
     libevent-dev \
     libsodium-dev \
+    libxxhash-dev \
+    liblz4-dev \
     libzstd-dev \
     libre2-dev
 }
diff --git 
a/worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/JavaReadCppWriteTestBase.scala
 
b/worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/JavaWriteCppReadTestBase.scala
similarity index 89%
rename from 
worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/JavaReadCppWriteTestBase.scala
rename to 
worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/JavaWriteCppReadTestBase.scala
index a57ffebe8..a124cc590 100644
--- 
a/worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/JavaReadCppWriteTestBase.scala
+++ 
b/worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/JavaWriteCppReadTestBase.scala
@@ -35,7 +35,7 @@ import org.apache.celeborn.common.protocol.CompressionCodec
 import org.apache.celeborn.common.util.Utils.runCommand
 import org.apache.celeborn.service.deploy.MiniClusterFeature
 
-trait JavaReadCppWriteTestBase extends AnyFunSuite
+trait JavaWriteCppReadTestBase extends AnyFunSuite
   with Logging with MiniClusterFeature with BeforeAndAfterAll {
 
   var masterPort = 0
@@ -51,16 +51,16 @@ trait JavaReadCppWriteTestBase extends AnyFunSuite
     shutdownMiniCluster()
   }
 
-  def testJavaReadCppWrite(codec: CompressionCodec): Unit = {
+  def testJavaWriteCppRead(codec: CompressionCodec): Unit = {
     beforeAll()
     try {
-      runJavaReadCppWrite(codec)
+      runJavaWriteCppRead(codec)
     } finally {
       afterAll()
     }
   }
 
-  def runJavaReadCppWrite(codec: CompressionCodec): Unit = {
+  def runJavaWriteCppRead(codec: CompressionCodec): Unit = {
     val appUniqueId = "test-app"
     val shuffleId = 0
     val attemptId = 0
@@ -87,13 +87,19 @@ trait JavaReadCppWriteTestBase extends AnyFunSuite
     val numData = 1000
     var sums = new util.ArrayList[Long](numPartitions)
     val rand = new Random()
+    var prefix = "-"
+    if (codec != CompressionCodec.NONE) {
+      // Add duplicate strings to make the compressed length shorter than the 
original length.
+      // Will be dropped in cpp test client.
+      prefix = prefix ++ "++++++++++"
+    }
     for (mapId <- 0 until numMappers) {
       for (partitionId <- 0 until numPartitions) {
         sums.add(0)
         for (i <- 0 until numData) {
           val data = rand.nextInt(maxData)
           sums.set(partitionId, sums.get(partitionId) + data)
-          val dataStr = "-" + data.toString
+          val dataStr = prefix + data.toString
           shuffleClient.pushOrMergeData(
             shuffleId,
             mapId,
@@ -105,7 +111,7 @@ trait JavaReadCppWriteTestBase extends AnyFunSuite
             numMappers,
             numPartitions,
             false,
-            true)
+            false)
         }
       }
       shuffleClient.pushMergedData(shuffleId, mapId, attemptId)
@@ -120,9 +126,10 @@ trait JavaReadCppWriteTestBase extends AnyFunSuite
     val cppBinRelativeDirectory = "cpp/build/celeborn/tests/"
     val cppBinFileName = "cppDataSumWithReaderClient"
     val cppBinFilePath = 
s"$projectDirectory/$cppBinRelativeDirectory/$cppBinFileName"
-    // Execution command: $exec lifecycleManagerHost lifecycleManagerPort 
appUniqueId shuffleId attemptId numPartitions cppResultFile
+    val cppCodec = codec.name()
+    // Execution command: $exec lifecycleManagerHost lifecycleManagerPort 
appUniqueId shuffleId attemptId numPartitions cppResultFile cppCodec
     val command = {
-      s"$cppBinFilePath $lifecycleManagerHost $lifecycleManagerPort 
$appUniqueId $shuffleId $attemptId $numPartitions $cppResultFile"
+      s"$cppBinFilePath $lifecycleManagerHost $lifecycleManagerPort 
$appUniqueId $shuffleId $attemptId $numPartitions $cppResultFile $cppCodec"
     }
     println(s"run command: $command")
     val commandOutput = runCommand(command)
diff --git 
a/worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/JavaReadCppWriteTestWithNONE.scala
 
b/worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/JavaWriteCppReadTestWithLZ4.scala
similarity index 88%
copy from 
worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/JavaReadCppWriteTestWithNONE.scala
copy to 
worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/JavaWriteCppReadTestWithLZ4.scala
index 4aa5b0ed7..bc1961384 100644
--- 
a/worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/JavaReadCppWriteTestWithNONE.scala
+++ 
b/worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/JavaWriteCppReadTestWithLZ4.scala
@@ -19,9 +19,9 @@ package org.apache.celeborn.service.deploy.cluster
 
 import org.apache.celeborn.common.protocol.CompressionCodec
 
-object JavaReadCppWriteTestWithNONE extends JavaReadCppWriteTestBase {
+object JavaWriteCppReadTestWithLZ4 extends JavaWriteCppReadTestBase {
 
   def main(args: Array[String]) = {
-    testJavaReadCppWrite(CompressionCodec.NONE)
+    testJavaWriteCppRead(CompressionCodec.LZ4)
   }
 }
diff --git 
a/worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/JavaReadCppWriteTestWithNONE.scala
 
b/worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/JavaWriteCppReadTestWithNONE.scala
similarity index 89%
rename from 
worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/JavaReadCppWriteTestWithNONE.scala
rename to 
worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/JavaWriteCppReadTestWithNONE.scala
index 4aa5b0ed7..a649f8350 100644
--- 
a/worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/JavaReadCppWriteTestWithNONE.scala
+++ 
b/worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/JavaWriteCppReadTestWithNONE.scala
@@ -19,9 +19,9 @@ package org.apache.celeborn.service.deploy.cluster
 
 import org.apache.celeborn.common.protocol.CompressionCodec
 
-object JavaReadCppWriteTestWithNONE extends JavaReadCppWriteTestBase {
+object JavaWriteCppReadTestWithNONE extends JavaWriteCppReadTestBase {
 
   def main(args: Array[String]) = {
-    testJavaReadCppWrite(CompressionCodec.NONE)
+    testJavaWriteCppRead(CompressionCodec.NONE)
   }
 }


Reply via email to