marin-ma commented on code in PR #9278:
URL: https://github.com/apache/incubator-gluten/pull/9278#discussion_r2048983286


##########
cpp/core/shuffle/Utils.h:
##########
@@ -105,4 +108,396 @@ class MmapFileStream : public arrow::io::InputStream {
   int64_t posRetain_ = 0;
 };
 
+// Adopted from arrow::io::CompressedOutputStream. Rebuild compressor after 
each `Flush()`.
+class ShuffleCompressedOutputStream : public arrow::io::OutputStream {
+ public:
+  /// \brief Create a compressed output stream wrapping the given output 
stream.
+  static arrow::Result<std::shared_ptr<ShuffleCompressedOutputStream>>
+  Make(arrow::util::Codec* codec, const std::shared_ptr<OutputStream>& raw, 
arrow::MemoryPool* pool) {
+    auto res = std::shared_ptr<ShuffleCompressedOutputStream>(new 
ShuffleCompressedOutputStream(codec, raw, pool));
+    RETURN_NOT_OK(res->Init(codec));
+    return res;
+  }
+
+  arrow::Result<int64_t> Tell() const override {
+    return totalPos_;
+  }
+
+  arrow::Status Write(const void* data, int64_t nbytes) override {
+    ARROW_RETURN_IF(!isOpen_, arrow::Status::Invalid("Stream is closed"));
+
+    if (nbytes == 0) {
+      return arrow::Status::OK();
+    }
+
+    freshCompressor_ = false;
+
+    int64_t flushTime = 0;
+    {
+      ScopedTimer timer(&compressTime_);
+      auto input = static_cast<const uint8_t*>(data);
+      while (nbytes > 0) {
+        int64_t input_len = nbytes;
+        int64_t output_len = compressed_->size() - compressedPos_;
+        uint8_t* output = compressed_->mutable_data() + compressedPos_;
+        ARROW_ASSIGN_OR_RAISE(auto result, compressor_->Compress(input_len, 
input, output_len, output));
+        compressedPos_ += result.bytes_written;
+
+        if (result.bytes_read == 0) {
+          // Not enough output, try to flush it and retry
+          if (compressedPos_ > 0) {
+            RETURN_NOT_OK(FlushCompressed(flushTime));
+            output_len = compressed_->size() - compressedPos_;
+            output = compressed_->mutable_data() + compressedPos_;
+            ARROW_ASSIGN_OR_RAISE(result, compressor_->Compress(input_len, 
input, output_len, output));
+            compressedPos_ += result.bytes_written;
+          }
+        }
+        input += result.bytes_read;
+        nbytes -= result.bytes_read;
+        totalPos_ += result.bytes_read;
+        if (compressedPos_ == compressed_->size()) {
+          // Output buffer full, flush it
+          RETURN_NOT_OK(FlushCompressed(flushTime));
+        }
+        if (result.bytes_read == 0) {
+          // Need to enlarge output buffer
+          RETURN_NOT_OK(compressed_->Resize(compressed_->size() * 2));
+        }
+      }
+    }
+    compressTime_ -= flushTime;
+    flushTime_ += flushTime;
+    return arrow::Status::OK();
+  }
+
+  arrow::Status Flush() override {
+    ARROW_RETURN_IF(!isOpen_, arrow::Status::Invalid("Stream is closed"));
+
+    if (freshCompressor_) {
+      // No data written, no need to flush
+      return arrow::Status::OK();
+    }
+
+    RETURN_NOT_OK(FinalizeCompression());
+    ARROW_ASSIGN_OR_RAISE(compressor_, codec_->MakeCompressor());

Review Comment:
   We cannot reuse the compressor outside of the stream. One output stream is 
held by one spiller, and the compressor is re-created each time the spiller 
receive a new partition to spill. Therefore the compressor can be ended and 
then recreated multiple times when writing one output stream.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to