marin-ma commented on code in PR #9278:
URL: https://github.com/apache/incubator-gluten/pull/9278#discussion_r2048989486


##########
cpp/core/shuffle/Utils.h:
##########
@@ -105,4 +108,396 @@ class MmapFileStream : public arrow::io::InputStream {
   int64_t posRetain_ = 0;
 };
 
+// Adopted from arrow::io::CompressedOutputStream. Rebuild compressor after 
each `Flush()`.
+class ShuffleCompressedOutputStream : public arrow::io::OutputStream {
+ public:
+  /// \brief Create a compressed output stream wrapping the given output 
stream.
+  static arrow::Result<std::shared_ptr<ShuffleCompressedOutputStream>>
+  Make(arrow::util::Codec* codec, const std::shared_ptr<OutputStream>& raw, 
arrow::MemoryPool* pool) {
+    auto res = std::shared_ptr<ShuffleCompressedOutputStream>(new 
ShuffleCompressedOutputStream(codec, raw, pool));
+    RETURN_NOT_OK(res->Init(codec));
+    return res;
+  }
+
+  arrow::Result<int64_t> Tell() const override {
+    return totalPos_;
+  }
+
+  arrow::Status Write(const void* data, int64_t nbytes) override {
+    ARROW_RETURN_IF(!isOpen_, arrow::Status::Invalid("Stream is closed"));
+
+    if (nbytes == 0) {
+      return arrow::Status::OK();
+    }
+
+    freshCompressor_ = false;
+
+    int64_t flushTime = 0;
+    {
+      ScopedTimer timer(&compressTime_);
+      auto input = static_cast<const uint8_t*>(data);
+      while (nbytes > 0) {
+        int64_t input_len = nbytes;
+        int64_t output_len = compressed_->size() - compressedPos_;
+        uint8_t* output = compressed_->mutable_data() + compressedPos_;
+        ARROW_ASSIGN_OR_RAISE(auto result, compressor_->Compress(input_len, 
input, output_len, output));
+        compressedPos_ += result.bytes_written;
+
+        if (result.bytes_read == 0) {
+          // Not enough output, try to flush it and retry
+          if (compressedPos_ > 0) {
+            RETURN_NOT_OK(FlushCompressed(flushTime));
+            output_len = compressed_->size() - compressedPos_;
+            output = compressed_->mutable_data() + compressedPos_;
+            ARROW_ASSIGN_OR_RAISE(result, compressor_->Compress(input_len, 
input, output_len, output));
+            compressedPos_ += result.bytes_written;
+          }
+        }
+        input += result.bytes_read;
+        nbytes -= result.bytes_read;
+        totalPos_ += result.bytes_read;
+        if (compressedPos_ == compressed_->size()) {
+          // Output buffer full, flush it
+          RETURN_NOT_OK(FlushCompressed(flushTime));
+        }
+        if (result.bytes_read == 0) {
+          // Need to enlarge output buffer
+          RETURN_NOT_OK(compressed_->Resize(compressed_->size() * 2));
+        }
+      }
+    }
+    compressTime_ -= flushTime;
+    flushTime_ += flushTime;
+    return arrow::Status::OK();
+  }
+
+  arrow::Status Flush() override {
+    ARROW_RETURN_IF(!isOpen_, arrow::Status::Invalid("Stream is closed"));
+
+    if (freshCompressor_) {
+      // No data written, no need to flush
+      return arrow::Status::OK();
+    }
+
+    RETURN_NOT_OK(FinalizeCompression());
+    ARROW_ASSIGN_OR_RAISE(compressor_, codec_->MakeCompressor());
+    freshCompressor_ = true;
+    return arrow::Status::OK();
+  }
+
+  arrow::Status Close() override {
+    if (isOpen_) {
+      isOpen_ = false;
+      if (!freshCompressor_) {
+        RETURN_NOT_OK(FinalizeCompression());
+      }
+      // Do not close the underlying stream, it is the caller's responsibility.
+    }
+    return arrow::Status::OK();
+  }
+
+  arrow::Status Abort() override {
+    if (isOpen_) {
+      isOpen_ = false;
+      return raw_->Abort();
+    }
+    return arrow::Status::OK();
+  }
+
+  bool closed() const override {
+    return !isOpen_;
+  }
+
+  int64_t compressTime() const {
+    return compressTime_;
+  }
+
+  int64_t flushTime() const {
+    return flushTime_;
+  }
+
+ private:
+  ARROW_DISALLOW_COPY_AND_ASSIGN(ShuffleCompressedOutputStream);
+
+  ShuffleCompressedOutputStream(
+      arrow::util::Codec* codec,
+      const std::shared_ptr<OutputStream>& raw,
+      arrow::MemoryPool* pool)
+      : codec_(codec), raw_(raw), pool_(pool) {}
+
+  arrow::Status Init(arrow::util::Codec* codec) {
+    ARROW_ASSIGN_OR_RAISE(compressor_, codec->MakeCompressor());
+    ARROW_ASSIGN_OR_RAISE(compressed_, AllocateResizableBuffer(kChunkSize, 
pool_));
+    compressedPos_ = 0;
+    isOpen_ = true;
+    return arrow::Status::OK();
+  }
+
+  arrow::Status FlushCompressed(int64_t& flushTime) {
+    if (compressedPos_ > 0) {
+      ScopedTimer timer(&flushTime);
+      RETURN_NOT_OK(raw_->Write(compressed_->data(), compressedPos_));
+      compressedPos_ = 0;
+    }
+    return arrow::Status::OK();
+  }
+
+  arrow::Status FinalizeCompression() {
+    int64_t flushTime = 0;
+    {
+      ScopedTimer timer(&compressTime_);
+      while (true) {
+        // Try to end compressor
+        int64_t output_len = compressed_->size() - compressedPos_;
+        uint8_t* output = compressed_->mutable_data() + compressedPos_;
+        ARROW_ASSIGN_OR_RAISE(auto result, compressor_->End(output_len, 
output));
+        compressedPos_ += result.bytes_written;
+
+        // Flush compressed output
+        RETURN_NOT_OK(FlushCompressed(flushTime));
+
+        if (result.should_retry) {
+          // Need to enlarge output buffer
+          RETURN_NOT_OK(compressed_->Resize(compressed_->size() * 2));
+        } else {
+          // Done
+          break;
+        }
+      }
+    }
+    compressTime_ -= flushTime;
+    flushTime_ += flushTime;
+    return arrow::Status::OK();
+  }
+
+  // TODO: Support setting chunk size
+  // Write 64 KB compressed data at a time
+  static const int64_t kChunkSize = 64 * 1024;

Review Comment:
   We should respect `spark.io.compression.lz4.blockSize` and 
`spark.io.compression.zstd.bufferSize` when creating this buffer. Both have a 
default value of 32k. I will create a followup pr to do some refactoring and 
make this configurable.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to