[ 
https://issues.apache.org/jira/browse/ARROW-2330?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16433765#comment-16433765
 ] 

ASF GitHub Bot commented on ARROW-2330:
---------------------------------------

alendit closed pull request #1769: ARROW-2330: [C++] Optimize delta buffer 
creation with partially finishable array builders
URL: https://github.com/apache/arrow/pull/1769
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/cpp/src/arrow/array-test.cc b/cpp/src/arrow/array-test.cc
index fb1bebfca..7f7666bb3 100644
--- a/cpp/src/arrow/array-test.cc
+++ b/cpp/src/arrow/array-test.cc
@@ -536,6 +536,44 @@ TYPED_TEST(TestPrimitiveBuilder, SliceEquality) {
   ASSERT_TRUE(array->RangeEquals(5, 15, 0, slice));
 }
 
+TYPED_TEST(TestPrimitiveBuilder, TestPartialFinish) {
+  const int64_t size = 1000;
+  this->RandomData(size * 2);
+
+  // build an array of all values
+  std::shared_ptr<Array> all_values_array;
+  ASSERT_OK(MakeArray(this->valid_bytes_, this->draws_, size * 2, 
this->builder_.get(),
+                      &all_values_array));
+
+  for (uint64_t idx = 0; idx < size; ++idx) {
+    if (this->valid_bytes_[idx] > 0) {
+      ASSERT_OK(this->builder_->Append(this->draws_[idx]));
+    } else {
+      ASSERT_OK(this->builder_->AppendNull());
+    }
+  }
+
+  std::shared_ptr<Array> result1;
+  ASSERT_OK(this->builder_->Finish(false, &result1));
+
+  ASSERT_EQ(size, result1->length());
+  ASSERT_TRUE(all_values_array->RangeEquals(0, size, 0, result1));
+
+  for (uint64_t idx = size; idx < size * 2; ++idx) {
+    if (this->valid_bytes_[idx] > 0) {
+      ASSERT_OK(this->builder_->Append(this->draws_[idx]));
+    } else {
+      ASSERT_OK(this->builder_->AppendNull());
+    }
+  }
+
+  std::shared_ptr<Array> result2;
+  ASSERT_OK(this->builder_->Finish(true, &result2));
+
+  ASSERT_EQ(size, result2->length());
+  ASSERT_TRUE(all_values_array->RangeEquals(size, size * 2, 0, result2));
+}
+
 TYPED_TEST(TestPrimitiveBuilder, TestAppendScalar) {
   DECL_T();
 
@@ -1027,6 +1065,27 @@ TEST_F(TestStringBuilder, TestZeroLength) {
   Done();
 }
 
+TEST_F(TestStringBuilder, TestPartialFinish) {
+  StringBuilder builder, builder_expected;
+  ASSERT_OK(builder.Append("foo"));
+  ASSERT_OK(builder_expected.Append("foo"));
+
+  std::shared_ptr<Array> result1, expected1;
+  ASSERT_OK(builder.Finish(false, &result1));
+  ASSERT_OK(builder_expected.Finish(&expected1));
+  ASSERT_EQ(1, result1->length());
+  ASSERT_TRUE(result1->Equals(expected1));
+
+  ASSERT_OK(builder.Append("foo"));
+  ASSERT_OK(builder_expected.Append("foo"));
+  std::shared_ptr<Array> result2, expected2;
+  ASSERT_OK(builder.Finish(false, &result2));
+  ASSERT_OK(builder_expected.Finish(&expected2));
+  ASSERT_EQ(1, result2->length());
+  ASSERT_EQ(1, result2->offset());
+  ASSERT_TRUE(result2->Equals(expected2));
+}
+
 // Binary container type
 // TODO(emkornfield) there should be some way to refactor these to avoid code 
duplicating
 // with String
@@ -1239,6 +1298,27 @@ TEST_F(TestBinaryBuilder, TestZeroLength) {
   Done();
 }
 
+TEST_F(TestBinaryBuilder, TestPartialFinish) {
+  BinaryBuilder builder, builder_expected;
+  ASSERT_OK(builder.Append("foo"));
+  ASSERT_OK(builder_expected.Append("foo"));
+
+  std::shared_ptr<Array> result1, expected1;
+  ASSERT_OK(builder.Finish(false, &result1));
+  ASSERT_OK(builder_expected.Finish(&expected1));
+  ASSERT_EQ(1, result1->length());
+  ASSERT_TRUE(result1->Equals(expected1));
+
+  ASSERT_OK(builder.Append("foo"));
+  ASSERT_OK(builder_expected.Append("foo"));
+  std::shared_ptr<Array> result2, expected2;
+  ASSERT_OK(builder.Finish(false, &result2));
+  ASSERT_OK(builder_expected.Finish(&expected2));
+  ASSERT_EQ(1, result2->length());
+  ASSERT_EQ(1, result2->offset());
+  ASSERT_TRUE(result2->Equals(expected2));
+}
+
 // ----------------------------------------------------------------------
 // Slice tests
 
@@ -1472,6 +1552,26 @@ TEST_F(TestFWBinaryArray, Slice) {
   ASSERT_TRUE(array->RangeEquals(1, 3, 0, slice));
 }
 
+TEST_F(TestFWBinaryArray, TestPartialFinish) {
+  auto type = fixed_size_binary(4);
+  FixedSizeBinaryBuilder builder(type);
+
+  ASSERT_OK(builder.Append("foo"));
+  std::shared_ptr<Array> result1;
+  ASSERT_OK(builder.Finish(false, &result1));
+  ASSERT_EQ(1, result1->length());
+  ASSERT_STREQ("foo", reinterpret_cast<const char*>(
+                          static_cast<const 
FixedSizeBinaryArray&>(*result1).Value(0)));
+
+  ASSERT_OK(builder.Append("bar"));
+  std::shared_ptr<Array> result2;
+  ASSERT_OK(builder.Finish(&result2));
+  ASSERT_EQ(1, result2->length());
+  ASSERT_EQ(1, result2->offset());
+  ASSERT_STREQ("bar", reinterpret_cast<const char*>(
+                          static_cast<const 
FixedSizeBinaryArray&>(*result2).Value(0)));
+}
+
 // ----------------------------------------------------------------------
 // AdaptiveInt tests
 
@@ -1603,6 +1703,31 @@ TEST_F(TestAdaptiveIntBuilder, TestAppendVector) {
   ASSERT_TRUE(expected_->Equals(result_));
 }
 
+TEST_F(TestAdaptiveIntBuilder, TestPartialFinish) {
+  ASSERT_OK(builder_->Append(0));
+  ASSERT_OK(
+      
builder_->Append(static_cast<int64_t>(std::numeric_limits<int32_t>::max()) + 
1));
+
+  std::shared_ptr<Array> result1, expected1;
+  ASSERT_OK(builder_->Finish(false, &result1));
+
+  std::vector<int64_t> expected_values1(
+      {0, static_cast<int64_t>(std::numeric_limits<int32_t>::max()) + 1});
+
+  ArrayFromVector<Int64Type, int64_t>(expected_values1, &expected1);
+  ASSERT_TRUE(expected1->Equals(result1));
+
+  ASSERT_OK(builder_->Append(65536));
+  ASSERT_OK(builder_->Append(1024));
+
+  std::shared_ptr<Array> result2, expected2;
+  ASSERT_OK(builder_->Finish(false, &result2));
+
+  std::vector<int64_t> expected_values2({65536, 1024});
+  ArrayFromVector<Int64Type, int64_t>(expected_values2, &expected_);
+  ASSERT_TRUE(expected_->Equals(result2));
+}
+
 class TestAdaptiveUIntBuilder : public TestBuilder {
  public:
   void SetUp() {
@@ -1698,6 +1823,31 @@ TEST_F(TestAdaptiveUIntBuilder, TestAppendVector) {
   ASSERT_TRUE(expected_->Equals(result_));
 }
 
+TEST_F(TestAdaptiveUIntBuilder, TestPartialFinish) {
+  ASSERT_OK(builder_->Append(0));
+  ASSERT_OK(
+      
builder_->Append(static_cast<uint64_t>(std::numeric_limits<uint32_t>::max()) + 
1));
+
+  std::shared_ptr<Array> result1, expected1;
+  ASSERT_OK(builder_->Finish(false, &result1));
+
+  std::vector<uint64_t> expected_values1(
+      {0, static_cast<uint64_t>(std::numeric_limits<uint32_t>::max()) + 1});
+
+  ArrayFromVector<UInt64Type, uint64_t>(expected_values1, &expected1);
+  ASSERT_TRUE(expected1->Equals(result1));
+
+  ASSERT_OK(builder_->Append(65536));
+  ASSERT_OK(builder_->Append(1024));
+
+  std::shared_ptr<Array> result2, expected2;
+  ASSERT_OK(builder_->Finish(false, &result2));
+
+  std::vector<uint64_t> expected_values2({65536, 1024});
+  ArrayFromVector<UInt64Type, uint64_t>(expected_values2, &expected_);
+  ASSERT_TRUE(expected_->Equals(result2));
+}
+
 // ----------------------------------------------------------------------
 // Dictionary tests
 
@@ -1817,7 +1967,7 @@ TYPED_TEST(TestDictionaryBuilder, DeltaDictionary) {
   ASSERT_OK(builder.Append(static_cast<typename TypeParam::c_type>(1)));
   ASSERT_OK(builder.Append(static_cast<typename TypeParam::c_type>(2)));
   std::shared_ptr<Array> result;
-  ASSERT_OK(builder.Finish(&result));
+  ASSERT_OK(builder.Finish(false, &result));
 
   // Build expected data for the initial dictionary
   NumericBuilder<TypeParam> dict_builder1;
@@ -1876,7 +2026,7 @@ TYPED_TEST(TestDictionaryBuilder, DoubleDeltaDictionary) {
   ASSERT_OK(builder.Append(static_cast<typename TypeParam::c_type>(1)));
   ASSERT_OK(builder.Append(static_cast<typename TypeParam::c_type>(2)));
   std::shared_ptr<Array> result;
-  ASSERT_OK(builder.Finish(&result));
+  ASSERT_OK(builder.Finish(false, &result));
 
   // Build expected data for the initial dictionary
   NumericBuilder<TypeParam> dict_builder1;
@@ -1905,7 +2055,7 @@ TYPED_TEST(TestDictionaryBuilder, DoubleDeltaDictionary) {
   ASSERT_OK(builder.Append(static_cast<typename TypeParam::c_type>(3)));
 
   std::shared_ptr<Array> result_delta1;
-  ASSERT_OK(builder.Finish(&result_delta1));
+  ASSERT_OK(builder.Finish(false, &result_delta1));
 
   // Build expected data for the delta dictionary
   NumericBuilder<TypeParam> dict_builder2;
@@ -1934,7 +2084,7 @@ TYPED_TEST(TestDictionaryBuilder, DoubleDeltaDictionary) {
   ASSERT_OK(builder.Append(static_cast<typename TypeParam::c_type>(5)));
 
   std::shared_ptr<Array> result_delta2;
-  ASSERT_OK(builder.Finish(&result_delta2));
+  ASSERT_OK(builder.Finish(false, &result_delta2));
 
   // Build expected data for the delta dictionary again
   NumericBuilder<TypeParam> dict_builder3;
@@ -2030,7 +2180,7 @@ TEST(TestStringDictionaryBuilder, DeltaDictionary) {
   ASSERT_OK(builder.Append("test"));
 
   std::shared_ptr<Array> result;
-  ASSERT_OK(builder.Finish(&result));
+  ASSERT_OK(builder.Finish(false, &result));
 
   // Build expected data
   StringBuilder str_builder1;
@@ -2056,7 +2206,7 @@ TEST(TestStringDictionaryBuilder, DeltaDictionary) {
   ASSERT_OK(builder.Append("test2"));
 
   std::shared_ptr<Array> result_delta;
-  ASSERT_OK(builder.Finish(&result_delta));
+  ASSERT_OK(builder.Finish(false, &result_delta));
 
   // Build expected data
   StringBuilder str_builder2;
@@ -2093,7 +2243,7 @@ TEST(TestStringDictionaryBuilder, BigDeltaDictionary) {
   }
 
   std::shared_ptr<Array> result;
-  ASSERT_OK(builder.Finish(&result));
+  ASSERT_OK(builder.Finish(false, &result));
 
   std::shared_ptr<Array> str_array1;
   ASSERT_OK(str_builder1.Finish(&str_array1));
@@ -2121,7 +2271,7 @@ TEST(TestStringDictionaryBuilder, BigDeltaDictionary) {
   ASSERT_OK(str_builder2.Append("test_new_value1"));
 
   std::shared_ptr<Array> result2;
-  ASSERT_OK(builder.Finish(&result2));
+  ASSERT_OK(builder.Finish(false, &result2));
 
   std::shared_ptr<Array> str_array2;
   ASSERT_OK(str_builder2.Finish(&str_array2));
@@ -2149,7 +2299,7 @@ TEST(TestStringDictionaryBuilder, BigDeltaDictionary) {
   ASSERT_OK(str_builder3.Append("test_new_value2"));
 
   std::shared_ptr<Array> result3;
-  ASSERT_OK(builder.Finish(&result3));
+  ASSERT_OK(builder.Finish(false, &result3));
 
   std::shared_ptr<Array> str_array3;
   ASSERT_OK(str_builder3.Finish(&str_array3));
@@ -2207,7 +2357,7 @@ TEST(TestFixedSizeBinaryDictionaryBuilder, 
DeltaDictionary) {
   ASSERT_OK(builder.Append(test.data()));
 
   std::shared_ptr<Array> result1;
-  ASSERT_OK(builder.Finish(&result1));
+  ASSERT_OK(builder.Finish(false, &result1));
 
   // Build expected data
   FixedSizeBinaryBuilder fsb_builder1(arrow::fixed_size_binary(4));
@@ -2233,7 +2383,7 @@ TEST(TestFixedSizeBinaryDictionaryBuilder, 
DeltaDictionary) {
   ASSERT_OK(builder.Append(test3.data()));
 
   std::shared_ptr<Array> result2;
-  ASSERT_OK(builder.Finish(&result2));
+  ASSERT_OK(builder.Finish(false, &result2));
 
   // Build expected data
   FixedSizeBinaryBuilder fsb_builder2(arrow::fixed_size_binary(4));
diff --git a/cpp/src/arrow/buffer-test.cc b/cpp/src/arrow/buffer-test.cc
index a24384a38..229737723 100644
--- a/cpp/src/arrow/buffer-test.cc
+++ b/cpp/src/arrow/buffer-test.cc
@@ -20,6 +20,7 @@
 #include <limits>
 #include <memory>
 #include <string>
+#include <vector>
 
 #include <gtest/gtest.h>
 
@@ -181,7 +182,7 @@ TEST(TestBuffer, Copy) {
 }
 
 TEST(TestBuffer, SliceBuffer) {
-  std::string data_str = "some data to slice";
+  std::string data_str = "some data toxo slice";
 
   auto data = reinterpret_cast<const uint8_t*>(data_str.c_str());
 
@@ -236,4 +237,102 @@ TEST(TestBufferBuilder, ResizeReserve) {
   ASSERT_EQ(128, builder.capacity());
 }
 
+TEST(TestBufferBuilder, ReuseBuilder) {
+  const std::string data1 = "foo";
+  const std::string data2 = "bar";
+
+  BufferBuilder builder;
+
+  ASSERT_OK(builder.Append(data1.c_str(), data1.length()));
+
+  std::shared_ptr<Buffer> out1;
+  ASSERT_OK(builder.FinishSlice(&out1, 0, data1.length(), false));
+
+  ASSERT_EQ(3, out1->size());
+  ASSERT_STREQ(data1.c_str(), reinterpret_cast<const char*>(out1->data()));
+
+  ASSERT_OK(builder.Append(data2.c_str(), data2.length()));
+
+  std::shared_ptr<Buffer> out2;
+  ASSERT_OK(builder.FinishSlice(&out2, data1.length(), data2.length(), true));
+
+  ASSERT_EQ(3, out2->size());
+  ASSERT_STREQ(data2.c_str(), reinterpret_cast<const char*>(out2->data()));
+}
+
+TEST(TestBufferBuilder, FinishSlice) {
+  const std::string data = "foo_bar";
+  auto data_ptr = data.c_str();
+
+  BufferBuilder builder;
+
+  ASSERT_OK(builder.Append(data_ptr, data.length()));
+  ASSERT_EQ(data.length(), builder.length());
+
+  // Partially finish
+  std::shared_ptr<Buffer> out;
+  ASSERT_OK(builder.FinishSlice(&out, 4, 3, true));
+
+  ASSERT_EQ(3, out->size());
+  ASSERT_STREQ("bar", reinterpret_cast<const char*>(out->data()));
+}
+
+template <typename T>
+class TestTypedBufferBuilder : public testing::Test {};
+
+typedef ::testing::Types<int8_t, uint8_t, int16_t, uint16_t, int32_t, 
uint32_t, int64_t,
+                         uint64_t, float, double>
+    PrimitiveTypes;
+
+TYPED_TEST_CASE(TestTypedBufferBuilder, PrimitiveTypes);
+
+TYPED_TEST(TestTypedBufferBuilder, ReuseBuilder) {
+  std::vector<TypeParam> values1 = {1, 2, 3, 4};
+  std::vector<TypeParam> values2 = {5, 6, 7, 8};
+
+  TypedBufferBuilder<TypeParam> builder;
+
+  ASSERT_OK(builder.Append(values1.data(), values1.size()));
+
+  std::shared_ptr<Buffer> out1;
+  ASSERT_OK(builder.FinishSlice(&out1, 0, values1.size() * sizeof(TypeParam),
+                                /* reset */ false));
+
+  ASSERT_EQ(values1.size() * sizeof(TypeParam), out1->size());
+  ASSERT_EQ(0, memcmp(values1.data(), out1->data(), values1.size() * 
sizeof(TypeParam)));
+
+  ASSERT_OK(builder.Append(values2.data(), values2.size()));
+
+  std::shared_ptr<Buffer> out2;
+  ASSERT_OK(builder.FinishSlice(&out2, values1.size() * sizeof(TypeParam),
+                                values2.size() * sizeof(TypeParam), /* reset 
*/ true));
+
+  ASSERT_EQ(values2.size() * sizeof(TypeParam), out2->size());
+  ASSERT_EQ(0, memcmp(values2.data(), out2->data(), values2.size() * 
sizeof(TypeParam)));
+}
+
+TYPED_TEST(TestTypedBufferBuilder, FinishSliceByItem) {
+  std::vector<TypeParam> values1 = {1, 2, 3, 4};
+  std::vector<TypeParam> values2 = {5, 6, 7, 8};
+
+  TypedBufferBuilder<TypeParam> builder;
+
+  ASSERT_OK(builder.Append(values1.data(), values1.size()));
+
+  std::shared_ptr<Buffer> out1;
+  ASSERT_OK(builder.FinishSliceByItem(&out1, 0, values1.size(), /* reset */ 
false));
+
+  ASSERT_EQ(values1.size() * sizeof(TypeParam), out1->size());
+  ASSERT_EQ(0, memcmp(values1.data(), out1->data(), values1.size() * 
sizeof(TypeParam)));
+
+  ASSERT_OK(builder.Append(values2.data(), values2.size()));
+
+  std::shared_ptr<Buffer> out2;
+  ASSERT_OK(
+      builder.FinishSliceByItem(&out2, values1.size(), values2.size(), /* 
reset */ true));
+
+  ASSERT_EQ(values2.size() * sizeof(TypeParam), out2->size());
+  ASSERT_EQ(0, memcmp(values2.data(), out2->data(), values2.size() * 
sizeof(TypeParam)));
+}
+
 }  // namespace arrow
diff --git a/cpp/src/arrow/buffer.h b/cpp/src/arrow/buffer.h
index 06160d7d4..fe7a0f398 100644
--- a/cpp/src/arrow/buffer.h
+++ b/cpp/src/arrow/buffer.h
@@ -53,12 +53,15 @@ class ARROW_EXPORT Buffer {
   /// \param[in] size buffer size
   ///
   /// \note The passed memory must be kept alive through some other means
-  Buffer(const uint8_t* data, int64_t size)
+  Buffer(const uint8_t* data, int64_t size, int64_t capacity)
       : is_mutable_(false),
         data_(data),
         mutable_data_(NULLPTR),
         size_(size),
-        capacity_(size) {}
+        capacity_(capacity) {}
+
+  /// Initialize with known capacity
+  Buffer(const uint8_t* data, int64_t size) : Buffer(data, size, size) {}
 
   /// \brief Construct from std::string without copying memory
   ///
@@ -80,7 +83,7 @@ class ARROW_EXPORT Buffer {
   /// in general we expected buffers to be aligned and padded to 64 bytes.  In 
the future
   /// we might add utility methods to help determine if a buffer satisfies 
this contract.
   Buffer(const std::shared_ptr<Buffer>& parent, const int64_t offset, const 
int64_t size)
-      : Buffer(parent->data() + offset, size) {
+      : Buffer(parent->data() + offset, size, parent->capacity()) {
     parent_ = parent;
   }
 
@@ -300,11 +303,25 @@ class ARROW_EXPORT BufferBuilder {
 
   Status Finish(std::shared_ptr<Buffer>* out) {
     // Do not shrink to fit to avoid unneeded realloc
+    return FinishSlice(out, 0, size_, true);
+  }
+
+  Status FinishSlice(std::shared_ptr<Buffer>* out, const int64_t offset,
+                     const int64_t length, const bool reset = true) {
+    // Do not shrink to fit to avoid unneeded realloc
     if (size_ > 0) {
       RETURN_NOT_OK(buffer_->Resize(size_, false));
     }
-    *out = buffer_;
-    Reset();
+    if (size_ == 0 || (offset == 0 && length == size_)) {
+      // no need for slices for trivial cases
+      *out = buffer_;
+    } else {
+      *out = std::make_shared<Buffer>(buffer_, offset, length);
+    }
+
+    if (reset) {
+      Reset();
+    }
     return Status::OK();
   }
 
@@ -328,7 +345,8 @@ class ARROW_EXPORT BufferBuilder {
 template <typename T>
 class ARROW_EXPORT TypedBufferBuilder : public BufferBuilder {
  public:
-  explicit TypedBufferBuilder(MemoryPool* pool) : BufferBuilder(pool) {}
+  explicit TypedBufferBuilder(MemoryPool* pool ARROW_MEMORY_POOL_DEFAULT)
+      : BufferBuilder(pool) {}
 
   Status Append(T arithmetic_value) {
     static_assert(std::is_arithmetic<T>::value,
@@ -357,6 +375,26 @@ class ARROW_EXPORT TypedBufferBuilder : public 
BufferBuilder {
                                 num_elements * sizeof(T));
   }
 
+  /// Same as FinishSlice but uses typed item counts as offsets and length,
+  /// i.e. with TypedBufferBuilder<uint32_t> offset of 4 means that the
+  /// offset is 12 bytese or 4 uints
+  Status FinishSliceByItem(std::shared_ptr<Buffer>* out, const int64_t offset,
+                           const int64_t length, const bool reset = true) {
+    // Do not shrink to fit to avoid unneeded realloc
+    if (size_ > 0) {
+      RETURN_NOT_OK(buffer_->Resize(size_, false));
+    }
+    if (size_ == 0) {
+      *out = buffer_;
+    } else {
+      *out = std::make_shared<Buffer>(buffer_, offset * sizeof(T), length * 
sizeof(T));
+    }
+    if (reset) {
+      Reset();
+    }
+    return Status::OK();
+  }
+
   const T* data() const { return reinterpret_cast<const T*>(data_); }
   int64_t length() const { return size_ / sizeof(T); }
   int64_t capacity() const { return capacity_ / sizeof(T); }
diff --git a/cpp/src/arrow/builder.cc b/cpp/src/arrow/builder.cc
index c97253e64..9874e0fd7 100644
--- a/cpp/src/arrow/builder.cc
+++ b/cpp/src/arrow/builder.cc
@@ -220,9 +220,11 @@ void ArrayBuilder::UnsafeSetNotNull(int64_t length) {
 // ----------------------------------------------------------------------
 // Null builder
 
-Status NullBuilder::FinishInternal(std::shared_ptr<ArrayData>* out) {
+Status NullBuilder::FinishInternal(bool reset_builder, 
std::shared_ptr<ArrayData>* out) {
   *out = ArrayData::Make(null(), length_, {nullptr}, length_);
-  length_ = null_count_ = 0;
+  if (reset_builder) {
+    length_ = null_count_ = 0;
+  }
   return Status::OK();
 }
 
@@ -309,16 +311,25 @@ Status PrimitiveBuilder<T>::Append(const 
std::vector<value_type>& values) {
 }
 
 template <typename T>
-Status PrimitiveBuilder<T>::FinishInternal(std::shared_ptr<ArrayData>* out) {
+Status PrimitiveBuilder<T>::FinishInternal(bool reset_builder,
+                                           std::shared_ptr<ArrayData>* out) {
+  auto length = length_ - elements_offset_;
   const int64_t bytes_required = TypeTraits<T>::bytes_required(length_);
-  if (bytes_required > 0 && bytes_required < data_->size()) {
+  if (bytes_required > 0 && bytes_required < data_->size() && reset_builder) {
     // Trim buffers
     RETURN_NOT_OK(data_->Resize(bytes_required));
+    // reset raw_data_ pointer and capacity
+    raw_data_ = reinterpret_cast<value_type*>(data_->mutable_data());
+    capacity_ = data()->capacity();
+  }
+  *out = ArrayData::Make(type_, length, {null_bitmap_, data_}, null_count_,
+                         elements_offset_);
+
+  if (reset_builder) {
+    data_ = null_bitmap_ = nullptr;
+    capacity_ = length_ = null_count_ = 0;
   }
-  *out = ArrayData::Make(type_, length_, {null_bitmap_, data_}, null_count_);
 
-  data_ = null_bitmap_ = nullptr;
-  capacity_ = length_ = null_count_ = 0;
   return Status::OK();
 }
 
@@ -340,7 +351,10 @@ template class PrimitiveBuilder<FloatType>;
 template class PrimitiveBuilder<DoubleType>;
 
 AdaptiveIntBuilderBase::AdaptiveIntBuilderBase(MemoryPool* pool)
-    : ArrayBuilder(int64(), pool), data_(nullptr), raw_data_(nullptr), 
int_size_(1) {}
+    : PartiallyFinishableArrayBuilder(int64(), pool),
+      data_(nullptr),
+      raw_data_(nullptr),
+      int_size_(1) {}
 
 Status AdaptiveIntBuilderBase::Init(int64_t capacity) {
   RETURN_NOT_OK(ArrayBuilder::Init(capacity));
@@ -378,11 +392,16 @@ Status AdaptiveIntBuilderBase::Resize(int64_t capacity) {
 
 AdaptiveIntBuilder::AdaptiveIntBuilder(MemoryPool* pool) : 
AdaptiveIntBuilderBase(pool) {}
 
-Status AdaptiveIntBuilder::FinishInternal(std::shared_ptr<ArrayData>* out) {
+Status AdaptiveIntBuilder::FinishInternal(bool reset_builder,
+                                          std::shared_ptr<ArrayData>* out) {
+  auto length = length_ - elements_offset_;
   const int64_t bytes_required = length_ * int_size_;
-  if (bytes_required > 0 && bytes_required < data_->size()) {
-    // Trim buffers
+  if (bytes_required > 0 && bytes_required < data_->size() && reset_builder) {
+    // only shrink the buffer
     RETURN_NOT_OK(data_->Resize(bytes_required));
+    // adjust raw_data_ pointer
+    raw_data_ = data_->mutable_data();
+    capacity_ = data_->capacity() / int_size_;
   }
 
   std::shared_ptr<DataType> output_type;
@@ -404,10 +423,13 @@ Status 
AdaptiveIntBuilder::FinishInternal(std::shared_ptr<ArrayData>* out) {
       return Status::NotImplemented("Only ints of size 1,2,4,8 are supported");
   }
 
-  *out = ArrayData::Make(output_type, length_, {null_bitmap_, data_}, 
null_count_);
+  *out = ArrayData::Make(output_type, length, {null_bitmap_, data_}, 
null_count_,
+                         elements_offset_);
 
-  data_ = null_bitmap_ = nullptr;
-  capacity_ = length_ = null_count_ = 0;
+  if (reset_builder) {
+    data_ = null_bitmap_ = nullptr;
+    capacity_ = length_ = null_count_ = 0;
+  }
   return Status::OK();
 }
 
@@ -535,12 +557,17 @@ Status AdaptiveIntBuilder::ExpandIntSize(uint8_t 
new_int_size) {
 AdaptiveUIntBuilder::AdaptiveUIntBuilder(MemoryPool* pool)
     : AdaptiveIntBuilderBase(pool) {}
 
-Status AdaptiveUIntBuilder::FinishInternal(std::shared_ptr<ArrayData>* out) {
+Status AdaptiveUIntBuilder::FinishInternal(bool reset_builder,
+                                           std::shared_ptr<ArrayData>* out) {
+  auto length = length_ - elements_offset_;
   const int64_t bytes_required = length_ * int_size_;
-  if (bytes_required > 0 && bytes_required < data_->size()) {
-    // Trim buffers
+  if (bytes_required > 0 && bytes_required < data_->size() && reset_builder) {
     RETURN_NOT_OK(data_->Resize(bytes_required));
+    // adjust raw_data_ pointer and capacity
+    raw_data_ = data_->mutable_data();
+    capacity_ = data_->capacity() / int_size_;
   }
+
   std::shared_ptr<DataType> output_type;
   switch (int_size_) {
     case 1:
@@ -560,10 +587,13 @@ Status 
AdaptiveUIntBuilder::FinishInternal(std::shared_ptr<ArrayData>* out) {
       return Status::NotImplemented("Only ints of size 1,2,4,8 are supported");
   }
 
-  *out = ArrayData::Make(output_type, length_, {null_bitmap_, data_}, 
null_count_);
+  *out = ArrayData::Make(output_type, length, {null_bitmap_, data_}, 
null_count_,
+                         elements_offset_);
 
-  data_ = null_bitmap_ = nullptr;
-  capacity_ = length_ = null_count_ = 0;
+  if (reset_builder) {
+    data_ = null_bitmap_ = nullptr;
+    capacity_ = length_ = null_count_ = 0;
+  }
   return Status::OK();
 }
 
@@ -689,7 +719,9 @@ Status AdaptiveUIntBuilder::ExpandIntSize(uint8_t 
new_int_size) {
 }
 
 BooleanBuilder::BooleanBuilder(MemoryPool* pool)
-    : ArrayBuilder(boolean(), pool), data_(nullptr), raw_data_(nullptr) {}
+    : PartiallyFinishableArrayBuilder(boolean(), pool),
+      data_(nullptr),
+      raw_data_(nullptr) {}
 
 BooleanBuilder::BooleanBuilder(const std::shared_ptr<DataType>& type, 
MemoryPool* pool)
     : BooleanBuilder(pool) {
@@ -730,17 +762,26 @@ Status BooleanBuilder::Resize(int64_t capacity) {
   return Status::OK();
 }
 
-Status BooleanBuilder::FinishInternal(std::shared_ptr<ArrayData>* out) {
+Status BooleanBuilder::FinishInternal(bool reset_builder,
+                                      std::shared_ptr<ArrayData>* out) {
+  auto length = length_ - elements_offset_;
   const int64_t bytes_required = BitUtil::BytesForBits(length_);
 
-  if (bytes_required > 0 && bytes_required < data_->size()) {
-    // Trim buffers
+  if (bytes_required > 0 && bytes_required < data_->size() && reset_builder) {
+    // Trim buffers only if buffer is reset afterwards
     RETURN_NOT_OK(data_->Resize(bytes_required));
+    // reset raw_data_ pointer and capacity_
+    raw_data_ = data_->mutable_data();
+    capacity_ = data_->capacity();
+  }
+  *out = ArrayData::Make(boolean(), length, {null_bitmap_, data_}, null_count_,
+                         elements_offset_);
+
+  if (reset_builder) {
+    data_ = null_bitmap_ = nullptr;
+    capacity_ = length_ = null_count_ = 0;
   }
-  *out = ArrayData::Make(boolean(), length_, {null_bitmap_, data_}, 
null_count_);
 
-  data_ = null_bitmap_ = nullptr;
-  capacity_ = length_ = null_count_ = 0;
   return Status::OK();
 }
 
@@ -809,13 +850,12 @@ Status BooleanBuilder::Append(const std::vector<bool>& 
values) {
 
 // ----------------------------------------------------------------------
 // DictionaryBuilder
-
 using internal::WrappedBinary;
 
 template <typename T>
 DictionaryBuilder<T>::DictionaryBuilder(const std::shared_ptr<DataType>& type,
                                         MemoryPool* pool)
-    : ArrayBuilder(type, pool),
+    : PartiallyFinishableArrayBuilder(type, pool),
       hash_slots_(nullptr),
       dict_builder_(type, pool),
       overflow_dict_builder_(type, pool),
@@ -828,7 +868,7 @@ DictionaryBuilder<T>::DictionaryBuilder(const 
std::shared_ptr<DataType>& type,
 
 DictionaryBuilder<NullType>::DictionaryBuilder(const 
std::shared_ptr<DataType>& type,
                                                MemoryPool* pool)
-    : ArrayBuilder(type, pool), values_builder_(pool) {
+    : PartiallyFinishableArrayBuilder(type, pool), values_builder_(pool) {
   if (!::arrow::CpuInfo::initialized()) {
     ::arrow::CpuInfo::Init();
   }
@@ -839,7 +879,7 @@ DictionaryBuilder<NullType>::~DictionaryBuilder() {}
 template <>
 DictionaryBuilder<FixedSizeBinaryType>::DictionaryBuilder(
     const std::shared_ptr<DataType>& type, MemoryPool* pool)
-    : ArrayBuilder(type, pool),
+    : PartiallyFinishableArrayBuilder(type, pool),
       hash_slots_(nullptr),
       dict_builder_(type, pool),
       overflow_dict_builder_(type, pool),
@@ -858,11 +898,10 @@ Status DictionaryBuilder<T>::Init(int64_t elements) {
   RETURN_NOT_OK(internal::NewHashTable(kInitialHashTableSize, pool_, 
&hash_table_));
   hash_slots_ = reinterpret_cast<int32_t*>(hash_table_->mutable_data());
   hash_table_size_ = kInitialHashTableSize;
-  entry_id_offset_ = 0;
   mod_bitmask_ = kInitialHashTableSize - 1;
   hash_table_load_threshold_ =
       static_cast<int64_t>(static_cast<double>(elements) * kMaxHashTableLoad);
-
+  RETURN_NOT_OK(dict_builder_.Init(elements));
   return values_builder_.Init(elements);
 }
 
@@ -915,7 +954,7 @@ Status DictionaryBuilder<T>::Append(const Scalar& value) {
 
   if (index == kHashSlotEmpty) {
     // Not in the hash table, so we insert it now
-    index = static_cast<hash_slot_t>(dict_builder_.length() + 
entry_id_offset_);
+    index = static_cast<hash_slot_t>(dict_builder_.length());
     hash_slots_[j] = index;
     RETURN_NOT_OK(AppendDictionary(value));
 
@@ -992,58 +1031,49 @@ typename DictionaryBuilder<T>::Scalar 
DictionaryBuilder<T>::GetDictionaryValue(
   return data[index];
 }
 
-template <typename T>
-Status DictionaryBuilder<T>::FinishInternal(std::shared_ptr<ArrayData>* out) {
-  entry_id_offset_ += dict_builder_.length();
-  RETURN_NOT_OK(overflow_dict_builder_.Append(
-      reinterpret_cast<const 
DictionaryBuilder<T>::Scalar*>(dict_builder_.data()->data()),
-      dict_builder_.length(), nullptr));
+template <>
+const uint8_t* DictionaryBuilder<FixedSizeBinaryType>::GetDictionaryValue(
+    typename TypeTraits<FixedSizeBinaryType>::BuilderType& dictionary_builder,
+    int64_t index) {
+  return dictionary_builder.GetValue(index);
+}
 
-  std::shared_ptr<Array> dictionary;
-  RETURN_NOT_OK(dict_builder_.Finish(&dictionary));
+template <typename T>
+Status DictionaryBuilder<T>::FinishInternal(bool reset_builder,
+                                            std::shared_ptr<ArrayData>* out) {
+  std::shared_ptr<Array> dictionary, values;
+  RETURN_NOT_OK(dict_builder_.Finish(reset_builder, &dictionary));
+  RETURN_NOT_OK(values_builder_.Finish(reset_builder, &values));
+  *out = values->data();
 
-  RETURN_NOT_OK(values_builder_.FinishInternal(out));
   (*out)->type = std::make_shared<DictionaryType>((*out)->type, dictionary);
 
-  RETURN_NOT_OK(dict_builder_.Init(capacity_));
-  RETURN_NOT_OK(values_builder_.Init(capacity_));
+  if (reset_builder) {
+    capacity_ = 0;
+  }
+
   return Status::OK();
 }
 
-Status DictionaryBuilder<NullType>::FinishInternal(std::shared_ptr<ArrayData>* 
out) {
+Status DictionaryBuilder<NullType>::FinishInternal(bool reset_builder,
+                                                   std::shared_ptr<ArrayData>* 
out) {
   std::shared_ptr<Array> dictionary = std::make_shared<NullArray>(0);
 
-  RETURN_NOT_OK(values_builder_.FinishInternal(out));
+  RETURN_NOT_OK(values_builder_.FinishInternal(reset_builder, out));
   (*out)->type = std::make_shared<DictionaryType>((*out)->type, dictionary);
   return Status::OK();
 }
 
-template <>
-const uint8_t* DictionaryBuilder<FixedSizeBinaryType>::GetDictionaryValue(
-    typename TypeTraits<FixedSizeBinaryType>::BuilderType& dictionary_builder,
-    int64_t index) {
-  return dictionary_builder.GetValue(index);
-}
-
 template <>
 Status DictionaryBuilder<FixedSizeBinaryType>::FinishInternal(
-    std::shared_ptr<ArrayData>* out) {
-  entry_id_offset_ += dict_builder_.length();
-
-  for (uint64_t index = 0, limit = dict_builder_.length(); index < limit; 
++index) {
-    const Scalar value = GetDictionaryValue(dict_builder_, index);
-    RETURN_NOT_OK(overflow_dict_builder_.Append(value));
-  }
-
-  std::shared_ptr<Array> dictionary;
-  RETURN_NOT_OK(dict_builder_.Finish(&dictionary));
+    bool reset_builder, std::shared_ptr<ArrayData>* out) {
+  std::shared_ptr<Array> dictionary, values;
+  RETURN_NOT_OK(dict_builder_.Finish(reset_builder, &dictionary));
+  RETURN_NOT_OK(values_builder_.Finish(reset_builder, &values));
+  *out = values->data();
 
-  RETURN_NOT_OK(values_builder_.FinishInternal(out));
   (*out)->type = std::make_shared<DictionaryType>((*out)->type, dictionary);
 
-  RETURN_NOT_OK(dict_builder_.Init(capacity_));
-  RETURN_NOT_OK(values_builder_.Init(capacity_));
-
   return Status::OK();
 }
 
@@ -1060,33 +1090,17 @@ int64_t 
DictionaryBuilder<FixedSizeBinaryType>::HashValue(const Scalar& value) {
 template <typename T>
 bool DictionaryBuilder<T>::SlotDifferent(hash_slot_t index, const Scalar& 
value) {
   const bool value_found =
-      index >= entry_id_offset_ &&
-      GetDictionaryValue(dict_builder_, static_cast<int64_t>(index - 
entry_id_offset_)) ==
-          value;
-  const bool value_found_overflow =
-      entry_id_offset_ > 0 &&
-      GetDictionaryValue(overflow_dict_builder_, static_cast<int64_t>(index)) 
== value;
-  return !(value_found || value_found_overflow);
+      GetDictionaryValue(dict_builder_, static_cast<int64_t>(index)) == value;
+  return !value_found;
 }
 
 template <>
 bool DictionaryBuilder<FixedSizeBinaryType>::SlotDifferent(hash_slot_t index,
                                                            const Scalar& 
value) {
   int32_t width = static_cast<const FixedSizeBinaryType&>(*type_).byte_width();
-  bool value_found = false;
-  if (index >= entry_id_offset_) {
-    const Scalar other =
-        GetDictionaryValue(dict_builder_, static_cast<int64_t>(index - 
entry_id_offset_));
-    value_found = memcmp(other, value, width) == 0;
-  }
-
-  bool value_found_overflow = false;
-  if (entry_id_offset_ > 0) {
-    const Scalar other_overflow =
-        GetDictionaryValue(overflow_dict_builder_, 
static_cast<int64_t>(index));
-    value_found_overflow = memcmp(other_overflow, value, width) == 0;
-  }
-  return !(value_found || value_found_overflow);
+  const Scalar other = GetDictionaryValue(dict_builder_, 
static_cast<int64_t>(index));
+  bool value_found = memcmp(other, value, width) == 0;
+  return !value_found;
 }
 
 template <typename T>
@@ -1094,82 +1108,63 @@ Status DictionaryBuilder<T>::AppendDictionary(const 
Scalar& value) {
   return dict_builder_.Append(value);
 }
 
-#define BINARY_DICTIONARY_SPECIALIZATIONS(Type)                                
        \
-  template <>                                                                  
        \
-  WrappedBinary DictionaryBuilder<Type>::GetDictionaryValue(                   
        \
-      typename TypeTraits<Type>::BuilderType& dictionary_builder, int64_t 
index) {     \
-    int32_t v_len;                                                             
        \
-    const uint8_t* v = dictionary_builder.GetValue(                            
        \
-        static_cast<int64_t>(index - entry_id_offset_), &v_len);               
        \
-    return WrappedBinary(v, v_len);                                            
        \
-  }                                                                            
        \
-                                                                               
        \
-  template <>                                                                  
        \
-  Status DictionaryBuilder<Type>::AppendDictionary(const WrappedBinary& value) 
{       \
-    return dict_builder_.Append(value.ptr_, value.length_);                    
        \
-  }                                                                            
        \
-                                                                               
        \
-  template <>                                                                  
        \
-  Status DictionaryBuilder<Type>::AppendArray(const Array& array) {            
        \
-    const BinaryArray& binary_array = static_cast<const BinaryArray&>(array);  
        \
-    WrappedBinary value(nullptr, 0);                                           
        \
-    for (int64_t i = 0; i < array.length(); i++) {                             
        \
-      if (array.IsNull(i)) {                                                   
        \
-        RETURN_NOT_OK(AppendNull());                                           
        \
-      } else {                                                                 
        \
-        value.ptr_ = binary_array.GetValue(i, &value.length_);                 
        \
-        RETURN_NOT_OK(Append(value));                                          
        \
-      }                                                                        
        \
-    }                                                                          
        \
-    return Status::OK();                                                       
        \
-  }                                                                            
        \
-                                                                               
        \
-  template <>                                                                  
        \
-  int64_t DictionaryBuilder<Type>::HashValue(const WrappedBinary& value) {     
        \
-    return HashUtil::Hash(value.ptr_, value.length_, 0);                       
        \
-  }                                                                            
        \
-                                                                               
        \
-  template <>                                                                  
        \
-  bool DictionaryBuilder<Type>::SlotDifferent(hash_slot_t index,               
        \
-                                              const WrappedBinary& value) {    
        \
-    int32_t other_length;                                                      
        \
-    bool value_found = false;                                                  
        \
-    if (index >= entry_id_offset_) {                                           
        \
-      const uint8_t* other_value = dict_builder_.GetValue(                     
        \
-          static_cast<int64_t>(index - entry_id_offset_), &other_length);      
        \
-      value_found = other_length == value.length_ &&                           
        \
-                    memcmp(other_value, value.ptr_, value.length_) == 0;       
        \
-    }                                                                          
        \
-                                                                               
        \
-    bool value_found_overflow = false;                                         
        \
-    if (entry_id_offset_ > 0) {                                                
        \
-      const uint8_t* other_value_overflow =                                    
        \
-          overflow_dict_builder_.GetValue(static_cast<int64_t>(index), 
&other_length); \
-      value_found_overflow =                                                   
        \
-          other_length == value.length_ &&                                     
        \
-          memcmp(other_value_overflow, value.ptr_, value.length_) == 0;        
        \
-    }                                                                          
        \
-    return !(value_found || value_found_overflow);                             
        \
-  }                                                                            
        \
-                                                                               
        \
-  template <>                                                                  
        \
-  Status DictionaryBuilder<Type>::FinishInternal(std::shared_ptr<ArrayData>* 
out) {    \
-    entry_id_offset_ += dict_builder_.length();                                
        \
-    for (uint64_t index = 0, limit = dict_builder_.length(); index < limit; 
++index) { \
-      int32_t out_length;                                                      
        \
-      const uint8_t* value = dict_builder_.GetValue(index, &out_length);       
        \
-      RETURN_NOT_OK(overflow_dict_builder_.Append(value, out_length));         
        \
-    }                                                                          
        \
-                                                                               
        \
-    std::shared_ptr<Array> dictionary;                                         
        \
-    RETURN_NOT_OK(dict_builder_.Finish(&dictionary));                          
        \
-                                                                               
        \
-    RETURN_NOT_OK(values_builder_.FinishInternal(out));                        
        \
-    (*out)->type = std::make_shared<DictionaryType>((*out)->type, dictionary); 
        \
-                                                                               
        \
-    RETURN_NOT_OK(dict_builder_.Init(capacity_));                              
        \
-    RETURN_NOT_OK(values_builder_.Init(capacity_));                            
        \
-    return Status::OK();                                                       
        \
+#define BINARY_DICTIONARY_SPECIALIZATIONS(Type)                                
          \
+  template <>                                                                  
          \
+  WrappedBinary DictionaryBuilder<Type>::GetDictionaryValue(                   
          \
+      typename TypeTraits<Type>::BuilderType& dictionary_builder, int64_t 
index) {       \
+    int32_t v_len;                                                             
          \
+    const uint8_t* v = 
dictionary_builder.GetValue(static_cast<int64_t>(index), &v_len); \
+    return WrappedBinary(v, v_len);                                            
          \
+  }                                                                            
          \
+                                                                               
          \
+  template <>                                                                  
          \
+  Status DictionaryBuilder<Type>::AppendDictionary(const WrappedBinary& value) 
{         \
+    return dict_builder_.Append(value.ptr_, value.length_);                    
          \
+  }                                                                            
          \
+                                                                               
          \
+  template <>                                                                  
          \
+  Status DictionaryBuilder<Type>::AppendArray(const Array& array) {            
          \
+    const BinaryArray& binary_array = static_cast<const BinaryArray&>(array);  
          \
+    WrappedBinary value(nullptr, 0);                                           
          \
+    for (int64_t i = 0; i < array.length(); i++) {                             
          \
+      if (array.IsNull(i)) {                                                   
          \
+        RETURN_NOT_OK(AppendNull());                                           
          \
+      } else {                                                                 
          \
+        value.ptr_ = binary_array.GetValue(i, &value.length_);                 
          \
+        RETURN_NOT_OK(Append(value));                                          
          \
+      }                                                                        
          \
+    }                                                                          
          \
+    return Status::OK();                                                       
          \
+  }                                                                            
          \
+                                                                               
          \
+  template <>                                                                  
          \
+  int64_t DictionaryBuilder<Type>::HashValue(const WrappedBinary& value) {     
          \
+    return HashUtil::Hash(value.ptr_, value.length_, 0);                       
          \
+  }                                                                            
          \
+                                                                               
          \
+  template <>                                                                  
          \
+  bool DictionaryBuilder<Type>::SlotDifferent(hash_slot_t index,               
          \
+                                              const WrappedBinary& value) {    
          \
+    int32_t other_length;                                                      
          \
+    bool value_found = false;                                                  
          \
+    const uint8_t* other_value =                                               
          \
+        dict_builder_.GetValue(static_cast<int64_t>(index), &other_length);    
          \
+    value_found = other_length == value.length_ &&                             
          \
+                  memcmp(other_value, value.ptr_, value.length_) == 0;         
          \
+    return !value_found;                                                       
          \
+  }                                                                            
          \
+                                                                               
          \
+  template <>                                                                  
          \
+  Status DictionaryBuilder<Type>::FinishInternal(bool reset_builder,           
          \
+                                                 std::shared_ptr<ArrayData>* 
out) {      \
+    std::shared_ptr<Array> dictionary, values;                                 
          \
+    RETURN_NOT_OK(dict_builder_.Finish(reset_builder, &dictionary));           
          \
+                                                                               
          \
+    RETURN_NOT_OK(values_builder_.Finish(reset_builder, &values));             
          \
+    *out = values->data();                                                     
          \
+    (*out)->type = std::make_shared<DictionaryType>((*out)->type, dictionary); 
          \
+                                                                               
          \
+    return Status::OK();                                                       
          \
   }
 
 BINARY_DICTIONARY_SPECIALIZATIONS(StringType);
@@ -1206,14 +1201,6 @@ Status Decimal128Builder::Append(const Decimal128& 
value) {
   return FixedSizeBinaryBuilder::Append(value.ToBytes());
 }
 
-Status Decimal128Builder::FinishInternal(std::shared_ptr<ArrayData>* out) {
-  std::shared_ptr<Buffer> data;
-  RETURN_NOT_OK(byte_builder_.Finish(&data));
-
-  *out = ArrayData::Make(type_, length_, {null_bitmap_, data}, null_count_);
-  return Status::OK();
-}
-
 // ----------------------------------------------------------------------
 // ListBuilder
 
@@ -1298,13 +1285,16 @@ ArrayBuilder* ListBuilder::value_builder() const {
 // String and binary
 
 BinaryBuilder::BinaryBuilder(const std::shared_ptr<DataType>& type, 
MemoryPool* pool)
-    : ArrayBuilder(type, pool), offsets_builder_(pool), 
value_data_builder_(pool) {}
+    : PartiallyFinishableArrayBuilder(type, pool),
+      offsets_builder_(pool),
+      value_data_builder_(pool) {}
 
 BinaryBuilder::BinaryBuilder(MemoryPool* pool) : BinaryBuilder(binary(), pool) 
{}
 
 Status BinaryBuilder::Init(int64_t elements) {
   DCHECK_LE(elements, kListMaximumElements);
   RETURN_NOT_OK(ArrayBuilder::Init(elements));
+  is_final_offset_written_ = false;
   // one more then requested for offsets
   return offsets_builder_.Resize((elements + 1) * sizeof(int32_t));
 }
@@ -1327,6 +1317,12 @@ Status BinaryBuilder::ReserveData(int64_t elements) {
 }
 
 Status BinaryBuilder::AppendNextOffset() {
+  if (is_final_offset_written_) {
+    // the final (closing) offset was written, just set the
+    // flag to false
+    is_final_offset_written_ = false;
+    return Status::OK();
+  }
   const int64_t num_bytes = value_data_builder_.length();
   if (ARROW_PREDICT_FALSE(num_bytes > kBinaryMemoryLimit)) {
     std::stringstream ss;
@@ -1337,6 +1333,16 @@ Status BinaryBuilder::AppendNextOffset() {
   return offsets_builder_.Append(static_cast<int32_t>(num_bytes));
 }
 
+// write final offset
+Status BinaryBuilder::AppendFinalOffset() {
+  if (ARROW_PREDICT_FALSE(is_final_offset_written_)) {
+    return Status::Invalid("Final offset allready added");
+  }
+  RETURN_NOT_OK(AppendNextOffset());
+  is_final_offset_written_ = true;
+  return Status::OK();
+}
+
 Status BinaryBuilder::Append(const uint8_t* value, int32_t length) {
   RETURN_NOT_OK(Reserve(1));
   RETURN_NOT_OK(AppendNextOffset());
@@ -1352,17 +1358,23 @@ Status BinaryBuilder::AppendNull() {
   return Status::OK();
 }
 
-Status BinaryBuilder::FinishInternal(std::shared_ptr<ArrayData>* out) {
-  // Write final offset (values length)
-  RETURN_NOT_OK(AppendNextOffset());
+Status BinaryBuilder::FinishInternal(bool reset_builder,
+                                     std::shared_ptr<ArrayData>* out) {
+  auto slice_length = length_ - elements_offset_;
+  RETURN_NOT_OK(AppendFinalOffset());
   std::shared_ptr<Buffer> offsets, value_data;
 
-  RETURN_NOT_OK(offsets_builder_.Finish(&offsets));
-  RETURN_NOT_OK(value_data_builder_.Finish(&value_data));
+  RETURN_NOT_OK(
+      offsets_builder_.FinishSliceByItem(&offsets, 0, slice_length + 1, 
reset_builder));
+  auto last_offset = offsets_builder_.data()[elements_offset_ + slice_length];
+  RETURN_NOT_OK(
+      value_data_builder_.FinishSliceByItem(&value_data, 0, last_offset, 
reset_builder));
 
-  *out = ArrayData::Make(type_, length_, {null_bitmap_, offsets, value_data}, 
null_count_,
-                         0);
-  Reset();
+  *out = ArrayData::Make(type_, slice_length, {null_bitmap_, offsets, 
value_data},
+                         null_count_, elements_offset_);
+  if (reset_builder) {
+    Reset();
+  }
   return Status::OK();
 }
 
@@ -1418,7 +1430,7 @@ Status StringBuilder::Append(const 
std::vector<std::string>& values,
 
 FixedSizeBinaryBuilder::FixedSizeBinaryBuilder(const 
std::shared_ptr<DataType>& type,
                                                MemoryPool* pool)
-    : ArrayBuilder(type, pool),
+    : PartiallyFinishableArrayBuilder(type, pool),
       byte_width_(static_cast<const FixedSizeBinaryType&>(*type).byte_width()),
       byte_builder_(pool) {}
 
@@ -1449,14 +1461,19 @@ Status FixedSizeBinaryBuilder::Resize(int64_t capacity) 
{
   return ArrayBuilder::Resize(capacity);
 }
 
-Status FixedSizeBinaryBuilder::FinishInternal(std::shared_ptr<ArrayData>* out) 
{
+Status FixedSizeBinaryBuilder::FinishInternal(bool reset_builder,
+                                              std::shared_ptr<ArrayData>* out) 
{
+  auto slice_length = length_ - elements_offset_;
   std::shared_ptr<Buffer> data;
-  RETURN_NOT_OK(byte_builder_.Finish(&data));
+  RETURN_NOT_OK(byte_builder_.FinishSlice(&data, 0, slice_length * 
byte_width_, false));
 
-  *out = ArrayData::Make(type_, length_, {null_bitmap_, data}, null_count_);
+  *out = ArrayData::Make(type_, slice_length, {null_bitmap_, data}, 
null_count_,
+                         elements_offset_);
 
-  null_bitmap_ = nullptr;
-  capacity_ = length_ = null_count_ = 0;
+  if (reset_builder) {
+    null_bitmap_ = nullptr;
+    capacity_ = length_ = null_count_ = 0;
+  }
   return Status::OK();
 }
 
diff --git a/cpp/src/arrow/builder.h b/cpp/src/arrow/builder.h
index b0f77bd98..6a4919153 100644
--- a/cpp/src/arrow/builder.h
+++ b/cpp/src/arrow/builder.h
@@ -26,6 +26,7 @@
 #include <string>
 #include <vector>
 
+#include "arrow/array.h"
 #include "arrow/buffer.h"
 #include "arrow/memory_pool.h"
 #include "arrow/status.h"
@@ -112,14 +113,14 @@ class ARROW_EXPORT ArrayBuilder {
   std::shared_ptr<PoolBuffer> null_bitmap() const { return null_bitmap_; }
 
   /// \brief Return result of builder as an internal generic ArrayData
-  /// object. Resets builder except for dictionary builder
+  /// object.
   ///
   /// \param[out] out the finalized ArrayData object
   /// \return Status
   virtual Status FinishInternal(std::shared_ptr<ArrayData>* out) = 0;
 
   /// \brief Return result of builder as an Array object.
-  ///        Resets the builder except for DictionaryBuilder
+  ///        Resets the builder.
   ///
   /// \param[out] out the finalized Array object
   /// \return Status
@@ -172,10 +173,47 @@ class ARROW_EXPORT ArrayBuilder {
   ARROW_DISALLOW_COPY_AND_ASSIGN(ArrayBuilder);
 };
 
-class ARROW_EXPORT NullBuilder : public ArrayBuilder {
+// An array build which supports partial finishes
+//
+// After a partial finish the state of the array builder
+// is not reset and additional elements are appended to the
+// previous state instead. Subsequent finishes will yield
+// results which represent the elements added since the last
+// finish, i.e. SliceBuffers or delta dictionaries
+class ARROW_EXPORT PartiallyFinishableArrayBuilder : public ArrayBuilder {
+ public:
+  using ArrayBuilder::ArrayBuilder;
+
+  // Finishes the builder into out without resetting the builder if 
reset_builder
+  // is false. Use with caution, it doesn't set elements_offset.
+  virtual Status FinishInternal(bool reset_builder, 
std::shared_ptr<ArrayData>* out) = 0;
+  Status FinishInternal(std::shared_ptr<ArrayData>* out) override {
+    return FinishInternal(true, out);
+  }
+  Status Finish(std::shared_ptr<Array>* out) { return Finish(true, out); }
+
+  /// \brief Return result of builder as a SliceArray object.
+  ///
+  /// \param[in] reset_builder reset the state of the builder
+  /// \param[out] out the finalized Array object
+  /// \return Status
+  Status Finish(bool reset_builder, std::shared_ptr<Array>* out) {
+    std::shared_ptr<ArrayData> internal_data;
+    RETURN_NOT_OK(FinishInternal(reset_builder, &internal_data));
+    *out = MakeArray(internal_data);
+    elements_offset_ = reset_builder ? 0 : length_;
+    return Status::OK();
+  }
+  int64_t elements_offset() { return elements_offset_; }
+
+ protected:
+  int64_t elements_offset_ = 0;
+};
+
+class ARROW_EXPORT NullBuilder : public PartiallyFinishableArrayBuilder {
  public:
   explicit NullBuilder(MemoryPool* pool ARROW_MEMORY_POOL_DEFAULT)
-      : ArrayBuilder(null(), pool) {}
+      : PartiallyFinishableArrayBuilder(null(), pool) {}
 
   Status AppendNull() {
     ++null_count_;
@@ -183,16 +221,17 @@ class ARROW_EXPORT NullBuilder : public ArrayBuilder {
     return Status::OK();
   }
 
-  Status FinishInternal(std::shared_ptr<ArrayData>* out) override;
+  using PartiallyFinishableArrayBuilder::FinishInternal;
+  Status FinishInternal(bool reset_builder, std::shared_ptr<ArrayData>* out) 
override;
 };
 
 template <typename Type>
-class ARROW_EXPORT PrimitiveBuilder : public ArrayBuilder {
+class ARROW_EXPORT PrimitiveBuilder : public PartiallyFinishableArrayBuilder {
  public:
   using value_type = typename Type::c_type;
 
   explicit PrimitiveBuilder(const std::shared_ptr<DataType>& type, MemoryPool* 
pool)
-      : ArrayBuilder(type, pool), data_(NULLPTR), raw_data_(NULLPTR) {}
+      : PartiallyFinishableArrayBuilder(type, pool), data_(NULLPTR), 
raw_data_(NULLPTR) {}
 
   using ArrayBuilder::Advance;
 
@@ -241,7 +280,9 @@ class ARROW_EXPORT PrimitiveBuilder : public ArrayBuilder {
   /// \return Status
   Status Append(const std::vector<value_type>& values);
 
-  Status FinishInternal(std::shared_ptr<ArrayData>* out) override;
+  using PartiallyFinishableArrayBuilder::FinishInternal;
+  Status FinishInternal(bool reset_builder, std::shared_ptr<ArrayData>* out) 
override;
+
   Status Init(int64_t capacity) override;
 
   /// Increase the capacity of the builder to accommodate at least the 
indicated
@@ -288,6 +329,15 @@ class ARROW_EXPORT NumericBuilder : public 
PrimitiveBuilder<T> {
     raw_data_[length_++] = val;
   }
 
+  /// Append an array of scalar elements without capacity check
+  ///
+  /// Same warning as with UnsafeAppend with a single element
+  void UnsafeAppend(const value_type* val, int64_t num_elements) {
+    std::memcpy(raw_data_, val, num_elements);
+    // length is updated here
+    ArrayBuilder::UnsafeSetNotNull(num_elements);
+  }
+
  protected:
   using PrimitiveBuilder<T>::length_;
   using PrimitiveBuilder<T>::null_bitmap_data_;
@@ -317,7 +367,7 @@ using DoubleBuilder = NumericBuilder<DoubleType>;
 
 namespace internal {
 
-class ARROW_EXPORT AdaptiveIntBuilderBase : public ArrayBuilder {
+class ARROW_EXPORT AdaptiveIntBuilderBase : public 
PartiallyFinishableArrayBuilder {
  public:
   explicit AdaptiveIntBuilderBase(MemoryPool* pool);
 
@@ -436,7 +486,8 @@ class ARROW_EXPORT AdaptiveUIntBuilder : public 
internal::AdaptiveIntBuilderBase
   Status Append(const uint64_t* values, int64_t length,
                 const uint8_t* valid_bytes = NULLPTR);
 
-  Status FinishInternal(std::shared_ptr<ArrayData>* out) override;
+  using PartiallyFinishableArrayBuilder::FinishInternal;
+  Status FinishInternal(bool reset_builder, std::shared_ptr<ArrayData>* out) 
override;
 
  protected:
   Status ExpandIntSize(uint8_t new_int_size);
@@ -498,7 +549,8 @@ class ARROW_EXPORT AdaptiveIntBuilder : public 
internal::AdaptiveIntBuilderBase
   Status Append(const int64_t* values, int64_t length,
                 const uint8_t* valid_bytes = NULLPTR);
 
-  Status FinishInternal(std::shared_ptr<ArrayData>* out) override;
+  using PartiallyFinishableArrayBuilder::FinishInternal;
+  Status FinishInternal(bool reset_builder, std::shared_ptr<ArrayData>* out) 
override;
 
  protected:
   Status ExpandIntSize(uint8_t new_int_size);
@@ -516,7 +568,7 @@ class ARROW_EXPORT AdaptiveIntBuilder : public 
internal::AdaptiveIntBuilderBase
   Status ExpandIntSizeN();
 };
 
-class ARROW_EXPORT BooleanBuilder : public ArrayBuilder {
+class ARROW_EXPORT BooleanBuilder : public PartiallyFinishableArrayBuilder {
  public:
   explicit BooleanBuilder(MemoryPool* pool ARROW_MEMORY_POOL_DEFAULT);
 
@@ -595,7 +647,8 @@ class ARROW_EXPORT BooleanBuilder : public ArrayBuilder {
   /// \return Status
   Status Append(const std::vector<bool>& values);
 
-  Status FinishInternal(std::shared_ptr<ArrayData>* out) override;
+  using PartiallyFinishableArrayBuilder::FinishInternal;
+  Status FinishInternal(bool reset_builder, std::shared_ptr<ArrayData>* out) 
override;
   Status Init(int64_t capacity) override;
 
   /// Increase the capacity of the builder to accommodate at least the 
indicated
@@ -666,7 +719,7 @@ class ARROW_EXPORT ListBuilder : public ArrayBuilder {
 
 /// \class BinaryBuilder
 /// \brief Builder class for variable-length binary data
-class ARROW_EXPORT BinaryBuilder : public ArrayBuilder {
+class ARROW_EXPORT BinaryBuilder : public PartiallyFinishableArrayBuilder {
  public:
   explicit BinaryBuilder(MemoryPool* pool ARROW_MEMORY_POOL_DEFAULT);
 
@@ -689,7 +742,8 @@ class ARROW_EXPORT BinaryBuilder : public ArrayBuilder {
   /// \brief Ensures there is enough allocated capacity to append the indicated
   /// number of bytes to the value data buffer without additional allocations
   Status ReserveData(int64_t elements);
-  Status FinishInternal(std::shared_ptr<ArrayData>* out) override;
+  using PartiallyFinishableArrayBuilder::FinishInternal;
+  Status FinishInternal(bool reset_builder, std::shared_ptr<ArrayData>* out) 
override;
 
   /// \return size of values buffer so far
   int64_t value_data_length() const { return value_data_builder_.length(); }
@@ -706,7 +760,10 @@ class ARROW_EXPORT BinaryBuilder : public ArrayBuilder {
   TypedBufferBuilder<uint8_t> value_data_builder_;
 
   Status AppendNextOffset();
+  Status AppendFinalOffset();
   void Reset();
+  // used to decide if we should write the next offset
+  bool is_final_offset_written_ = false;
 };
 
 /// \class StringBuilder
@@ -725,7 +782,7 @@ class ARROW_EXPORT StringBuilder : public BinaryBuilder {
 // ----------------------------------------------------------------------
 // FixedSizeBinaryBuilder
 
-class ARROW_EXPORT FixedSizeBinaryBuilder : public ArrayBuilder {
+class ARROW_EXPORT FixedSizeBinaryBuilder : public 
PartiallyFinishableArrayBuilder {
  public:
   FixedSizeBinaryBuilder(const std::shared_ptr<DataType>& type,
                          MemoryPool* pool ARROW_MEMORY_POOL_DEFAULT);
@@ -753,7 +810,8 @@ class ARROW_EXPORT FixedSizeBinaryBuilder : public 
ArrayBuilder {
 
   Status Init(int64_t elements) override;
   Status Resize(int64_t capacity) override;
-  Status FinishInternal(std::shared_ptr<ArrayData>* out) override;
+  using PartiallyFinishableArrayBuilder::FinishInternal;
+  Status FinishInternal(bool reset_builders, std::shared_ptr<ArrayData>* out) 
override;
 
   /// \return size of values buffer so far
   int64_t value_data_length() const { return byte_builder_.length(); }
@@ -776,8 +834,6 @@ class ARROW_EXPORT Decimal128Builder : public 
FixedSizeBinaryBuilder {
   using FixedSizeBinaryBuilder::Append;
 
   Status Append(const Decimal128& val);
-
-  Status FinishInternal(std::shared_ptr<ArrayData>* out) override;
 };
 
 using DecimalBuilder = Decimal128Builder;
@@ -869,7 +925,7 @@ struct DictionaryScalar<FixedSizeBinaryType> {
 ///
 /// data
 template <typename T>
-class ARROW_EXPORT DictionaryBuilder : public ArrayBuilder {
+class ARROW_EXPORT DictionaryBuilder : public PartiallyFinishableArrayBuilder {
  public:
   using Scalar = typename internal::DictionaryScalar<T>::type;
 
@@ -893,10 +949,11 @@ class ARROW_EXPORT DictionaryBuilder : public 
ArrayBuilder {
 
   Status Init(int64_t elements) override;
   Status Resize(int64_t capacity) override;
-  Status FinishInternal(std::shared_ptr<ArrayData>* out) override;
+  using PartiallyFinishableArrayBuilder::FinishInternal;
+  Status FinishInternal(bool reset_builder, std::shared_ptr<ArrayData>* out) 
override;
 
   /// is the dictionary builder in the delta building mode
-  bool is_building_delta() { return entry_id_offset_ > 0; }
+  bool is_building_delta() { return dict_builder_.elements_offset() > 0; }
 
  protected:
   Status DoubleTableSize();
@@ -912,11 +969,6 @@ class ARROW_EXPORT DictionaryBuilder : public ArrayBuilder 
{
   /// Size of the table. Must be a power of 2.
   int64_t hash_table_size_;
 
-  // offset for the entry ids. Used to build delta dictionaries,
-  // increased on every InternalFinish by the number of current entries
-  // in the dictionary
-  int64_t entry_id_offset_;
-
   // Store hash_table_size_ - 1, so that j & mod_bitmask_ is equivalent to j %
   // hash_table_size_, but uses far fewer CPU cycles
   int64_t mod_bitmask_;
@@ -932,7 +984,7 @@ class ARROW_EXPORT DictionaryBuilder : public ArrayBuilder {
 };
 
 template <>
-class ARROW_EXPORT DictionaryBuilder<NullType> : public ArrayBuilder {
+class ARROW_EXPORT DictionaryBuilder<NullType> : public 
PartiallyFinishableArrayBuilder {
  public:
   ~DictionaryBuilder() override;
 
@@ -947,7 +999,8 @@ class ARROW_EXPORT DictionaryBuilder<NullType> : public 
ArrayBuilder {
 
   Status Init(int64_t elements) override;
   Status Resize(int64_t capacity) override;
-  Status FinishInternal(std::shared_ptr<ArrayData>* out) override;
+  using PartiallyFinishableArrayBuilder::FinishInternal;
+  Status FinishInternal(bool reset_builder, std::shared_ptr<ArrayData>* out) 
override;
 
  protected:
   AdaptiveIntBuilder values_builder_;


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> [C++] Optimize delta buffer creation with partially finishable array builders
> -----------------------------------------------------------------------------
>
>                 Key: ARROW-2330
>                 URL: https://issues.apache.org/jira/browse/ARROW-2330
>             Project: Apache Arrow
>          Issue Type: New Feature
>          Components: C++
>    Affects Versions: 0.8.0
>            Reporter: Dimitri Vorona
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 0.10.0
>
>
> The main aim of this change is to optimize the building of delta 
> dictionaries. In the current version delta dictionaries are built using an 
> additional "overflow" buffer which leads to complicated and potentially 
> error-prone code and subpar performance by doubling the number of lookups.
> I solve this problem by introducing the notion of partially finishable array 
> builders, i.e. builder which are able to retain the state on calling Finish. 
> The interface is based on RecordBatchBuilder::Flush, i.e. Finish is 
> overloaded with additional signature Finish(bool reset_builder, 
> std::shared_ptr<Array>* out). The resulting Arrays point to the same data 
> buffer with different offsets.
> I'm aware that the change is kind of biggish, but I'd like to discuss it 
> here. The solution makes the code more straight forward, doesn't bloat the 
> code base too much and leaves the API more or less untouched. Additionally, 
> the new way to make delta dictionaries by using a different call signature to 
> Finish feel cleaner to me.
> I'm looking forward to your critic and improvement ideas.
> The pull request is available at: https://github.com/apache/arrow/pull/1769



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to