This is an automated email from the ASF dual-hosted git repository.
apitrou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new 84583d6530 GH-37434: [C++] IO: Refactor BufferedInputStream::Read for
small input (#37460)
84583d6530 is described below
commit 84583d65306c01897fae01ba587885f140054b0c
Author: mwish <[email protected]>
AuthorDate: Fri Sep 1 02:44:12 2023 +0800
GH-37434: [C++] IO: Refactor BufferedInputStream::Read for small input
(#37460)
### Rationale for this change
If we Set BufferSize == 100k, and read 3k bytes per IO. When we read the 34
times, the IO would be (99k, 102k]
In Read, it will read buffered (99k, 100k], issue IO for (100k, 102k].
Rather than (100k, 200k].
### What changes are included in this PR?
Refactor `BufferedInputStream::Read` to optimize small IO.
### Are these changes tested?
Already has tests?
### Are there any user-facing changes?
User might get io-pattern changed. It can be optimization or downgrade.
* Closes: #37434
Lead-authored-by: mwish <[email protected]>
Co-authored-by: Antoine Pitrou <[email protected]>
Signed-off-by: Antoine Pitrou <[email protected]>
---
cpp/src/arrow/io/buffered.cc | 88 +++++++-----
cpp/src/arrow/io/buffered_test.cc | 275 +++++++++++++++++++++++++++++++++++-
cpp/src/arrow/testing/gtest_util.cc | 2 +-
cpp/src/arrow/testing/gtest_util.h | 2 +-
4 files changed, 321 insertions(+), 46 deletions(-)
diff --git a/cpp/src/arrow/io/buffered.cc b/cpp/src/arrow/io/buffered.cc
index e0e37c5802..21cce478d3 100644
--- a/cpp/src/arrow/io/buffered.cc
+++ b/cpp/src/arrow/io/buffered.cc
@@ -342,23 +342,28 @@ class BufferedInputStream::Impl : public BufferedBase {
buffer_pos_ = bytes_buffered_ = 0;
}
- Status BufferIfNeeded() {
- if (bytes_buffered_ == 0) {
- // Fill buffer
- if (!buffer_) {
- RETURN_NOT_OK(ResetBuffer());
- }
+ Status DoBuffer() {
+ // Fill buffer
+ if (!buffer_) {
+ RETURN_NOT_OK(ResetBuffer());
+ }
- int64_t bytes_to_buffer = buffer_size_;
- if (raw_read_bound_ >= 0) {
- bytes_to_buffer = std::min(buffer_size_, raw_read_bound_ -
raw_read_total_);
- }
- ARROW_ASSIGN_OR_RAISE(bytes_buffered_, raw_->Read(bytes_to_buffer,
buffer_data_));
- buffer_pos_ = 0;
- raw_read_total_ += bytes_buffered_;
+ int64_t bytes_to_buffer = buffer_size_;
+ if (raw_read_bound_ >= 0) {
+ bytes_to_buffer = std::min(buffer_size_, raw_read_bound_ -
raw_read_total_);
+ }
+ ARROW_ASSIGN_OR_RAISE(bytes_buffered_, raw_->Read(bytes_to_buffer,
buffer_data_));
+ buffer_pos_ = 0;
+ raw_read_total_ += bytes_buffered_;
- // Do not make assumptions about the raw stream position
- raw_pos_ = -1;
+ // Do not make assumptions about the raw stream position
+ raw_pos_ = -1;
+ return Status::OK();
+ }
+
+ Status BufferIfNeeded() {
+ if (bytes_buffered_ == 0) {
+ return DoBuffer();
}
return Status::OK();
}
@@ -373,33 +378,38 @@ class BufferedInputStream::Impl : public BufferedBase {
return Status::Invalid("Bytes to read must be positive. Received:",
nbytes);
}
- if (nbytes < buffer_size_) {
- // Pre-buffer for small reads
- RETURN_NOT_OK(BufferIfNeeded());
+ // 1. First consume pre-buffered data.
+ int64_t pre_buffer_copy_bytes = std::min(nbytes, bytes_buffered_);
+ if (pre_buffer_copy_bytes > 0) {
+ memcpy(out, buffer_data_ + buffer_pos_, pre_buffer_copy_bytes);
+ ConsumeBuffer(pre_buffer_copy_bytes);
}
-
- if (nbytes > bytes_buffered_) {
- // Copy buffered bytes into out, then read rest
- memcpy(out, buffer_data_ + buffer_pos_, bytes_buffered_);
-
- int64_t bytes_to_read = nbytes - bytes_buffered_;
- if (raw_read_bound_ >= 0) {
- bytes_to_read = std::min(bytes_to_read, raw_read_bound_ -
raw_read_total_);
- }
- ARROW_ASSIGN_OR_RAISE(
- int64_t bytes_read,
- raw_->Read(bytes_to_read, reinterpret_cast<uint8_t*>(out) +
bytes_buffered_));
+ int64_t remaining_bytes = nbytes - pre_buffer_copy_bytes;
+ if (raw_read_bound_ >= 0) {
+ remaining_bytes = std::min(remaining_bytes, raw_read_bound_ -
raw_read_total_);
+ }
+ if (remaining_bytes == 0) {
+ return pre_buffer_copy_bytes;
+ }
+ DCHECK_EQ(0, bytes_buffered_);
+
+ // 2. Read from storage.
+ if (remaining_bytes >= buffer_size_) {
+ // 2.1. If read is larger than buffer size, read directly from storage.
+ ARROW_ASSIGN_OR_RAISE(int64_t bytes_read,
+ raw_->Read(remaining_bytes,
reinterpret_cast<uint8_t*>(out) +
+
pre_buffer_copy_bytes));
raw_read_total_ += bytes_read;
-
- // Do not make assumptions about the raw stream position
- raw_pos_ = -1;
- bytes_read += bytes_buffered_;
RewindBuffer();
- return bytes_read;
+ return pre_buffer_copy_bytes + bytes_read;
} else {
- memcpy(out, buffer_data_ + buffer_pos_, nbytes);
- ConsumeBuffer(nbytes);
- return nbytes;
+ // 2.2. If read is smaller than buffer size, fill buffer and copy from
buffer.
+ RETURN_NOT_OK(DoBuffer());
+ int64_t bytes_copy_after_buffer = std::min(bytes_buffered_,
remaining_bytes);
+ memcpy(reinterpret_cast<uint8_t*>(out) + pre_buffer_copy_bytes,
+ buffer_data_ + buffer_pos_, bytes_copy_after_buffer);
+ ConsumeBuffer(bytes_copy_after_buffer);
+ return pre_buffer_copy_bytes + bytes_copy_after_buffer;
}
}
@@ -432,7 +442,7 @@ class BufferedInputStream::Impl : public BufferedBase {
BufferedInputStream::BufferedInputStream(std::shared_ptr<InputStream> raw,
MemoryPool* pool,
int64_t raw_total_bytes_bound) {
- impl_.reset(new Impl(std::move(raw), pool, raw_total_bytes_bound));
+ impl_ = std::make_unique<Impl>(std::move(raw), pool, raw_total_bytes_bound);
}
BufferedInputStream::~BufferedInputStream() {
internal::CloseFromDestructor(this); }
diff --git a/cpp/src/arrow/io/buffered_test.cc
b/cpp/src/arrow/io/buffered_test.cc
index 520eaaa935..82feeea051 100644
--- a/cpp/src/arrow/io/buffered_test.cc
+++ b/cpp/src/arrow/io/buffered_test.cc
@@ -46,8 +46,7 @@
#include "arrow/testing/gtest_util.h"
#include "arrow/util/io_util.h"
-namespace arrow {
-namespace io {
+namespace arrow::io {
using ::arrow::internal::TemporaryDir;
@@ -321,7 +320,7 @@ TEST_F(TestBufferedOutputStream, TruncatesFile) {
// ----------------------------------------------------------------------
// BufferedInputStream tests
-const char kExample1[] = "informaticacrobaticsimmolation";
+const std::string_view kExample1 = "informaticacrobaticsimmolation";
class TestBufferedInputStream : public FileTestFixture<BufferedInputStream> {
public:
@@ -672,5 +671,271 @@ TEST_F(TestBufferedInputStreamBound,
BufferExactlyExhausted) {
}
}
-} // namespace io
-} // namespace arrow
+// These tests exercise the buffering algorithm by checking the reads issued
+// to the underlying raw stream.
+class TestBufferedInputStreamChunk : public TestBufferedInputStream {
+ public:
+ void SetUp() { TestBufferedInputStream::SetUp(); }
+
+ void TearDown() {
+ buffered_ = nullptr;
+ tracked_ = nullptr;
+ raw_ = nullptr;
+ }
+
+ void MakeExample(int64_t buffer_size,
+ std::optional<int64_t> read_bound = std::nullopt) {
+ test_data_ = kExample1;
+ MemoryPool* pool = default_memory_pool();
+
+ ASSERT_OK_AND_ASSIGN(auto file_out, FileOutputStream::Open(path_));
+ ASSERT_OK(file_out->Write(test_data_));
+ ASSERT_OK(file_out->Close());
+
+ ASSERT_OK_AND_ASSIGN(auto file_in, ReadableFile::Open(path_));
+ raw_ = file_in;
+ tracked_ =
TrackedRandomAccessFile::Make(dynamic_cast<RandomAccessFile*>(raw_.get()));
+ ASSERT_OK_AND_ASSIGN(buffered_,
+ BufferedInputStream::Create(buffer_size, pool,
tracked_,
+ read_bound.value_or(-1)));
+ }
+
+ protected:
+ std::shared_ptr<TrackedRandomAccessFile> tracked_;
+};
+
+TEST_F(TestBufferedInputStreamChunk, NoRead) {
+ const int64_t kBufferSize = 5;
+ MakeExample(kBufferSize);
+
+ EXPECT_TRUE(tracked_->get_read_ranges().empty());
+}
+
+TEST_F(TestBufferedInputStreamChunk, LargeRead) {
+ const int64_t kBufferSize = 5;
+ MakeExample(kBufferSize);
+
+ // Read bytes greater than buffer_size would not buffer.
+ ASSERT_OK_AND_ASSIGN(auto buf, buffered_->Read(6));
+ AssertBufferEqual(*buf, kExample1.substr(0, 6));
+
+ EXPECT_EQ(0, buffered_->bytes_buffered());
+ std::vector<ReadRange> read_ranges = {ReadRange{0, 6}};
+ EXPECT_EQ(tracked_->get_read_ranges(), read_ranges);
+}
+
+TEST_F(TestBufferedInputStreamChunk, SmallReadThenLargeRead) {
+ const int64_t kBufferSize = 5;
+ MakeExample(kBufferSize);
+
+ // Small read would trigger buffer the whole chunk
+ ASSERT_OK_AND_ASSIGN(auto buf, buffered_->Read(1));
+ AssertBufferEqual(*buf, kExample1.substr(0, 1));
+ EXPECT_EQ(4, buffered_->bytes_buffered());
+ std::vector<ReadRange> read_ranges = {ReadRange{0, 5}};
+ EXPECT_EQ(tracked_->get_read_ranges(), read_ranges);
+
+ // Large read with pre-buffered will copy the
+ // pre-buffered data first, then read the remaining without filling
+ // the buffer.
+ ASSERT_OK_AND_ASSIGN(buf, buffered_->Read(20));
+ AssertBufferEqual(*buf, kExample1.substr(1, 20));
+ EXPECT_EQ(0, buffered_->bytes_buffered());
+ read_ranges.push_back(ReadRange{5, 16});
+ EXPECT_EQ(tracked_->get_read_ranges(), read_ranges);
+
+ // Small read again
+ ASSERT_OK_AND_ASSIGN(buf, buffered_->Read(2));
+ AssertBufferEqual(*buf, kExample1.substr(21, 2));
+ EXPECT_EQ(kBufferSize - 2, buffered_->bytes_buffered());
+ read_ranges.push_back(ReadRange{21, 5});
+ EXPECT_EQ(tracked_->get_read_ranges(), read_ranges);
+}
+
+TEST_F(TestBufferedInputStreamChunk, BufferWholeChunk) {
+ const int64_t kBufferSize = 5;
+ MakeExample(kBufferSize);
+
+ // Small read
+ ASSERT_OK_AND_ASSIGN(auto buf, buffered_->Read(1));
+ AssertBufferEqual(*buf, kExample1.substr(0, 1));
+ EXPECT_EQ(kBufferSize - 1, buffered_->bytes_buffered());
+ std::vector<ReadRange> read_ranges = {ReadRange{0, 5}};
+ EXPECT_EQ(tracked_->get_read_ranges(), read_ranges);
+
+ // Whole read size is larger than buffer size, but raw read size would be
+ // smaller than buffer size => buffer is filled anew
+ ASSERT_OK_AND_ASSIGN(buf, buffered_->Read(6));
+ AssertBufferEqual(*buf, kExample1.substr(1, 6));
+ EXPECT_EQ(kBufferSize * 2 - 1 - 6, buffered_->bytes_buffered());
+ read_ranges.push_back(ReadRange{5, 5});
+ EXPECT_EQ(tracked_->get_read_ranges(), read_ranges);
+}
+
+TEST_F(TestBufferedInputStreamChunk, BufferLargerThanFileSize) {
+ const int64_t kFileSize = static_cast<int64_t>(kExample1.size());
+ const int64_t kBufferSize = kFileSize + 10;
+ MakeExample(kBufferSize);
+
+ ASSERT_OK_AND_ASSIGN(auto buf, buffered_->Read(1));
+ AssertBufferEqual(*buf, kExample1.substr(0, 1));
+ EXPECT_EQ(kFileSize - 1, buffered_->bytes_buffered());
+ std::vector<ReadRange> read_ranges = {ReadRange{0, kBufferSize}};
+ EXPECT_EQ(tracked_->get_read_ranges(), read_ranges);
+
+ ASSERT_OK_AND_ASSIGN(buf, buffered_->Read(kFileSize - 3));
+ AssertBufferEqual(*buf, kExample1.substr(1, kFileSize - 3));
+ EXPECT_EQ(2, buffered_->bytes_buffered());
+ EXPECT_EQ(tracked_->get_read_ranges(), read_ranges);
+
+ // Short read up to EOF
+ ASSERT_OK_AND_ASSIGN(buf, buffered_->Read(20));
+ AssertBufferEqual(*buf, kExample1.substr(kFileSize - 2, 2));
+ EXPECT_EQ(0, buffered_->bytes_buffered());
+ read_ranges.push_back(ReadRange{kFileSize, kBufferSize});
+ EXPECT_EQ(tracked_->get_read_ranges(), read_ranges);
+
+ // EOF
+ ASSERT_OK_AND_ASSIGN(buf, buffered_->Read(20));
+ AssertBufferEqual(*buf, "");
+ EXPECT_EQ(0, buffered_->bytes_buffered());
+ // (BufferedInputStream still tries to fetch more bytes)
+ read_ranges.push_back(ReadRange{kFileSize, kBufferSize});
+ EXPECT_EQ(tracked_->get_read_ranges(), read_ranges);
+}
+
+TEST_F(TestBufferedInputStreamChunk, BufferLargerThanReadBound) {
+ const int64_t kBufferSize = 6;
+ const int64_t kReadBound = 5;
+ MakeExample(kBufferSize, kReadBound);
+
+ // Small read
+ ASSERT_OK_AND_ASSIGN(auto buf, buffered_->Read(1));
+ AssertBufferEqual(*buf, kExample1.substr(0, 1));
+ EXPECT_EQ(kReadBound - 1, buffered_->bytes_buffered());
+ std::vector<ReadRange> read_ranges = {ReadRange{0, 5}};
+ EXPECT_EQ(tracked_->get_read_ranges(), read_ranges);
+
+ // Longer read, but truncated due to the read bound
+ ASSERT_OK_AND_ASSIGN(buf, buffered_->Read(10));
+ AssertBufferEqual(*buf, kExample1.substr(1, 4));
+ EXPECT_EQ(0, buffered_->bytes_buffered());
+ // Due to the read bound, no more row reads were issued
+ EXPECT_EQ(tracked_->get_read_ranges(), read_ranges);
+
+ // EOF
+ ASSERT_OK_AND_ASSIGN(buf, buffered_->Read(10));
+ AssertBufferEqual(*buf, "");
+ EXPECT_EQ(0, buffered_->bytes_buffered());
+ EXPECT_EQ(tracked_->get_read_ranges(), read_ranges);
+}
+
+TEST_F(TestBufferedInputStreamChunk, BufferSmallerThanReadBound) {
+ const int64_t kBufferSize = 3;
+ const int64_t kReadBound = 5;
+ MakeExample(kBufferSize, kReadBound);
+
+ ASSERT_OK_AND_ASSIGN(auto buf, buffered_->Read(6));
+ AssertBufferEqual(*buf, kExample1.substr(0, 5));
+ EXPECT_EQ(0, buffered_->bytes_buffered());
+ std::vector<ReadRange> read_ranges = {ReadRange{0, 5}};
+ EXPECT_EQ(tracked_->get_read_ranges(), read_ranges);
+
+ // EOF
+ ASSERT_OK_AND_ASSIGN(buf, buffered_->Read(10));
+ AssertBufferEqual(*buf, "");
+ EXPECT_EQ(0, buffered_->bytes_buffered());
+ EXPECT_EQ(tracked_->get_read_ranges(), read_ranges);
+}
+
+class TestBufferedInputStreamRandom : public ::testing::Test {
+ public:
+ void MakeBuffered(int64_t data_size, int64_t buffer_size,
+ std::optional<int64_t> read_bound = std::nullopt) {
+ buffer_size_ = buffer_size;
+ data_.clear();
+ data_.reserve(data_size);
+ while (data_.size() < static_cast<size_t>(data_size)) {
+ data_.append(kExample1, /*pos*/ 0,
+ /*count*/ static_cast<size_t>(data_size) - data_.size());
+ }
+ EXPECT_EQ(data_.size(), data_size);
+
+ // Clear dependent streams in reverse order
+ buffered_.reset();
+ tracked_.reset();
+ // Read from a copy of data, so that data_.substr() below doesn't
invalidate it
+ raw_ = BufferReader::FromString(data_);
+ tracked_ = TrackedRandomAccessFile::Make(raw_.get());
+ EXPECT_OK_AND_ASSIGN(buffered_,
+ BufferedInputStream::Create(buffer_size,
default_memory_pool(),
+ tracked_,
read_bound.value_or(-1)));
+ if (read_bound) {
+ data_ = data_.substr(0, *read_bound);
+ }
+ }
+
+ void TestReads() {
+ std::default_random_engine gen(/*seed*/ 42);
+ std::uniform_int_distribution<int64_t> read_len_dist(1, 80);
+
+ int64_t pos = 0;
+ const int64_t size = static_cast<int64_t>(data_.size());
+ while (pos < size) {
+ const int64_t read_len = read_len_dist(gen);
+ ASSERT_OK_AND_ASSIGN(auto buf, buffered_->Read(read_len));
+ AssertBufferEqual(*buf, std::string_view(data_).substr(pos, read_len));
+ pos += buf->size();
+ }
+
+ // EOF was reached
+ ASSERT_EQ(pos, size);
+
+ // Number of reads should not be excessive given the buffer size
+ int64_t max_reads = (size + buffer_size_ - 1) / buffer_size_;
+ EXPECT_LE(tracked_->num_reads(), max_reads);
+ const auto& read_ranges = tracked_->get_read_ranges();
+ pos = 0;
+ for (size_t i = 0; i < read_ranges.size(); ++i) {
+ const auto cur = read_ranges[i];
+ ASSERT_EQ(cur.offset, pos);
+ // Never read less than buffer size bytes, except perhaps if fewer bytes
remain
+ ASSERT_GE(cur.length, std::min(buffer_size_, size - pos));
+ // Bump actual position in the file
+ pos += std::min(cur.length, size - pos);
+ }
+
+ // Further reads return an empty buffer
+ ASSERT_OK_AND_ASSIGN(auto buf, buffered_->Read(buffer_size_ - 1));
+ AssertBufferEqual(*buf, "");
+ ASSERT_OK_AND_ASSIGN(buf, buffered_->Read(buffer_size_ + 1));
+ AssertBufferEqual(*buf, "");
+ }
+
+ protected:
+ std::string data_;
+ int64_t buffer_size_;
+ std::shared_ptr<RandomAccessFile> raw_;
+ std::shared_ptr<TrackedRandomAccessFile> tracked_;
+ std::shared_ptr<BufferedInputStream> buffered_;
+};
+
+TEST_F(TestBufferedInputStreamRandom, ReadsWithoutReadBound) {
+ constexpr int kNumIters = 10;
+
+ for (int i = 0; i < kNumIters; ++i) {
+ MakeBuffered(/*data_size=*/3000, /*buffer_size=*/11);
+ TestReads();
+ }
+}
+
+TEST_F(TestBufferedInputStreamRandom, ReadsWithReadBound) {
+ constexpr int kNumIters = 10;
+
+ for (int i = 0; i < kNumIters; ++i) {
+ MakeBuffered(/*data_size=*/3000, /*buffer_size=*/11, /*read_bound=*/2000);
+ TestReads();
+ }
+}
+
+} // namespace arrow::io
diff --git a/cpp/src/arrow/testing/gtest_util.cc
b/cpp/src/arrow/testing/gtest_util.cc
index c6de6b02fc..a6dc1d59c6 100644
--- a/cpp/src/arrow/testing/gtest_util.cc
+++ b/cpp/src/arrow/testing/gtest_util.cc
@@ -214,7 +214,7 @@ void AssertBufferEqual(const Buffer& buffer, const
std::vector<uint8_t>& expecte
}
}
-void AssertBufferEqual(const Buffer& buffer, const std::string& expected) {
+void AssertBufferEqual(const Buffer& buffer, std::string_view expected) {
ASSERT_EQ(static_cast<size_t>(buffer.size()), expected.length())
<< "Mismatching buffer size";
const uint8_t* buffer_data = buffer.data();
diff --git a/cpp/src/arrow/testing/gtest_util.h
b/cpp/src/arrow/testing/gtest_util.h
index e3838bfe4b..bb462af86a 100644
--- a/cpp/src/arrow/testing/gtest_util.h
+++ b/cpp/src/arrow/testing/gtest_util.h
@@ -232,7 +232,7 @@ ARROW_TESTING_EXPORT void AssertChunkedApproxEquivalent(
ARROW_TESTING_EXPORT void AssertBufferEqual(const Buffer& buffer,
const std::vector<uint8_t>&
expected);
ARROW_TESTING_EXPORT void AssertBufferEqual(const Buffer& buffer,
- const std::string& expected);
+ std::string_view expected);
ARROW_TESTING_EXPORT void AssertBufferEqual(const Buffer& buffer, const
Buffer& expected);
ARROW_TESTING_EXPORT void AssertTypeEqual(const DataType& lhs, const DataType&
rhs,