This is an automated email from the ASF dual-hosted git repository.
apitrou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new aae2557e30 GH-39377: [C++] IO: Reuse same buffer in
CompressedInputStream (#39807)
aae2557e30 is described below
commit aae2557e303601f89c4bb94ee669d9f2fb83b528
Author: mwish <[email protected]>
AuthorDate: Wed Mar 27 19:42:56 2024 +0800
GH-39377: [C++] IO: Reuse same buffer in CompressedInputStream (#39807)
### Rationale for this change
This patch reuses the same buffer in `CompressedInputStream`. It includes
the `decompress_` and `compress_` buffer
### What changes are included in this PR?
1. For `compress_`, allocate and reuse same buffer with `kChunkSize`
(64KB), and reusing it
2. For `decompress_`, reusing a same buffer (mostly 1MB) without continues
`Reallocate`
In the worst case, `decompress_` might hold a large buffer.
### Are these changes tested?
Already
### Are there any user-facing changes?
`CompressedInputStream` might has larger buffer
* Closes: #39377
Lead-authored-by: mwish <[email protected]>
Co-authored-by: Antoine Pitrou <[email protected]>
Co-authored-by: mwish <[email protected]>
Signed-off-by: Antoine Pitrou <[email protected]>
---
cpp/src/arrow/io/CMakeLists.txt | 2 +
cpp/src/arrow/io/compressed.cc | 63 +++++++---
cpp/src/arrow/io/compressed.h | 6 +
cpp/src/arrow/io/compressed_benchmark.cc | 200 +++++++++++++++++++++++++++++++
4 files changed, 253 insertions(+), 18 deletions(-)
diff --git a/cpp/src/arrow/io/CMakeLists.txt b/cpp/src/arrow/io/CMakeLists.txt
index 041d511083..f7afbca558 100644
--- a/cpp/src/arrow/io/CMakeLists.txt
+++ b/cpp/src/arrow/io/CMakeLists.txt
@@ -43,5 +43,7 @@ if(NOT (${ARROW_SIMD_LEVEL} STREQUAL "NONE") AND NOT
(${ARROW_SIMD_LEVEL} STREQU
add_arrow_benchmark(memory_benchmark PREFIX "arrow-io")
endif()
+add_arrow_benchmark(compressed_benchmark PREFIX "arrow-io")
+
# Headers: top level
arrow_install_all_headers("arrow/io")
diff --git a/cpp/src/arrow/io/compressed.cc b/cpp/src/arrow/io/compressed.cc
index 6c484242a4..d06101748d 100644
--- a/cpp/src/arrow/io/compressed.cc
+++ b/cpp/src/arrow/io/compressed.cc
@@ -201,7 +201,7 @@ Result<std::shared_ptr<CompressedOutputStream>>
CompressedOutputStream::Make(
util::Codec* codec, const std::shared_ptr<OutputStream>& raw, MemoryPool*
pool) {
// CAUTION: codec is not owned
std::shared_ptr<CompressedOutputStream> res(new CompressedOutputStream);
- res->impl_.reset(new Impl(pool, std::move(raw)));
+ res->impl_.reset(new Impl(pool, raw));
RETURN_NOT_OK(res->impl_->Init(codec));
return res;
}
@@ -233,8 +233,10 @@ class CompressedInputStream::Impl {
: pool_(pool),
raw_(raw),
is_open_(true),
+ supports_zero_copy_from_raw_(raw_->supports_zero_copy()),
compressed_pos_(0),
decompressed_pos_(0),
+ fresh_decompressor_(false),
total_pos_(0) {}
Status Init(Codec* codec) {
@@ -261,7 +263,7 @@ class CompressedInputStream::Impl {
}
}
- bool closed() { return !is_open_; }
+ bool closed() const { return !is_open_; }
Result<int64_t> Tell() const { return total_pos_; }
@@ -269,8 +271,27 @@ class CompressedInputStream::Impl {
Status EnsureCompressedData() {
int64_t compressed_avail = compressed_ ? compressed_->size() -
compressed_pos_ : 0;
if (compressed_avail == 0) {
- // No compressed data available, read a full chunk
- ARROW_ASSIGN_OR_RAISE(compressed_, raw_->Read(kChunkSize));
+ // Ensure compressed_ buffer is allocated with kChunkSize.
+ if (!supports_zero_copy_from_raw_) {
+ if (compressed_for_non_zero_copy_ == nullptr) {
+ ARROW_ASSIGN_OR_RAISE(compressed_for_non_zero_copy_,
+ AllocateResizableBuffer(kChunkSize, pool_));
+ } else if (compressed_for_non_zero_copy_->size() != kChunkSize) {
+ RETURN_NOT_OK(
+ compressed_for_non_zero_copy_->Resize(kChunkSize,
/*shrink_to_fit=*/false));
+ }
+ ARROW_ASSIGN_OR_RAISE(
+ int64_t read_size,
+ raw_->Read(kChunkSize,
+
compressed_for_non_zero_copy_->mutable_data_as<void>()));
+ if (read_size != compressed_for_non_zero_copy_->size()) {
+ RETURN_NOT_OK(
+ compressed_for_non_zero_copy_->Resize(read_size,
/*shrink_to_fit=*/false));
+ }
+ compressed_ = compressed_for_non_zero_copy_;
+ } else {
+ ARROW_ASSIGN_OR_RAISE(compressed_, raw_->Read(kChunkSize));
+ }
compressed_pos_ = 0;
}
return Status::OK();
@@ -284,8 +305,13 @@ class CompressedInputStream::Impl {
int64_t decompress_size = kDecompressSize;
while (true) {
- ARROW_ASSIGN_OR_RAISE(decompressed_,
- AllocateResizableBuffer(decompress_size, pool_));
+ if (decompressed_ == nullptr) {
+ ARROW_ASSIGN_OR_RAISE(decompressed_,
+ AllocateResizableBuffer(decompress_size, pool_));
+ } else {
+ // Shrinking the buffer if it's already large enough
+ RETURN_NOT_OK(decompressed_->Resize(decompress_size,
/*shrink_to_fit=*/true));
+ }
decompressed_pos_ = 0;
int64_t input_len = compressed_->size() - compressed_pos_;
@@ -300,7 +326,9 @@ class CompressedInputStream::Impl {
fresh_decompressor_ = false;
}
if (result.bytes_written > 0 || !result.need_more_output || input_len ==
0) {
- RETURN_NOT_OK(decompressed_->Resize(result.bytes_written));
+ // Not calling shrink_to_fit here because we're likely to reusing the
buffer.
+ RETURN_NOT_OK(
+ decompressed_->Resize(result.bytes_written,
/*shrink_to_fit=*/false));
break;
}
DCHECK_EQ(result.bytes_written, 0);
@@ -310,7 +338,7 @@ class CompressedInputStream::Impl {
return Status::OK();
}
- // Read a given number of bytes from the decompressed_ buffer.
+ // Copying a given number of bytes from the decompressed_ buffer.
int64_t ReadFromDecompressed(int64_t nbytes, uint8_t* out) {
int64_t readable = decompressed_ ? (decompressed_->size() -
decompressed_pos_) : 0;
int64_t read_bytes = std::min(readable, nbytes);
@@ -318,11 +346,6 @@ class CompressedInputStream::Impl {
if (read_bytes > 0) {
memcpy(out, decompressed_->data() + decompressed_pos_, read_bytes);
decompressed_pos_ += read_bytes;
-
- if (decompressed_pos_ == decompressed_->size()) {
- // Decompressed data is exhausted, release buffer
- decompressed_.reset();
- }
}
return read_bytes;
@@ -357,7 +380,7 @@ class CompressedInputStream::Impl {
}
Result<int64_t> Read(int64_t nbytes, void* out) {
- auto out_data = reinterpret_cast<uint8_t*>(out);
+ auto* out_data = reinterpret_cast<uint8_t*>(out);
int64_t total_read = 0;
bool decompressor_has_data = true;
@@ -382,10 +405,10 @@ class CompressedInputStream::Impl {
ARROW_ASSIGN_OR_RAISE(auto buf, AllocateResizableBuffer(nbytes, pool_));
ARROW_ASSIGN_OR_RAISE(int64_t bytes_read, Read(nbytes,
buf->mutable_data()));
RETURN_NOT_OK(buf->Resize(bytes_read));
- return std::move(buf);
+ return buf;
}
- std::shared_ptr<InputStream> raw() const { return raw_; }
+ const std::shared_ptr<InputStream>& raw() const { return raw_; }
private:
// Read 64 KB compressed data at a time
@@ -396,7 +419,12 @@ class CompressedInputStream::Impl {
MemoryPool* pool_;
std::shared_ptr<InputStream> raw_;
bool is_open_;
+ const bool supports_zero_copy_from_raw_;
std::shared_ptr<Decompressor> decompressor_;
+ // If `raw_->supports_zero_copy()`, this buffer would not allocate memory.
+ // Otherwise, this buffer would allocate `kChunkSize` memory and read data
from
+ // `raw_`.
+ std::shared_ptr<ResizableBuffer> compressed_for_non_zero_copy_;
std::shared_ptr<Buffer> compressed_;
// Position in compressed buffer
int64_t compressed_pos_;
@@ -413,10 +441,9 @@ Result<std::shared_ptr<CompressedInputStream>>
CompressedInputStream::Make(
Codec* codec, const std::shared_ptr<InputStream>& raw, MemoryPool* pool) {
// CAUTION: codec is not owned
std::shared_ptr<CompressedInputStream> res(new CompressedInputStream);
- res->impl_.reset(new Impl(pool, std::move(raw)));
+ res->impl_.reset(new Impl(pool, raw));
RETURN_NOT_OK(res->impl_->Init(codec));
return res;
- return Status::OK();
}
CompressedInputStream::~CompressedInputStream() {
internal::CloseFromDestructor(this); }
diff --git a/cpp/src/arrow/io/compressed.h b/cpp/src/arrow/io/compressed.h
index cd1a7f673c..6b4e7ab4d7 100644
--- a/cpp/src/arrow/io/compressed.h
+++ b/cpp/src/arrow/io/compressed.h
@@ -44,6 +44,9 @@ class ARROW_EXPORT CompressedOutputStream : public
OutputStream {
~CompressedOutputStream() override;
/// \brief Create a compressed output stream wrapping the given output
stream.
+ ///
+ /// The codec must be capable of streaming compression. Some codecs,
+ /// like Snappy, are not able to do so.
static Result<std::shared_ptr<CompressedOutputStream>> Make(
util::Codec* codec, const std::shared_ptr<OutputStream>& raw,
MemoryPool* pool = default_memory_pool());
@@ -82,6 +85,9 @@ class ARROW_EXPORT CompressedInputStream
~CompressedInputStream() override;
/// \brief Create a compressed input stream wrapping the given input stream.
+ ///
+ /// The codec must be capable of streaming decompression. Some codecs,
+ /// like Snappy, are not able to do so.
static Result<std::shared_ptr<CompressedInputStream>> Make(
util::Codec* codec, const std::shared_ptr<InputStream>& raw,
MemoryPool* pool = default_memory_pool());
diff --git a/cpp/src/arrow/io/compressed_benchmark.cc
b/cpp/src/arrow/io/compressed_benchmark.cc
new file mode 100644
index 0000000000..52a30d8cb0
--- /dev/null
+++ b/cpp/src/arrow/io/compressed_benchmark.cc
@@ -0,0 +1,200 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "benchmark/benchmark.h"
+
+#include <algorithm>
+#include <cstdint>
+#include <cstring>
+#include <memory>
+#include <random>
+#include <string>
+#include <vector>
+
+#include "arrow/buffer.h"
+#include "arrow/io/compressed.h"
+#include "arrow/io/memory.h"
+#include "arrow/result.h"
+#include "arrow/testing/gtest_util.h"
+#include "arrow/util/compression.h"
+#include "arrow/util/config.h"
+#include "arrow/util/logging.h"
+#include "arrow/util/macros.h"
+
+namespace arrow::io {
+
+using ::arrow::Compression;
+
+std::vector<uint8_t> MakeCompressibleData(int data_size) {
+ // XXX This isn't a real-world corpus so doesn't really represent the
+ // comparative qualities of the algorithms
+
+ // First make highly compressible data
+ std::string base_data =
+ "Apache Arrow is a cross-language development platform for in-memory
data";
+ int nrepeats = static_cast<int>(1 + data_size / base_data.size());
+
+ std::vector<uint8_t> data(base_data.size() * nrepeats);
+ for (int i = 0; i < nrepeats; ++i) {
+ std::memcpy(data.data() + i * base_data.size(), base_data.data(),
base_data.size());
+ }
+ data.resize(data_size);
+
+ // Then randomly mutate some bytes so as to make things harder
+ std::mt19937 engine(42);
+ std::exponential_distribution<> offsets(0.05);
+ std::uniform_int_distribution<> values(0, 255);
+
+ int64_t pos = 0;
+ while (pos < data_size) {
+ data[pos] = static_cast<uint8_t>(values(engine));
+ pos += static_cast<int64_t>(offsets(engine));
+ }
+
+ return data;
+}
+
+// Using a non-zero copy buffer reader to benchmark the non-zero copy path.
+class NonZeroCopyBufferReader final : public InputStream {
+ public:
+ NonZeroCopyBufferReader(std::shared_ptr<Buffer> buffer) :
reader_(std::move(buffer)) {}
+
+ bool supports_zero_copy() const override { return false; }
+
+ Result<int64_t> Read(int64_t nbytes, void* out) override {
+ return reader_.Read(nbytes, out);
+ }
+
+ Result<std::shared_ptr<Buffer>> Read(int64_t nbytes) override {
+ // Testing the non-zero copy path like reading from local file or Object
store,
+ // so we need to allocate a buffer and copy the data.
+ ARROW_ASSIGN_OR_RAISE(auto buf, ::arrow::AllocateResizableBuffer(nbytes));
+ ARROW_ASSIGN_OR_RAISE(int64_t size, Read(nbytes, buf->mutable_data()));
+ ARROW_RETURN_NOT_OK(buf->Resize(size));
+ return buf;
+ }
+ Status Close() override { return reader_.Close(); }
+ Result<int64_t> Tell() const override { return reader_.Tell(); }
+ bool closed() const override { return reader_.closed(); }
+
+ private:
+ ::arrow::io::BufferReader reader_;
+};
+
+enum class BufferReadMode { ProvidedByCaller, ReturnedByCallee };
+
+template <typename BufReader, BufferReadMode Mode>
+static void CompressedInputStreamBenchmark(::benchmark::State& state,
+ Compression::type compression) {
+ const int64_t input_size = state.range(0);
+ const int64_t batch_size = state.range(1);
+
+ const std::vector<uint8_t> data =
MakeCompressibleData(static_cast<int>(input_size));
+ auto codec = ::arrow::util::Codec::Create(compression).ValueOrDie();
+ int64_t max_compress_len =
+ codec->MaxCompressedLen(static_cast<int64_t>(data.size()), data.data());
+ std::shared_ptr<::arrow::ResizableBuffer> buf =
+ ::arrow::AllocateResizableBuffer(max_compress_len).ValueOrDie();
+ const int64_t compressed_length =
+ codec
+ ->Compress(static_cast<int64_t>(data.size()), data.data(),
max_compress_len,
+ buf->mutable_data())
+ .ValueOrDie();
+ ABORT_NOT_OK(buf->Resize(compressed_length));
+ for (auto _ : state) {
+ state.PauseTiming();
+ auto reader = std::make_shared<BufReader>(buf);
+ [[maybe_unused]] std::unique_ptr<Buffer> read_buffer;
+ if constexpr (Mode == BufferReadMode::ProvidedByCaller) {
+ read_buffer = ::arrow::AllocateBuffer(batch_size).ValueOrDie();
+ }
+ state.ResumeTiming();
+ // Put `CompressedInputStream::Make` in timing.
+ auto input_stream =
+ ::arrow::io::CompressedInputStream::Make(codec.get(),
reader).ValueOrDie();
+ auto remaining_size = input_size;
+ while (remaining_size > 0) {
+ if constexpr (Mode == BufferReadMode::ProvidedByCaller) {
+ auto value = input_stream->Read(batch_size,
read_buffer->mutable_data());
+ ABORT_NOT_OK(value);
+ remaining_size -= value.ValueOrDie();
+ } else {
+ auto value = input_stream->Read(batch_size);
+ ABORT_NOT_OK(value);
+ remaining_size -= value.ValueOrDie()->size();
+ }
+ }
+ }
+ state.SetBytesProcessed(input_size * state.iterations());
+}
+
+template <Compression::type kCompression>
+static void CompressedInputStreamZeroCopyBufferProvidedByCaller(
+ ::benchmark::State& state) {
+ CompressedInputStreamBenchmark<::arrow::io::BufferReader,
+ BufferReadMode::ProvidedByCaller>(state,
kCompression);
+}
+
+template <Compression::type kCompression>
+static void CompressedInputStreamNonZeroCopyBufferProvidedByCaller(
+ ::benchmark::State& state) {
+ CompressedInputStreamBenchmark<NonZeroCopyBufferReader,
+ BufferReadMode::ProvidedByCaller>(state,
kCompression);
+}
+
+template <Compression::type kCompression>
+static void CompressedInputStreamZeroCopyBufferReturnedByCallee(
+ ::benchmark::State& state) {
+ CompressedInputStreamBenchmark<::arrow::io::BufferReader,
+ BufferReadMode::ReturnedByCallee>(state,
kCompression);
+}
+
+template <Compression::type kCompression>
+static void CompressedInputStreamNonZeroCopyBufferReturnedByCallee(
+ ::benchmark::State& state) {
+ CompressedInputStreamBenchmark<NonZeroCopyBufferReader,
+ BufferReadMode::ReturnedByCallee>(state,
kCompression);
+}
+
+static void CompressedInputArguments(::benchmark::internal::Benchmark* b) {
+ b->ArgNames({"num_bytes", "batch_size"})
+ ->Args({8 * 1024, 8 * 1024})
+ ->Args({64 * 1024, 8 * 1024})
+ ->Args({64 * 1024, 64 * 1024})
+ ->Args({1024 * 1024, 8 * 1024})
+ ->Args({1024 * 1024, 64 * 1024})
+ ->Args({1024 * 1024, 1024 * 1024});
+}
+
+#ifdef ARROW_WITH_LZ4
+// Benchmark LZ4 because it's lightweight, which makes benchmarking focused on
the
+// overhead of the compression input stream.
+BENCHMARK_TEMPLATE(CompressedInputStreamZeroCopyBufferProvidedByCaller,
+ Compression::LZ4_FRAME)
+ ->Apply(CompressedInputArguments);
+BENCHMARK_TEMPLATE(CompressedInputStreamNonZeroCopyBufferProvidedByCaller,
+ Compression::LZ4_FRAME)
+ ->Apply(CompressedInputArguments);
+BENCHMARK_TEMPLATE(CompressedInputStreamZeroCopyBufferReturnedByCallee,
+ Compression::LZ4_FRAME)
+ ->Apply(CompressedInputArguments);
+BENCHMARK_TEMPLATE(CompressedInputStreamNonZeroCopyBufferReturnedByCallee,
+ Compression::LZ4_FRAME)
+ ->Apply(CompressedInputArguments);
+#endif
+
+} // namespace arrow::io