This is an automated email from the ASF dual-hosted git repository.
wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new acb0b13 ARROW-1521: [C++] Add BufferOutputStream::Reset method
acb0b13 is described below
commit acb0b13fb1eda18a3c51a180120342f478699ce1
Author: Wes McKinney <[email protected]>
AuthorDate: Mon Oct 1 04:06:11 2018 -0400
ARROW-1521: [C++] Add BufferOutputStream::Reset method
This allows a arrow::io::BufferOutputStream to be reused
Author: Wes McKinney <[email protected]>
Closes #2665 from wesm/ARROW-1521 and squashes the following commits:
f8149afd4 <Wes McKinney> Add unit test for BufferOutputStream::Reset
e69e67de2 <Wes McKinney> Start drafting BufferOutputStream::Reset
---
cpp/src/arrow/io/io-memory-test.cc | 20 ++++++++++++++++++++
cpp/src/arrow/io/memory.cc | 17 ++++++++++++++---
cpp/src/arrow/io/memory.h | 14 ++++++++++++++
3 files changed, 48 insertions(+), 3 deletions(-)
diff --git a/cpp/src/arrow/io/io-memory-test.cc
b/cpp/src/arrow/io/io-memory-test.cc
index 9751f17..db536ad 100644
--- a/cpp/src/arrow/io/io-memory-test.cc
+++ b/cpp/src/arrow/io/io-memory-test.cc
@@ -85,6 +85,26 @@ TEST_F(TestBufferOutputStream, WriteAfterFinish) {
ASSERT_RAISES(IOError, stream_->Write(data));
}
+TEST_F(TestBufferOutputStream, Reset) {
+ std::string data = "data123456";
+
+ auto stream = checked_cast<BufferOutputStream*>(stream_.get());
+
+ ASSERT_OK(stream->Write(data));
+
+ std::shared_ptr<Buffer> buffer;
+ ASSERT_OK(stream->Finish(&buffer));
+ ASSERT_EQ(buffer->size(), static_cast<int64_t>(data.size()));
+
+ ASSERT_OK(stream->Reset(2048));
+ ASSERT_OK(stream->Write(data));
+ ASSERT_OK(stream->Write(data));
+ std::shared_ptr<Buffer> buffer2;
+ ASSERT_OK(stream->Finish(&buffer2));
+
+ ASSERT_EQ(buffer2->size(), static_cast<int64_t>(data.size() * 2));
+}
+
TEST(TestFixedSizeBufferWriter, Basics) {
std::shared_ptr<Buffer> buffer;
ASSERT_OK(AllocateBuffer(1024, &buffer));
diff --git a/cpp/src/arrow/io/memory.cc b/cpp/src/arrow/io/memory.cc
index ef7a43d..7c5bade 100644
--- a/cpp/src/arrow/io/memory.cc
+++ b/cpp/src/arrow/io/memory.cc
@@ -36,6 +36,9 @@ namespace io {
static constexpr int64_t kBufferMinimumSize = 256;
+BufferOutputStream::BufferOutputStream()
+ : is_open_(false), capacity_(0), position_(0), mutable_data_(nullptr) {}
+
BufferOutputStream::BufferOutputStream(const std::shared_ptr<ResizableBuffer>&
buffer)
: buffer_(buffer),
is_open_(true),
@@ -45,9 +48,17 @@ BufferOutputStream::BufferOutputStream(const
std::shared_ptr<ResizableBuffer>& b
Status BufferOutputStream::Create(int64_t initial_capacity, MemoryPool* pool,
std::shared_ptr<BufferOutputStream>* out) {
- std::shared_ptr<ResizableBuffer> buffer;
- RETURN_NOT_OK(AllocateResizableBuffer(pool, initial_capacity, &buffer));
- *out = std::make_shared<BufferOutputStream>(buffer);
+ // ctor is private, so cannot use make_shared
+ *out = std::shared_ptr<BufferOutputStream>(new BufferOutputStream);
+ return (*out)->Reset(initial_capacity, pool);
+}
+
+Status BufferOutputStream::Reset(int64_t initial_capacity, MemoryPool* pool) {
+ RETURN_NOT_OK(AllocateResizableBuffer(pool, initial_capacity, &buffer_));
+ is_open_ = true;
+ capacity_ = initial_capacity;
+ position_ = 0;
+ mutable_data_ = buffer_->mutable_data();
return Status::OK();
}
diff --git a/cpp/src/arrow/io/memory.h b/cpp/src/arrow/io/memory.h
index ea4e8ca..0bff985 100644
--- a/cpp/src/arrow/io/memory.h
+++ b/cpp/src/arrow/io/memory.h
@@ -24,6 +24,7 @@
#include <memory>
#include "arrow/io/interfaces.h"
+#include "arrow/memory_pool.h"
#include "arrow/util/visibility.h"
namespace arrow {
@@ -56,10 +57,23 @@ class ARROW_EXPORT BufferOutputStream : public OutputStream
{
Status Tell(int64_t* position) const override;
Status Write(const void* data, int64_t nbytes) override;
+ using OutputStream::Write;
+
/// Close the stream and return the buffer
Status Finish(std::shared_ptr<Buffer>* result);
+ /// \brief Initialize state of OutputStream with newly allocated memory and
+ /// set position to 0
+ /// \param[in] initial_capacity the starting allocated capacity
+ /// \param[in,out] pool the memory pool to use for allocations
+ /// \return Status
+ Status Reset(int64_t initial_capacity = 1024, MemoryPool* pool =
default_memory_pool());
+
+ int64_t capacity() const { return capacity_; }
+
private:
+ BufferOutputStream();
+
// Ensures there is sufficient space available to write nbytes
Status Reserve(int64_t nbytes);