This is an automated email from the ASF dual-hosted git repository.

apitrou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new 84583d6530 GH-37434: [C++] IO: Refactor BufferedInputStream::Read for 
small input (#37460)
84583d6530 is described below

commit 84583d65306c01897fae01ba587885f140054b0c
Author: mwish <[email protected]>
AuthorDate: Fri Sep 1 02:44:12 2023 +0800

    GH-37434: [C++] IO: Refactor BufferedInputStream::Read for small input 
(#37460)
    
    
    
    ### Rationale for this change
    
    If we Set BufferSize == 100k, and read 3k bytes per IO. When we read the 34 
times, the IO would be (99k, 102k]
    
    In Read, it will read buffered (99k, 100k], issue IO for (100k, 102k]. 
Rather than (100k, 200k].
    
    ### What changes are included in this PR?
    
    Refactor `BufferedInputStream::Read` to optimize small IO.
    
    ### Are these changes tested?
    
    Already has tests?
    
    ### Are there any user-facing changes?
    
    User might get io-pattern changed. It can be optimization or downgrade.
    
    * Closes: #37434
    
    Lead-authored-by: mwish <[email protected]>
    Co-authored-by: Antoine Pitrou <[email protected]>
    Signed-off-by: Antoine Pitrou <[email protected]>
---
 cpp/src/arrow/io/buffered.cc        |  88 +++++++-----
 cpp/src/arrow/io/buffered_test.cc   | 275 +++++++++++++++++++++++++++++++++++-
 cpp/src/arrow/testing/gtest_util.cc |   2 +-
 cpp/src/arrow/testing/gtest_util.h  |   2 +-
 4 files changed, 321 insertions(+), 46 deletions(-)

diff --git a/cpp/src/arrow/io/buffered.cc b/cpp/src/arrow/io/buffered.cc
index e0e37c5802..21cce478d3 100644
--- a/cpp/src/arrow/io/buffered.cc
+++ b/cpp/src/arrow/io/buffered.cc
@@ -342,23 +342,28 @@ class BufferedInputStream::Impl : public BufferedBase {
     buffer_pos_ = bytes_buffered_ = 0;
   }
 
-  Status BufferIfNeeded() {
-    if (bytes_buffered_ == 0) {
-      // Fill buffer
-      if (!buffer_) {
-        RETURN_NOT_OK(ResetBuffer());
-      }
+  Status DoBuffer() {
+    // Fill buffer
+    if (!buffer_) {
+      RETURN_NOT_OK(ResetBuffer());
+    }
 
-      int64_t bytes_to_buffer = buffer_size_;
-      if (raw_read_bound_ >= 0) {
-        bytes_to_buffer = std::min(buffer_size_, raw_read_bound_ - 
raw_read_total_);
-      }
-      ARROW_ASSIGN_OR_RAISE(bytes_buffered_, raw_->Read(bytes_to_buffer, 
buffer_data_));
-      buffer_pos_ = 0;
-      raw_read_total_ += bytes_buffered_;
+    int64_t bytes_to_buffer = buffer_size_;
+    if (raw_read_bound_ >= 0) {
+      bytes_to_buffer = std::min(buffer_size_, raw_read_bound_ - 
raw_read_total_);
+    }
+    ARROW_ASSIGN_OR_RAISE(bytes_buffered_, raw_->Read(bytes_to_buffer, 
buffer_data_));
+    buffer_pos_ = 0;
+    raw_read_total_ += bytes_buffered_;
 
-      // Do not make assumptions about the raw stream position
-      raw_pos_ = -1;
+    // Do not make assumptions about the raw stream position
+    raw_pos_ = -1;
+    return Status::OK();
+  }
+
+  Status BufferIfNeeded() {
+    if (bytes_buffered_ == 0) {
+      return DoBuffer();
     }
     return Status::OK();
   }
@@ -373,33 +378,38 @@ class BufferedInputStream::Impl : public BufferedBase {
       return Status::Invalid("Bytes to read must be positive. Received:", 
nbytes);
     }
 
-    if (nbytes < buffer_size_) {
-      // Pre-buffer for small reads
-      RETURN_NOT_OK(BufferIfNeeded());
+    // 1. First consume pre-buffered data.
+    int64_t pre_buffer_copy_bytes = std::min(nbytes, bytes_buffered_);
+    if (pre_buffer_copy_bytes > 0) {
+      memcpy(out, buffer_data_ + buffer_pos_, pre_buffer_copy_bytes);
+      ConsumeBuffer(pre_buffer_copy_bytes);
     }
-
-    if (nbytes > bytes_buffered_) {
-      // Copy buffered bytes into out, then read rest
-      memcpy(out, buffer_data_ + buffer_pos_, bytes_buffered_);
-
-      int64_t bytes_to_read = nbytes - bytes_buffered_;
-      if (raw_read_bound_ >= 0) {
-        bytes_to_read = std::min(bytes_to_read, raw_read_bound_ - 
raw_read_total_);
-      }
-      ARROW_ASSIGN_OR_RAISE(
-          int64_t bytes_read,
-          raw_->Read(bytes_to_read, reinterpret_cast<uint8_t*>(out) + 
bytes_buffered_));
+    int64_t remaining_bytes = nbytes - pre_buffer_copy_bytes;
+    if (raw_read_bound_ >= 0) {
+      remaining_bytes = std::min(remaining_bytes, raw_read_bound_ - 
raw_read_total_);
+    }
+    if (remaining_bytes == 0) {
+      return pre_buffer_copy_bytes;
+    }
+    DCHECK_EQ(0, bytes_buffered_);
+
+    // 2. Read from storage.
+    if (remaining_bytes >= buffer_size_) {
+      // 2.1. If read is larger than buffer size, read directly from storage.
+      ARROW_ASSIGN_OR_RAISE(int64_t bytes_read,
+                            raw_->Read(remaining_bytes, 
reinterpret_cast<uint8_t*>(out) +
+                                                            
pre_buffer_copy_bytes));
       raw_read_total_ += bytes_read;
-
-      // Do not make assumptions about the raw stream position
-      raw_pos_ = -1;
-      bytes_read += bytes_buffered_;
       RewindBuffer();
-      return bytes_read;
+      return pre_buffer_copy_bytes + bytes_read;
     } else {
-      memcpy(out, buffer_data_ + buffer_pos_, nbytes);
-      ConsumeBuffer(nbytes);
-      return nbytes;
+      // 2.2. If read is smaller than buffer size, fill buffer and copy from 
buffer.
+      RETURN_NOT_OK(DoBuffer());
+      int64_t bytes_copy_after_buffer = std::min(bytes_buffered_, 
remaining_bytes);
+      memcpy(reinterpret_cast<uint8_t*>(out) + pre_buffer_copy_bytes,
+             buffer_data_ + buffer_pos_, bytes_copy_after_buffer);
+      ConsumeBuffer(bytes_copy_after_buffer);
+      return pre_buffer_copy_bytes + bytes_copy_after_buffer;
     }
   }
 
@@ -432,7 +442,7 @@ class BufferedInputStream::Impl : public BufferedBase {
 BufferedInputStream::BufferedInputStream(std::shared_ptr<InputStream> raw,
                                          MemoryPool* pool,
                                          int64_t raw_total_bytes_bound) {
-  impl_.reset(new Impl(std::move(raw), pool, raw_total_bytes_bound));
+  impl_ = std::make_unique<Impl>(std::move(raw), pool, raw_total_bytes_bound);
 }
 
 BufferedInputStream::~BufferedInputStream() { 
internal::CloseFromDestructor(this); }
diff --git a/cpp/src/arrow/io/buffered_test.cc 
b/cpp/src/arrow/io/buffered_test.cc
index 520eaaa935..82feeea051 100644
--- a/cpp/src/arrow/io/buffered_test.cc
+++ b/cpp/src/arrow/io/buffered_test.cc
@@ -46,8 +46,7 @@
 #include "arrow/testing/gtest_util.h"
 #include "arrow/util/io_util.h"
 
-namespace arrow {
-namespace io {
+namespace arrow::io {
 
 using ::arrow::internal::TemporaryDir;
 
@@ -321,7 +320,7 @@ TEST_F(TestBufferedOutputStream, TruncatesFile) {
 // ----------------------------------------------------------------------
 // BufferedInputStream tests
 
-const char kExample1[] = "informaticacrobaticsimmolation";
+const std::string_view kExample1 = "informaticacrobaticsimmolation";
 
 class TestBufferedInputStream : public FileTestFixture<BufferedInputStream> {
  public:
@@ -672,5 +671,271 @@ TEST_F(TestBufferedInputStreamBound, 
BufferExactlyExhausted) {
   }
 }
 
-}  // namespace io
-}  // namespace arrow
+// These tests exercise the buffering algorithm by checking the reads issued
+// to the underlying raw stream.
+class TestBufferedInputStreamChunk : public TestBufferedInputStream {
+ public:
+  void SetUp() { TestBufferedInputStream::SetUp(); }
+
+  void TearDown() {
+    buffered_ = nullptr;
+    tracked_ = nullptr;
+    raw_ = nullptr;
+  }
+
+  void MakeExample(int64_t buffer_size,
+                   std::optional<int64_t> read_bound = std::nullopt) {
+    test_data_ = kExample1;
+    MemoryPool* pool = default_memory_pool();
+
+    ASSERT_OK_AND_ASSIGN(auto file_out, FileOutputStream::Open(path_));
+    ASSERT_OK(file_out->Write(test_data_));
+    ASSERT_OK(file_out->Close());
+
+    ASSERT_OK_AND_ASSIGN(auto file_in, ReadableFile::Open(path_));
+    raw_ = file_in;
+    tracked_ = 
TrackedRandomAccessFile::Make(dynamic_cast<RandomAccessFile*>(raw_.get()));
+    ASSERT_OK_AND_ASSIGN(buffered_,
+                         BufferedInputStream::Create(buffer_size, pool, 
tracked_,
+                                                     read_bound.value_or(-1)));
+  }
+
+ protected:
+  std::shared_ptr<TrackedRandomAccessFile> tracked_;
+};
+
+TEST_F(TestBufferedInputStreamChunk, NoRead) {
+  const int64_t kBufferSize = 5;
+  MakeExample(kBufferSize);
+
+  EXPECT_TRUE(tracked_->get_read_ranges().empty());
+}
+
+TEST_F(TestBufferedInputStreamChunk, LargeRead) {
+  const int64_t kBufferSize = 5;
+  MakeExample(kBufferSize);
+
+  // Read bytes greater than buffer_size would not buffer.
+  ASSERT_OK_AND_ASSIGN(auto buf, buffered_->Read(6));
+  AssertBufferEqual(*buf, kExample1.substr(0, 6));
+
+  EXPECT_EQ(0, buffered_->bytes_buffered());
+  std::vector<ReadRange> read_ranges = {ReadRange{0, 6}};
+  EXPECT_EQ(tracked_->get_read_ranges(), read_ranges);
+}
+
+TEST_F(TestBufferedInputStreamChunk, SmallReadThenLargeRead) {
+  const int64_t kBufferSize = 5;
+  MakeExample(kBufferSize);
+
+  // Small read would trigger buffer the whole chunk
+  ASSERT_OK_AND_ASSIGN(auto buf, buffered_->Read(1));
+  AssertBufferEqual(*buf, kExample1.substr(0, 1));
+  EXPECT_EQ(4, buffered_->bytes_buffered());
+  std::vector<ReadRange> read_ranges = {ReadRange{0, 5}};
+  EXPECT_EQ(tracked_->get_read_ranges(), read_ranges);
+
+  // Large read with pre-buffered will copy the
+  // pre-buffered data first, then read the remaining without filling
+  // the buffer.
+  ASSERT_OK_AND_ASSIGN(buf, buffered_->Read(20));
+  AssertBufferEqual(*buf, kExample1.substr(1, 20));
+  EXPECT_EQ(0, buffered_->bytes_buffered());
+  read_ranges.push_back(ReadRange{5, 16});
+  EXPECT_EQ(tracked_->get_read_ranges(), read_ranges);
+
+  // Small read again
+  ASSERT_OK_AND_ASSIGN(buf, buffered_->Read(2));
+  AssertBufferEqual(*buf, kExample1.substr(21, 2));
+  EXPECT_EQ(kBufferSize - 2, buffered_->bytes_buffered());
+  read_ranges.push_back(ReadRange{21, 5});
+  EXPECT_EQ(tracked_->get_read_ranges(), read_ranges);
+}
+
+TEST_F(TestBufferedInputStreamChunk, BufferWholeChunk) {
+  const int64_t kBufferSize = 5;
+  MakeExample(kBufferSize);
+
+  // Small read
+  ASSERT_OK_AND_ASSIGN(auto buf, buffered_->Read(1));
+  AssertBufferEqual(*buf, kExample1.substr(0, 1));
+  EXPECT_EQ(kBufferSize - 1, buffered_->bytes_buffered());
+  std::vector<ReadRange> read_ranges = {ReadRange{0, 5}};
+  EXPECT_EQ(tracked_->get_read_ranges(), read_ranges);
+
+  // Whole read size is larger than buffer size, but raw read size would be
+  // smaller than buffer size => buffer is filled anew
+  ASSERT_OK_AND_ASSIGN(buf, buffered_->Read(6));
+  AssertBufferEqual(*buf, kExample1.substr(1, 6));
+  EXPECT_EQ(kBufferSize * 2 - 1 - 6, buffered_->bytes_buffered());
+  read_ranges.push_back(ReadRange{5, 5});
+  EXPECT_EQ(tracked_->get_read_ranges(), read_ranges);
+}
+
+TEST_F(TestBufferedInputStreamChunk, BufferLargerThanFileSize) {
+  const int64_t kFileSize = static_cast<int64_t>(kExample1.size());
+  const int64_t kBufferSize = kFileSize + 10;
+  MakeExample(kBufferSize);
+
+  ASSERT_OK_AND_ASSIGN(auto buf, buffered_->Read(1));
+  AssertBufferEqual(*buf, kExample1.substr(0, 1));
+  EXPECT_EQ(kFileSize - 1, buffered_->bytes_buffered());
+  std::vector<ReadRange> read_ranges = {ReadRange{0, kBufferSize}};
+  EXPECT_EQ(tracked_->get_read_ranges(), read_ranges);
+
+  ASSERT_OK_AND_ASSIGN(buf, buffered_->Read(kFileSize - 3));
+  AssertBufferEqual(*buf, kExample1.substr(1, kFileSize - 3));
+  EXPECT_EQ(2, buffered_->bytes_buffered());
+  EXPECT_EQ(tracked_->get_read_ranges(), read_ranges);
+
+  // Short read up to EOF
+  ASSERT_OK_AND_ASSIGN(buf, buffered_->Read(20));
+  AssertBufferEqual(*buf, kExample1.substr(kFileSize - 2, 2));
+  EXPECT_EQ(0, buffered_->bytes_buffered());
+  read_ranges.push_back(ReadRange{kFileSize, kBufferSize});
+  EXPECT_EQ(tracked_->get_read_ranges(), read_ranges);
+
+  // EOF
+  ASSERT_OK_AND_ASSIGN(buf, buffered_->Read(20));
+  AssertBufferEqual(*buf, "");
+  EXPECT_EQ(0, buffered_->bytes_buffered());
+  // (BufferedInputStream still tries to fetch more bytes)
+  read_ranges.push_back(ReadRange{kFileSize, kBufferSize});
+  EXPECT_EQ(tracked_->get_read_ranges(), read_ranges);
+}
+
+TEST_F(TestBufferedInputStreamChunk, BufferLargerThanReadBound) {
+  const int64_t kBufferSize = 6;
+  const int64_t kReadBound = 5;
+  MakeExample(kBufferSize, kReadBound);
+
+  // Small read
+  ASSERT_OK_AND_ASSIGN(auto buf, buffered_->Read(1));
+  AssertBufferEqual(*buf, kExample1.substr(0, 1));
+  EXPECT_EQ(kReadBound - 1, buffered_->bytes_buffered());
+  std::vector<ReadRange> read_ranges = {ReadRange{0, 5}};
+  EXPECT_EQ(tracked_->get_read_ranges(), read_ranges);
+
+  // Longer read, but truncated due to the read bound
+  ASSERT_OK_AND_ASSIGN(buf, buffered_->Read(10));
+  AssertBufferEqual(*buf, kExample1.substr(1, 4));
+  EXPECT_EQ(0, buffered_->bytes_buffered());
+  // Due to the read bound, no more row reads were issued
+  EXPECT_EQ(tracked_->get_read_ranges(), read_ranges);
+
+  // EOF
+  ASSERT_OK_AND_ASSIGN(buf, buffered_->Read(10));
+  AssertBufferEqual(*buf, "");
+  EXPECT_EQ(0, buffered_->bytes_buffered());
+  EXPECT_EQ(tracked_->get_read_ranges(), read_ranges);
+}
+
+TEST_F(TestBufferedInputStreamChunk, BufferSmallerThanReadBound) {
+  const int64_t kBufferSize = 3;
+  const int64_t kReadBound = 5;
+  MakeExample(kBufferSize, kReadBound);
+
+  ASSERT_OK_AND_ASSIGN(auto buf, buffered_->Read(6));
+  AssertBufferEqual(*buf, kExample1.substr(0, 5));
+  EXPECT_EQ(0, buffered_->bytes_buffered());
+  std::vector<ReadRange> read_ranges = {ReadRange{0, 5}};
+  EXPECT_EQ(tracked_->get_read_ranges(), read_ranges);
+
+  // EOF
+  ASSERT_OK_AND_ASSIGN(buf, buffered_->Read(10));
+  AssertBufferEqual(*buf, "");
+  EXPECT_EQ(0, buffered_->bytes_buffered());
+  EXPECT_EQ(tracked_->get_read_ranges(), read_ranges);
+}
+
+class TestBufferedInputStreamRandom : public ::testing::Test {
+ public:
+  void MakeBuffered(int64_t data_size, int64_t buffer_size,
+                    std::optional<int64_t> read_bound = std::nullopt) {
+    buffer_size_ = buffer_size;
+    data_.clear();
+    data_.reserve(data_size);
+    while (data_.size() < static_cast<size_t>(data_size)) {
+      data_.append(kExample1, /*pos*/ 0,
+                   /*count*/ static_cast<size_t>(data_size) - data_.size());
+    }
+    EXPECT_EQ(data_.size(), data_size);
+
+    // Clear dependent streams in reverse order
+    buffered_.reset();
+    tracked_.reset();
+    // Read from a copy of data, so that data_.substr() below doesn't 
invalidate it
+    raw_ = BufferReader::FromString(data_);
+    tracked_ = TrackedRandomAccessFile::Make(raw_.get());
+    EXPECT_OK_AND_ASSIGN(buffered_,
+                         BufferedInputStream::Create(buffer_size, 
default_memory_pool(),
+                                                     tracked_, 
read_bound.value_or(-1)));
+    if (read_bound) {
+      data_ = data_.substr(0, *read_bound);
+    }
+  }
+
+  void TestReads() {
+    std::default_random_engine gen(/*seed*/ 42);
+    std::uniform_int_distribution<int64_t> read_len_dist(1, 80);
+
+    int64_t pos = 0;
+    const int64_t size = static_cast<int64_t>(data_.size());
+    while (pos < size) {
+      const int64_t read_len = read_len_dist(gen);
+      ASSERT_OK_AND_ASSIGN(auto buf, buffered_->Read(read_len));
+      AssertBufferEqual(*buf, std::string_view(data_).substr(pos, read_len));
+      pos += buf->size();
+    }
+
+    // EOF was reached
+    ASSERT_EQ(pos, size);
+
+    // Number of reads should not be excessive given the buffer size
+    int64_t max_reads = (size + buffer_size_ - 1) / buffer_size_;
+    EXPECT_LE(tracked_->num_reads(), max_reads);
+    const auto& read_ranges = tracked_->get_read_ranges();
+    pos = 0;
+    for (size_t i = 0; i < read_ranges.size(); ++i) {
+      const auto cur = read_ranges[i];
+      ASSERT_EQ(cur.offset, pos);
+      // Never read less than buffer size bytes, except perhaps if fewer bytes 
remain
+      ASSERT_GE(cur.length, std::min(buffer_size_, size - pos));
+      // Bump actual position in the file
+      pos += std::min(cur.length, size - pos);
+    }
+
+    // Further reads return an empty buffer
+    ASSERT_OK_AND_ASSIGN(auto buf, buffered_->Read(buffer_size_ - 1));
+    AssertBufferEqual(*buf, "");
+    ASSERT_OK_AND_ASSIGN(buf, buffered_->Read(buffer_size_ + 1));
+    AssertBufferEqual(*buf, "");
+  }
+
+ protected:
+  std::string data_;
+  int64_t buffer_size_;
+  std::shared_ptr<RandomAccessFile> raw_;
+  std::shared_ptr<TrackedRandomAccessFile> tracked_;
+  std::shared_ptr<BufferedInputStream> buffered_;
+};
+
+TEST_F(TestBufferedInputStreamRandom, ReadsWithoutReadBound) {
+  constexpr int kNumIters = 10;
+
+  for (int i = 0; i < kNumIters; ++i) {
+    MakeBuffered(/*data_size=*/3000, /*buffer_size=*/11);
+    TestReads();
+  }
+}
+
+TEST_F(TestBufferedInputStreamRandom, ReadsWithReadBound) {
+  constexpr int kNumIters = 10;
+
+  for (int i = 0; i < kNumIters; ++i) {
+    MakeBuffered(/*data_size=*/3000, /*buffer_size=*/11, /*read_bound=*/2000);
+    TestReads();
+  }
+}
+
+}  // namespace arrow::io
diff --git a/cpp/src/arrow/testing/gtest_util.cc 
b/cpp/src/arrow/testing/gtest_util.cc
index c6de6b02fc..a6dc1d59c6 100644
--- a/cpp/src/arrow/testing/gtest_util.cc
+++ b/cpp/src/arrow/testing/gtest_util.cc
@@ -214,7 +214,7 @@ void AssertBufferEqual(const Buffer& buffer, const 
std::vector<uint8_t>& expecte
   }
 }
 
-void AssertBufferEqual(const Buffer& buffer, const std::string& expected) {
+void AssertBufferEqual(const Buffer& buffer, std::string_view expected) {
   ASSERT_EQ(static_cast<size_t>(buffer.size()), expected.length())
       << "Mismatching buffer size";
   const uint8_t* buffer_data = buffer.data();
diff --git a/cpp/src/arrow/testing/gtest_util.h 
b/cpp/src/arrow/testing/gtest_util.h
index e3838bfe4b..bb462af86a 100644
--- a/cpp/src/arrow/testing/gtest_util.h
+++ b/cpp/src/arrow/testing/gtest_util.h
@@ -232,7 +232,7 @@ ARROW_TESTING_EXPORT void AssertChunkedApproxEquivalent(
 ARROW_TESTING_EXPORT void AssertBufferEqual(const Buffer& buffer,
                                             const std::vector<uint8_t>& 
expected);
 ARROW_TESTING_EXPORT void AssertBufferEqual(const Buffer& buffer,
-                                            const std::string& expected);
+                                            std::string_view expected);
 ARROW_TESTING_EXPORT void AssertBufferEqual(const Buffer& buffer, const 
Buffer& expected);
 
 ARROW_TESTING_EXPORT void AssertTypeEqual(const DataType& lhs, const DataType& 
rhs,

Reply via email to