This is an automated email from the ASF dual-hosted git repository.

apitrou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new aae2557e30 GH-39377: [C++] IO: Reuse same buffer in 
CompressedInputStream (#39807)
aae2557e30 is described below

commit aae2557e303601f89c4bb94ee669d9f2fb83b528
Author: mwish <[email protected]>
AuthorDate: Wed Mar 27 19:42:56 2024 +0800

    GH-39377: [C++] IO: Reuse same buffer in CompressedInputStream (#39807)
    
    
    
    ### Rationale for this change
    
    This patch reuses the same buffer in `CompressedInputStream`. It includes 
the `decompress_` and `compress_` buffer
    
    ### What changes are included in this PR?
    
    1. For `compress_`, allocate and reuse same buffer with `kChunkSize` 
(64KB), and reusing it
    2. For `decompress_`, reusing a same buffer (mostly 1MB) without continues 
`Reallocate`
    
    In the worst case, `decompress_` might hold a large buffer.
    
    ### Are these changes tested?
    
    Already
    
    ### Are there any user-facing changes?
    
    `CompressedInputStream` might has larger buffer
    
    * Closes: #39377
    
    Lead-authored-by: mwish <[email protected]>
    Co-authored-by: Antoine Pitrou <[email protected]>
    Co-authored-by: mwish <[email protected]>
    Signed-off-by: Antoine Pitrou <[email protected]>
---
 cpp/src/arrow/io/CMakeLists.txt          |   2 +
 cpp/src/arrow/io/compressed.cc           |  63 +++++++---
 cpp/src/arrow/io/compressed.h            |   6 +
 cpp/src/arrow/io/compressed_benchmark.cc | 200 +++++++++++++++++++++++++++++++
 4 files changed, 253 insertions(+), 18 deletions(-)

diff --git a/cpp/src/arrow/io/CMakeLists.txt b/cpp/src/arrow/io/CMakeLists.txt
index 041d511083..f7afbca558 100644
--- a/cpp/src/arrow/io/CMakeLists.txt
+++ b/cpp/src/arrow/io/CMakeLists.txt
@@ -43,5 +43,7 @@ if(NOT (${ARROW_SIMD_LEVEL} STREQUAL "NONE") AND NOT 
(${ARROW_SIMD_LEVEL} STREQU
   add_arrow_benchmark(memory_benchmark PREFIX "arrow-io")
 endif()
 
+add_arrow_benchmark(compressed_benchmark PREFIX "arrow-io")
+
 # Headers: top level
 arrow_install_all_headers("arrow/io")
diff --git a/cpp/src/arrow/io/compressed.cc b/cpp/src/arrow/io/compressed.cc
index 6c484242a4..d06101748d 100644
--- a/cpp/src/arrow/io/compressed.cc
+++ b/cpp/src/arrow/io/compressed.cc
@@ -201,7 +201,7 @@ Result<std::shared_ptr<CompressedOutputStream>> 
CompressedOutputStream::Make(
     util::Codec* codec, const std::shared_ptr<OutputStream>& raw, MemoryPool* 
pool) {
   // CAUTION: codec is not owned
   std::shared_ptr<CompressedOutputStream> res(new CompressedOutputStream);
-  res->impl_.reset(new Impl(pool, std::move(raw)));
+  res->impl_.reset(new Impl(pool, raw));
   RETURN_NOT_OK(res->impl_->Init(codec));
   return res;
 }
@@ -233,8 +233,10 @@ class CompressedInputStream::Impl {
       : pool_(pool),
         raw_(raw),
         is_open_(true),
+        supports_zero_copy_from_raw_(raw_->supports_zero_copy()),
         compressed_pos_(0),
         decompressed_pos_(0),
+        fresh_decompressor_(false),
         total_pos_(0) {}
 
   Status Init(Codec* codec) {
@@ -261,7 +263,7 @@ class CompressedInputStream::Impl {
     }
   }
 
-  bool closed() { return !is_open_; }
+  bool closed() const { return !is_open_; }
 
   Result<int64_t> Tell() const { return total_pos_; }
 
@@ -269,8 +271,27 @@ class CompressedInputStream::Impl {
   Status EnsureCompressedData() {
     int64_t compressed_avail = compressed_ ? compressed_->size() - 
compressed_pos_ : 0;
     if (compressed_avail == 0) {
-      // No compressed data available, read a full chunk
-      ARROW_ASSIGN_OR_RAISE(compressed_, raw_->Read(kChunkSize));
+      // Ensure compressed_ buffer is allocated with kChunkSize.
+      if (!supports_zero_copy_from_raw_) {
+        if (compressed_for_non_zero_copy_ == nullptr) {
+          ARROW_ASSIGN_OR_RAISE(compressed_for_non_zero_copy_,
+                                AllocateResizableBuffer(kChunkSize, pool_));
+        } else if (compressed_for_non_zero_copy_->size() != kChunkSize) {
+          RETURN_NOT_OK(
+              compressed_for_non_zero_copy_->Resize(kChunkSize, 
/*shrink_to_fit=*/false));
+        }
+        ARROW_ASSIGN_OR_RAISE(
+            int64_t read_size,
+            raw_->Read(kChunkSize,
+                       
compressed_for_non_zero_copy_->mutable_data_as<void>()));
+        if (read_size != compressed_for_non_zero_copy_->size()) {
+          RETURN_NOT_OK(
+              compressed_for_non_zero_copy_->Resize(read_size, 
/*shrink_to_fit=*/false));
+        }
+        compressed_ = compressed_for_non_zero_copy_;
+      } else {
+        ARROW_ASSIGN_OR_RAISE(compressed_, raw_->Read(kChunkSize));
+      }
       compressed_pos_ = 0;
     }
     return Status::OK();
@@ -284,8 +305,13 @@ class CompressedInputStream::Impl {
     int64_t decompress_size = kDecompressSize;
 
     while (true) {
-      ARROW_ASSIGN_OR_RAISE(decompressed_,
-                            AllocateResizableBuffer(decompress_size, pool_));
+      if (decompressed_ == nullptr) {
+        ARROW_ASSIGN_OR_RAISE(decompressed_,
+                              AllocateResizableBuffer(decompress_size, pool_));
+      } else {
+        // Shrinking the buffer if it's already large enough
+        RETURN_NOT_OK(decompressed_->Resize(decompress_size, 
/*shrink_to_fit=*/true));
+      }
       decompressed_pos_ = 0;
 
       int64_t input_len = compressed_->size() - compressed_pos_;
@@ -300,7 +326,9 @@ class CompressedInputStream::Impl {
         fresh_decompressor_ = false;
       }
       if (result.bytes_written > 0 || !result.need_more_output || input_len == 
0) {
-        RETURN_NOT_OK(decompressed_->Resize(result.bytes_written));
+        // Not calling shrink_to_fit here because we're likely to reusing the 
buffer.
+        RETURN_NOT_OK(
+            decompressed_->Resize(result.bytes_written, 
/*shrink_to_fit=*/false));
         break;
       }
       DCHECK_EQ(result.bytes_written, 0);
@@ -310,7 +338,7 @@ class CompressedInputStream::Impl {
     return Status::OK();
   }
 
-  // Read a given number of bytes from the decompressed_ buffer.
+  // Copying a given number of bytes from the decompressed_ buffer.
   int64_t ReadFromDecompressed(int64_t nbytes, uint8_t* out) {
     int64_t readable = decompressed_ ? (decompressed_->size() - 
decompressed_pos_) : 0;
     int64_t read_bytes = std::min(readable, nbytes);
@@ -318,11 +346,6 @@ class CompressedInputStream::Impl {
     if (read_bytes > 0) {
       memcpy(out, decompressed_->data() + decompressed_pos_, read_bytes);
       decompressed_pos_ += read_bytes;
-
-      if (decompressed_pos_ == decompressed_->size()) {
-        // Decompressed data is exhausted, release buffer
-        decompressed_.reset();
-      }
     }
 
     return read_bytes;
@@ -357,7 +380,7 @@ class CompressedInputStream::Impl {
   }
 
   Result<int64_t> Read(int64_t nbytes, void* out) {
-    auto out_data = reinterpret_cast<uint8_t*>(out);
+    auto* out_data = reinterpret_cast<uint8_t*>(out);
 
     int64_t total_read = 0;
     bool decompressor_has_data = true;
@@ -382,10 +405,10 @@ class CompressedInputStream::Impl {
     ARROW_ASSIGN_OR_RAISE(auto buf, AllocateResizableBuffer(nbytes, pool_));
     ARROW_ASSIGN_OR_RAISE(int64_t bytes_read, Read(nbytes, 
buf->mutable_data()));
     RETURN_NOT_OK(buf->Resize(bytes_read));
-    return std::move(buf);
+    return buf;
   }
 
-  std::shared_ptr<InputStream> raw() const { return raw_; }
+  const std::shared_ptr<InputStream>& raw() const { return raw_; }
 
  private:
   // Read 64 KB compressed data at a time
@@ -396,7 +419,12 @@ class CompressedInputStream::Impl {
   MemoryPool* pool_;
   std::shared_ptr<InputStream> raw_;
   bool is_open_;
+  const bool supports_zero_copy_from_raw_;
   std::shared_ptr<Decompressor> decompressor_;
+  // If `raw_->supports_zero_copy()`, this buffer would not allocate memory.
+  // Otherwise, this buffer would allocate `kChunkSize` memory and read data 
from
+  // `raw_`.
+  std::shared_ptr<ResizableBuffer> compressed_for_non_zero_copy_;
   std::shared_ptr<Buffer> compressed_;
   // Position in compressed buffer
   int64_t compressed_pos_;
@@ -413,10 +441,9 @@ Result<std::shared_ptr<CompressedInputStream>> 
CompressedInputStream::Make(
     Codec* codec, const std::shared_ptr<InputStream>& raw, MemoryPool* pool) {
   // CAUTION: codec is not owned
   std::shared_ptr<CompressedInputStream> res(new CompressedInputStream);
-  res->impl_.reset(new Impl(pool, std::move(raw)));
+  res->impl_.reset(new Impl(pool, raw));
   RETURN_NOT_OK(res->impl_->Init(codec));
   return res;
-  return Status::OK();
 }
 
 CompressedInputStream::~CompressedInputStream() { 
internal::CloseFromDestructor(this); }
diff --git a/cpp/src/arrow/io/compressed.h b/cpp/src/arrow/io/compressed.h
index cd1a7f673c..6b4e7ab4d7 100644
--- a/cpp/src/arrow/io/compressed.h
+++ b/cpp/src/arrow/io/compressed.h
@@ -44,6 +44,9 @@ class ARROW_EXPORT CompressedOutputStream : public 
OutputStream {
   ~CompressedOutputStream() override;
 
   /// \brief Create a compressed output stream wrapping the given output 
stream.
+  ///
+  /// The codec must be capable of streaming compression. Some codecs,
+  /// like Snappy, are not able to do so.
   static Result<std::shared_ptr<CompressedOutputStream>> Make(
       util::Codec* codec, const std::shared_ptr<OutputStream>& raw,
       MemoryPool* pool = default_memory_pool());
@@ -82,6 +85,9 @@ class ARROW_EXPORT CompressedInputStream
   ~CompressedInputStream() override;
 
   /// \brief Create a compressed input stream wrapping the given input stream.
+  ///
+  /// The codec must be capable of streaming decompression. Some codecs,
+  /// like Snappy, are not able to do so.
   static Result<std::shared_ptr<CompressedInputStream>> Make(
       util::Codec* codec, const std::shared_ptr<InputStream>& raw,
       MemoryPool* pool = default_memory_pool());
diff --git a/cpp/src/arrow/io/compressed_benchmark.cc 
b/cpp/src/arrow/io/compressed_benchmark.cc
new file mode 100644
index 0000000000..52a30d8cb0
--- /dev/null
+++ b/cpp/src/arrow/io/compressed_benchmark.cc
@@ -0,0 +1,200 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "benchmark/benchmark.h"
+
+#include <algorithm>
+#include <cstdint>
+#include <cstring>
+#include <memory>
+#include <random>
+#include <string>
+#include <vector>
+
+#include "arrow/buffer.h"
+#include "arrow/io/compressed.h"
+#include "arrow/io/memory.h"
+#include "arrow/result.h"
+#include "arrow/testing/gtest_util.h"
+#include "arrow/util/compression.h"
+#include "arrow/util/config.h"
+#include "arrow/util/logging.h"
+#include "arrow/util/macros.h"
+
+namespace arrow::io {
+
+using ::arrow::Compression;
+
+std::vector<uint8_t> MakeCompressibleData(int data_size) {
+  // XXX This isn't a real-world corpus so doesn't really represent the
+  // comparative qualities of the algorithms
+
+  // First make highly compressible data
+  std::string base_data =
+      "Apache Arrow is a cross-language development platform for in-memory 
data";
+  int nrepeats = static_cast<int>(1 + data_size / base_data.size());
+
+  std::vector<uint8_t> data(base_data.size() * nrepeats);
+  for (int i = 0; i < nrepeats; ++i) {
+    std::memcpy(data.data() + i * base_data.size(), base_data.data(), 
base_data.size());
+  }
+  data.resize(data_size);
+
+  // Then randomly mutate some bytes so as to make things harder
+  std::mt19937 engine(42);
+  std::exponential_distribution<> offsets(0.05);
+  std::uniform_int_distribution<> values(0, 255);
+
+  int64_t pos = 0;
+  while (pos < data_size) {
+    data[pos] = static_cast<uint8_t>(values(engine));
+    pos += static_cast<int64_t>(offsets(engine));
+  }
+
+  return data;
+}
+
+// Using a non-zero copy buffer reader to benchmark the non-zero copy path.
+class NonZeroCopyBufferReader final : public InputStream {
+ public:
+  NonZeroCopyBufferReader(std::shared_ptr<Buffer> buffer) : 
reader_(std::move(buffer)) {}
+
+  bool supports_zero_copy() const override { return false; }
+
+  Result<int64_t> Read(int64_t nbytes, void* out) override {
+    return reader_.Read(nbytes, out);
+  }
+
+  Result<std::shared_ptr<Buffer>> Read(int64_t nbytes) override {
+    // Testing the non-zero copy path like reading from local file or Object 
store,
+    // so we need to allocate a buffer and copy the data.
+    ARROW_ASSIGN_OR_RAISE(auto buf, ::arrow::AllocateResizableBuffer(nbytes));
+    ARROW_ASSIGN_OR_RAISE(int64_t size, Read(nbytes, buf->mutable_data()));
+    ARROW_RETURN_NOT_OK(buf->Resize(size));
+    return buf;
+  }
+  Status Close() override { return reader_.Close(); }
+  Result<int64_t> Tell() const override { return reader_.Tell(); }
+  bool closed() const override { return reader_.closed(); }
+
+ private:
+  ::arrow::io::BufferReader reader_;
+};
+
+enum class BufferReadMode { ProvidedByCaller, ReturnedByCallee };
+
+template <typename BufReader, BufferReadMode Mode>
+static void CompressedInputStreamBenchmark(::benchmark::State& state,
+                                           Compression::type compression) {
+  const int64_t input_size = state.range(0);
+  const int64_t batch_size = state.range(1);
+
+  const std::vector<uint8_t> data = 
MakeCompressibleData(static_cast<int>(input_size));
+  auto codec = ::arrow::util::Codec::Create(compression).ValueOrDie();
+  int64_t max_compress_len =
+      codec->MaxCompressedLen(static_cast<int64_t>(data.size()), data.data());
+  std::shared_ptr<::arrow::ResizableBuffer> buf =
+      ::arrow::AllocateResizableBuffer(max_compress_len).ValueOrDie();
+  const int64_t compressed_length =
+      codec
+          ->Compress(static_cast<int64_t>(data.size()), data.data(), 
max_compress_len,
+                     buf->mutable_data())
+          .ValueOrDie();
+  ABORT_NOT_OK(buf->Resize(compressed_length));
+  for (auto _ : state) {
+    state.PauseTiming();
+    auto reader = std::make_shared<BufReader>(buf);
+    [[maybe_unused]] std::unique_ptr<Buffer> read_buffer;
+    if constexpr (Mode == BufferReadMode::ProvidedByCaller) {
+      read_buffer = ::arrow::AllocateBuffer(batch_size).ValueOrDie();
+    }
+    state.ResumeTiming();
+    // Put `CompressedInputStream::Make` in timing.
+    auto input_stream =
+        ::arrow::io::CompressedInputStream::Make(codec.get(), 
reader).ValueOrDie();
+    auto remaining_size = input_size;
+    while (remaining_size > 0) {
+      if constexpr (Mode == BufferReadMode::ProvidedByCaller) {
+        auto value = input_stream->Read(batch_size, 
read_buffer->mutable_data());
+        ABORT_NOT_OK(value);
+        remaining_size -= value.ValueOrDie();
+      } else {
+        auto value = input_stream->Read(batch_size);
+        ABORT_NOT_OK(value);
+        remaining_size -= value.ValueOrDie()->size();
+      }
+    }
+  }
+  state.SetBytesProcessed(input_size * state.iterations());
+}
+
+template <Compression::type kCompression>
+static void CompressedInputStreamZeroCopyBufferProvidedByCaller(
+    ::benchmark::State& state) {
+  CompressedInputStreamBenchmark<::arrow::io::BufferReader,
+                                 BufferReadMode::ProvidedByCaller>(state, 
kCompression);
+}
+
+template <Compression::type kCompression>
+static void CompressedInputStreamNonZeroCopyBufferProvidedByCaller(
+    ::benchmark::State& state) {
+  CompressedInputStreamBenchmark<NonZeroCopyBufferReader,
+                                 BufferReadMode::ProvidedByCaller>(state, 
kCompression);
+}
+
+template <Compression::type kCompression>
+static void CompressedInputStreamZeroCopyBufferReturnedByCallee(
+    ::benchmark::State& state) {
+  CompressedInputStreamBenchmark<::arrow::io::BufferReader,
+                                 BufferReadMode::ReturnedByCallee>(state, 
kCompression);
+}
+
+template <Compression::type kCompression>
+static void CompressedInputStreamNonZeroCopyBufferReturnedByCallee(
+    ::benchmark::State& state) {
+  CompressedInputStreamBenchmark<NonZeroCopyBufferReader,
+                                 BufferReadMode::ReturnedByCallee>(state, 
kCompression);
+}
+
+static void CompressedInputArguments(::benchmark::internal::Benchmark* b) {
+  b->ArgNames({"num_bytes", "batch_size"})
+      ->Args({8 * 1024, 8 * 1024})
+      ->Args({64 * 1024, 8 * 1024})
+      ->Args({64 * 1024, 64 * 1024})
+      ->Args({1024 * 1024, 8 * 1024})
+      ->Args({1024 * 1024, 64 * 1024})
+      ->Args({1024 * 1024, 1024 * 1024});
+}
+
+#ifdef ARROW_WITH_LZ4
+// Benchmark LZ4 because it's lightweight, which makes benchmarking focused on 
the
+// overhead of the compression input stream.
+BENCHMARK_TEMPLATE(CompressedInputStreamZeroCopyBufferProvidedByCaller,
+                   Compression::LZ4_FRAME)
+    ->Apply(CompressedInputArguments);
+BENCHMARK_TEMPLATE(CompressedInputStreamNonZeroCopyBufferProvidedByCaller,
+                   Compression::LZ4_FRAME)
+    ->Apply(CompressedInputArguments);
+BENCHMARK_TEMPLATE(CompressedInputStreamZeroCopyBufferReturnedByCallee,
+                   Compression::LZ4_FRAME)
+    ->Apply(CompressedInputArguments);
+BENCHMARK_TEMPLATE(CompressedInputStreamNonZeroCopyBufferReturnedByCallee,
+                   Compression::LZ4_FRAME)
+    ->Apply(CompressedInputArguments);
+#endif
+
+}  // namespace arrow::io

Reply via email to