This is an automated email from the ASF dual-hosted git repository.

wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 8e00ee9  ARROW-1114: [C++] Add simple RecordBatchBuilder class
8e00ee9 is described below

commit 8e00ee993a158ba444cf5a4b75e7ad24756a7fbb
Author: Wes McKinney <wes.mckin...@twosigma.com>
AuthorDate: Mon Oct 23 17:58:32 2017 -0400

    ARROW-1114: [C++] Add simple RecordBatchBuilder class
    
    This patch includes ARROW-1172 which I put up separately. Putting this up 
for comments on the API before I go too far down the rabbit hole. The idea is 
to make it simpler for users to construct record batches given a known schema. 
For example, this could be used in turbodbc or another database driver
    
    closes #810 incidentally
    
    Author: Wes McKinney <wes.mckin...@twosigma.com>
    
    Closes #802 from wesm/ARROW-1114 and squashes the following commits:
    
    5f104c4e [Wes McKinney] Rename Create to Make, remove const method versions
    a8b6c5cc [Wes McKinney] Update doxygen comments, change FlushAndReset to 
instead have Flush with a flag for resetting
    2573ae7e [Wes McKinney] Test invalid field length
    256419fc [Wes McKinney] Basic test passing
    be7e2325 [Wes McKinney] Start table_builder-test.cc
    9660502a [Wes McKinney] Draft RecordBatchBuilder, no tests or benchmarks yet
---
 cpp/src/arrow/CMakeLists.txt               |   3 +
 cpp/src/arrow/api.h                        |   1 +
 cpp/src/arrow/ipc/json-integration-test.cc |   4 +-
 cpp/src/arrow/table_builder-test.cc        | 146 +++++++++++++++++++++++++++++
 cpp/src/arrow/table_builder.cc             | 101 ++++++++++++++++++++
 cpp/src/arrow/table_builder.h              | 111 ++++++++++++++++++++++
 cpp/src/arrow/test-util.h                  |  14 +++
 7 files changed, 378 insertions(+), 2 deletions(-)

diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt
index 9a67651..22b4751 100644
--- a/cpp/src/arrow/CMakeLists.txt
+++ b/cpp/src/arrow/CMakeLists.txt
@@ -24,6 +24,7 @@ set(ARROW_SRCS
   pretty_print.cc
   status.cc
   table.cc
+  table_builder.cc
   tensor.cc
   type.cc
   visitor.cc
@@ -143,6 +144,7 @@ install(FILES
   pretty_print.h
   status.h
   table.h
+  table_builder.h
   tensor.h
   type.h
   type_fwd.h
@@ -172,6 +174,7 @@ ADD_ARROW_TEST(pretty_print-test)
 ADD_ARROW_TEST(status-test)
 ADD_ARROW_TEST(type-test)
 ADD_ARROW_TEST(table-test)
+ADD_ARROW_TEST(table_builder-test)
 ADD_ARROW_TEST(tensor-test)
 
 ADD_ARROW_BENCHMARK(builder-benchmark)
diff --git a/cpp/src/arrow/api.h b/cpp/src/arrow/api.h
index 4d731bd..5d2e859 100644
--- a/cpp/src/arrow/api.h
+++ b/cpp/src/arrow/api.h
@@ -28,6 +28,7 @@
 #include "arrow/pretty_print.h"
 #include "arrow/status.h"
 #include "arrow/table.h"
+#include "arrow/table_builder.h"
 #include "arrow/tensor.h"
 #include "arrow/type.h"
 #include "arrow/visitor.h"
diff --git a/cpp/src/arrow/ipc/json-integration-test.cc 
b/cpp/src/arrow/ipc/json-integration-test.cc
index 4a3b2b3..c7530a4 100644
--- a/cpp/src/arrow/ipc/json-integration-test.cc
+++ b/cpp/src/arrow/ipc/json-integration-test.cc
@@ -182,10 +182,10 @@ static Status ValidateArrowVsJson(const std::string& 
arrow_path,
       ss << "Record batch " << i << " did not match";
 
       ss << "\nJSON:\n";
-      RETURN_NOT_OK(PrettyPrint(*json_batch.get(), 0, &ss));
+      RETURN_NOT_OK(PrettyPrint(*json_batch, 0, &ss));
 
       ss << "\nArrow:\n";
-      RETURN_NOT_OK(PrettyPrint(*arrow_batch.get(), 0, &ss));
+      RETURN_NOT_OK(PrettyPrint(*arrow_batch, 0, &ss));
       return Status::Invalid(ss.str());
     }
   }
diff --git a/cpp/src/arrow/table_builder-test.cc 
b/cpp/src/arrow/table_builder-test.cc
new file mode 100644
index 0000000..07d9b6b
--- /dev/null
+++ b/cpp/src/arrow/table_builder-test.cc
@@ -0,0 +1,146 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "gtest/gtest.h"
+
+#include "arrow/array.h"
+#include "arrow/status.h"
+#include "arrow/table.h"
+#include "arrow/table_builder.h"
+#include "arrow/test-common.h"
+#include "arrow/test-util.h"
+#include "arrow/type.h"
+
+namespace arrow {
+
+class TestRecordBatchBuilder : public TestBase {
+ public:
+};
+
+std::shared_ptr<Schema> ExampleSchema1() {
+  auto f0 = field("f0", int32());
+  auto f1 = field("f1", utf8());
+  auto f2 = field("f1", list(int8()));
+  return ::arrow::schema({f0, f1, f2});
+}
+
+template <typename BuilderType, typename T>
+void AppendValues(BuilderType* builder, const std::vector<T>& values,
+                  const std::vector<bool>& is_valid) {
+  for (size_t i = 0; i < values.size(); ++i) {
+    if (is_valid.size() == 0 || is_valid[i]) {
+      ASSERT_OK(builder->Append(values[i]));
+    } else {
+      ASSERT_OK(builder->AppendNull());
+    }
+  }
+}
+
+template <typename ValueType, typename T>
+void AppendList(ListBuilder* builder, const std::vector<std::vector<T>>& 
values,
+                const std::vector<bool>& is_valid) {
+  auto values_builder = static_cast<ValueType*>(builder->value_builder());
+
+  for (size_t i = 0; i < values.size(); ++i) {
+    if (is_valid.size() == 0 || is_valid[i]) {
+      ASSERT_OK(builder->Append());
+      AppendValues<ValueType, T>(values_builder, values[i], {});
+    } else {
+      ASSERT_OK(builder->AppendNull());
+    }
+  }
+}
+
+TEST_F(TestRecordBatchBuilder, Basics) {
+  auto schema = ExampleSchema1();
+
+  std::unique_ptr<RecordBatchBuilder> builder;
+  ASSERT_OK(RecordBatchBuilder::Make(schema, pool_, &builder));
+
+  std::vector<bool> is_valid = {false, true, true, true};
+  std::vector<int32_t> f0_values = {0, 1, 2, 3};
+  std::vector<std::string> f1_values = {"a", "bb", "ccc", "dddd"};
+  std::vector<std::vector<int8_t>> f2_values = {{}, {0, 1}, {}, {2}};
+
+  std::shared_ptr<Array> a0, a1, a2;
+
+  // Make the expected record batch
+  auto AppendData = [&](Int32Builder* b0, StringBuilder* b1, ListBuilder* b2) {
+    AppendValues<Int32Builder, int32_t>(b0, f0_values, is_valid);
+    AppendValues<StringBuilder, std::string>(b1, f1_values, is_valid);
+    AppendList<Int8Builder, int8_t>(b2, f2_values, is_valid);
+  };
+
+  Int32Builder ex_b0;
+  StringBuilder ex_b1;
+  ListBuilder ex_b2(pool_, std::unique_ptr<Int8Builder>(new 
Int8Builder(pool_)));
+
+  AppendData(&ex_b0, &ex_b1, &ex_b2);
+  ASSERT_OK(ex_b0.Finish(&a0));
+  ASSERT_OK(ex_b1.Finish(&a1));
+  ASSERT_OK(ex_b2.Finish(&a2));
+
+  RecordBatch expected(schema, 4, {a0, a1, a2});
+
+  // Builder attributes
+  ASSERT_EQ(3, builder->num_fields());
+  ASSERT_EQ(schema.get(), builder->schema().get());
+
+  const int kIter = 3;
+  for (int i = 0; i < kIter; ++i) {
+    AppendData(builder->GetFieldAs<Int32Builder>(0),
+               static_cast<StringBuilder*>(builder->GetField(1)),
+               builder->GetFieldAs<ListBuilder>(2));
+
+    std::shared_ptr<RecordBatch> batch;
+
+    if (i == kIter - 1) {
+      // Do not flush in last iteration
+      ASSERT_OK(builder->Flush(false, &batch));
+    } else {
+      ASSERT_OK(builder->Flush(&batch));
+    }
+
+    ASSERT_BATCHES_EQUAL(expected, *batch);
+  }
+
+  // Test setting initial capacity
+  builder->SetInitialCapacity(4096);
+  ASSERT_EQ(4096, builder->initial_capacity());
+}
+
+TEST_F(TestRecordBatchBuilder, InvalidFieldLength) {
+  auto schema = ExampleSchema1();
+
+  std::unique_ptr<RecordBatchBuilder> builder;
+  ASSERT_OK(RecordBatchBuilder::Make(schema, pool_, &builder));
+
+  std::vector<bool> is_valid = {false, true, true, true};
+  std::vector<int32_t> f0_values = {0, 1, 2, 3};
+
+  AppendValues<Int32Builder, int32_t>(builder->GetFieldAs<Int32Builder>(0), 
f0_values,
+                                      is_valid);
+
+  std::shared_ptr<RecordBatch> dummy;
+  ASSERT_RAISES(Invalid, builder->Flush(&dummy));
+}
+
+}  // namespace arrow
diff --git a/cpp/src/arrow/table_builder.cc b/cpp/src/arrow/table_builder.cc
new file mode 100644
index 0000000..a1bd959
--- /dev/null
+++ b/cpp/src/arrow/table_builder.cc
@@ -0,0 +1,101 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/table_builder.h"
+
+#include <algorithm>
+#include <cstdlib>
+#include <memory>
+#include <sstream>
+
+#include "arrow/array.h"
+#include "arrow/builder.h"
+#include "arrow/status.h"
+#include "arrow/table.h"
+#include "arrow/type.h"
+#include "arrow/util/logging.h"
+
+namespace arrow {
+
+// ----------------------------------------------------------------------
+// RecordBatchBuilder
+
+RecordBatchBuilder::RecordBatchBuilder(const std::shared_ptr<Schema>& schema,
+                                       MemoryPool* pool, int64_t 
initial_capacity)
+    : schema_(schema), initial_capacity_(initial_capacity), pool_(pool) {}
+
+Status RecordBatchBuilder::Make(const std::shared_ptr<Schema>& schema, 
MemoryPool* pool,
+                                std::unique_ptr<RecordBatchBuilder>* builder) {
+  return Make(schema, pool, kMinBuilderCapacity, builder);
+}
+
+Status RecordBatchBuilder::Make(const std::shared_ptr<Schema>& schema, 
MemoryPool* pool,
+                                int64_t initial_capacity,
+                                std::unique_ptr<RecordBatchBuilder>* builder) {
+  builder->reset(new RecordBatchBuilder(schema, pool, initial_capacity));
+  RETURN_NOT_OK((*builder)->CreateBuilders());
+  return (*builder)->InitBuilders();
+}
+
+Status RecordBatchBuilder::Flush(bool reset_builders,
+                                 std::shared_ptr<RecordBatch>* batch) {
+  std::vector<std::shared_ptr<Array>> fields;
+  fields.resize(this->num_fields());
+
+  int64_t length = 0;
+  for (int i = 0; i < this->num_fields(); ++i) {
+    RETURN_NOT_OK(raw_field_builders_[i]->Finish(&fields[i]));
+    if (i > 0 && fields[i]->length() != length) {
+      return Status::Invalid("All fields must be same length when calling 
Flush");
+    }
+    length = fields[i]->length();
+  }
+  *batch = std::make_shared<RecordBatch>(schema_, length, std::move(fields));
+  if (reset_builders) {
+    return InitBuilders();
+  } else {
+    return Status::OK();
+  }
+}
+
+Status RecordBatchBuilder::Flush(std::shared_ptr<RecordBatch>* batch) {
+  return Flush(true, batch);
+}
+
+void RecordBatchBuilder::SetInitialCapacity(int64_t capacity) {
+  DCHECK_GT(capacity, 0) << "Initial capacity must be positive";
+  initial_capacity_ = capacity;
+}
+
+Status RecordBatchBuilder::CreateBuilders() {
+  field_builders_.resize(this->num_fields());
+  raw_field_builders_.resize(this->num_fields());
+  for (int i = 0; i < this->num_fields(); ++i) {
+    RETURN_NOT_OK(MakeBuilder(pool_, schema_->field(i)->type(), 
&field_builders_[i]));
+    raw_field_builders_[i] = field_builders_[i].get();
+  }
+  return Status::OK();
+}
+
+Status RecordBatchBuilder::InitBuilders() {
+  for (int i = 0; i < this->num_fields(); ++i) {
+    RETURN_NOT_OK(raw_field_builders_[i]->Init(initial_capacity_));
+  }
+  return Status::OK();
+}
+
+}  // namespace arrow
diff --git a/cpp/src/arrow/table_builder.h b/cpp/src/arrow/table_builder.h
new file mode 100644
index 0000000..582389b
--- /dev/null
+++ b/cpp/src/arrow/table_builder.h
@@ -0,0 +1,111 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef ARROW_TABLE_BUILDER_H
+#define ARROW_TABLE_BUILDER_H
+
+#include <cstdint>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "arrow/status.h"
+#include "arrow/type.h"
+#include "arrow/util/visibility.h"
+
+namespace arrow {
+
+class ArrayBuilder;
+class MemoryPool;
+class RecordBatch;
+class Schema;
+
+/// \class RecordBatchBuilder
+/// \brief Helper class for creating record batches iteratively given a known
+/// schema
+class RecordBatchBuilder {
+ public:
+  /// \brief Create an initialize a RecordBatchBuilder
+  /// \param[in] schema The schema for the record batch
+  /// \param[in] pool A MemoryPool to use for allocations
+  /// \param[in] builder the created builder instance
+  static Status Make(const std::shared_ptr<Schema>& schema, MemoryPool* pool,
+                     std::unique_ptr<RecordBatchBuilder>* builder);
+
+  /// \brief Create an initialize a RecordBatchBuilder
+  /// \param[in] schema The schema for the record batch
+  /// \param[in] pool A MemoryPool to use for allocations
+  /// \param[in] initial_capacity The initial capacity for the builders
+  /// \param[in] builder the created builder instance
+  static Status Make(const std::shared_ptr<Schema>& schema, MemoryPool* pool,
+                     int64_t initial_capacity,
+                     std::unique_ptr<RecordBatchBuilder>* builder);
+
+  /// \brief Get base pointer to field builder
+  /// \param i the field index
+  /// \return pointer to ArrayBuilder
+  ArrayBuilder* GetField(int i) { return raw_field_builders_[i]; }
+
+  /// \brief Return field builder casted to indicated specific builder type
+  /// \param i the field index
+  /// \return pointer to template type
+  template <typename T>
+  T* GetFieldAs(int i) {
+    return static_cast<T*>(raw_field_builders_[i]);
+  }
+
+  /// \brief Finish current batch and optionally reset
+  /// \param[in] reset_builders the resulting RecordBatch
+  /// \param[out] batch the resulting RecordBatch
+  /// \return Status
+  Status Flush(bool reset_builders, std::shared_ptr<RecordBatch>* batch);
+
+  /// \brief Finish current batch and reset
+  /// \param[out] batch the resulting RecordBatch
+  /// \return Status
+  Status Flush(std::shared_ptr<RecordBatch>* batch);
+
+  /// \brief Set the initial capacity for new builders
+  void SetInitialCapacity(int64_t capacity);
+
+  /// \brief The initial capacity for builders
+  int64_t initial_capacity() const { return initial_capacity_; }
+
+  /// \brief The number of fields in the schema
+  int num_fields() const { return schema_->num_fields(); }
+
+  /// \brief The number of fields in the schema
+  std::shared_ptr<Schema> schema() const { return schema_; }
+
+ private:
+  RecordBatchBuilder(const std::shared_ptr<Schema>& schema, MemoryPool* pool,
+                     int64_t initial_capacity);
+
+  Status CreateBuilders();
+  Status InitBuilders();
+
+  std::shared_ptr<Schema> schema_;
+  int64_t initial_capacity_;
+  MemoryPool* pool_;
+
+  std::vector<std::unique_ptr<ArrayBuilder>> field_builders_;
+  std::vector<ArrayBuilder*> raw_field_builders_;
+};
+
+}  // namespace arrow
+
+#endif  // ARROW_TABLE_BUILDER_H
diff --git a/cpp/src/arrow/test-util.h b/cpp/src/arrow/test-util.h
index 80e4feb..83ebdea 100644
--- a/cpp/src/arrow/test-util.h
+++ b/cpp/src/arrow/test-util.h
@@ -22,6 +22,7 @@
 #include <limits>
 #include <memory>
 #include <random>
+#include <sstream>
 #include <string>
 #include <vector>
 
@@ -291,6 +292,19 @@ void AssertArraysEqual(const Array& expected, const Array& 
actual) {
   }
 }
 
+#define ASSERT_BATCHES_EQUAL(LEFT, RIGHT)    \
+  do {                                       \
+    if (!LEFT.ApproxEquals(RIGHT)) {         \
+      std::stringstream ss;                  \
+      ss << "Left:\n";                       \
+      ASSERT_OK(PrettyPrint(LEFT, 0, &ss));  \
+                                             \
+      ss << "\nRight:\n";                    \
+      ASSERT_OK(PrettyPrint(RIGHT, 0, &ss)); \
+      FAIL() << ss.str();                    \
+    }                                        \
+  } while (false)
+
 }  // namespace arrow
 
 #endif  // ARROW_TEST_UTIL_H_

-- 
To stop receiving notification emails like this one, please contact
['"commits@arrow.apache.org" <commits@arrow.apache.org>'].

Reply via email to