Copilot commented on code in PR #3575:
URL: https://github.com/apache/celeborn/pull/3575#discussion_r2731560232
##########
cpp/celeborn/client/ShuffleClient.cpp:
##########
@@ -154,23 +160,37 @@ int ShuffleClientImpl::pushData(
auto pushState = getPushState(mapKey);
const int nextBatchId = pushState->nextBatchId();
- // TODO: compression in writing is not supported.
+ // Compression support: compress data if compression is enabled
+ const uint8_t* dataToWrite = data + offset;
+ size_t lengthToWrite = length;
+ std::unique_ptr<uint8_t[]> compressedBuffer;
+
+ if (shuffleCompressionEnabled_ && compressor_) {
+ // Allocate buffer for compressed data
+ const size_t compressedCapacity = compressor_->getDstCapacity(length);
+ compressedBuffer = std::make_unique<uint8_t[]>(compressedCapacity);
+
+ // Compress the data
+ lengthToWrite = compressor_->compress(
+ dataToWrite, 0, length, compressedBuffer.get(), 0);
+ dataToWrite = compressedBuffer.get();
+ }
auto writeBuffer =
- memory::ByteBuffer::createWriteOnly(kBatchHeaderSize + length);
+ memory::ByteBuffer::createWriteOnly(kBatchHeaderSize + lengthToWrite);
// TODO: the java side uses Platform to write the data. We simply assume
// littleEndian here.
writeBuffer->writeLE<int>(mapId);
writeBuffer->writeLE<int>(attemptId);
writeBuffer->writeLE<int>(nextBatchId);
- writeBuffer->writeLE<int>(length);
- writeBuffer->writeFromBuffer(data, offset, length);
+ writeBuffer->writeLE<int>(lengthToWrite);
+ writeBuffer->writeFromBuffer(dataToWrite, 0, lengthToWrite);
Review Comment:
`length`/`offset` are `size_t`, but the compressor interface and on-wire
header use 32-bit ints (`compress(..., int srcLength, ...)` and
`writeLE<int>(lengthToWrite)`). Right now there are multiple implicit
narrowings (`size_t` -> `int`) which can overflow/truncate for large buffers
(and `lengthToWrite` can also be slightly larger than `length` due to
compression framing). Add an explicit bounds check and cast (or change the
local types to `int`/`int32_t` after validation) before calling
`getDstCapacity/compress` and before writing the length/batchBytesSize.
##########
cpp/celeborn/client/tests/CompressorFactoryTest.cpp:
##########
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <gtest/gtest.h>
+
+#include "celeborn/client/compress/Compressor.h"
+#include "celeborn/conf/CelebornConf.h"
+
+using namespace celeborn;
+using namespace celeborn::client;
+using namespace celeborn::conf;
+using namespace celeborn::protocol;
+
+TEST(CompressorFactoryTest, CreateLz4CompressorFromConf) {
+ CelebornConf conf;
+ conf.registerProperty(CelebornConf::kShuffleCompressionCodec, "LZ4");
+
+ auto compressor = compress::Compressor::createCompressor(conf);
+ ASSERT_NE(compressor, nullptr);
+
+ // Verify it's an LZ4 compressor
+ EXPECT_GT(compressor->getDstCapacity(100), 0);
+}
Review Comment:
This test doesn’t actually verify that the factory produced an LZ4
compressor: `getDstCapacity()` will be >0 for any codec. To make this test
meaningful, assert on codec-specific behavior (e.g., compress a small payload
and check the codec magic/header, or dynamic_cast to `Lz4Compressor` if RTTI is
available).
##########
cpp/celeborn/client/tests/CompressorFactoryTest.cpp:
##########
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <gtest/gtest.h>
+
+#include "celeborn/client/compress/Compressor.h"
+#include "celeborn/conf/CelebornConf.h"
+
+using namespace celeborn;
+using namespace celeborn::client;
+using namespace celeborn::conf;
+using namespace celeborn::protocol;
+
+TEST(CompressorFactoryTest, CreateLz4CompressorFromConf) {
+ CelebornConf conf;
+ conf.registerProperty(CelebornConf::kShuffleCompressionCodec, "LZ4");
+
+ auto compressor = compress::Compressor::createCompressor(conf);
+ ASSERT_NE(compressor, nullptr);
+
+ // Verify it's an LZ4 compressor
+ EXPECT_GT(compressor->getDstCapacity(100), 0);
+}
+
+TEST(CompressorFactoryTest, CreateZstdCompressorFromConf) {
+ CelebornConf conf;
+ conf.registerProperty(CelebornConf::kShuffleCompressionCodec, "ZSTD");
+ conf.registerProperty(
+ CelebornConf::kShuffleCompressionZstdCompressLevel, "3");
+
+ auto compressor = compress::Compressor::createCompressor(conf);
+ ASSERT_NE(compressor, nullptr);
+
+ // Verify it's a ZSTD compressor
+ EXPECT_GT(compressor->getDstCapacity(100), 0);
+}
+
+TEST(CompressorFactoryTest, CompressionCodecNoneDisablesCompression) {
+ CelebornConf conf;
+ // Verify default is NONE
+ EXPECT_EQ(conf.shuffleCompressionCodec(), CompressionCodec::NONE);
+}
+
+TEST(CompressorFactoryTest, ZstdCompressionLevelFromConf) {
+ // Test that configuration correctly reads ZSTD compression levels
+ for (int level = -5; level <= 10; level++) {
+ CelebornConf conf;
+ conf.registerProperty(CelebornConf::kShuffleCompressionCodec, "ZSTD");
+ conf.registerProperty(
+ CelebornConf::kShuffleCompressionZstdCompressLevel,
+ std::to_string(level));
+
+ // Verify the compression level is set correctly
+ EXPECT_EQ(conf.shuffleCompressionZstdCompressLevel(), level);
+
+ // Verify the compressor is created correctly
+ auto compressor = compress::Compressor::createCompressor(conf);
+ ASSERT_NE(compressor, nullptr);
+ EXPECT_GT(compressor->getDstCapacity(100), 0);
+ }
+}
+
+TEST(CompressorFactoryTest, CompressWithOffsetLz4) {
+ CelebornConf conf;
+ conf.registerProperty(CelebornConf::kShuffleCompressionCodec, "LZ4");
+
+ auto compressor = compress::Compressor::createCompressor(conf);
+ ASSERT_NE(compressor, nullptr);
+
+ const std::string prefix = "SKIP_THIS_PREFIX";
+ const std::string testData =
+ "Celeborn compression offset test with structured data: "
+ "partition_0:shuffle_1:map_2:attempt_0:batch_3:data_block_4 "
+ "partition_0:shuffle_1:map_2:attempt_0:batch_3:data_block_4 "
+ "partition_0:shuffle_1:map_2:attempt_0:batch_3:data_block_4";
+ std::string fullData = prefix + testData;
+
+ const auto maxLength = compressor->getDstCapacity(testData.size());
+ std::vector<uint8_t> compressedData(maxLength);
+
+ // Compress with offset (simulating pushData usage pattern)
+ const size_t compressedSize = compressor->compress(
+ reinterpret_cast<const uint8_t*>(fullData.data()),
+ prefix.size(),
+ testData.size(),
+ compressedData.data(),
+ 0);
+
+ // Verify compression succeeded with offset
+ EXPECT_GT(compressedSize, 0);
+ EXPECT_LE(compressedSize, maxLength);
+}
+
+TEST(CompressorFactoryTest, CompressWithOffsetZstd) {
+ CelebornConf conf;
+ conf.registerProperty(CelebornConf::kShuffleCompressionCodec, "ZSTD");
+ conf.registerProperty(
+ CelebornConf::kShuffleCompressionZstdCompressLevel, "3");
+
+ auto compressor = compress::Compressor::createCompressor(conf);
+ ASSERT_NE(compressor, nullptr);
+
+ const std::string prefix = "SKIP_THIS_PREFIX";
+ const std::string testData =
+ "Celeborn compression offset test with structured data: "
+ "partition_0:shuffle_1:map_2:attempt_0:batch_3:data_block_4 "
+ "partition_0:shuffle_1:map_2:attempt_0:batch_3:data_block_4 "
+ "partition_0:shuffle_1:map_2:attempt_0:batch_3:data_block_4";
+ std::string fullData = prefix + testData;
+
+ const auto maxLength = compressor->getDstCapacity(testData.size());
+ std::vector<uint8_t> compressedData(maxLength);
+
+ // Compress with offset (simulating pushData usage pattern)
+ const size_t compressedSize = compressor->compress(
+ reinterpret_cast<const uint8_t*>(fullData.data()),
+ prefix.size(),
+ testData.size(),
+ compressedData.data(),
+ 0);
+
+ // Verify compression succeeded with offset
+ EXPECT_GT(compressedSize, 0);
+ EXPECT_LE(compressedSize, maxLength);
+}
Review Comment:
Same issue as the LZ4 offset test: this only asserts `compressedSize > 0`,
which won’t catch bugs where `srcOffset` is ignored. Decompress and compare the
result to `testData` to ensure the offset is respected.
##########
cpp/celeborn/client/tests/CompressorFactoryTest.cpp:
##########
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <gtest/gtest.h>
+
+#include "celeborn/client/compress/Compressor.h"
+#include "celeborn/conf/CelebornConf.h"
+
+using namespace celeborn;
+using namespace celeborn::client;
+using namespace celeborn::conf;
+using namespace celeborn::protocol;
+
+TEST(CompressorFactoryTest, CreateLz4CompressorFromConf) {
+ CelebornConf conf;
+ conf.registerProperty(CelebornConf::kShuffleCompressionCodec, "LZ4");
+
+ auto compressor = compress::Compressor::createCompressor(conf);
+ ASSERT_NE(compressor, nullptr);
+
+ // Verify it's an LZ4 compressor
+ EXPECT_GT(compressor->getDstCapacity(100), 0);
+}
+
+TEST(CompressorFactoryTest, CreateZstdCompressorFromConf) {
+ CelebornConf conf;
+ conf.registerProperty(CelebornConf::kShuffleCompressionCodec, "ZSTD");
+ conf.registerProperty(
+ CelebornConf::kShuffleCompressionZstdCompressLevel, "3");
+
+ auto compressor = compress::Compressor::createCompressor(conf);
+ ASSERT_NE(compressor, nullptr);
+
+ // Verify it's a ZSTD compressor
+ EXPECT_GT(compressor->getDstCapacity(100), 0);
+}
+
+TEST(CompressorFactoryTest, CompressionCodecNoneDisablesCompression) {
+ CelebornConf conf;
+ // Verify default is NONE
+ EXPECT_EQ(conf.shuffleCompressionCodec(), CompressionCodec::NONE);
+}
+
+TEST(CompressorFactoryTest, ZstdCompressionLevelFromConf) {
+ // Test that configuration correctly reads ZSTD compression levels
+ for (int level = -5; level <= 10; level++) {
+ CelebornConf conf;
+ conf.registerProperty(CelebornConf::kShuffleCompressionCodec, "ZSTD");
+ conf.registerProperty(
+ CelebornConf::kShuffleCompressionZstdCompressLevel,
+ std::to_string(level));
+
+ // Verify the compression level is set correctly
+ EXPECT_EQ(conf.shuffleCompressionZstdCompressLevel(), level);
+
+ // Verify the compressor is created correctly
+ auto compressor = compress::Compressor::createCompressor(conf);
+ ASSERT_NE(compressor, nullptr);
+ EXPECT_GT(compressor->getDstCapacity(100), 0);
+ }
+}
+
+TEST(CompressorFactoryTest, CompressWithOffsetLz4) {
+ CelebornConf conf;
+ conf.registerProperty(CelebornConf::kShuffleCompressionCodec, "LZ4");
+
+ auto compressor = compress::Compressor::createCompressor(conf);
+ ASSERT_NE(compressor, nullptr);
+
+ const std::string prefix = "SKIP_THIS_PREFIX";
+ const std::string testData =
+ "Celeborn compression offset test with structured data: "
+ "partition_0:shuffle_1:map_2:attempt_0:batch_3:data_block_4 "
+ "partition_0:shuffle_1:map_2:attempt_0:batch_3:data_block_4 "
+ "partition_0:shuffle_1:map_2:attempt_0:batch_3:data_block_4";
+ std::string fullData = prefix + testData;
+
+ const auto maxLength = compressor->getDstCapacity(testData.size());
+ std::vector<uint8_t> compressedData(maxLength);
+
+ // Compress with offset (simulating pushData usage pattern)
+ const size_t compressedSize = compressor->compress(
+ reinterpret_cast<const uint8_t*>(fullData.data()),
+ prefix.size(),
+ testData.size(),
+ compressedData.data(),
+ 0);
+
+ // Verify compression succeeded with offset
+ EXPECT_GT(compressedSize, 0);
+ EXPECT_LE(compressedSize, maxLength);
+}
Review Comment:
The “offset” coverage here is weak: the test only checks that compression
returns a non-zero size, which would still pass even if `srcOffset` were
ignored and the wrong bytes were compressed. To validate offset handling,
decompress the output and assert the decompressed content equals `testData`
(and does not include `prefix`).
##########
cpp/celeborn/client/ShuffleClient.h:
##########
@@ -249,6 +250,8 @@ class ShuffleClientImpl
static constexpr size_t kBatchHeaderSize = 4 * 4;
const std::string appUniqueId_;
+ const bool shuffleCompressionEnabled_;
+ std::unique_ptr<compress::Compressor> compressor_;
Review Comment:
`compressor_` is stored as a single shared instance on `ShuffleClientImpl`,
but `pushData()` appears designed to be callable concurrently (e.g.,
`PushState` uses atomics/concurrent maps). `Lz4Compressor` has mutable internal
state (`XXH32_state_t*`) and is not thread-safe, so concurrent
`compressor_->compress()` calls can corrupt output/checksums. Consider making
the compressor thread-safe (internal locking), using a per-call/per-thread
compressor instance, or switching LZ4 checksum computation to a stateless API.
```suggestion
// Factory for creating compressor instances on demand to avoid sharing a
// single non-thread-safe compressor across concurrent operations.
std::function<std::unique_ptr<compress::Compressor>()> compressorFactory_;
```
##########
cpp/celeborn/client/ShuffleClient.cpp:
##########
@@ -57,7 +57,13 @@ ShuffleClientImpl::ShuffleClientImpl(
: appUniqueId_(appUniqueId),
conf_(conf),
clientFactory_(clientEndpoint.clientFactory()),
- pushDataRetryPool_(clientEndpoint.pushDataRetryPool()) {
+ pushDataRetryPool_(clientEndpoint.pushDataRetryPool()),
+ shuffleCompressionEnabled_(
+ conf->shuffleCompressionCodec() != protocol::CompressionCodec::NONE),
+ compressor_(
+ shuffleCompressionEnabled_
+ ? compress::Compressor::createCompressor(*conf)
+ : nullptr) {
Review Comment:
Constructor initializer list order doesn’t match member declaration order in
`ShuffleClientImpl` (new `shuffleCompressionEnabled_`/`compressor_` are
declared before `conf_`, but `conf_` appears earlier in the init list). Because
C++ initializes members in declaration order, this can trigger `-Wreorder`
warnings (and potentially build failures under `-Werror`). Reorder the
initializer list to match the header’s member order (or reorder the member
declarations).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]