This is an automated email from the ASF dual-hosted git repository.

wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new acb0b13  ARROW-1521: [C++] Add BufferOutputStream::Reset method
acb0b13 is described below

commit acb0b13fb1eda18a3c51a180120342f478699ce1
Author: Wes McKinney <[email protected]>
AuthorDate: Mon Oct 1 04:06:11 2018 -0400

    ARROW-1521: [C++] Add BufferOutputStream::Reset method
    
    This allows a arrow::io::BufferOutputStream to be reused
    
    Author: Wes McKinney <[email protected]>
    
    Closes #2665 from wesm/ARROW-1521 and squashes the following commits:
    
    f8149afd4 <Wes McKinney> Add unit test for BufferOutputStream::Reset
    e69e67de2 <Wes McKinney> Start drafting BufferOutputStream::Reset
---
 cpp/src/arrow/io/io-memory-test.cc | 20 ++++++++++++++++++++
 cpp/src/arrow/io/memory.cc         | 17 ++++++++++++++---
 cpp/src/arrow/io/memory.h          | 14 ++++++++++++++
 3 files changed, 48 insertions(+), 3 deletions(-)

diff --git a/cpp/src/arrow/io/io-memory-test.cc 
b/cpp/src/arrow/io/io-memory-test.cc
index 9751f17..db536ad 100644
--- a/cpp/src/arrow/io/io-memory-test.cc
+++ b/cpp/src/arrow/io/io-memory-test.cc
@@ -85,6 +85,26 @@ TEST_F(TestBufferOutputStream, WriteAfterFinish) {
   ASSERT_RAISES(IOError, stream_->Write(data));
 }
 
+TEST_F(TestBufferOutputStream, Reset) {
+  std::string data = "data123456";
+
+  auto stream = checked_cast<BufferOutputStream*>(stream_.get());
+
+  ASSERT_OK(stream->Write(data));
+
+  std::shared_ptr<Buffer> buffer;
+  ASSERT_OK(stream->Finish(&buffer));
+  ASSERT_EQ(buffer->size(), static_cast<int64_t>(data.size()));
+
+  ASSERT_OK(stream->Reset(2048));
+  ASSERT_OK(stream->Write(data));
+  ASSERT_OK(stream->Write(data));
+  std::shared_ptr<Buffer> buffer2;
+  ASSERT_OK(stream->Finish(&buffer2));
+
+  ASSERT_EQ(buffer2->size(), static_cast<int64_t>(data.size() * 2));
+}
+
 TEST(TestFixedSizeBufferWriter, Basics) {
   std::shared_ptr<Buffer> buffer;
   ASSERT_OK(AllocateBuffer(1024, &buffer));
diff --git a/cpp/src/arrow/io/memory.cc b/cpp/src/arrow/io/memory.cc
index ef7a43d..7c5bade 100644
--- a/cpp/src/arrow/io/memory.cc
+++ b/cpp/src/arrow/io/memory.cc
@@ -36,6 +36,9 @@ namespace io {
 
 static constexpr int64_t kBufferMinimumSize = 256;
 
+BufferOutputStream::BufferOutputStream()
+    : is_open_(false), capacity_(0), position_(0), mutable_data_(nullptr) {}
+
 BufferOutputStream::BufferOutputStream(const std::shared_ptr<ResizableBuffer>& 
buffer)
     : buffer_(buffer),
       is_open_(true),
@@ -45,9 +48,17 @@ BufferOutputStream::BufferOutputStream(const 
std::shared_ptr<ResizableBuffer>& b
 
 Status BufferOutputStream::Create(int64_t initial_capacity, MemoryPool* pool,
                                   std::shared_ptr<BufferOutputStream>* out) {
-  std::shared_ptr<ResizableBuffer> buffer;
-  RETURN_NOT_OK(AllocateResizableBuffer(pool, initial_capacity, &buffer));
-  *out = std::make_shared<BufferOutputStream>(buffer);
+  // ctor is private, so cannot use make_shared
+  *out = std::shared_ptr<BufferOutputStream>(new BufferOutputStream);
+  return (*out)->Reset(initial_capacity, pool);
+}
+
+Status BufferOutputStream::Reset(int64_t initial_capacity, MemoryPool* pool) {
+  RETURN_NOT_OK(AllocateResizableBuffer(pool, initial_capacity, &buffer_));
+  is_open_ = true;
+  capacity_ = initial_capacity;
+  position_ = 0;
+  mutable_data_ = buffer_->mutable_data();
   return Status::OK();
 }
 
diff --git a/cpp/src/arrow/io/memory.h b/cpp/src/arrow/io/memory.h
index ea4e8ca..0bff985 100644
--- a/cpp/src/arrow/io/memory.h
+++ b/cpp/src/arrow/io/memory.h
@@ -24,6 +24,7 @@
 #include <memory>
 
 #include "arrow/io/interfaces.h"
+#include "arrow/memory_pool.h"
 #include "arrow/util/visibility.h"
 
 namespace arrow {
@@ -56,10 +57,23 @@ class ARROW_EXPORT BufferOutputStream : public OutputStream 
{
   Status Tell(int64_t* position) const override;
   Status Write(const void* data, int64_t nbytes) override;
 
+  using OutputStream::Write;
+
   /// Close the stream and return the buffer
   Status Finish(std::shared_ptr<Buffer>* result);
 
+  /// \brief Initialize state of OutputStream with newly allocated memory and
+  /// set position to 0
+  /// \param[in] initial_capacity the starting allocated capacity
+  /// \param[in,out] pool the memory pool to use for allocations
+  /// \return Status
+  Status Reset(int64_t initial_capacity = 1024, MemoryPool* pool = 
default_memory_pool());
+
+  int64_t capacity() const { return capacity_; }
+
  private:
+  BufferOutputStream();
+
   // Ensures there is sufficient space available to write nbytes
   Status Reserve(int64_t nbytes);
 

Reply via email to