This is an automated email from the ASF dual-hosted git repository.

apitrou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 9f9563f  ARROW-3170: [C++] Experimental readahead spooler
9f9563f is described below

commit 9f9563f1680522d4fc4939da9bd153687d6eb763
Author: Antoine Pitrou <[email protected]>
AuthorDate: Mon Sep 10 18:42:45 2018 +0200

    ARROW-3170: [C++] Experimental readahead spooler
    
    Author: Antoine Pitrou <[email protected]>
    
    Closes #2492 from pitrou/ARROW-501-readahead and squashes the following 
commits:
    
    cbbd3db8 <Antoine Pitrou> Improve paddings description
    4f8f70ff <Antoine Pitrou> ARROW-501:  Experimental readahead spooler
---
 cpp/src/arrow/CMakeLists.txt          |   1 +
 cpp/src/arrow/io/CMakeLists.txt       |   2 +
 cpp/src/arrow/io/io-readahead-test.cc | 234 ++++++++++++++++++++++++++++++++++
 cpp/src/arrow/io/readahead.cc         | 221 ++++++++++++++++++++++++++++++++
 cpp/src/arrow/io/readahead.h          |  96 ++++++++++++++
 cpp/src/arrow/test-util.h             |  15 ++-
 cpp/src/plasma/test/client_tests.cc   |   2 +-
 7 files changed, 569 insertions(+), 2 deletions(-)

diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt
index b46f35c..4937378 100644
--- a/cpp/src/arrow/CMakeLists.txt
+++ b/cpp/src/arrow/CMakeLists.txt
@@ -34,6 +34,7 @@ set(ARROW_SRCS
   io/file.cc
   io/interfaces.cc
   io/memory.cc
+  io/readahead.cc
 
   util/bit-util.cc
   util/compression.cc
diff --git a/cpp/src/arrow/io/CMakeLists.txt b/cpp/src/arrow/io/CMakeLists.txt
index 65221d0..ff6b854 100644
--- a/cpp/src/arrow/io/CMakeLists.txt
+++ b/cpp/src/arrow/io/CMakeLists.txt
@@ -26,6 +26,7 @@ if (ARROW_HDFS AND NOT ARROW_BOOST_HEADER_ONLY)
 endif()
 
 ADD_ARROW_TEST(io-memory-test)
+ADD_ARROW_TEST(io-readahead-test)
 
 ADD_ARROW_BENCHMARK(io-file-benchmark)
 ADD_ARROW_BENCHMARK(io-memory-benchmark)
@@ -38,4 +39,5 @@ install(FILES
   hdfs.h
   interfaces.h
   memory.h
+  readahead.h
   DESTINATION "${CMAKE_INSTALL_INCLUDEDIR}/arrow/io")
diff --git a/cpp/src/arrow/io/io-readahead-test.cc 
b/cpp/src/arrow/io/io-readahead-test.cc
new file mode 100644
index 0000000..fa0a138
--- /dev/null
+++ b/cpp/src/arrow/io/io-readahead-test.cc
@@ -0,0 +1,234 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <chrono>
+#include <memory>
+#include <string>
+#include <thread>
+#include <vector>
+
+#include <gtest/gtest.h>
+
+#include "arrow/buffer.h"
+#include "arrow/io/interfaces.h"
+#include "arrow/io/memory.h"
+#include "arrow/io/readahead.h"
+#include "arrow/memory_pool.h"
+#include "arrow/status.h"
+#include "arrow/test-util.h"
+#include "arrow/util/checked_cast.h"
+
+namespace arrow {
+namespace io {
+namespace internal {
+
+static void sleep_for(double seconds) {
+  std::this_thread::sleep_for(
+      std::chrono::nanoseconds(static_cast<int64_t>(seconds * 1e9)));
+}
+
+static void busy_wait(double seconds, std::function<bool()> predicate) {
+  const double period = 0.001;
+  for (int i = 0; !predicate() && i * period < seconds; ++i) {
+    sleep_for(period);
+  }
+}
+
+std::shared_ptr<BufferReader> DataReader(const std::string& data) {
+  std::shared_ptr<Buffer> buffer;
+  ABORT_NOT_OK(AllocateBuffer(data.length(), &buffer));
+  memcpy(buffer->mutable_data(), data.data(), data.length());
+  return std::make_shared<BufferReader>(std::move(buffer));
+}
+
+static int64_t WaitForPosition(const RandomAccessFile& file, int64_t expected,
+                               double seconds = 0.2) {
+  int64_t pos = -1;
+  busy_wait(seconds, [&]() -> bool {
+    ABORT_NOT_OK(file.Tell(&pos));
+    return pos >= expected;
+  });
+  return pos;
+}
+
+static void AssertEventualPosition(const RandomAccessFile& file, int64_t 
expected) {
+  int64_t pos = WaitForPosition(file, expected);
+  ASSERT_EQ(pos, expected) << "File didn't reach expected position";
+}
+
+static void AssertPosition(const RandomAccessFile& file, int64_t expected) {
+  int64_t pos = -1;
+  ABORT_NOT_OK(file.Tell(&pos));
+  ASSERT_EQ(pos, expected) << "File didn't reach expected position";
+}
+
+template <typename Expected>
+static void AssertReadaheadBuffer(const ReadaheadBuffer& buf,
+                                  std::set<int64_t> left_paddings,
+                                  std::set<int64_t> right_paddings,
+                                  const Expected& expected_data) {
+  ASSERT_TRUE(left_paddings.count(buf.left_padding))
+      << "Left padding (" << buf.left_padding << ") not amongst expected 
values";
+  ASSERT_TRUE(right_paddings.count(buf.right_padding))
+      << "Right padding (" << buf.right_padding << ") not amongst expected 
values";
+  auto actual_data =
+      SliceBuffer(buf.buffer, buf.left_padding,
+                  buf.buffer->size() - buf.left_padding - buf.right_padding);
+  AssertBufferEqual(*actual_data, expected_data);
+}
+
+static void AssertReadaheadBufferEOF(const ReadaheadBuffer& buf) {
+  ASSERT_EQ(buf.buffer.get(), nullptr) << "Expected EOF signalled by null 
buffer pointer";
+}
+
+TEST(ReadaheadSpooler, BasicReads) {
+  // Test basic reads
+  auto data_reader = DataReader("0123456789");
+  ReadaheadSpooler spooler(data_reader, 2, 3);
+  ReadaheadBuffer buf;
+
+  AssertEventualPosition(*data_reader, 3 * 2);
+
+  ASSERT_OK(spooler.Read(&buf));
+  AssertReadaheadBuffer(buf, {0}, {0}, "01");
+  AssertEventualPosition(*data_reader, 4 * 2);
+  ASSERT_OK(spooler.Read(&buf));
+  AssertReadaheadBuffer(buf, {0}, {0}, "23");
+  AssertEventualPosition(*data_reader, 5 * 2);
+  ASSERT_OK(spooler.Read(&buf));
+  AssertReadaheadBuffer(buf, {0}, {0}, "45");
+  ASSERT_OK(spooler.Read(&buf));
+  AssertReadaheadBuffer(buf, {0}, {0}, "67");
+  ASSERT_OK(spooler.Read(&buf));
+  AssertReadaheadBuffer(buf, {0}, {0}, "89");
+  ASSERT_OK(spooler.Read(&buf));
+  AssertReadaheadBufferEOF(buf);
+  ASSERT_OK(spooler.Read(&buf));
+  AssertReadaheadBufferEOF(buf);
+}
+
+TEST(ReadaheadSpooler, ShortReadAtEnd) {
+  auto data_reader = DataReader("01234");
+  ReadaheadSpooler spooler(data_reader, 3, 2);
+  ReadaheadBuffer buf;
+
+  AssertEventualPosition(*data_reader, 5);
+
+  ASSERT_OK(spooler.Read(&buf));
+  AssertReadaheadBuffer(buf, {0}, {0}, "012");
+  ASSERT_OK(spooler.Read(&buf));
+  AssertReadaheadBuffer(buf, {0}, {0}, "34");
+  ASSERT_OK(spooler.Read(&buf));
+  AssertReadaheadBufferEOF(buf);
+}
+
+TEST(ReadaheadSpooler, Close) {
+  // Closing should stop reads
+  auto data_reader = DataReader("0123456789");
+  ReadaheadSpooler spooler(data_reader, 2, 2);
+  ReadaheadBuffer buf;
+
+  AssertEventualPosition(*data_reader, 2 * 2);
+  ASSERT_OK(spooler.Close());
+
+  // XXX not sure this makes sense
+  ASSERT_OK(spooler.Read(&buf));
+  AssertReadaheadBuffer(buf, {0}, {0}, "01");
+  ASSERT_OK(spooler.Read(&buf));
+  AssertReadaheadBuffer(buf, {0}, {0}, "23");
+  ASSERT_OK(spooler.Read(&buf));
+  AssertReadaheadBufferEOF(buf);
+  AssertPosition(*data_reader, 2 * 2);
+
+  // Idempotency
+  ASSERT_OK(spooler.Close());
+}
+
+TEST(ReadaheadSpooler, Paddings) {
+  auto data_reader = DataReader("0123456789");
+  ReadaheadSpooler spooler(data_reader, 2, 2, 1 /* left_padding */,
+                           4 /* right_padding */);
+  ReadaheadBuffer buf;
+
+  AssertEventualPosition(*data_reader, 2 * 2);
+  ASSERT_EQ(spooler.GetLeftPadding(), 1);
+  ASSERT_EQ(spooler.GetRightPadding(), 4);
+  spooler.SetLeftPadding(3);
+  spooler.SetRightPadding(2);
+  ASSERT_EQ(spooler.GetLeftPadding(), 3);
+  ASSERT_EQ(spooler.GetRightPadding(), 2);
+
+  ASSERT_OK(spooler.Read(&buf));
+  AssertReadaheadBuffer(buf, {1}, {4}, "01");
+  ASSERT_OK(spooler.Read(&buf));
+  AssertReadaheadBuffer(buf, {1}, {4}, "23");
+  ASSERT_OK(spooler.Read(&buf));
+  AssertReadaheadBuffer(buf, {3}, {2}, "45");
+  ASSERT_OK(spooler.Read(&buf));
+  AssertReadaheadBuffer(buf, {3}, {2}, "67");
+  spooler.SetLeftPadding(4);
+  ASSERT_OK(spooler.Read(&buf));
+  AssertReadaheadBuffer(buf, {3, 4}, {2}, "89");
+  ASSERT_OK(spooler.Read(&buf));
+  AssertReadaheadBufferEOF(buf);
+}
+
+TEST(ReadaheadSpooler, StressReads) {
+  // NBYTES % READ_SIZE != 0 ensures a short read at end
+#if defined(ARROW_VALGRIND)
+  const int64_t NBYTES = 101;
+#else
+  const int64_t NBYTES = 50001;
+#endif
+  const int64_t READ_SIZE = 2;
+
+  std::shared_ptr<ResizableBuffer> data;
+  ASSERT_OK(MakeRandomByteBuffer(NBYTES, default_memory_pool(), &data));
+  auto data_reader = std::make_shared<BufferReader>(data);
+
+  ReadaheadSpooler spooler(data_reader, READ_SIZE, 7);
+  int64_t pos = 0;
+  std::vector<ReadaheadBuffer> readahead_buffers;
+
+  // Stress Read() calls while the background thread is reading ahead
+  while (pos < NBYTES) {
+    ReadaheadBuffer buf;
+    ASSERT_OK(spooler.Read(&buf));
+    ASSERT_NE(buf.buffer.get(), nullptr) << "Got premature EOF at index " << 
pos;
+    pos += buf.buffer->size() - buf.left_padding - buf.right_padding;
+    readahead_buffers.push_back(std::move(buf));
+  }
+  // At EOF
+  {
+    ReadaheadBuffer buf;
+    ASSERT_OK(spooler.Read(&buf));
+    AssertReadaheadBufferEOF(buf);
+  }
+
+  pos = 0;
+  for (const auto& buf : readahead_buffers) {
+    auto expected_data = SliceBuffer(data, pos, std::min(READ_SIZE, NBYTES - 
pos));
+    AssertReadaheadBuffer(buf, {0}, {0}, *expected_data);
+    pos += expected_data->size();
+  }
+  // Got exactly the total bytes length
+  ASSERT_EQ(pos, NBYTES);
+}
+
+}  // namespace internal
+}  // namespace io
+}  // namespace arrow
diff --git a/cpp/src/arrow/io/readahead.cc b/cpp/src/arrow/io/readahead.cc
new file mode 100644
index 0000000..c21e45d
--- /dev/null
+++ b/cpp/src/arrow/io/readahead.cc
@@ -0,0 +1,221 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/io/readahead.h"
+#include "arrow/buffer.h"
+#include "arrow/status.h"
+#include "arrow/util/logging.h"
+
+#include <condition_variable>
+#include <cstring>
+#include <deque>
+#include <memory>
+#include <mutex>
+#include <string>
+#include <thread>
+#include <utility>
+
+namespace arrow {
+namespace io {
+namespace internal {
+
+// ----------------------------------------------------------------------
+// ReadaheadSpooler implementation
+
+class ReadaheadSpooler::Impl {
+ public:
+  Impl(MemoryPool* pool, std::shared_ptr<InputStream> raw, int64_t read_size,
+       int32_t readahead_queue_size, int64_t left_padding, int64_t 
right_padding)
+      : pool_(pool),
+        raw_(raw),
+        read_size_(read_size),
+        readahead_queue_size_(readahead_queue_size),
+        left_padding_(left_padding),
+        right_padding_(right_padding) {
+    DCHECK_NE(raw, nullptr);
+    DCHECK_GT(read_size, 0);
+    DCHECK_GT(readahead_queue_size, 0);
+    io_worker_ = std::thread([&]() { WorkerLoop(); });
+  }
+
+  ~Impl() { ARROW_UNUSED(Close()); }
+
+  Status Close() {
+    std::unique_lock<std::mutex> lock(mutex_);
+    please_close_ = true;
+    io_wakeup_.notify_one();
+    // Wait for IO thread to finish
+    if (io_worker_.joinable()) {
+      lock.unlock();
+      io_worker_.join();
+      lock.lock();
+    }
+    return raw_->Close();
+  }
+
+  Status Read(ReadaheadBuffer* out) {
+    std::unique_lock<std::mutex> lock(mutex_);
+    while (true) {
+      // Drain queue before querying other flags
+      if (buffer_queue_.size() > 0) {
+        *out = std::move(buffer_queue_.front());
+        DCHECK_NE(out->buffer, nullptr);
+        buffer_queue_.pop_front();
+        // Need to fill up queue again
+        io_wakeup_.notify_one();
+        return Status::OK();
+      }
+      if (!read_status_.ok()) {
+        // Got a read error, bail out
+        return read_status_;
+      }
+      if (eof_) {
+        out->buffer.reset();
+        return Status::OK();
+      }
+      // Readahead queue is empty and we're not closed yet, wait for more I/O
+      io_progress_.wait(lock);
+    }
+  }
+
+  int64_t left_padding() {
+    std::unique_lock<std::mutex> lock(mutex_);
+    return left_padding_;
+  }
+
+  void left_padding(int64_t size) {
+    std::unique_lock<std::mutex> lock(mutex_);
+    left_padding_ = size;
+  }
+
+  int64_t right_padding() {
+    std::unique_lock<std::mutex> lock(mutex_);
+    return right_padding_;
+  }
+
+  void right_padding(int64_t size) {
+    std::unique_lock<std::mutex> lock(mutex_);
+    right_padding_ = size;
+  }
+
+ protected:
+  // The background thread's main function
+  void WorkerLoop() {
+    std::unique_lock<std::mutex> lock(mutex_);
+    Status st;
+
+    while (true) {
+      if (please_close_) {
+        goto eof;
+      }
+      // Fill up readahead queue until desired size
+      while (buffer_queue_.size() < 
static_cast<size_t>(readahead_queue_size_)) {
+        ReadaheadBuffer buf = {nullptr, left_padding_, right_padding_};
+        lock.unlock();
+        Status st = ReadOneBufferUnlocked(&buf);
+        lock.lock();
+        if (!st.ok()) {
+          read_status_ = st;
+          goto error;
+        }
+        // Close() could have been called while unlocked above
+        if (please_close_) {
+          goto eof;
+        }
+        // Got empty read?
+        if (buf.buffer->size() == buf.left_padding + buf.right_padding) {
+          goto eof;
+        }
+        buffer_queue_.push_back(std::move(buf));
+        io_progress_.notify_one();
+      }
+      // Wait for Close() or Read() call
+      io_wakeup_.wait(lock);
+    }
+  eof:
+    eof_ = true;
+  error:
+    // Make sure any pending Read() doesn't block indefinitely
+    io_progress_.notify_one();
+  }
+
+  Status ReadOneBufferUnlocked(ReadaheadBuffer* buf) {
+    // Note that left_padding_ and right_padding_ may be modified while 
unlocked
+    std::shared_ptr<ResizableBuffer> buffer;
+    int64_t bytes_read;
+    RETURN_NOT_OK(AllocateResizableBuffer(
+        pool_, read_size_ + buf->left_padding + buf->right_padding, &buffer));
+    RETURN_NOT_OK(
+        raw_->Read(read_size_, &bytes_read, buffer->mutable_data() + 
buf->left_padding));
+    if (bytes_read < read_size_) {
+      // Got a short read
+      RETURN_NOT_OK(buffer->Resize(bytes_read + buf->left_padding + 
buf->right_padding));
+    }
+    // Zero padding areas
+    memset(buffer->mutable_data(), 0, buf->left_padding);
+    memset(buffer->mutable_data() + bytes_read + buf->left_padding, 0,
+           buf->right_padding);
+    buf->buffer = std::move(buffer);
+    return Status::OK();
+  }
+
+  MemoryPool* pool_;
+  std::shared_ptr<InputStream> raw_;
+  int64_t read_size_;
+  int32_t readahead_queue_size_;
+  int64_t left_padding_ = 0;
+  int64_t right_padding_ = 0;
+
+  std::mutex mutex_;
+  std::condition_variable io_wakeup_;
+  std::condition_variable io_progress_;
+  std::thread io_worker_;
+  bool please_close_ = false;
+  bool eof_ = false;
+  std::deque<ReadaheadBuffer> buffer_queue_;
+  Status read_status_;
+};
+
+ReadaheadSpooler::ReadaheadSpooler(MemoryPool* pool, 
std::shared_ptr<InputStream> raw,
+                                   int64_t read_size, int32_t 
readahead_queue_size,
+                                   int64_t left_padding, int64_t right_padding)
+    : impl_(new ReadaheadSpooler::Impl(pool, raw, read_size, 
readahead_queue_size,
+                                       left_padding, right_padding)) {}
+
+ReadaheadSpooler::ReadaheadSpooler(std::shared_ptr<InputStream> raw, int64_t 
read_size,
+                                   int32_t readahead_queue_size, int64_t 
left_padding,
+                                   int64_t right_padding)
+    : ReadaheadSpooler(default_memory_pool(), raw, read_size, 
readahead_queue_size,
+                       left_padding, right_padding) {}
+
+int64_t ReadaheadSpooler::GetLeftPadding() { return impl_->left_padding(); }
+
+void ReadaheadSpooler::SetLeftPadding(int64_t size) { 
impl_->left_padding(size); }
+
+int64_t ReadaheadSpooler::GetRightPadding() { return impl_->right_padding(); }
+
+void ReadaheadSpooler::SetRightPadding(int64_t size) { 
impl_->right_padding(size); }
+
+Status ReadaheadSpooler::Close() { return impl_->Close(); }
+
+Status ReadaheadSpooler::Read(ReadaheadBuffer* out) { return impl_->Read(out); 
}
+
+ReadaheadSpooler::~ReadaheadSpooler() {}
+
+}  // namespace internal
+}  // namespace io
+}  // namespace arrow
diff --git a/cpp/src/arrow/io/readahead.h b/cpp/src/arrow/io/readahead.h
new file mode 100644
index 0000000..d7ac509
--- /dev/null
+++ b/cpp/src/arrow/io/readahead.h
@@ -0,0 +1,96 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef ARROW_IO_READAHEAD_H
+#define ARROW_IO_READAHEAD_H
+
+#include <memory>
+#include <string>
+
+#include "arrow/io/interfaces.h"
+#include "arrow/util/visibility.h"
+
+namespace arrow {
+
+class MemoryPool;
+class ResizableBuffer;
+class Status;
+
+namespace io {
+namespace internal {
+
+struct ARROW_EXPORT ReadaheadBuffer {
+  std::shared_ptr<ResizableBuffer> buffer;
+  int64_t left_padding;
+  int64_t right_padding;
+};
+
+class ARROW_EXPORT ReadaheadSpooler {
+ public:
+  /// \brief EXPERIMENTAL: Create a readahead spooler wrapping the given input 
stream.
+  ///
+  /// The spooler launches a background thread that reads up to a given number
+  /// of fixed-size blocks in advance from the underlying stream.
+  /// The buffers returned by Read() will be padded at the beginning and the 
end
+  /// with the configured amount of (zeroed) bytes.
+  ReadaheadSpooler(MemoryPool* pool, std::shared_ptr<InputStream> raw,
+                   int64_t read_size = kDefaultReadSize, int32_t 
readahead_queue_size = 1,
+                   int64_t left_padding = 0, int64_t right_padding = 0);
+
+  explicit ReadaheadSpooler(std::shared_ptr<InputStream> raw,
+                            int64_t read_size = kDefaultReadSize,
+                            int32_t readahead_queue_size = 1, int64_t 
left_padding = 0,
+                            int64_t right_padding = 0);
+
+  ~ReadaheadSpooler();
+
+  /// Configure zero-padding at beginning and end of buffers (default 0 bytes).
+  /// The buffers returned by Read() will be padded at the beginning and the 
end
+  /// with the configured amount of (zeroed) bytes.
+  /// Note that, as reading happens in background and in advance, changing the
+  /// configured values might not affect Read() results immediately.
+  int64_t GetLeftPadding();
+  void SetLeftPadding(int64_t size);
+
+  int64_t GetRightPadding();
+  void SetRightPadding(int64_t size);
+
+  /// \brief Close the spooler.  This implicitly closes the underlying input 
stream.
+  Status Close();
+
+  /// \brief Read a buffer from the queue.
+  ///
+  /// If the buffer pointer in the ReadaheadBuffer is null, then EOF was
+  /// reached and/or the spooler was explicitly closed.
+  /// Otherwise, the buffer will contain at most read_size bytes in addition
+  /// to the configured padding (short reads are possible at the end of a 
file).
+  // How do we allow reusing the buffer in ReadaheadBuffer? perhaps by using
+  // a caching memory pool?
+  Status Read(ReadaheadBuffer* out);
+
+ private:
+  static constexpr int64_t kDefaultReadSize = 1 << 20;  // 1 MB
+
+  class ARROW_NO_EXPORT Impl;
+  std::unique_ptr<Impl> impl_;
+};
+
+}  // namespace internal
+}  // namespace io
+}  // namespace arrow
+
+#endif  // ARROW_IO_READAHEAD_H
diff --git a/cpp/src/arrow/test-util.h b/cpp/src/arrow/test-util.h
index 13c6a61..dfc1ab5 100644
--- a/cpp/src/arrow/test-util.h
+++ b/cpp/src/arrow/test-util.h
@@ -330,13 +330,26 @@ void AssertChunkedEqual(const ChunkedArray& expected, 
const ChunkedArray& actual
 }
 
 void AssertBufferEqual(const Buffer& buffer, const std::vector<uint8_t>& 
expected) {
-  ASSERT_EQ(buffer.size(), expected.size());
+  ASSERT_EQ(buffer.size(), expected.size()) << "Mismatching buffer size";
   const uint8_t* buffer_data = buffer.data();
   for (size_t i = 0; i < expected.size(); ++i) {
     ASSERT_EQ(buffer_data[i], expected[i]);
   }
 }
 
