afterincomparableyum commented on code in PR #3575:
URL: https://github.com/apache/celeborn/pull/3575#discussion_r2750572424
##########
cpp/celeborn/client/ShuffleClient.cpp:
##########
@@ -154,23 +160,37 @@ int ShuffleClientImpl::pushData(
auto pushState = getPushState(mapKey);
const int nextBatchId = pushState->nextBatchId();
- // TODO: compression in writing is not supported.
+ // Compression support: compress data if compression is enabled
+ const uint8_t* dataToWrite = data + offset;
+ size_t lengthToWrite = length;
+ std::unique_ptr<uint8_t[]> compressedBuffer;
+
+ if (shuffleCompressionEnabled_ && compressor_) {
+ // Allocate buffer for compressed data
+ const size_t compressedCapacity = compressor_->getDstCapacity(length);
+ compressedBuffer = std::make_unique<uint8_t[]>(compressedCapacity);
+
+ // Compress the data
+ lengthToWrite = compressor_->compress(
+ dataToWrite, 0, length, compressedBuffer.get(), 0);
+ dataToWrite = compressedBuffer.get();
+ }
auto writeBuffer =
- memory::ByteBuffer::createWriteOnly(kBatchHeaderSize + length);
+ memory::ByteBuffer::createWriteOnly(kBatchHeaderSize + lengthToWrite);
// TODO: the java side uses Platform to write the data. We simply assume
// littleEndian here.
writeBuffer->writeLE<int>(mapId);
writeBuffer->writeLE<int>(attemptId);
writeBuffer->writeLE<int>(nextBatchId);
- writeBuffer->writeLE<int>(length);
- writeBuffer->writeFromBuffer(data, offset, length);
+ writeBuffer->writeLE<int>(lengthToWrite);
+ writeBuffer->writeFromBuffer(dataToWrite, 0, lengthToWrite);
Review Comment:
Constructor Initialization List Order:
appUniqueId_
conf_
clientFactory_
pushDataRetryPool_
shuffleCompressionEnabled_
compressorFactory_
The initialization order matches the declaration order perfectly.
The compression members (shuffleCompressionEnabled_ and compressorFactory_)
are declared after conf_ in the header (lines 271 and 274) in the latest commit
I added, and they're initialized after conf_ in the constructor. This is now
correct.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]