[ 
https://issues.apache.org/jira/browse/ARROW-1114?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16215929#comment-16215929
 ] 

ASF GitHub Bot commented on ARROW-1114:
---------------------------------------

wesm closed pull request #802: ARROW-1114: [C++] Add simple RecordBatchBuilder 
class
URL: https://github.com/apache/arrow/pull/802
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt
index 9a676510e..22b475146 100644
--- a/cpp/src/arrow/CMakeLists.txt
+++ b/cpp/src/arrow/CMakeLists.txt
@@ -24,6 +24,7 @@ set(ARROW_SRCS
   pretty_print.cc
   status.cc
   table.cc
+  table_builder.cc
   tensor.cc
   type.cc
   visitor.cc
@@ -143,6 +144,7 @@ install(FILES
   pretty_print.h
   status.h
   table.h
+  table_builder.h
   tensor.h
   type.h
   type_fwd.h
@@ -172,6 +174,7 @@ ADD_ARROW_TEST(pretty_print-test)
 ADD_ARROW_TEST(status-test)
 ADD_ARROW_TEST(type-test)
 ADD_ARROW_TEST(table-test)
+ADD_ARROW_TEST(table_builder-test)
 ADD_ARROW_TEST(tensor-test)
 
 ADD_ARROW_BENCHMARK(builder-benchmark)
diff --git a/cpp/src/arrow/api.h b/cpp/src/arrow/api.h
index 4d731bd32..5d2e859f3 100644
--- a/cpp/src/arrow/api.h
+++ b/cpp/src/arrow/api.h
@@ -28,6 +28,7 @@
 #include "arrow/pretty_print.h"
 #include "arrow/status.h"
 #include "arrow/table.h"
+#include "arrow/table_builder.h"
 #include "arrow/tensor.h"
 #include "arrow/type.h"
 #include "arrow/visitor.h"
diff --git a/cpp/src/arrow/ipc/json-integration-test.cc 
b/cpp/src/arrow/ipc/json-integration-test.cc
index 4a3b2b399..c7530a467 100644
--- a/cpp/src/arrow/ipc/json-integration-test.cc
+++ b/cpp/src/arrow/ipc/json-integration-test.cc
@@ -182,10 +182,10 @@ static Status ValidateArrowVsJson(const std::string& 
arrow_path,
       ss << "Record batch " << i << " did not match";
 
       ss << "\nJSON:\n";
-      RETURN_NOT_OK(PrettyPrint(*json_batch.get(), 0, &ss));
+      RETURN_NOT_OK(PrettyPrint(*json_batch, 0, &ss));
 
       ss << "\nArrow:\n";
-      RETURN_NOT_OK(PrettyPrint(*arrow_batch.get(), 0, &ss));
+      RETURN_NOT_OK(PrettyPrint(*arrow_batch, 0, &ss));
       return Status::Invalid(ss.str());
     }
   }
diff --git a/cpp/src/arrow/table_builder-test.cc 
b/cpp/src/arrow/table_builder-test.cc
new file mode 100644
index 000000000..07d9b6b2d
--- /dev/null
+++ b/cpp/src/arrow/table_builder-test.cc
@@ -0,0 +1,146 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "gtest/gtest.h"
+
+#include "arrow/array.h"
+#include "arrow/status.h"
+#include "arrow/table.h"
+#include "arrow/table_builder.h"
+#include "arrow/test-common.h"
+#include "arrow/test-util.h"
+#include "arrow/type.h"
+
+namespace arrow {
+
+class TestRecordBatchBuilder : public TestBase {
+ public:
+};
+
+std::shared_ptr<Schema> ExampleSchema1() {
+  auto f0 = field("f0", int32());
+  auto f1 = field("f1", utf8());
+  auto f2 = field("f1", list(int8()));
+  return ::arrow::schema({f0, f1, f2});
+}
+
+template <typename BuilderType, typename T>
+void AppendValues(BuilderType* builder, const std::vector<T>& values,
+                  const std::vector<bool>& is_valid) {
+  for (size_t i = 0; i < values.size(); ++i) {
+    if (is_valid.size() == 0 || is_valid[i]) {
+      ASSERT_OK(builder->Append(values[i]));
+    } else {
+      ASSERT_OK(builder->AppendNull());
+    }
+  }
+}
+
+template <typename ValueType, typename T>
+void AppendList(ListBuilder* builder, const std::vector<std::vector<T>>& 
values,
+                const std::vector<bool>& is_valid) {
+  auto values_builder = static_cast<ValueType*>(builder->value_builder());
+
+  for (size_t i = 0; i < values.size(); ++i) {
+    if (is_valid.size() == 0 || is_valid[i]) {
+      ASSERT_OK(builder->Append());
+      AppendValues<ValueType, T>(values_builder, values[i], {});
+    } else {
+      ASSERT_OK(builder->AppendNull());
+    }
+  }
+}
+
+TEST_F(TestRecordBatchBuilder, Basics) {
+  auto schema = ExampleSchema1();
+
+  std::unique_ptr<RecordBatchBuilder> builder;
+  ASSERT_OK(RecordBatchBuilder::Make(schema, pool_, &builder));
+
+  std::vector<bool> is_valid = {false, true, true, true};
+  std::vector<int32_t> f0_values = {0, 1, 2, 3};
+  std::vector<std::string> f1_values = {"a", "bb", "ccc", "dddd"};
+  std::vector<std::vector<int8_t>> f2_values = {{}, {0, 1}, {}, {2}};
+
+  std::shared_ptr<Array> a0, a1, a2;
+
+  // Make the expected record batch
+  auto AppendData = [&](Int32Builder* b0, StringBuilder* b1, ListBuilder* b2) {
+    AppendValues<Int32Builder, int32_t>(b0, f0_values, is_valid);
+    AppendValues<StringBuilder, std::string>(b1, f1_values, is_valid);
+    AppendList<Int8Builder, int8_t>(b2, f2_values, is_valid);
+  };
+
+  Int32Builder ex_b0;
+  StringBuilder ex_b1;
+  ListBuilder ex_b2(pool_, std::unique_ptr<Int8Builder>(new 
Int8Builder(pool_)));
+
+  AppendData(&ex_b0, &ex_b1, &ex_b2);
+  ASSERT_OK(ex_b0.Finish(&a0));
+  ASSERT_OK(ex_b1.Finish(&a1));
+  ASSERT_OK(ex_b2.Finish(&a2));
+
+  RecordBatch expected(schema, 4, {a0, a1, a2});
+
+  // Builder attributes
+  ASSERT_EQ(3, builder->num_fields());
+  ASSERT_EQ(schema.get(), builder->schema().get());
+
+  const int kIter = 3;
+  for (int i = 0; i < kIter; ++i) {
+    AppendData(builder->GetFieldAs<Int32Builder>(0),
+               static_cast<StringBuilder*>(builder->GetField(1)),
+               builder->GetFieldAs<ListBuilder>(2));
+
+    std::shared_ptr<RecordBatch> batch;
+
+    if (i == kIter - 1) {
+      // Do not flush in last iteration
+      ASSERT_OK(builder->Flush(false, &batch));
+    } else {
+      ASSERT_OK(builder->Flush(&batch));
+    }
+
+    ASSERT_BATCHES_EQUAL(expected, *batch);
+  }
+
+  // Test setting initial capacity
+  builder->SetInitialCapacity(4096);
+  ASSERT_EQ(4096, builder->initial_capacity());
+}
+
+TEST_F(TestRecordBatchBuilder, InvalidFieldLength) {
+  auto schema = ExampleSchema1();
+
+  std::unique_ptr<RecordBatchBuilder> builder;
+  ASSERT_OK(RecordBatchBuilder::Make(schema, pool_, &builder));
+
+  std::vector<bool> is_valid = {false, true, true, true};
+  std::vector<int32_t> f0_values = {0, 1, 2, 3};
+
+  AppendValues<Int32Builder, int32_t>(builder->GetFieldAs<Int32Builder>(0), 
f0_values,
+                                      is_valid);
+
+  std::shared_ptr<RecordBatch> dummy;
+  ASSERT_RAISES(Invalid, builder->Flush(&dummy));
+}
+
+}  // namespace arrow
diff --git a/cpp/src/arrow/table_builder.cc b/cpp/src/arrow/table_builder.cc
new file mode 100644
index 000000000..a1bd95940
--- /dev/null
+++ b/cpp/src/arrow/table_builder.cc
@@ -0,0 +1,101 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/table_builder.h"
+
+#include <algorithm>
+#include <cstdlib>
+#include <memory>
+#include <sstream>
+
+#include "arrow/array.h"
+#include "arrow/builder.h"
+#include "arrow/status.h"
+#include "arrow/table.h"
+#include "arrow/type.h"
+#include "arrow/util/logging.h"
+
+namespace arrow {
+
+// ----------------------------------------------------------------------
+// RecordBatchBuilder
+
+RecordBatchBuilder::RecordBatchBuilder(const std::shared_ptr<Schema>& schema,
+                                       MemoryPool* pool, int64_t 
initial_capacity)
+    : schema_(schema), initial_capacity_(initial_capacity), pool_(pool) {}
+
+Status RecordBatchBuilder::Make(const std::shared_ptr<Schema>& schema, 
MemoryPool* pool,
+                                std::unique_ptr<RecordBatchBuilder>* builder) {
+  return Make(schema, pool, kMinBuilderCapacity, builder);
+}
+
+Status RecordBatchBuilder::Make(const std::shared_ptr<Schema>& schema, 
MemoryPool* pool,
+                                int64_t initial_capacity,
+                                std::unique_ptr<RecordBatchBuilder>* builder) {
+  builder->reset(new RecordBatchBuilder(schema, pool, initial_capacity));
+  RETURN_NOT_OK((*builder)->CreateBuilders());
+  return (*builder)->InitBuilders();
+}
+
+Status RecordBatchBuilder::Flush(bool reset_builders,
+                                 std::shared_ptr<RecordBatch>* batch) {
+  std::vector<std::shared_ptr<Array>> fields;
+  fields.resize(this->num_fields());
+
+  int64_t length = 0;
+  for (int i = 0; i < this->num_fields(); ++i) {
+    RETURN_NOT_OK(raw_field_builders_[i]->Finish(&fields[i]));
+    if (i > 0 && fields[i]->length() != length) {
+      return Status::Invalid("All fields must be same length when calling 
Flush");
+    }
+    length = fields[i]->length();
+  }
+  *batch = std::make_shared<RecordBatch>(schema_, length, std::move(fields));
+  if (reset_builders) {
+    return InitBuilders();
+  } else {
+    return Status::OK();
+  }
+}
+
+Status RecordBatchBuilder::Flush(std::shared_ptr<RecordBatch>* batch) {
+  return Flush(true, batch);
+}
+
+void RecordBatchBuilder::SetInitialCapacity(int64_t capacity) {
+  DCHECK_GT(capacity, 0) << "Initial capacity must be positive";
+  initial_capacity_ = capacity;
+}
+
+Status RecordBatchBuilder::CreateBuilders() {
+  field_builders_.resize(this->num_fields());
+  raw_field_builders_.resize(this->num_fields());
+  for (int i = 0; i < this->num_fields(); ++i) {
+    RETURN_NOT_OK(MakeBuilder(pool_, schema_->field(i)->type(), 
&field_builders_[i]));
+    raw_field_builders_[i] = field_builders_[i].get();
+  }
+  return Status::OK();
+}
+
+Status RecordBatchBuilder::InitBuilders() {
+  for (int i = 0; i < this->num_fields(); ++i) {
+    RETURN_NOT_OK(raw_field_builders_[i]->Init(initial_capacity_));
+  }
+  return Status::OK();
+}
+
+}  // namespace arrow
diff --git a/cpp/src/arrow/table_builder.h b/cpp/src/arrow/table_builder.h
new file mode 100644
index 000000000..582389b70
--- /dev/null
+++ b/cpp/src/arrow/table_builder.h
@@ -0,0 +1,111 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef ARROW_TABLE_BUILDER_H
+#define ARROW_TABLE_BUILDER_H
+
+#include <cstdint>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "arrow/status.h"
+#include "arrow/type.h"
+#include "arrow/util/visibility.h"
+
+namespace arrow {
+
+class ArrayBuilder;
+class MemoryPool;
+class RecordBatch;
+class Schema;
+
+/// \class RecordBatchBuilder
+/// \brief Helper class for creating record batches iteratively given a known
+/// schema
+class RecordBatchBuilder {
+ public:
+  /// \brief Create an initialize a RecordBatchBuilder
+  /// \param[in] schema The schema for the record batch
+  /// \param[in] pool A MemoryPool to use for allocations
+  /// \param[in] builder the created builder instance
+  static Status Make(const std::shared_ptr<Schema>& schema, MemoryPool* pool,
+                     std::unique_ptr<RecordBatchBuilder>* builder);
+
+  /// \brief Create an initialize a RecordBatchBuilder
+  /// \param[in] schema The schema for the record batch
+  /// \param[in] pool A MemoryPool to use for allocations
+  /// \param[in] initial_capacity The initial capacity for the builders
+  /// \param[in] builder the created builder instance
+  static Status Make(const std::shared_ptr<Schema>& schema, MemoryPool* pool,
+                     int64_t initial_capacity,
+                     std::unique_ptr<RecordBatchBuilder>* builder);
+
+  /// \brief Get base pointer to field builder
+  /// \param i the field index
+  /// \return pointer to ArrayBuilder
+  ArrayBuilder* GetField(int i) { return raw_field_builders_[i]; }
+
+  /// \brief Return field builder casted to indicated specific builder type
+  /// \param i the field index
+  /// \return pointer to template type
+  template <typename T>
+  T* GetFieldAs(int i) {
+    return static_cast<T*>(raw_field_builders_[i]);
+  }
+
+  /// \brief Finish current batch and optionally reset
+  /// \param[in] reset_builders the resulting RecordBatch
+  /// \param[out] batch the resulting RecordBatch
+  /// \return Status
+  Status Flush(bool reset_builders, std::shared_ptr<RecordBatch>* batch);
+
+  /// \brief Finish current batch and reset
+  /// \param[out] batch the resulting RecordBatch
+  /// \return Status
+  Status Flush(std::shared_ptr<RecordBatch>* batch);
+
+  /// \brief Set the initial capacity for new builders
+  void SetInitialCapacity(int64_t capacity);
+
+  /// \brief The initial capacity for builders
+  int64_t initial_capacity() const { return initial_capacity_; }
+
+  /// \brief The number of fields in the schema
+  int num_fields() const { return schema_->num_fields(); }
+
+  /// \brief The number of fields in the schema
+  std::shared_ptr<Schema> schema() const { return schema_; }
+
+ private:
+  RecordBatchBuilder(const std::shared_ptr<Schema>& schema, MemoryPool* pool,
+                     int64_t initial_capacity);
+
+  Status CreateBuilders();
+  Status InitBuilders();
+
+  std::shared_ptr<Schema> schema_;
+  int64_t initial_capacity_;
+  MemoryPool* pool_;
+
+  std::vector<std::unique_ptr<ArrayBuilder>> field_builders_;
+  std::vector<ArrayBuilder*> raw_field_builders_;
+};
+
+}  // namespace arrow
+
+#endif  // ARROW_TABLE_BUILDER_H
diff --git a/cpp/src/arrow/test-util.h b/cpp/src/arrow/test-util.h
index 80e4feb6c..83ebdea4a 100644
--- a/cpp/src/arrow/test-util.h
+++ b/cpp/src/arrow/test-util.h
@@ -22,6 +22,7 @@
 #include <limits>
 #include <memory>
 #include <random>
+#include <sstream>
 #include <string>
 #include <vector>
 
@@ -291,6 +292,19 @@ void AssertArraysEqual(const Array& expected, const Array& 
actual) {
   }
 }
 
+#define ASSERT_BATCHES_EQUAL(LEFT, RIGHT)    \
+  do {                                       \
+    if (!LEFT.ApproxEquals(RIGHT)) {         \
+      std::stringstream ss;                  \
+      ss << "Left:\n";                       \
+      ASSERT_OK(PrettyPrint(LEFT, 0, &ss));  \
+                                             \
+      ss << "\nRight:\n";                    \
+      ASSERT_OK(PrettyPrint(RIGHT, 0, &ss)); \
+      FAIL() << ss.str();                    \
+    }                                        \
+  } while (false)
+
 }  // namespace arrow
 
 #endif  // ARROW_TEST_UTIL_H_


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> [C++] Create Record Batch Builder class as a reusable and efficient way to 
> transpose row-by-row data to columns
> ---------------------------------------------------------------------------------------------------------------
>
>                 Key: ARROW-1114
>                 URL: https://issues.apache.org/jira/browse/ARROW-1114
>             Project: Apache Arrow
>          Issue Type: New Feature
>          Components: C++
>            Reporter: Wes McKinney
>            Assignee: Wes McKinney
>              Labels: pull-request-available
>             Fix For: 0.8.0
>
>
> When dealing with IO interfaces that yield one record at a time, a common 
> task will be appending each element to a sequence. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to