This is an automated email from the ASF dual-hosted git repository.
apitrou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 9f9563f ARROW-3170: [C++] Experimental readahead spooler
9f9563f is described below
commit 9f9563f1680522d4fc4939da9bd153687d6eb763
Author: Antoine Pitrou <[email protected]>
AuthorDate: Mon Sep 10 18:42:45 2018 +0200
ARROW-3170: [C++] Experimental readahead spooler
Author: Antoine Pitrou <[email protected]>
Closes #2492 from pitrou/ARROW-501-readahead and squashes the following
commits:
cbbd3db8 <Antoine Pitrou> Improve paddings description
4f8f70ff <Antoine Pitrou> ARROW-501: Experimental readahead spooler
---
cpp/src/arrow/CMakeLists.txt | 1 +
cpp/src/arrow/io/CMakeLists.txt | 2 +
cpp/src/arrow/io/io-readahead-test.cc | 234 ++++++++++++++++++++++++++++++++++
cpp/src/arrow/io/readahead.cc | 221 ++++++++++++++++++++++++++++++++
cpp/src/arrow/io/readahead.h | 96 ++++++++++++++
cpp/src/arrow/test-util.h | 15 ++-
cpp/src/plasma/test/client_tests.cc | 2 +-
7 files changed, 569 insertions(+), 2 deletions(-)
diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt
index b46f35c..4937378 100644
--- a/cpp/src/arrow/CMakeLists.txt
+++ b/cpp/src/arrow/CMakeLists.txt
@@ -34,6 +34,7 @@ set(ARROW_SRCS
io/file.cc
io/interfaces.cc
io/memory.cc
+ io/readahead.cc
util/bit-util.cc
util/compression.cc
diff --git a/cpp/src/arrow/io/CMakeLists.txt b/cpp/src/arrow/io/CMakeLists.txt
index 65221d0..ff6b854 100644
--- a/cpp/src/arrow/io/CMakeLists.txt
+++ b/cpp/src/arrow/io/CMakeLists.txt
@@ -26,6 +26,7 @@ if (ARROW_HDFS AND NOT ARROW_BOOST_HEADER_ONLY)
endif()
ADD_ARROW_TEST(io-memory-test)
+ADD_ARROW_TEST(io-readahead-test)
ADD_ARROW_BENCHMARK(io-file-benchmark)
ADD_ARROW_BENCHMARK(io-memory-benchmark)
@@ -38,4 +39,5 @@ install(FILES
hdfs.h
interfaces.h
memory.h
+ readahead.h
DESTINATION "${CMAKE_INSTALL_INCLUDEDIR}/arrow/io")
diff --git a/cpp/src/arrow/io/io-readahead-test.cc
b/cpp/src/arrow/io/io-readahead-test.cc
new file mode 100644
index 0000000..fa0a138
--- /dev/null
+++ b/cpp/src/arrow/io/io-readahead-test.cc
@@ -0,0 +1,234 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <chrono>
+#include <memory>
+#include <string>
+#include <thread>
+#include <vector>
+
+#include <gtest/gtest.h>
+
+#include "arrow/buffer.h"
+#include "arrow/io/interfaces.h"
+#include "arrow/io/memory.h"
+#include "arrow/io/readahead.h"
+#include "arrow/memory_pool.h"
+#include "arrow/status.h"
+#include "arrow/test-util.h"
+#include "arrow/util/checked_cast.h"
+
+namespace arrow {
+namespace io {
+namespace internal {
+
+static void sleep_for(double seconds) {
+ std::this_thread::sleep_for(
+ std::chrono::nanoseconds(static_cast<int64_t>(seconds * 1e9)));
+}
+
+static void busy_wait(double seconds, std::function<bool()> predicate) {
+ const double period = 0.001;
+ for (int i = 0; !predicate() && i * period < seconds; ++i) {
+ sleep_for(period);
+ }
+}
+
+std::shared_ptr<BufferReader> DataReader(const std::string& data) {
+ std::shared_ptr<Buffer> buffer;
+ ABORT_NOT_OK(AllocateBuffer(data.length(), &buffer));
+ memcpy(buffer->mutable_data(), data.data(), data.length());
+ return std::make_shared<BufferReader>(std::move(buffer));
+}
+
+static int64_t WaitForPosition(const RandomAccessFile& file, int64_t expected,
+ double seconds = 0.2) {
+ int64_t pos = -1;
+ busy_wait(seconds, [&]() -> bool {
+ ABORT_NOT_OK(file.Tell(&pos));
+ return pos >= expected;
+ });
+ return pos;
+}
+
+static void AssertEventualPosition(const RandomAccessFile& file, int64_t
expected) {
+ int64_t pos = WaitForPosition(file, expected);
+ ASSERT_EQ(pos, expected) << "File didn't reach expected position";
+}
+
+static void AssertPosition(const RandomAccessFile& file, int64_t expected) {
+ int64_t pos = -1;
+ ABORT_NOT_OK(file.Tell(&pos));
+ ASSERT_EQ(pos, expected) << "File didn't reach expected position";
+}
+
+template <typename Expected>
+static void AssertReadaheadBuffer(const ReadaheadBuffer& buf,
+ std::set<int64_t> left_paddings,
+ std::set<int64_t> right_paddings,
+ const Expected& expected_data) {
+ ASSERT_TRUE(left_paddings.count(buf.left_padding))
+ << "Left padding (" << buf.left_padding << ") not amongst expected
values";
+ ASSERT_TRUE(right_paddings.count(buf.right_padding))
+ << "Right padding (" << buf.right_padding << ") not amongst expected
values";
+ auto actual_data =
+ SliceBuffer(buf.buffer, buf.left_padding,
+ buf.buffer->size() - buf.left_padding - buf.right_padding);
+ AssertBufferEqual(*actual_data, expected_data);
+}
+
+static void AssertReadaheadBufferEOF(const ReadaheadBuffer& buf) {
+ ASSERT_EQ(buf.buffer.get(), nullptr) << "Expected EOF signalled by null
buffer pointer";
+}
+
+TEST(ReadaheadSpooler, BasicReads) {
+ // Test basic reads
+ auto data_reader = DataReader("0123456789");
+ ReadaheadSpooler spooler(data_reader, 2, 3);
+ ReadaheadBuffer buf;
+
+ AssertEventualPosition(*data_reader, 3 * 2);
+
+ ASSERT_OK(spooler.Read(&buf));
+ AssertReadaheadBuffer(buf, {0}, {0}, "01");
+ AssertEventualPosition(*data_reader, 4 * 2);
+ ASSERT_OK(spooler.Read(&buf));
+ AssertReadaheadBuffer(buf, {0}, {0}, "23");
+ AssertEventualPosition(*data_reader, 5 * 2);
+ ASSERT_OK(spooler.Read(&buf));
+ AssertReadaheadBuffer(buf, {0}, {0}, "45");
+ ASSERT_OK(spooler.Read(&buf));
+ AssertReadaheadBuffer(buf, {0}, {0}, "67");
+ ASSERT_OK(spooler.Read(&buf));
+ AssertReadaheadBuffer(buf, {0}, {0}, "89");
+ ASSERT_OK(spooler.Read(&buf));
+ AssertReadaheadBufferEOF(buf);
+ ASSERT_OK(spooler.Read(&buf));
+ AssertReadaheadBufferEOF(buf);
+}
+
+TEST(ReadaheadSpooler, ShortReadAtEnd) {
+ auto data_reader = DataReader("01234");
+ ReadaheadSpooler spooler(data_reader, 3, 2);
+ ReadaheadBuffer buf;
+
+ AssertEventualPosition(*data_reader, 5);
+
+ ASSERT_OK(spooler.Read(&buf));
+ AssertReadaheadBuffer(buf, {0}, {0}, "012");
+ ASSERT_OK(spooler.Read(&buf));
+ AssertReadaheadBuffer(buf, {0}, {0}, "34");
+ ASSERT_OK(spooler.Read(&buf));
+ AssertReadaheadBufferEOF(buf);
+}
+
+TEST(ReadaheadSpooler, Close) {
+ // Closing should stop reads
+ auto data_reader = DataReader("0123456789");
+ ReadaheadSpooler spooler(data_reader, 2, 2);
+ ReadaheadBuffer buf;
+
+ AssertEventualPosition(*data_reader, 2 * 2);
+ ASSERT_OK(spooler.Close());
+
+ // XXX not sure this makes sense
+ ASSERT_OK(spooler.Read(&buf));
+ AssertReadaheadBuffer(buf, {0}, {0}, "01");
+ ASSERT_OK(spooler.Read(&buf));
+ AssertReadaheadBuffer(buf, {0}, {0}, "23");
+ ASSERT_OK(spooler.Read(&buf));
+ AssertReadaheadBufferEOF(buf);
+ AssertPosition(*data_reader, 2 * 2);
+
+ // Idempotency
+ ASSERT_OK(spooler.Close());
+}
+
+TEST(ReadaheadSpooler, Paddings) {
+ auto data_reader = DataReader("0123456789");
+ ReadaheadSpooler spooler(data_reader, 2, 2, 1 /* left_padding */,
+ 4 /* right_padding */);
+ ReadaheadBuffer buf;
+
+ AssertEventualPosition(*data_reader, 2 * 2);
+ ASSERT_EQ(spooler.GetLeftPadding(), 1);
+ ASSERT_EQ(spooler.GetRightPadding(), 4);
+ spooler.SetLeftPadding(3);
+ spooler.SetRightPadding(2);
+ ASSERT_EQ(spooler.GetLeftPadding(), 3);
+ ASSERT_EQ(spooler.GetRightPadding(), 2);
+
+ ASSERT_OK(spooler.Read(&buf));
+ AssertReadaheadBuffer(buf, {1}, {4}, "01");
+ ASSERT_OK(spooler.Read(&buf));
+ AssertReadaheadBuffer(buf, {1}, {4}, "23");
+ ASSERT_OK(spooler.Read(&buf));
+ AssertReadaheadBuffer(buf, {3}, {2}, "45");
+ ASSERT_OK(spooler.Read(&buf));
+ AssertReadaheadBuffer(buf, {3}, {2}, "67");
+ spooler.SetLeftPadding(4);
+ ASSERT_OK(spooler.Read(&buf));
+ AssertReadaheadBuffer(buf, {3, 4}, {2}, "89");
+ ASSERT_OK(spooler.Read(&buf));
+ AssertReadaheadBufferEOF(buf);
+}
+
+TEST(ReadaheadSpooler, StressReads) {
+ // NBYTES % READ_SIZE != 0 ensures a short read at end
+#if defined(ARROW_VALGRIND)
+ const int64_t NBYTES = 101;
+#else
+ const int64_t NBYTES = 50001;
+#endif
+ const int64_t READ_SIZE = 2;
+
+ std::shared_ptr<ResizableBuffer> data;
+ ASSERT_OK(MakeRandomByteBuffer(NBYTES, default_memory_pool(), &data));
+ auto data_reader = std::make_shared<BufferReader>(data);
+
+ ReadaheadSpooler spooler(data_reader, READ_SIZE, 7);
+ int64_t pos = 0;
+ std::vector<ReadaheadBuffer> readahead_buffers;
+
+ // Stress Read() calls while the background thread is reading ahead
+ while (pos < NBYTES) {
+ ReadaheadBuffer buf;
+ ASSERT_OK(spooler.Read(&buf));
+ ASSERT_NE(buf.buffer.get(), nullptr) << "Got premature EOF at index " <<
pos;
+ pos += buf.buffer->size() - buf.left_padding - buf.right_padding;
+ readahead_buffers.push_back(std::move(buf));
+ }
+ // At EOF
+ {
+ ReadaheadBuffer buf;
+ ASSERT_OK(spooler.Read(&buf));
+ AssertReadaheadBufferEOF(buf);
+ }
+
+ pos = 0;
+ for (const auto& buf : readahead_buffers) {
+ auto expected_data = SliceBuffer(data, pos, std::min(READ_SIZE, NBYTES -
pos));
+ AssertReadaheadBuffer(buf, {0}, {0}, *expected_data);
+ pos += expected_data->size();
+ }
+ // Got exactly the total bytes length
+ ASSERT_EQ(pos, NBYTES);
+}
+
+} // namespace internal
+} // namespace io
+} // namespace arrow
diff --git a/cpp/src/arrow/io/readahead.cc b/cpp/src/arrow/io/readahead.cc
new file mode 100644
index 0000000..c21e45d
--- /dev/null
+++ b/cpp/src/arrow/io/readahead.cc
@@ -0,0 +1,221 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/io/readahead.h"
+#include "arrow/buffer.h"
+#include "arrow/status.h"
+#include "arrow/util/logging.h"
+
+#include <condition_variable>
+#include <cstring>
+#include <deque>
+#include <memory>
+#include <mutex>
+#include <string>
+#include <thread>
+#include <utility>
+
+namespace arrow {
+namespace io {
+namespace internal {
+
+// ----------------------------------------------------------------------
+// ReadaheadSpooler implementation
+
+class ReadaheadSpooler::Impl {
+ public:
+ Impl(MemoryPool* pool, std::shared_ptr<InputStream> raw, int64_t read_size,
+ int32_t readahead_queue_size, int64_t left_padding, int64_t
right_padding)
+ : pool_(pool),
+ raw_(raw),
+ read_size_(read_size),
+ readahead_queue_size_(readahead_queue_size),
+ left_padding_(left_padding),
+ right_padding_(right_padding) {
+ DCHECK_NE(raw, nullptr);
+ DCHECK_GT(read_size, 0);
+ DCHECK_GT(readahead_queue_size, 0);
+ io_worker_ = std::thread([&]() { WorkerLoop(); });
+ }
+
+ ~Impl() { ARROW_UNUSED(Close()); }
+
+ Status Close() {
+ std::unique_lock<std::mutex> lock(mutex_);
+ please_close_ = true;
+ io_wakeup_.notify_one();
+ // Wait for IO thread to finish
+ if (io_worker_.joinable()) {
+ lock.unlock();
+ io_worker_.join();
+ lock.lock();
+ }
+ return raw_->Close();
+ }
+
+ Status Read(ReadaheadBuffer* out) {
+ std::unique_lock<std::mutex> lock(mutex_);
+ while (true) {
+ // Drain queue before querying other flags
+ if (buffer_queue_.size() > 0) {
+ *out = std::move(buffer_queue_.front());
+ DCHECK_NE(out->buffer, nullptr);
+ buffer_queue_.pop_front();
+ // Need to fill up queue again
+ io_wakeup_.notify_one();
+ return Status::OK();
+ }
+ if (!read_status_.ok()) {
+ // Got a read error, bail out
+ return read_status_;
+ }
+ if (eof_) {
+ out->buffer.reset();
+ return Status::OK();
+ }
+ // Readahead queue is empty and we're not closed yet, wait for more I/O
+ io_progress_.wait(lock);
+ }
+ }
+
+ int64_t left_padding() {
+ std::unique_lock<std::mutex> lock(mutex_);
+ return left_padding_;
+ }
+
+ void left_padding(int64_t size) {
+ std::unique_lock<std::mutex> lock(mutex_);
+ left_padding_ = size;
+ }
+
+ int64_t right_padding() {
+ std::unique_lock<std::mutex> lock(mutex_);
+ return right_padding_;
+ }
+
+ void right_padding(int64_t size) {
+ std::unique_lock<std::mutex> lock(mutex_);
+ right_padding_ = size;
+ }
+
+ protected:
+ // The background thread's main function
+ void WorkerLoop() {
+ std::unique_lock<std::mutex> lock(mutex_);
+ Status st;
+
+ while (true) {
+ if (please_close_) {
+ goto eof;
+ }
+ // Fill up readahead queue until desired size
+ while (buffer_queue_.size() <
static_cast<size_t>(readahead_queue_size_)) {
+ ReadaheadBuffer buf = {nullptr, left_padding_, right_padding_};
+ lock.unlock();
+ Status st = ReadOneBufferUnlocked(&buf);
+ lock.lock();
+ if (!st.ok()) {
+ read_status_ = st;
+ goto error;
+ }
+ // Close() could have been called while unlocked above
+ if (please_close_) {
+ goto eof;
+ }
+ // Got empty read?
+ if (buf.buffer->size() == buf.left_padding + buf.right_padding) {
+ goto eof;
+ }
+ buffer_queue_.push_back(std::move(buf));
+ io_progress_.notify_one();
+ }
+ // Wait for Close() or Read() call
+ io_wakeup_.wait(lock);
+ }
+ eof:
+ eof_ = true;
+ error:
+ // Make sure any pending Read() doesn't block indefinitely
+ io_progress_.notify_one();
+ }
+
+ Status ReadOneBufferUnlocked(ReadaheadBuffer* buf) {
+ // Note that left_padding_ and right_padding_ may be modified while
unlocked
+ std::shared_ptr<ResizableBuffer> buffer;
+ int64_t bytes_read;
+ RETURN_NOT_OK(AllocateResizableBuffer(
+ pool_, read_size_ + buf->left_padding + buf->right_padding, &buffer));
+ RETURN_NOT_OK(
+ raw_->Read(read_size_, &bytes_read, buffer->mutable_data() +
buf->left_padding));
+ if (bytes_read < read_size_) {
+ // Got a short read
+ RETURN_NOT_OK(buffer->Resize(bytes_read + buf->left_padding +
buf->right_padding));
+ }
+ // Zero padding areas
+ memset(buffer->mutable_data(), 0, buf->left_padding);
+ memset(buffer->mutable_data() + bytes_read + buf->left_padding, 0,
+ buf->right_padding);
+ buf->buffer = std::move(buffer);
+ return Status::OK();
+ }
+
+ MemoryPool* pool_;
+ std::shared_ptr<InputStream> raw_;
+ int64_t read_size_;
+ int32_t readahead_queue_size_;
+ int64_t left_padding_ = 0;
+ int64_t right_padding_ = 0;
+
+ std::mutex mutex_;
+ std::condition_variable io_wakeup_;
+ std::condition_variable io_progress_;
+ std::thread io_worker_;
+ bool please_close_ = false;
+ bool eof_ = false;
+ std::deque<ReadaheadBuffer> buffer_queue_;
+ Status read_status_;
+};
+
+ReadaheadSpooler::ReadaheadSpooler(MemoryPool* pool,
std::shared_ptr<InputStream> raw,
+ int64_t read_size, int32_t
readahead_queue_size,
+ int64_t left_padding, int64_t right_padding)
+ : impl_(new ReadaheadSpooler::Impl(pool, raw, read_size,
readahead_queue_size,
+ left_padding, right_padding)) {}
+
+ReadaheadSpooler::ReadaheadSpooler(std::shared_ptr<InputStream> raw, int64_t
read_size,
+ int32_t readahead_queue_size, int64_t
left_padding,
+ int64_t right_padding)
+ : ReadaheadSpooler(default_memory_pool(), raw, read_size,
readahead_queue_size,
+ left_padding, right_padding) {}
+
+int64_t ReadaheadSpooler::GetLeftPadding() { return impl_->left_padding(); }
+
+void ReadaheadSpooler::SetLeftPadding(int64_t size) {
impl_->left_padding(size); }
+
+int64_t ReadaheadSpooler::GetRightPadding() { return impl_->right_padding(); }
+
+void ReadaheadSpooler::SetRightPadding(int64_t size) {
impl_->right_padding(size); }
+
+Status ReadaheadSpooler::Close() { return impl_->Close(); }
+
+Status ReadaheadSpooler::Read(ReadaheadBuffer* out) { return impl_->Read(out);
}
+
+ReadaheadSpooler::~ReadaheadSpooler() {}
+
+} // namespace internal
+} // namespace io
+} // namespace arrow
diff --git a/cpp/src/arrow/io/readahead.h b/cpp/src/arrow/io/readahead.h
new file mode 100644
index 0000000..d7ac509
--- /dev/null
+++ b/cpp/src/arrow/io/readahead.h
@@ -0,0 +1,96 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef ARROW_IO_READAHEAD_H
+#define ARROW_IO_READAHEAD_H
+
+#include <memory>
+#include <string>
+
+#include "arrow/io/interfaces.h"
+#include "arrow/util/visibility.h"
+
+namespace arrow {
+
+class MemoryPool;
+class ResizableBuffer;
+class Status;
+
+namespace io {
+namespace internal {
+
+struct ARROW_EXPORT ReadaheadBuffer {
+ std::shared_ptr<ResizableBuffer> buffer;
+ int64_t left_padding;
+ int64_t right_padding;
+};
+
+class ARROW_EXPORT ReadaheadSpooler {
+ public:
+ /// \brief EXPERIMENTAL: Create a readahead spooler wrapping the given input
stream.
+ ///
+ /// The spooler launches a background thread that reads up to a given number
+ /// of fixed-size blocks in advance from the underlying stream.
+ /// The buffers returned by Read() will be padded at the beginning and the
end
+ /// with the configured amount of (zeroed) bytes.
+ ReadaheadSpooler(MemoryPool* pool, std::shared_ptr<InputStream> raw,
+ int64_t read_size = kDefaultReadSize, int32_t
readahead_queue_size = 1,
+ int64_t left_padding = 0, int64_t right_padding = 0);
+
+ explicit ReadaheadSpooler(std::shared_ptr<InputStream> raw,
+ int64_t read_size = kDefaultReadSize,
+ int32_t readahead_queue_size = 1, int64_t
left_padding = 0,
+ int64_t right_padding = 0);
+
+ ~ReadaheadSpooler();
+
+ /// Configure zero-padding at beginning and end of buffers (default 0 bytes).
+ /// The buffers returned by Read() will be padded at the beginning and the
end
+ /// with the configured amount of (zeroed) bytes.
+ /// Note that, as reading happens in background and in advance, changing the
+ /// configured values might not affect Read() results immediately.
+ int64_t GetLeftPadding();
+ void SetLeftPadding(int64_t size);
+
+ int64_t GetRightPadding();
+ void SetRightPadding(int64_t size);
+
+ /// \brief Close the spooler. This implicitly closes the underlying input
stream.
+ Status Close();
+
+ /// \brief Read a buffer from the queue.
+ ///
+ /// If the buffer pointer in the ReadaheadBuffer is null, then EOF was
+ /// reached and/or the spooler was explicitly closed.
+ /// Otherwise, the buffer will contain at most read_size bytes in addition
+ /// to the configured padding (short reads are possible at the end of a
file).
+ // How do we allow reusing the buffer in ReadaheadBuffer? perhaps by using
+ // a caching memory pool?
+ Status Read(ReadaheadBuffer* out);
+
+ private:
+ static constexpr int64_t kDefaultReadSize = 1 << 20; // 1 MB
+
+ class ARROW_NO_EXPORT Impl;
+ std::unique_ptr<Impl> impl_;
+};
+
+} // namespace internal
+} // namespace io
+} // namespace arrow
+
+#endif // ARROW_IO_READAHEAD_H
diff --git a/cpp/src/arrow/test-util.h b/cpp/src/arrow/test-util.h
index 13c6a61..dfc1ab5 100644
--- a/cpp/src/arrow/test-util.h
+++ b/cpp/src/arrow/test-util.h
@@ -330,13 +330,26 @@ void AssertChunkedEqual(const ChunkedArray& expected,
const ChunkedArray& actual
}
void AssertBufferEqual(const Buffer& buffer, const std::vector<uint8_t>&
expected) {
- ASSERT_EQ(buffer.size(), expected.size());
+ ASSERT_EQ(buffer.size(), expected.size()) << "Mismatching buffer size";
const uint8_t* buffer_data = buffer.data();
for (size_t i = 0; i < expected.size(); ++i) {
ASSERT_EQ(buffer_data[i], expected[i]);
}
}
+void AssertBufferEqual(const Buffer& buffer, const std::string& expected) {
+ ASSERT_EQ(buffer.size(), expected.length()) << "Mismatching buffer size";
+ const uint8_t* buffer_data = buffer.data();
+ for (size_t i = 0; i < expected.size(); ++i) {
+ ASSERT_EQ(buffer_data[i], expected[i]);
+ }
+}
+
+void AssertBufferEqual(const Buffer& buffer, const Buffer& expected) {
+ ASSERT_EQ(buffer.size(), expected.size()) << "Mismatching buffer size";
+ ASSERT_TRUE(buffer.Equals(expected));
+}
+
void PrintColumn(const Column& col, std::stringstream* ss) {
const ChunkedArray& carr = *col.data();
for (int i = 0; i < carr.num_chunks(); ++i) {
diff --git a/cpp/src/plasma/test/client_tests.cc
b/cpp/src/plasma/test/client_tests.cc
index a2725a6..1ad6039 100644
--- a/cpp/src/plasma/test/client_tests.cc
+++ b/cpp/src/plasma/test/client_tests.cc
@@ -290,7 +290,7 @@ TEST_F(TestPlasmaStore, GetTest) {
{
auto metadata = object_buffers[0].metadata;
object_buffers.clear();
- ::arrow::AssertBufferEqual(*metadata, {42});
+ ::arrow::AssertBufferEqual(*metadata, std::string{42});
ARROW_CHECK_OK(client_.FlushReleaseHistory());
EXPECT_TRUE(client_.IsInUse(object_id));
}