+void AssertBufferEqual(const Buffer& buffer, const std::string& expected) {
+  ASSERT_EQ(buffer.size(), expected.length()) << "Mismatching buffer size";
+  const uint8_t* buffer_data = buffer.data();
+  for (size_t i = 0; i < expected.size(); ++i) {
+    ASSERT_EQ(buffer_data[i], expected[i]);
+  }
+}
+
+void AssertBufferEqual(const Buffer& buffer, const Buffer& expected) {
+  ASSERT_EQ(buffer.size(), expected.size()) << "Mismatching buffer size";
+  ASSERT_TRUE(buffer.Equals(expected));
+}
+
 void PrintColumn(const Column& col, std::stringstream* ss) {
   const ChunkedArray& carr = *col.data();
   for (int i = 0; i < carr.num_chunks(); ++i) {
diff --git a/cpp/src/plasma/test/client_tests.cc 
b/cpp/src/plasma/test/client_tests.cc
index a2725a6..1ad6039 100644
--- a/cpp/src/plasma/test/client_tests.cc
+++ b/cpp/src/plasma/test/client_tests.cc
@@ -290,7 +290,7 @@ TEST_F(TestPlasmaStore, GetTest) {
   {
     auto metadata = object_buffers[0].metadata;
     object_buffers.clear();
-    ::arrow::AssertBufferEqual(*metadata, {42});
+    ::arrow::AssertBufferEqual(*metadata, std::string{42});
     ARROW_CHECK_OK(client_.FlushReleaseHistory());
     EXPECT_TRUE(client_.IsInUse(object_id));
   }

Reply via email to