zhouyuan commented on code in PR #9278:
URL: https://github.com/apache/incubator-gluten/pull/9278#discussion_r2048535521
##########
cpp/core/shuffle/Payload.cc:
##########
@@ -462,6 +472,10 @@ int64_t InMemoryPayload::rawSize() {
return getBufferSize(buffers_);
}
+bool InMemoryPayload::mergable() const {
Review Comment:
nice refactor! typo `mergeable` ?
##########
cpp/core/shuffle/Utils.h:
##########
@@ -105,4 +108,396 @@ class MmapFileStream : public arrow::io::InputStream {
int64_t posRetain_ = 0;
};
+// Adopted from arrow::io::CompressedOutputStream. Rebuild compressor after
each `Flush()`.
+class ShuffleCompressedOutputStream : public arrow::io::OutputStream {
+ public:
+ /// \brief Create a compressed output stream wrapping the given output
stream.
+ static arrow::Result<std::shared_ptr<ShuffleCompressedOutputStream>>
+ Make(arrow::util::Codec* codec, const std::shared_ptr<OutputStream>& raw,
arrow::MemoryPool* pool) {
+ auto res = std::shared_ptr<ShuffleCompressedOutputStream>(new
ShuffleCompressedOutputStream(codec, raw, pool));
+ RETURN_NOT_OK(res->Init(codec));
+ return res;
+ }
+
+ arrow::Result<int64_t> Tell() const override {
+ return totalPos_;
+ }
+
+ arrow::Status Write(const void* data, int64_t nbytes) override {
+ ARROW_RETURN_IF(!isOpen_, arrow::Status::Invalid("Stream is closed"));
+
+ if (nbytes == 0) {
+ return arrow::Status::OK();
+ }
+
+ freshCompressor_ = false;
+
+ int64_t flushTime = 0;
+ {
+ ScopedTimer timer(&compressTime_);
+ auto input = static_cast<const uint8_t*>(data);
+ while (nbytes > 0) {
+ int64_t input_len = nbytes;
+ int64_t output_len = compressed_->size() - compressedPos_;
+ uint8_t* output = compressed_->mutable_data() + compressedPos_;
+ ARROW_ASSIGN_OR_RAISE(auto result, compressor_->Compress(input_len,
input, output_len, output));
+ compressedPos_ += result.bytes_written;
+
+ if (result.bytes_read == 0) {
+ // Not enough output, try to flush it and retry
+ if (compressedPos_ > 0) {
+ RETURN_NOT_OK(FlushCompressed(flushTime));
+ output_len = compressed_->size() - compressedPos_;
+ output = compressed_->mutable_data() + compressedPos_;
+ ARROW_ASSIGN_OR_RAISE(result, compressor_->Compress(input_len,
input, output_len, output));
+ compressedPos_ += result.bytes_written;
+ }
+ }
+ input += result.bytes_read;
+ nbytes -= result.bytes_read;
+ totalPos_ += result.bytes_read;
+ if (compressedPos_ == compressed_->size()) {
+ // Output buffer full, flush it
+ RETURN_NOT_OK(FlushCompressed(flushTime));
+ }
+ if (result.bytes_read == 0) {
+ // Need to enlarge output buffer
+ RETURN_NOT_OK(compressed_->Resize(compressed_->size() * 2));
+ }
+ }
+ }
+ compressTime_ -= flushTime;
+ flushTime_ += flushTime;
+ return arrow::Status::OK();
+ }
+
+ arrow::Status Flush() override {
+ ARROW_RETURN_IF(!isOpen_, arrow::Status::Invalid("Stream is closed"));
+
+ if (freshCompressor_) {
+ // No data written, no need to flush
+ return arrow::Status::OK();
+ }
+
+ RETURN_NOT_OK(FinalizeCompression());
+ ARROW_ASSIGN_OR_RAISE(compressor_, codec_->MakeCompressor());
+ freshCompressor_ = true;
+ return arrow::Status::OK();
+ }
+
+ arrow::Status Close() override {
+ if (isOpen_) {
+ isOpen_ = false;
+ if (!freshCompressor_) {
+ RETURN_NOT_OK(FinalizeCompression());
+ }
+ // Do not close the underlying stream, it is the caller's responsibility.
+ }
+ return arrow::Status::OK();
+ }
+
+ arrow::Status Abort() override {
+ if (isOpen_) {
+ isOpen_ = false;
+ return raw_->Abort();
+ }
+ return arrow::Status::OK();
+ }
+
+ bool closed() const override {
+ return !isOpen_;
+ }
+
+ int64_t compressTime() const {
+ return compressTime_;
+ }
+
+ int64_t flushTime() const {
+ return flushTime_;
+ }
+
+ private:
+ ARROW_DISALLOW_COPY_AND_ASSIGN(ShuffleCompressedOutputStream);
+
+ ShuffleCompressedOutputStream(
+ arrow::util::Codec* codec,
+ const std::shared_ptr<OutputStream>& raw,
+ arrow::MemoryPool* pool)
+ : codec_(codec), raw_(raw), pool_(pool) {}
+
+ arrow::Status Init(arrow::util::Codec* codec) {
+ ARROW_ASSIGN_OR_RAISE(compressor_, codec->MakeCompressor());
+ ARROW_ASSIGN_OR_RAISE(compressed_, AllocateResizableBuffer(kChunkSize,
pool_));
+ compressedPos_ = 0;
+ isOpen_ = true;
+ return arrow::Status::OK();
+ }
+
+ arrow::Status FlushCompressed(int64_t& flushTime) {
+ if (compressedPos_ > 0) {
+ ScopedTimer timer(&flushTime);
+ RETURN_NOT_OK(raw_->Write(compressed_->data(), compressedPos_));
+ compressedPos_ = 0;
+ }
+ return arrow::Status::OK();
+ }
+
+ arrow::Status FinalizeCompression() {
+ int64_t flushTime = 0;
+ {
+ ScopedTimer timer(&compressTime_);
+ while (true) {
+ // Try to end compressor
+ int64_t output_len = compressed_->size() - compressedPos_;
+ uint8_t* output = compressed_->mutable_data() + compressedPos_;
+ ARROW_ASSIGN_OR_RAISE(auto result, compressor_->End(output_len,
output));
+ compressedPos_ += result.bytes_written;
+
+ // Flush compressed output
+ RETURN_NOT_OK(FlushCompressed(flushTime));
+
+ if (result.should_retry) {
+ // Need to enlarge output buffer
+ RETURN_NOT_OK(compressed_->Resize(compressed_->size() * 2));
+ } else {
+ // Done
+ break;
+ }
+ }
+ }
+ compressTime_ -= flushTime;
+ flushTime_ += flushTime;
+ return arrow::Status::OK();
+ }
+
+ // TODO: Support setting chunk size
+ // Write 64 KB compressed data at a time
+ static const int64_t kChunkSize = 64 * 1024;
Review Comment:
comparing with the original method, will this bring higher memory footprint?
##########
cpp/core/shuffle/Utils.h:
##########
@@ -105,4 +108,396 @@ class MmapFileStream : public arrow::io::InputStream {
int64_t posRetain_ = 0;
};
+// Adopted from arrow::io::CompressedOutputStream. Rebuild compressor after
each `Flush()`.
+class ShuffleCompressedOutputStream : public arrow::io::OutputStream {
+ public:
+ /// \brief Create a compressed output stream wrapping the given output
stream.
+ static arrow::Result<std::shared_ptr<ShuffleCompressedOutputStream>>
+ Make(arrow::util::Codec* codec, const std::shared_ptr<OutputStream>& raw,
arrow::MemoryPool* pool) {
+ auto res = std::shared_ptr<ShuffleCompressedOutputStream>(new
ShuffleCompressedOutputStream(codec, raw, pool));
+ RETURN_NOT_OK(res->Init(codec));
+ return res;
+ }
+
+ arrow::Result<int64_t> Tell() const override {
+ return totalPos_;
+ }
+
+ arrow::Status Write(const void* data, int64_t nbytes) override {
+ ARROW_RETURN_IF(!isOpen_, arrow::Status::Invalid("Stream is closed"));
+
+ if (nbytes == 0) {
+ return arrow::Status::OK();
+ }
+
+ freshCompressor_ = false;
+
+ int64_t flushTime = 0;
+ {
+ ScopedTimer timer(&compressTime_);
+ auto input = static_cast<const uint8_t*>(data);
+ while (nbytes > 0) {
+ int64_t input_len = nbytes;
+ int64_t output_len = compressed_->size() - compressedPos_;
+ uint8_t* output = compressed_->mutable_data() + compressedPos_;
+ ARROW_ASSIGN_OR_RAISE(auto result, compressor_->Compress(input_len,
input, output_len, output));
+ compressedPos_ += result.bytes_written;
+
+ if (result.bytes_read == 0) {
+ // Not enough output, try to flush it and retry
+ if (compressedPos_ > 0) {
+ RETURN_NOT_OK(FlushCompressed(flushTime));
+ output_len = compressed_->size() - compressedPos_;
+ output = compressed_->mutable_data() + compressedPos_;
+ ARROW_ASSIGN_OR_RAISE(result, compressor_->Compress(input_len,
input, output_len, output));
+ compressedPos_ += result.bytes_written;
+ }
+ }
+ input += result.bytes_read;
+ nbytes -= result.bytes_read;
+ totalPos_ += result.bytes_read;
+ if (compressedPos_ == compressed_->size()) {
+ // Output buffer full, flush it
+ RETURN_NOT_OK(FlushCompressed(flushTime));
+ }
+ if (result.bytes_read == 0) {
+ // Need to enlarge output buffer
+ RETURN_NOT_OK(compressed_->Resize(compressed_->size() * 2));
+ }
+ }
+ }
+ compressTime_ -= flushTime;
+ flushTime_ += flushTime;
+ return arrow::Status::OK();
+ }
+
+ arrow::Status Flush() override {
+ ARROW_RETURN_IF(!isOpen_, arrow::Status::Invalid("Stream is closed"));
+
+ if (freshCompressor_) {
+ // No data written, no need to flush
+ return arrow::Status::OK();
+ }
+
+ RETURN_NOT_OK(FinalizeCompression());
+ ARROW_ASSIGN_OR_RAISE(compressor_, codec_->MakeCompressor());
Review Comment:
Can we move this compressor out of the shufflestream and reuse it?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]