This is an automated email from the ASF dual-hosted git repository.
nicholasjiang pushed a commit to branch branch-0.6
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/branch-0.6 by this push:
new 16b12053c [CELEBORN-2090] Support Lz4 Decompression in CppClient
16b12053c is described below
commit 16b12053cad69e5137896b82b82feb2318bcf1b5
Author: Jray <[email protected]>
AuthorDate: Fri Aug 8 18:19:48 2025 +0800
[CELEBORN-2090] Support Lz4 Decompression in CppClient
### What changes were proposed in this pull request?
This PR adds support for lz4 decompression in CppClient.
### Why are the changes needed?
To support reading from Celeborn with CppClient.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
By compilation and UTs.
Closes #3402 from Jraaay/feat/cpp_client_lz4_decompression.
Authored-by: Jray <[email protected]>
Signed-off-by: SteNicholas <[email protected]>
(cherry picked from commit cfb490c9380cd5e8e903042de9350b3eea8ce9c8)
Signed-off-by: SteNicholas <[email protected]>
---
.github/workflows/cpp_integration.yml | 15 +++-
LICENSE | 3 +
cpp/CMakeLists.txt | 6 ++
cpp/README.md | 2 +-
cpp/celeborn/client/CMakeLists.txt | 5 +-
cpp/celeborn/client/ShuffleClient.cpp | 14 +++-
cpp/celeborn/client/ShuffleClient.h | 16 ++++
.../celeborn/client/compress/Decompressor.cpp | 25 ++++--
.../celeborn/client/compress/Decompressor.h | 33 ++++++--
cpp/celeborn/client/compress/Lz4Decompressor.cpp | 97 ++++++++++++++++++++++
.../celeborn/client/compress/Lz4Decompressor.h | 32 +++++--
.../celeborn/client/compress/Lz4Trait.h | 25 ++++--
cpp/celeborn/client/reader/CelebornInputStream.cpp | 41 +++++++--
cpp/celeborn/client/reader/CelebornInputStream.h | 8 +-
cpp/celeborn/client/tests/CMakeLists.txt | 3 +-
cpp/celeborn/client/tests/Lz4DecompressorTest.cpp | 70 ++++++++++++++++
cpp/celeborn/conf/CMakeLists.txt | 1 +
cpp/celeborn/conf/CelebornConf.cpp | 8 ++
cpp/celeborn/conf/CelebornConf.h | 6 ++
cpp/celeborn/memory/ByteBuffer.cpp | 22 +++++
cpp/celeborn/memory/ByteBuffer.h | 13 ++-
cpp/celeborn/memory/tests/ByteBufferTest.cpp | 76 +++++++++++++++++
cpp/celeborn/protocol/CMakeLists.txt | 3 +-
.../celeborn/protocol/CompressionCodec.cpp | 30 +++++--
.../celeborn/protocol/CompressionCodec.h | 20 +++--
cpp/celeborn/tests/DataSumWithReaderClient.cpp | 11 ++-
cpp/cmake/FindLZ4.cmake | 41 +++++++++
cpp/scripts/setup-ubuntu.sh | 2 +
...stBase.scala => JavaWriteCppReadTestBase.scala} | 23 +++--
...ONE.scala => JavaWriteCppReadTestWithLZ4.scala} | 4 +-
...NE.scala => JavaWriteCppReadTestWithNONE.scala} | 4 +-
31 files changed, 587 insertions(+), 72 deletions(-)
diff --git a/.github/workflows/cpp_integration.yml
b/.github/workflows/cpp_integration.yml
index 0af4c61e5..0b221aeb9 100644
--- a/.github/workflows/cpp_integration.yml
+++ b/.github/workflows/cpp_integration.yml
@@ -30,7 +30,7 @@ on:
jobs:
celeborn_cpp_check_lint:
runs-on: ubuntu-22.04
- container: holylow/celeborn-cpp-dev:0.3
+ container: jraaaay/celeborn-cpp-dev:0.4
steps:
- uses: actions/checkout@v4
with:
@@ -43,7 +43,7 @@ jobs:
xargs clang-format-15 -style=file:./.clang-format -n --Werror
celeborn_cpp_unit_test:
runs-on: ubuntu-22.04
- container: holylow/celeborn-cpp-dev:0.3
+ container: jraaaay/celeborn-cpp-dev:0.4
steps:
- uses: actions/checkout@v4
with:
@@ -59,7 +59,7 @@ jobs:
run: ctest
celeborn_cpp_integration_test:
runs-on: ubuntu-22.04
- container: holylow/celeborn-cpp-dev:0.3
+ container: jraaaay/celeborn-cpp-dev:0.4
steps:
- uses: actions/checkout@v4
with:
@@ -90,5 +90,12 @@ jobs:
build/mvn -pl worker \
test-compile exec:java \
-Dexec.classpathScope="test" \
-
-Dexec.mainClass="org.apache.celeborn.service.deploy.cluster.JavaReadCppWriteTestWithNONE"
\
+
-Dexec.mainClass="org.apache.celeborn.service.deploy.cluster.JavaWriteCppReadTestWithNONE"
\
+ -Dexec.args="-XX:MaxDirectMemorySize=2G"
+ - name: Run Java-Cpp Hybrid Integration Test (LZ4 Decompression)
+ run: |
+ build/mvn -pl worker \
+ test-compile exec:java \
+ -Dexec.classpathScope="test" \
+
-Dexec.mainClass="org.apache.celeborn.service.deploy.cluster.JavaWriteCppReadTestWithLZ4"
\
-Dexec.args="-XX:MaxDirectMemorySize=2G"
diff --git a/LICENSE b/LICENSE
index f757490f9..ee0a3f890 100644
--- a/LICENSE
+++ b/LICENSE
@@ -274,6 +274,9 @@ Meta Velox
./cpp/celeborn/conf/BaseConf.h
./cpp/celeborn/conf/BaseConf.cpp
+Meta Folly
+./cpp/cmake/FindLz4.cmake
+
------------------------------------------------------------------------------------
This product bundles various third-party components under the CC0 license.
This section summarizes those components.
diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt
index 8552b7ef1..f24a8abff 100644
--- a/cpp/CMakeLists.txt
+++ b/cpp/CMakeLists.txt
@@ -129,6 +129,12 @@ find_package(Sodium REQUIRED)
find_library(FIZZ fizz REQUIRED)
find_library(WANGLE wangle REQUIRED)
+find_package(LZ4 REQUIRED)
+set(LZ4_WITH_DEPENDENCIES
+ ${LZ4_LIBRARY}
+ xxhash
+)
+
find_library(RE2 re2)
find_package(fizz CONFIG REQUIRED)
diff --git a/cpp/README.md b/cpp/README.md
index 2aa07b4e1..d2c794228 100644
--- a/cpp/README.md
+++ b/cpp/README.md
@@ -21,7 +21,7 @@ docker run \
-w /celeborn \
-it --rm \
--name celeborn-cpp-dev-container \
- holylow/celeborn-cpp-dev:0.3 \
+ jraaaay/celeborn-cpp-dev:0.4 \
/bin/bash
```
diff --git a/cpp/celeborn/client/CMakeLists.txt
b/cpp/celeborn/client/CMakeLists.txt
index 2491b7d7f..94e58795d 100644
--- a/cpp/celeborn/client/CMakeLists.txt
+++ b/cpp/celeborn/client/CMakeLists.txt
@@ -16,7 +16,9 @@ add_library(
client
reader/WorkerPartitionReader.cpp
reader/CelebornInputStream.cpp
- ShuffleClient.cpp)
+ ShuffleClient.cpp
+ compress/Decompressor.cpp
+ compress/Lz4Decompressor.cpp)
target_include_directories(client PUBLIC ${CMAKE_BINARY_DIR})
@@ -31,6 +33,7 @@ target_link_libraries(
${FIZZ}
${LIBSODIUM_LIBRARY}
${FOLLY_WITH_DEPENDENCIES}
+ ${LZ4_WITH_DEPENDENCIES}
${GLOG}
${GFLAGS_LIBRARIES}
)
diff --git a/cpp/celeborn/client/ShuffleClient.cpp
b/cpp/celeborn/client/ShuffleClient.cpp
index 8e8ca2121..ccf7c6dcb 100644
--- a/cpp/celeborn/client/ShuffleClient.cpp
+++ b/cpp/celeborn/client/ShuffleClient.cpp
@@ -48,6 +48,17 @@ std::unique_ptr<CelebornInputStream>
ShuffleClientImpl::readPartition(
int attemptNumber,
int startMapIndex,
int endMapIndex) {
+ return ShuffleClientImpl::readPartition(
+ shuffleId, partitionId, attemptNumber, startMapIndex, endMapIndex, true);
+}
+
+std::unique_ptr<CelebornInputStream> ShuffleClientImpl::readPartition(
+ int shuffleId,
+ int partitionId,
+ int attemptNumber,
+ int startMapIndex,
+ int endMapIndex,
+ bool needCompression) {
const auto& reducerFileGroupInfo = getReducerFileGroupInfo(shuffleId);
std::string shuffleKey = utils::makeShuffleKey(appUniqueId_, shuffleId);
std::vector<std::shared_ptr<const protocol::PartitionLocation>> locations;
@@ -64,7 +75,8 @@ std::unique_ptr<CelebornInputStream>
ShuffleClientImpl::readPartition(
reducerFileGroupInfo.attempts,
attemptNumber,
startMapIndex,
- endMapIndex);
+ endMapIndex,
+ needCompression);
}
void ShuffleClientImpl::updateReducerFileGroup(int shuffleId) {
diff --git a/cpp/celeborn/client/ShuffleClient.h
b/cpp/celeborn/client/ShuffleClient.h
index 288e52e9e..284c7ade9 100644
--- a/cpp/celeborn/client/ShuffleClient.h
+++ b/cpp/celeborn/client/ShuffleClient.h
@@ -38,6 +38,14 @@ class ShuffleClient {
int startMapIndex,
int endMapIndex) = 0;
+ virtual std::unique_ptr<CelebornInputStream> readPartition(
+ int shuffleId,
+ int partitionId,
+ int attemptNumber,
+ int startMapIndex,
+ int endMapIndex,
+ bool needCompression) = 0;
+
virtual bool cleanupShuffle(int shuffleId) = 0;
virtual void shutdown() = 0;
@@ -62,6 +70,14 @@ class ShuffleClientImpl : public ShuffleClient {
int startMapIndex,
int endMapIndex) override;
+ std::unique_ptr<CelebornInputStream> readPartition(
+ int shuffleId,
+ int partitionId,
+ int attemptNumber,
+ int startMapIndex,
+ int endMapIndex,
+ bool needCompression) override;
+
void updateReducerFileGroup(int shuffleId) override;
bool cleanupShuffle(int shuffleId) override;
diff --git
a/worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/JavaReadCppWriteTestWithNONE.scala
b/cpp/celeborn/client/compress/Decompressor.cpp
similarity index 55%
copy from
worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/JavaReadCppWriteTestWithNONE.scala
copy to cpp/celeborn/client/compress/Decompressor.cpp
index 4aa5b0ed7..999d01822 100644
---
a/worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/JavaReadCppWriteTestWithNONE.scala
+++ b/cpp/celeborn/client/compress/Decompressor.cpp
@@ -15,13 +15,28 @@
* limitations under the License.
*/
-package org.apache.celeborn.service.deploy.cluster
+#include <stdexcept>
-import org.apache.celeborn.common.protocol.CompressionCodec
+#include "celeborn/client/compress/Lz4Decompressor.h"
+#include "celeborn/utils/Exceptions.h"
-object JavaReadCppWriteTestWithNONE extends JavaReadCppWriteTestBase {
+namespace celeborn {
+namespace client {
+namespace compress {
- def main(args: Array[String]) = {
- testJavaReadCppWrite(CompressionCodec.NONE)
+std::unique_ptr<Decompressor> Decompressor::createDecompressor(
+ protocol::CompressionCodec codec) {
+ switch (codec) {
+ case protocol::CompressionCodec::LZ4:
+ return std::make_unique<Lz4Decompressor>();
+ case protocol::CompressionCodec::ZSTD:
+ // TODO: impl zstd
+ CELEBORN_FAIL("Compression codec ZSTD is not supported.");
+ default:
+ CELEBORN_FAIL("Unknown compression codec.");
}
}
+
+} // namespace compress
+} // namespace client
+} // namespace celeborn
diff --git
a/worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/JavaReadCppWriteTestWithNONE.scala
b/cpp/celeborn/client/compress/Decompressor.h
similarity index 53%
copy from
worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/JavaReadCppWriteTestWithNONE.scala
copy to cpp/celeborn/client/compress/Decompressor.h
index 4aa5b0ed7..694c07d42 100644
---
a/worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/JavaReadCppWriteTestWithNONE.scala
+++ b/cpp/celeborn/client/compress/Decompressor.h
@@ -15,13 +15,34 @@
* limitations under the License.
*/
-package org.apache.celeborn.service.deploy.cluster
+#pragma once
-import org.apache.celeborn.common.protocol.CompressionCodec
+#include <memory>
-object JavaReadCppWriteTestWithNONE extends JavaReadCppWriteTestBase {
+#include "celeborn/protocol/CompressionCodec.h"
- def main(args: Array[String]) = {
- testJavaReadCppWrite(CompressionCodec.NONE)
+namespace celeborn {
+namespace client {
+namespace compress {
+
+class Decompressor {
+ public:
+ virtual ~Decompressor() = default;
+
+ virtual int decompress(const uint8_t* src, uint8_t* dst, int dst_off) = 0;
+
+ virtual int getOriginalLen(const uint8_t* src) = 0;
+
+ static std::unique_ptr<Decompressor> createDecompressor(
+ protocol::CompressionCodec codec);
+
+ protected:
+ static int32_t readIntLE(const uint8_t* buf, const int i) {
+ const auto p = buf;
+ return (p[i]) | (p[i + 1] << 8) | (p[i + 2] << 16) | (p[i + 3] << 24);
}
-}
+};
+
+} // namespace compress
+} // namespace client
+} // namespace celeborn
diff --git a/cpp/celeborn/client/compress/Lz4Decompressor.cpp
b/cpp/celeborn/client/compress/Lz4Decompressor.cpp
new file mode 100644
index 000000000..f85ced0d2
--- /dev/null
+++ b/cpp/celeborn/client/compress/Lz4Decompressor.cpp
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "celeborn/client/compress/Lz4Decompressor.h"
+#include <lz4.h>
+#include <cstring>
+#include <iostream>
+#include "celeborn/utils/Exceptions.h"
+
+namespace celeborn {
+namespace client {
+namespace compress {
+Lz4Decompressor::Lz4Decompressor() {
+ xxhash_state_ = XXH32_createState();
+ if (!xxhash_state_) {
+ CELEBORN_FAIL("Failed to create XXH32 state.")
+ }
+ XXH32_reset(xxhash_state_, kDefaultSeed);
+}
+
+Lz4Decompressor::~Lz4Decompressor() {
+ if (xxhash_state_) {
+ XXH32_freeState(xxhash_state_);
+ }
+}
+
+int Lz4Decompressor::getOriginalLen(const uint8_t* src) {
+ return readIntLE(src, kMagicLength + 5);
+}
+
+int Lz4Decompressor::decompress(
+ const uint8_t* src,
+ uint8_t* dst,
+ const int dstOff) {
+ const int compressionMethod = static_cast<unsigned char>(src[kMagicLength]);
+ const int compressedLen = readIntLE(src, kMagicLength + 1);
+ const int originalLen = readIntLE(src, kMagicLength + 5);
+ const int expectedCheck = readIntLE(src, kMagicLength + 9);
+
+ const uint8_t* compressedDataPtr = src + kHeaderLength;
+ uint8_t* dstPtr = dst + dstOff;
+
+ switch (compressionMethod) {
+ case kCompressionMethodRaw:
+ std::memcpy(dstPtr, compressedDataPtr, originalLen);
+ break;
+ case kCompressionMethodLZ4: {
+ const int decompressedBytes = LZ4_decompress_safe(
+ reinterpret_cast<const char*>(compressedDataPtr),
+ reinterpret_cast<char*>(dstPtr),
+ compressedLen,
+ originalLen);
+
+ if (decompressedBytes != originalLen) {
+ CELEBORN_FAIL(
+ std::string("Decompression failed! LZ4 error or size mismatch. ") +
+ "Expected: " + std::to_string(originalLen) +
+ ", Got: " + std::to_string(decompressedBytes));
+ }
+ break;
+ }
+ default:
+ CELEBORN_FAIL(
+ std::string("Unsupported compression method: ") +
+ std::to_string(compressionMethod));
+ }
+
+ XXH32_reset(xxhash_state_, kDefaultSeed);
+ XXH32_update(xxhash_state_, dstPtr, originalLen);
+ const uint32_t actualCheck = XXH32_digest(xxhash_state_) & 0xFFFFFFFL;
+
+ if (static_cast<uint32_t>(expectedCheck) != actualCheck) {
+ CELEBORN_FAIL(
+ std::string("Checksum mismatch! Expected: ") +
+ std::to_string(expectedCheck) +
+ ", Actual: " + std::to_string(actualCheck));
+ }
+
+ return originalLen;
+}
+} // namespace compress
+} // namespace client
+} // namespace celeborn
diff --git
a/worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/JavaReadCppWriteTestWithNONE.scala
b/cpp/celeborn/client/compress/Lz4Decompressor.h
similarity index 54%
copy from
worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/JavaReadCppWriteTestWithNONE.scala
copy to cpp/celeborn/client/compress/Lz4Decompressor.h
index 4aa5b0ed7..3d34a6876 100644
---
a/worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/JavaReadCppWriteTestWithNONE.scala
+++ b/cpp/celeborn/client/compress/Lz4Decompressor.h
@@ -15,13 +15,31 @@
* limitations under the License.
*/
-package org.apache.celeborn.service.deploy.cluster
+#pragma once
-import org.apache.celeborn.common.protocol.CompressionCodec
+#include <xxhash.h>
+#include "celeborn/client/compress/Decompressor.h"
+#include "celeborn/client/compress/Lz4Trait.h"
-object JavaReadCppWriteTestWithNONE extends JavaReadCppWriteTestBase {
+namespace celeborn {
+namespace client {
+namespace compress {
- def main(args: Array[String]) = {
- testJavaReadCppWrite(CompressionCodec.NONE)
- }
-}
+class Lz4Decompressor final : public Decompressor, Lz4Trait {
+ public:
+ Lz4Decompressor();
+ ~Lz4Decompressor() override;
+
+ int getOriginalLen(const uint8_t* src) override;
+ int decompress(const uint8_t* src, uint8_t* dst, int dstOff) override;
+
+ Lz4Decompressor(const Lz4Decompressor&) = delete;
+ Lz4Decompressor& operator=(const Lz4Decompressor&) = delete;
+
+ private:
+ XXH32_state_t* xxhash_state_;
+};
+
+} // namespace compress
+} // namespace client
+} // namespace celeborn
diff --git
a/worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/JavaReadCppWriteTestWithNONE.scala
b/cpp/celeborn/client/compress/Lz4Trait.h
similarity index 60%
copy from
worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/JavaReadCppWriteTestWithNONE.scala
copy to cpp/celeborn/client/compress/Lz4Trait.h
index 4aa5b0ed7..fc236a1c5 100644
---
a/worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/JavaReadCppWriteTestWithNONE.scala
+++ b/cpp/celeborn/client/compress/Lz4Trait.h
@@ -15,13 +15,24 @@
* limitations under the License.
*/
-package org.apache.celeborn.service.deploy.cluster
+#pragma once
-import org.apache.celeborn.common.protocol.CompressionCodec
+namespace celeborn {
+namespace client {
+namespace compress {
-object JavaReadCppWriteTestWithNONE extends JavaReadCppWriteTestBase {
+struct Lz4Trait {
+ static constexpr char kMagic[] = {'L', 'Z', '4', 'B', 'l', 'o', 'c', 'k'};
+ static constexpr int kMagicLength = sizeof(kMagic);
- def main(args: Array[String]) = {
- testJavaReadCppWrite(CompressionCodec.NONE)
- }
-}
+ static constexpr int kHeaderLength = kMagicLength + 1 + 4 + 4 + 4;
+
+ static constexpr int kCompressionMethodRaw = 0x10;
+ static constexpr int kCompressionMethodLZ4 = 0x20;
+
+ static constexpr int kDefaultSeed = 0x9747b28c;
+};
+
+} // namespace compress
+} // namespace client
+} // namespace celeborn
diff --git a/cpp/celeborn/client/reader/CelebornInputStream.cpp
b/cpp/celeborn/client/reader/CelebornInputStream.cpp
index a3023d6f3..d75b705e5 100644
--- a/cpp/celeborn/client/reader/CelebornInputStream.cpp
+++ b/cpp/celeborn/client/reader/CelebornInputStream.cpp
@@ -16,6 +16,8 @@
*/
#include "celeborn/client/reader/CelebornInputStream.h"
+#include <lz4.h>
+#include "celeborn/client/compress/Decompressor.h"
namespace celeborn {
namespace client {
@@ -27,7 +29,8 @@ CelebornInputStream::CelebornInputStream(
const std::vector<int>& attempts,
int attemptNumber,
int startMapIndex,
- int endMapIndex)
+ int endMapIndex,
+ bool needCompression)
: shuffleKey_(shuffleKey),
conf_(conf),
clientFactory_(clientFactory),
@@ -38,7 +41,15 @@ CelebornInputStream::CelebornInputStream(
endMapIndex_(endMapIndex),
currLocationIndex_(0),
currBatchPos_(0),
- currBatchSize_(0) {
+ currBatchSize_(0),
+ shouldDecompress_(
+ conf_->shuffleCompressionCodec() !=
+ protocol::CompressionCodec::NONE &&
+ needCompression) {
+ if (shouldDecompress_) {
+ decompressor_ = compress::Decompressor::createDecompressor(
+ conf_->shuffleCompressionCodec());
+ }
moveToNextReader();
}
@@ -57,8 +68,8 @@ int CelebornInputStream::read(uint8_t* buffer, size_t offset,
size_t len) {
}
size_t batchRemainingSize = currBatchSize_ - currBatchPos_;
size_t toReadBytes = std::min(len - readBytes, batchRemainingSize);
- CELEBORN_CHECK_GE(currChunk_->remainingSize(), toReadBytes);
- auto size = currChunk_->readToBuffer(&buf[readBytes], toReadBytes);
+ CELEBORN_CHECK_GE(decompressedChunk_->remainingSize(), toReadBytes);
+ auto size = decompressedChunk_->readToBuffer(&buf[readBytes], toReadBytes);
CELEBORN_CHECK_EQ(toReadBytes, size);
readBytes += toReadBytes;
currBatchPos_ += toReadBytes;
@@ -83,13 +94,31 @@ bool CelebornInputStream::fillBuffer() {
CELEBORN_CHECK_GE(currChunk_->remainingSize(), size);
CELEBORN_CHECK_LT(mapId, attempts_.size());
- // TODO: compression is not supported yet!
+ if (shouldDecompress_) {
+ if (size > compressedBuf_.size()) {
+ compressedBuf_.resize(size);
+ }
+ currChunk_->readToBuffer(compressedBuf_.data(), size);
+ }
if (attemptId == attempts_[mapId]) {
auto& batchRecord = getBatchRecord(mapId);
if (batchRecord.count(batchId) <= 0) {
batchRecord.insert(batchId);
- currBatchSize_ = size;
+ if (shouldDecompress_) {
+ const auto originalLength =
+ decompressor_->getOriginalLen(compressedBuf_.data());
+ std::unique_ptr<folly::IOBuf> decompressedBuf_ =
+ folly::IOBuf::createCombined(originalLength);
+ decompressedBuf_->append(originalLength);
+ currBatchSize_ = decompressor_->decompress(
+ compressedBuf_.data(), decompressedBuf_->writableData(), 0);
+ decompressedChunk_ = memory::ByteBuffer::createReadOnly(
+ std::move(decompressedBuf_), false);
+ } else {
+ currBatchSize_ = size;
+ decompressedChunk_ = currChunk_->readToReadOnlyBuffer(size);
+ }
currBatchPos_ = 0;
hasData = true;
break;
diff --git a/cpp/celeborn/client/reader/CelebornInputStream.h
b/cpp/celeborn/client/reader/CelebornInputStream.h
index af143321c..5dd9c4f76 100644
--- a/cpp/celeborn/client/reader/CelebornInputStream.h
+++ b/cpp/celeborn/client/reader/CelebornInputStream.h
@@ -17,6 +17,7 @@
#pragma once
+#include "celeborn/client/compress/Decompressor.h"
#include "celeborn/client/reader/WorkerPartitionReader.h"
#include "celeborn/conf/CelebornConf.h"
@@ -33,7 +34,8 @@ class CelebornInputStream {
const std::vector<int>& attempts,
int attemptNumber,
int startMapIndex,
- int endMapIndex);
+ int endMapIndex,
+ bool needCompression);
int read(uint8_t* buffer, size_t offset, size_t len);
@@ -68,9 +70,13 @@ class CelebornInputStream {
int attemptNumber_;
int startMapIndex_;
int endMapIndex_;
+ bool shouldDecompress_;
+ std::unique_ptr<compress::Decompressor> decompressor_;
+ std::vector<uint8_t> compressedBuf_;
int currLocationIndex_;
std::unique_ptr<memory::ReadOnlyByteBuffer> currChunk_;
+ std::unique_ptr<memory::ReadOnlyByteBuffer> decompressedChunk_;
size_t currBatchPos_;
size_t currBatchSize_;
std::shared_ptr<PartitionReader> currReader_;
diff --git a/cpp/celeborn/client/tests/CMakeLists.txt
b/cpp/celeborn/client/tests/CMakeLists.txt
index 6543ca28d..ab1dea923 100644
--- a/cpp/celeborn/client/tests/CMakeLists.txt
+++ b/cpp/celeborn/client/tests/CMakeLists.txt
@@ -15,7 +15,8 @@
add_executable(
celeborn_client_test
- WorkerPartitionReaderTest.cpp)
+ WorkerPartitionReaderTest.cpp
+ Lz4DecompressorTest.cpp)
add_test(NAME celeborn_client_test COMMAND celeborn_client_test)
diff --git a/cpp/celeborn/client/tests/Lz4DecompressorTest.cpp
b/cpp/celeborn/client/tests/Lz4DecompressorTest.cpp
new file mode 100644
index 000000000..6cd786367
--- /dev/null
+++ b/cpp/celeborn/client/tests/Lz4DecompressorTest.cpp
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <gtest/gtest.h>
+
+#include "celeborn/client/compress/Lz4Decompressor.h"
+
+using namespace celeborn;
+using namespace celeborn::client;
+using namespace celeborn::protocol;
+
+TEST(Lz4DecompressorTest, DecompressWithLz4) {
+ compress::Lz4Decompressor decompressor;
+
+ std::vector<uint8_t> compressedData = {
+ 76, 90, 52, 66, 108, 111, 99, 107, 32, 29, 0, 0, 0,
+ 31, 0, 0, 0, 116, 18, 177, 8, 83, 72, 101, 108, 108,
+ 111, 1, 0, 240, 4, 32, 67, 101, 108, 101, 98, 111, 114,
+ 110, 33, 33, 33, 33, 33, 33, 33, 33, 33, 33};
+
+ const int originalLen = decompressor.getOriginalLen(compressedData.data());
+
+ const auto decompressedData = new uint8_t[originalLen + 1];
+ decompressedData[originalLen] = '\0';
+
+ const bool success =
+ decompressor.decompress(compressedData.data(), decompressedData, 0);
+
+ EXPECT_TRUE(success);
+
+ EXPECT_EQ(
+ reinterpret_cast<char*>(decompressedData),
+ std::string("Helloooooooo Celeborn!!!!!!!!!!"));
+}
+
+TEST(Lz4DecompressorTest, DecompressWithRaw) {
+ compress::Lz4Decompressor decompressor;
+
+ std::vector<uint8_t> compressedData = {
+ 76, 90, 52, 66, 108, 111, 99, 107, 16, 15, 0, 0, 0,
+ 15, 0, 0, 0, 188, 66, 58, 13, 72, 101, 108, 108, 111,
+ 32, 67, 101, 108, 101, 98, 111, 114, 110, 33, 110, 33};
+
+ const int originalLen = decompressor.getOriginalLen(compressedData.data());
+
+ const auto decompressedData = new uint8_t[originalLen + 1];
+ decompressedData[originalLen] = '\0';
+
+ const bool success =
+ decompressor.decompress(compressedData.data(), decompressedData, 0);
+
+ EXPECT_TRUE(success);
+
+ EXPECT_EQ(
+ reinterpret_cast<char*>(decompressedData),
+ std::string("Hello Celeborn!"));
+}
diff --git a/cpp/celeborn/conf/CMakeLists.txt b/cpp/celeborn/conf/CMakeLists.txt
index f9fbca886..86dfc7e8d 100644
--- a/cpp/celeborn/conf/CMakeLists.txt
+++ b/cpp/celeborn/conf/CMakeLists.txt
@@ -20,6 +20,7 @@ add_library(
target_link_libraries(
conf
+ protocol
utils
memory
${FOLLY_WITH_DEPENDENCIES}
diff --git a/cpp/celeborn/conf/CelebornConf.cpp
b/cpp/celeborn/conf/CelebornConf.cpp
index f8d34862e..4f85c19a0 100644
--- a/cpp/celeborn/conf/CelebornConf.cpp
+++ b/cpp/celeborn/conf/CelebornConf.cpp
@@ -140,6 +140,9 @@ const std::unordered_map<std::string,
folly::Optional<std::string>>
NUM_PROP(kNetworkIoNumConnectionsPerPeer, "1"),
NUM_PROP(kNetworkIoClientThreads, 0),
NUM_PROP(kClientFetchMaxReqsInFlight, 3),
+ STR_PROP(
+ kShuffleCompressionCodec,
+ protocol::toString(protocol::CompressionCodec::NONE)),
// NUM_PROP(kNumExample, 50'000),
// BOOL_PROP(kBoolExample, false),
};
@@ -202,5 +205,10 @@ int CelebornConf::networkIoClientThreads() const {
int CelebornConf::clientFetchMaxReqsInFlight() const {
return std::stoi(optionalProperty(kClientFetchMaxReqsInFlight).value());
}
+
+protocol::CompressionCodec CelebornConf::shuffleCompressionCodec() const {
+ return protocol::toCompressionCodec(
+ optionalProperty(kShuffleCompressionCodec).value());
+}
} // namespace conf
} // namespace celeborn
diff --git a/cpp/celeborn/conf/CelebornConf.h b/cpp/celeborn/conf/CelebornConf.h
index 7f36823fa..783bc96e1 100644
--- a/cpp/celeborn/conf/CelebornConf.h
+++ b/cpp/celeborn/conf/CelebornConf.h
@@ -18,6 +18,7 @@
#pragma once
#include "celeborn/conf/BaseConf.h"
+#include "celeborn/protocol/CompressionCodec.h"
#include "celeborn/utils/CelebornUtils.h"
namespace celeborn {
@@ -60,6 +61,9 @@ class CelebornConf : public BaseConf {
static constexpr std::string_view kClientFetchMaxReqsInFlight{
"celeborn.client.fetch.maxReqsInFlight"};
+ static constexpr std::string_view kShuffleCompressionCodec{
+ "celeborn.client.shuffle.compression.codec"};
+
CelebornConf();
CelebornConf(const std::string& filename);
@@ -83,6 +87,8 @@ class CelebornConf : public BaseConf {
int networkIoClientThreads() const;
int clientFetchMaxReqsInFlight() const;
+
+ protocol::CompressionCodec shuffleCompressionCodec() const;
};
} // namespace conf
} // namespace celeborn
diff --git a/cpp/celeborn/memory/ByteBuffer.cpp
b/cpp/celeborn/memory/ByteBuffer.cpp
index 061ca2edf..a44ba92bd 100644
--- a/cpp/celeborn/memory/ByteBuffer.cpp
+++ b/cpp/celeborn/memory/ByteBuffer.cpp
@@ -74,5 +74,27 @@ std::unique_ptr<folly::IOBuf> ByteBuffer::trimBuffer(
}
return std::move(data);
}
+
+std::unique_ptr<ReadOnlyByteBuffer> ReadOnlyByteBuffer::readToReadOnlyBuffer(
+ const size_t len) const {
+ std::unique_ptr<folly::IOBuf> leftData = folly::IOBuf::create(0);
+ auto cnt = 0;
+ while (cnt < len) {
+ if (this->remainingSize() == 0) {
+ break;
+ }
+ std::unique_ptr<folly::IOBuf> newBlock =
+ std::move(this->cursor_->currentBuffer()->clone());
+ newBlock->pop();
+ newBlock->trimStart(this->cursor_->getPositionInCurrentBuffer());
+ if (newBlock->length() > len - cnt) {
+ newBlock->trimEnd(newBlock->length() - (len - cnt));
+ }
+ this->cursor_->skip(newBlock->length());
+ cnt += newBlock->length();
+ leftData->appendToChain(std::move(newBlock));
+ }
+ return createReadOnly(std::move(leftData), isBigEndian_);
+}
} // namespace memory
} // namespace celeborn
diff --git a/cpp/celeborn/memory/ByteBuffer.h b/cpp/celeborn/memory/ByteBuffer.h
index 763582d32..0c1027fe2 100644
--- a/cpp/celeborn/memory/ByteBuffer.h
+++ b/cpp/celeborn/memory/ByteBuffer.h
@@ -48,12 +48,11 @@ class ByteBuffer {
assert(data_);
}
- std::unique_ptr<folly::IOBuf> data_;
- bool isBigEndian_;
-
- private:
static std::unique_ptr<folly::IOBuf> trimBuffer(
const ReadOnlyByteBuffer& buffer);
+
+ std::unique_ptr<folly::IOBuf> data_;
+ bool isBigEndian_;
};
class ReadOnlyByteBuffer : public ByteBuffer {
@@ -126,6 +125,8 @@ class ReadOnlyByteBuffer : public ByteBuffer {
return cursor_->pullAtMost(buf, len);
}
+ std::unique_ptr<ReadOnlyByteBuffer> readToReadOnlyBuffer(size_t len) const;
+
std::unique_ptr<folly::IOBuf> getData() const {
return data_->clone();
}
@@ -171,6 +172,10 @@ class WriteOnlyByteBuffer : public ByteBuffer {
appender_->push(reinterpret_cast<const uint8_t*>(ptr), data.size());
}
+ void writeFromBuffer(const void* data, const size_t len) const {
+ appender_->push(static_cast<const uint8_t*>(data), len);
+ }
+
size_t size() const {
return data_->computeChainDataLength();
}
diff --git a/cpp/celeborn/memory/tests/ByteBufferTest.cpp
b/cpp/celeborn/memory/tests/ByteBufferTest.cpp
index 350b019a6..f6c4c6711 100644
--- a/cpp/celeborn/memory/tests/ByteBufferTest.cpp
+++ b/cpp/celeborn/memory/tests/ByteBufferTest.cpp
@@ -144,6 +144,75 @@ void testReadData(ReadOnlyByteBuffer* readBuffer, size_t
size) {
EXPECT_THROW(readBuffer->readLE<int64_t>(), std::exception);
}
+void testReadOnlyBufferReadData(ReadOnlyByteBuffer* readBuffer, size_t size) {
+ EXPECT_EQ(size, testSize);
+ size_t remainingSize = size;
+ EXPECT_EQ(readBuffer->size(), size);
+ EXPECT_EQ(readBuffer->remainingSize(), remainingSize);
+
+ // Test read string.
+ auto strRead =
+ readBuffer->readToReadOnlyBuffer(strPayload.size())->readToString();
+ EXPECT_EQ(strRead, strPayload);
+ remainingSize -= strPayload.size();
+ EXPECT_EQ(readBuffer->remainingSize(), remainingSize);
+
+ // Test read BigEndian.
+ EXPECT_EQ(
+ readBuffer->readToReadOnlyBuffer(sizeof(int16_t))->readBE<int16_t>(),
+ int16Payload);
+ remainingSize -= sizeof(int16_t);
+ EXPECT_EQ(readBuffer->remainingSize(), remainingSize);
+ EXPECT_EQ(
+ readBuffer->readToReadOnlyBuffer(sizeof(int32_t))->readBE<int32_t>(),
+ int32Payload);
+ remainingSize -= sizeof(int32_t);
+ EXPECT_EQ(readBuffer->remainingSize(), remainingSize);
+ EXPECT_EQ(
+ readBuffer->readToReadOnlyBuffer(sizeof(int64_t))->readBE<int64_t>(),
+ int64Payload);
+ remainingSize -= sizeof(int64_t);
+ EXPECT_EQ(readBuffer->remainingSize(), remainingSize);
+
+ // Test read LittleEndian.
+ EXPECT_EQ(
+ readBuffer->readToReadOnlyBuffer(sizeof(int16_t))->readLE<int16_t>(),
+ int16Payload);
+ remainingSize -= sizeof(int16_t);
+ EXPECT_EQ(readBuffer->remainingSize(), remainingSize);
+ EXPECT_EQ(
+ readBuffer->readToReadOnlyBuffer(sizeof(int32_t))->readLE<int32_t>(),
+ int32Payload);
+ remainingSize -= sizeof(int32_t);
+ EXPECT_EQ(readBuffer->remainingSize(), remainingSize);
+ EXPECT_EQ(
+ readBuffer->readToReadOnlyBuffer(sizeof(int64_t))->readLE<int64_t>(),
+ int64Payload);
+ remainingSize -= sizeof(int64_t);
+ EXPECT_EQ(readBuffer->remainingSize(), remainingSize);
+
+ // Test retreat and skip.
+ const auto retreatSize = sizeof(int32_t) + sizeof(int64_t);
+ remainingSize += retreatSize;
+ readBuffer->retreat(retreatSize);
+ EXPECT_EQ(readBuffer->remainingSize(), remainingSize);
+ EXPECT_EQ(
+ readBuffer->readToReadOnlyBuffer(sizeof(int32_t))->readLE<int32_t>(),
+ int32Payload);
+ remainingSize -= sizeof(int32_t);
+ EXPECT_EQ(readBuffer->remainingSize(), remainingSize);
+ readBuffer->skip(sizeof(int64_t));
+ remainingSize -= sizeof(int64_t);
+ EXPECT_EQ(readBuffer->remainingSize(), remainingSize);
+
+ // Test read end.
+ EXPECT_EQ(readBuffer->size(), size);
+ EXPECT_EQ(readBuffer->remainingSize(), 0);
+ EXPECT_THROW(
+ readBuffer->readToReadOnlyBuffer(sizeof(int64_t))->readLE<int64_t>(),
+ std::exception);
+}
+
TEST(ByteBufferTest, continuousBufferRead) {
size_t size = 0;
auto data = createRawData(size);
@@ -189,6 +258,13 @@ TEST(ByteBufferTest, writeBufferAndRead) {
testReadData(readBuffer.get(), size);
}
+TEST(ByteBufferTest, readOnlyBufferRead) {
+ size_t size = 0;
+ auto writeBuffer = createWriteOnlyBuffer(size);
+ auto readBuffer = WriteOnlyByteBuffer::toReadOnly(std::move(writeBuffer));
+ testReadOnlyBufferReadData(readBuffer.get(), size);
+}
+
TEST(ByteBufferTest, concatReadBuffer) {
size_t size = 0;
auto writeBuffer1 = createWriteOnlyBuffer(size);
diff --git a/cpp/celeborn/protocol/CMakeLists.txt
b/cpp/celeborn/protocol/CMakeLists.txt
index a1dc054d3..f0b0bea78 100644
--- a/cpp/celeborn/protocol/CMakeLists.txt
+++ b/cpp/celeborn/protocol/CMakeLists.txt
@@ -17,7 +17,8 @@ add_library(
STATIC
PartitionLocation.cpp
TransportMessage.cpp
- ControlMessages.cpp)
+ ControlMessages.cpp
+ CompressionCodec.cpp)
target_include_directories(protocol PUBLIC ${CMAKE_BINARY_DIR})
diff --git
a/worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/JavaReadCppWriteTestWithNONE.scala
b/cpp/celeborn/protocol/CompressionCodec.cpp
similarity index 56%
copy from
worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/JavaReadCppWriteTestWithNONE.scala
copy to cpp/celeborn/protocol/CompressionCodec.cpp
index 4aa5b0ed7..93f90ebd1 100644
---
a/worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/JavaReadCppWriteTestWithNONE.scala
+++ b/cpp/celeborn/protocol/CompressionCodec.cpp
@@ -15,13 +15,31 @@
* limitations under the License.
*/
-package org.apache.celeborn.service.deploy.cluster
+#include "celeborn/protocol/CompressionCodec.h"
-import org.apache.celeborn.common.protocol.CompressionCodec
-
-object JavaReadCppWriteTestWithNONE extends JavaReadCppWriteTestBase {
+namespace celeborn {
+namespace protocol {
+CompressionCodec toCompressionCodec(std::string_view code) {
+ if (code == "LZ4") {
+ return CompressionCodec::LZ4;
+ }
+ if (code == "ZSTD") {
+ return CompressionCodec::ZSTD;
+ }
+ return CompressionCodec::NONE;
+}
- def main(args: Array[String]) = {
- testJavaReadCppWrite(CompressionCodec.NONE)
+std::string_view toString(CompressionCodec codec) {
+ switch (codec) {
+ case CompressionCodec::LZ4:
+ return "LZ4";
+ case CompressionCodec::ZSTD:
+ return "ZSTD";
+ case CompressionCodec::NONE:
+ return "NONE";
+ default:
+ return "UNKNOWN";
}
}
+} // namespace protocol
+} // namespace celeborn
diff --git
a/worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/JavaReadCppWriteTestWithNONE.scala
b/cpp/celeborn/protocol/CompressionCodec.h
similarity index 73%
copy from
worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/JavaReadCppWriteTestWithNONE.scala
copy to cpp/celeborn/protocol/CompressionCodec.h
index 4aa5b0ed7..785167c1f 100644
---
a/worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/JavaReadCppWriteTestWithNONE.scala
+++ b/cpp/celeborn/protocol/CompressionCodec.h
@@ -15,13 +15,19 @@
* limitations under the License.
*/
-package org.apache.celeborn.service.deploy.cluster
+#pragma once
+#include <string_view>
-import org.apache.celeborn.common.protocol.CompressionCodec
+namespace celeborn {
+namespace protocol {
+enum class CompressionCodec {
+ LZ4,
+ ZSTD,
+ NONE,
+};
-object JavaReadCppWriteTestWithNONE extends JavaReadCppWriteTestBase {
+CompressionCodec toCompressionCodec(std::string_view code);
- def main(args: Array[String]) = {
- testJavaReadCppWrite(CompressionCodec.NONE)
- }
-}
+std::string_view toString(CompressionCodec codec);
+} // namespace protocol
+} // namespace celeborn
diff --git a/cpp/celeborn/tests/DataSumWithReaderClient.cpp
b/cpp/celeborn/tests/DataSumWithReaderClient.cpp
index d60166e92..8c060fa54 100644
--- a/cpp/celeborn/tests/DataSumWithReaderClient.cpp
+++ b/cpp/celeborn/tests/DataSumWithReaderClient.cpp
@@ -23,7 +23,7 @@
int main(int argc, char** argv) {
// Read the configs.
- assert(argc == 8);
+ assert(argc == 9);
std::string lifecycleManagerHost = argv[1];
int lifecycleManagerPort = std::atoi(argv[2]);
std::string appUniqueId = argv[3];
@@ -31,15 +31,19 @@ int main(int argc, char** argv) {
int attemptId = std::atoi(argv[5]);
int numPartitions = std::atoi(argv[6]);
std::string resultFile = argv[7];
+ std::string compressCodec = argv[8];
std::cout << "lifecycleManagerHost = " << lifecycleManagerHost
<< ", lifecycleManagerPort = " << lifecycleManagerPort
<< ", appUniqueId = " << appUniqueId
<< ", shuffleId = " << shuffleId << ", attemptId = " << attemptId
<< ", numPartitions = " << numPartitions
- << ", resultFile = " << resultFile << std::endl;
+ << ", resultFile = " << resultFile << std::endl
+ << ", compressCodec = " << compressCodec << std::endl;
// Create shuffleClient and setup.
auto conf = std::make_shared<celeborn::conf::CelebornConf>();
+ conf->registerProperty(
+ celeborn::conf::CelebornConf::kShuffleCompressionCodec, compressCodec);
auto clientFactory =
std::make_shared<celeborn::network::TransportClientFactory>(conf);
auto shuffleClient = std::make_unique<celeborn::client::ShuffleClientImpl>(
@@ -63,6 +67,9 @@ int main(int argc, char** argv) {
dataCnt++;
continue;
}
+ if (c == '+') {
+ continue;
+ }
assert(c >= '0' && c <= '9');
data *= 10;
data += c - '0';
diff --git a/cpp/cmake/FindLZ4.cmake b/cpp/cmake/FindLZ4.cmake
new file mode 100644
index 000000000..918568b55
--- /dev/null
+++ b/cpp/cmake/FindLZ4.cmake
@@ -0,0 +1,41 @@
+# Copyright (c) Meta Platforms, Inc. and affiliates.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Finds liblz4.
+#
+# This module defines:
+# LZ4_FOUND
+# LZ4_INCLUDE_DIR
+# LZ4_LIBRARY
+#
+
+find_path(LZ4_INCLUDE_DIR NAMES lz4.h)
+
+find_library(LZ4_LIBRARY_DEBUG NAMES lz4d)
+find_library(LZ4_LIBRARY_RELEASE NAMES lz4)
+
+include(SelectLibraryConfigurations)
+SELECT_LIBRARY_CONFIGURATIONS(LZ4)
+
+include(FindPackageHandleStandardArgs)
+FIND_PACKAGE_HANDLE_STANDARD_ARGS(
+ LZ4 DEFAULT_MSG
+ LZ4_LIBRARY LZ4_INCLUDE_DIR
+)
+
+if (LZ4_FOUND)
+ message(STATUS "Found LZ4: ${LZ4_LIBRARY}")
+endif()
+
+mark_as_advanced(LZ4_INCLUDE_DIR LZ4_LIBRARY)
diff --git a/cpp/scripts/setup-ubuntu.sh b/cpp/scripts/setup-ubuntu.sh
index fe137a180..e26ffa4f3 100755
--- a/cpp/scripts/setup-ubuntu.sh
+++ b/cpp/scripts/setup-ubuntu.sh
@@ -126,6 +126,8 @@ function install_celeborn_cpp_deps_from_apt {
libgmock-dev \
libevent-dev \
libsodium-dev \
+ libxxhash-dev \
+ liblz4-dev \
libzstd-dev \
libre2-dev
}
diff --git
a/worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/JavaReadCppWriteTestBase.scala
b/worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/JavaWriteCppReadTestBase.scala
similarity index 89%
rename from
worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/JavaReadCppWriteTestBase.scala
rename to
worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/JavaWriteCppReadTestBase.scala
index a57ffebe8..a124cc590 100644
---
a/worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/JavaReadCppWriteTestBase.scala
+++
b/worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/JavaWriteCppReadTestBase.scala
@@ -35,7 +35,7 @@ import org.apache.celeborn.common.protocol.CompressionCodec
import org.apache.celeborn.common.util.Utils.runCommand
import org.apache.celeborn.service.deploy.MiniClusterFeature
-trait JavaReadCppWriteTestBase extends AnyFunSuite
+trait JavaWriteCppReadTestBase extends AnyFunSuite
with Logging with MiniClusterFeature with BeforeAndAfterAll {
var masterPort = 0
@@ -51,16 +51,16 @@ trait JavaReadCppWriteTestBase extends AnyFunSuite
shutdownMiniCluster()
}
- def testJavaReadCppWrite(codec: CompressionCodec): Unit = {
+ def testJavaWriteCppRead(codec: CompressionCodec): Unit = {
beforeAll()
try {
- runJavaReadCppWrite(codec)
+ runJavaWriteCppRead(codec)
} finally {
afterAll()
}
}
- def runJavaReadCppWrite(codec: CompressionCodec): Unit = {
+ def runJavaWriteCppRead(codec: CompressionCodec): Unit = {
val appUniqueId = "test-app"
val shuffleId = 0
val attemptId = 0
@@ -87,13 +87,19 @@ trait JavaReadCppWriteTestBase extends AnyFunSuite
val numData = 1000
var sums = new util.ArrayList[Long](numPartitions)
val rand = new Random()
+ var prefix = "-"
+ if (codec != CompressionCodec.NONE) {
+ // Add duplicate strings to make the compressed length shorter than the
original length.
+ // Will be dropped in cpp test client.
+ prefix = prefix ++ "++++++++++"
+ }
for (mapId <- 0 until numMappers) {
for (partitionId <- 0 until numPartitions) {
sums.add(0)
for (i <- 0 until numData) {
val data = rand.nextInt(maxData)
sums.set(partitionId, sums.get(partitionId) + data)
- val dataStr = "-" + data.toString
+ val dataStr = prefix + data.toString
shuffleClient.pushOrMergeData(
shuffleId,
mapId,
@@ -105,7 +111,7 @@ trait JavaReadCppWriteTestBase extends AnyFunSuite
numMappers,
numPartitions,
false,
- true)
+ false)
}
}
shuffleClient.pushMergedData(shuffleId, mapId, attemptId)
@@ -120,9 +126,10 @@ trait JavaReadCppWriteTestBase extends AnyFunSuite
val cppBinRelativeDirectory = "cpp/build/celeborn/tests/"
val cppBinFileName = "cppDataSumWithReaderClient"
val cppBinFilePath =
s"$projectDirectory/$cppBinRelativeDirectory/$cppBinFileName"
- // Execution command: $exec lifecycleManagerHost lifecycleManagerPort
appUniqueId shuffleId attemptId numPartitions cppResultFile
+ val cppCodec = codec.name()
+ // Execution command: $exec lifecycleManagerHost lifecycleManagerPort
appUniqueId shuffleId attemptId numPartitions cppResultFile cppCodec
val command = {
- s"$cppBinFilePath $lifecycleManagerHost $lifecycleManagerPort
$appUniqueId $shuffleId $attemptId $numPartitions $cppResultFile"
+ s"$cppBinFilePath $lifecycleManagerHost $lifecycleManagerPort
$appUniqueId $shuffleId $attemptId $numPartitions $cppResultFile $cppCodec"
}
println(s"run command: $command")
val commandOutput = runCommand(command)
diff --git
a/worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/JavaReadCppWriteTestWithNONE.scala
b/worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/JavaWriteCppReadTestWithLZ4.scala
similarity index 88%
copy from
worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/JavaReadCppWriteTestWithNONE.scala
copy to
worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/JavaWriteCppReadTestWithLZ4.scala
index 4aa5b0ed7..bc1961384 100644
---
a/worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/JavaReadCppWriteTestWithNONE.scala
+++
b/worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/JavaWriteCppReadTestWithLZ4.scala
@@ -19,9 +19,9 @@ package org.apache.celeborn.service.deploy.cluster
import org.apache.celeborn.common.protocol.CompressionCodec
-object JavaReadCppWriteTestWithNONE extends JavaReadCppWriteTestBase {
+object JavaWriteCppReadTestWithLZ4 extends JavaWriteCppReadTestBase {
def main(args: Array[String]) = {
- testJavaReadCppWrite(CompressionCodec.NONE)
+ testJavaWriteCppRead(CompressionCodec.LZ4)
}
}
diff --git
a/worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/JavaReadCppWriteTestWithNONE.scala
b/worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/JavaWriteCppReadTestWithNONE.scala
similarity index 89%
rename from
worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/JavaReadCppWriteTestWithNONE.scala
rename to
worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/JavaWriteCppReadTestWithNONE.scala
index 4aa5b0ed7..a649f8350 100644
---
a/worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/JavaReadCppWriteTestWithNONE.scala
+++
b/worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/JavaWriteCppReadTestWithNONE.scala
@@ -19,9 +19,9 @@ package org.apache.celeborn.service.deploy.cluster
import org.apache.celeborn.common.protocol.CompressionCodec
-object JavaReadCppWriteTestWithNONE extends JavaReadCppWriteTestBase {
+object JavaWriteCppReadTestWithNONE extends JavaWriteCppReadTestBase {
def main(args: Array[String]) = {
- testJavaReadCppWrite(CompressionCodec.NONE)
+ testJavaWriteCppRead(CompressionCodec.NONE)
}
}