This is an automated email from the ASF dual-hosted git repository. wesm pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push: new 8e00ee9 ARROW-1114: [C++] Add simple RecordBatchBuilder class 8e00ee9 is described below commit 8e00ee993a158ba444cf5a4b75e7ad24756a7fbb Author: Wes McKinney <wes.mckin...@twosigma.com> AuthorDate: Mon Oct 23 17:58:32 2017 -0400 ARROW-1114: [C++] Add simple RecordBatchBuilder class This patch includes ARROW-1172 which I put up separately. Putting this up for comments on the API before I go too far down the rabbit hole. The idea is to make it simpler for users to construct record batches given a known schema. For example, this could be used in turbodbc or another database driver closes #810 incidentally Author: Wes McKinney <wes.mckin...@twosigma.com> Closes #802 from wesm/ARROW-1114 and squashes the following commits: 5f104c4e [Wes McKinney] Rename Create to Make, remove const method versions a8b6c5cc [Wes McKinney] Update doxygen comments, change FlushAndReset to instead have Flush with a flag for resetting 2573ae7e [Wes McKinney] Test invalid field length 256419fc [Wes McKinney] Basic test passing be7e2325 [Wes McKinney] Start table_builder-test.cc 9660502a [Wes McKinney] Draft RecordBatchBuilder, no tests or benchmarks yet --- cpp/src/arrow/CMakeLists.txt | 3 + cpp/src/arrow/api.h | 1 + cpp/src/arrow/ipc/json-integration-test.cc | 4 +- cpp/src/arrow/table_builder-test.cc | 146 +++++++++++++++++++++++++++++ cpp/src/arrow/table_builder.cc | 101 ++++++++++++++++++++ cpp/src/arrow/table_builder.h | 111 ++++++++++++++++++++++ cpp/src/arrow/test-util.h | 14 +++ 7 files changed, 378 insertions(+), 2 deletions(-) diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt index 9a67651..22b4751 100644 --- a/cpp/src/arrow/CMakeLists.txt +++ b/cpp/src/arrow/CMakeLists.txt @@ -24,6 +24,7 @@ set(ARROW_SRCS pretty_print.cc status.cc table.cc + table_builder.cc tensor.cc type.cc visitor.cc @@ -143,6 +144,7 @@ install(FILES pretty_print.h status.h table.h + table_builder.h tensor.h type.h type_fwd.h @@ -172,6 +174,7 @@ ADD_ARROW_TEST(pretty_print-test) ADD_ARROW_TEST(status-test) ADD_ARROW_TEST(type-test) ADD_ARROW_TEST(table-test) +ADD_ARROW_TEST(table_builder-test) ADD_ARROW_TEST(tensor-test) ADD_ARROW_BENCHMARK(builder-benchmark) diff --git a/cpp/src/arrow/api.h b/cpp/src/arrow/api.h index 4d731bd..5d2e859 100644 --- a/cpp/src/arrow/api.h +++ b/cpp/src/arrow/api.h @@ -28,6 +28,7 @@ #include "arrow/pretty_print.h" #include "arrow/status.h" #include "arrow/table.h" +#include "arrow/table_builder.h" #include "arrow/tensor.h" #include "arrow/type.h" #include "arrow/visitor.h" diff --git a/cpp/src/arrow/ipc/json-integration-test.cc b/cpp/src/arrow/ipc/json-integration-test.cc index 4a3b2b3..c7530a4 100644 --- a/cpp/src/arrow/ipc/json-integration-test.cc +++ b/cpp/src/arrow/ipc/json-integration-test.cc @@ -182,10 +182,10 @@ static Status ValidateArrowVsJson(const std::string& arrow_path, ss << "Record batch " << i << " did not match"; ss << "\nJSON:\n"; - RETURN_NOT_OK(PrettyPrint(*json_batch.get(), 0, &ss)); + RETURN_NOT_OK(PrettyPrint(*json_batch, 0, &ss)); ss << "\nArrow:\n"; - RETURN_NOT_OK(PrettyPrint(*arrow_batch.get(), 0, &ss)); + RETURN_NOT_OK(PrettyPrint(*arrow_batch, 0, &ss)); return Status::Invalid(ss.str()); } } diff --git a/cpp/src/arrow/table_builder-test.cc b/cpp/src/arrow/table_builder-test.cc new file mode 100644 index 0000000..07d9b6b --- /dev/null +++ b/cpp/src/arrow/table_builder-test.cc @@ -0,0 +1,146 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <memory> +#include <string> +#include <vector> + +#include "gtest/gtest.h" + +#include "arrow/array.h" +#include "arrow/status.h" +#include "arrow/table.h" +#include "arrow/table_builder.h" +#include "arrow/test-common.h" +#include "arrow/test-util.h" +#include "arrow/type.h" + +namespace arrow { + +class TestRecordBatchBuilder : public TestBase { + public: +}; + +std::shared_ptr<Schema> ExampleSchema1() { + auto f0 = field("f0", int32()); + auto f1 = field("f1", utf8()); + auto f2 = field("f1", list(int8())); + return ::arrow::schema({f0, f1, f2}); +} + +template <typename BuilderType, typename T> +void AppendValues(BuilderType* builder, const std::vector<T>& values, + const std::vector<bool>& is_valid) { + for (size_t i = 0; i < values.size(); ++i) { + if (is_valid.size() == 0 || is_valid[i]) { + ASSERT_OK(builder->Append(values[i])); + } else { + ASSERT_OK(builder->AppendNull()); + } + } +} + +template <typename ValueType, typename T> +void AppendList(ListBuilder* builder, const std::vector<std::vector<T>>& values, + const std::vector<bool>& is_valid) { + auto values_builder = static_cast<ValueType*>(builder->value_builder()); + + for (size_t i = 0; i < values.size(); ++i) { + if (is_valid.size() == 0 || is_valid[i]) { + ASSERT_OK(builder->Append()); + AppendValues<ValueType, T>(values_builder, values[i], {}); + } else { + ASSERT_OK(builder->AppendNull()); + } + } +} + +TEST_F(TestRecordBatchBuilder, Basics) { + auto schema = ExampleSchema1(); + + std::unique_ptr<RecordBatchBuilder> builder; + ASSERT_OK(RecordBatchBuilder::Make(schema, pool_, &builder)); + + std::vector<bool> is_valid = {false, true, true, true}; + std::vector<int32_t> f0_values = {0, 1, 2, 3}; + std::vector<std::string> f1_values = {"a", "bb", "ccc", "dddd"}; + std::vector<std::vector<int8_t>> f2_values = {{}, {0, 1}, {}, {2}}; + + std::shared_ptr<Array> a0, a1, a2; + + // Make the expected record batch + auto AppendData = [&](Int32Builder* b0, StringBuilder* b1, ListBuilder* b2) { + AppendValues<Int32Builder, int32_t>(b0, f0_values, is_valid); + AppendValues<StringBuilder, std::string>(b1, f1_values, is_valid); + AppendList<Int8Builder, int8_t>(b2, f2_values, is_valid); + }; + + Int32Builder ex_b0; + StringBuilder ex_b1; + ListBuilder ex_b2(pool_, std::unique_ptr<Int8Builder>(new Int8Builder(pool_))); + + AppendData(&ex_b0, &ex_b1, &ex_b2); + ASSERT_OK(ex_b0.Finish(&a0)); + ASSERT_OK(ex_b1.Finish(&a1)); + ASSERT_OK(ex_b2.Finish(&a2)); + + RecordBatch expected(schema, 4, {a0, a1, a2}); + + // Builder attributes + ASSERT_EQ(3, builder->num_fields()); + ASSERT_EQ(schema.get(), builder->schema().get()); + + const int kIter = 3; + for (int i = 0; i < kIter; ++i) { + AppendData(builder->GetFieldAs<Int32Builder>(0), + static_cast<StringBuilder*>(builder->GetField(1)), + builder->GetFieldAs<ListBuilder>(2)); + + std::shared_ptr<RecordBatch> batch; + + if (i == kIter - 1) { + // Do not flush in last iteration + ASSERT_OK(builder->Flush(false, &batch)); + } else { + ASSERT_OK(builder->Flush(&batch)); + } + + ASSERT_BATCHES_EQUAL(expected, *batch); + } + + // Test setting initial capacity + builder->SetInitialCapacity(4096); + ASSERT_EQ(4096, builder->initial_capacity()); +} + +TEST_F(TestRecordBatchBuilder, InvalidFieldLength) { + auto schema = ExampleSchema1(); + + std::unique_ptr<RecordBatchBuilder> builder; + ASSERT_OK(RecordBatchBuilder::Make(schema, pool_, &builder)); + + std::vector<bool> is_valid = {false, true, true, true}; + std::vector<int32_t> f0_values = {0, 1, 2, 3}; + + AppendValues<Int32Builder, int32_t>(builder->GetFieldAs<Int32Builder>(0), f0_values, + is_valid); + + std::shared_ptr<RecordBatch> dummy; + ASSERT_RAISES(Invalid, builder->Flush(&dummy)); +} + +} // namespace arrow diff --git a/cpp/src/arrow/table_builder.cc b/cpp/src/arrow/table_builder.cc new file mode 100644 index 0000000..a1bd959 --- /dev/null +++ b/cpp/src/arrow/table_builder.cc @@ -0,0 +1,101 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/table_builder.h" + +#include <algorithm> +#include <cstdlib> +#include <memory> +#include <sstream> + +#include "arrow/array.h" +#include "arrow/builder.h" +#include "arrow/status.h" +#include "arrow/table.h" +#include "arrow/type.h" +#include "arrow/util/logging.h" + +namespace arrow { + +// ---------------------------------------------------------------------- +// RecordBatchBuilder + +RecordBatchBuilder::RecordBatchBuilder(const std::shared_ptr<Schema>& schema, + MemoryPool* pool, int64_t initial_capacity) + : schema_(schema), initial_capacity_(initial_capacity), pool_(pool) {} + +Status RecordBatchBuilder::Make(const std::shared_ptr<Schema>& schema, MemoryPool* pool, + std::unique_ptr<RecordBatchBuilder>* builder) { + return Make(schema, pool, kMinBuilderCapacity, builder); +} + +Status RecordBatchBuilder::Make(const std::shared_ptr<Schema>& schema, MemoryPool* pool, + int64_t initial_capacity, + std::unique_ptr<RecordBatchBuilder>* builder) { + builder->reset(new RecordBatchBuilder(schema, pool, initial_capacity)); + RETURN_NOT_OK((*builder)->CreateBuilders()); + return (*builder)->InitBuilders(); +} + +Status RecordBatchBuilder::Flush(bool reset_builders, + std::shared_ptr<RecordBatch>* batch) { + std::vector<std::shared_ptr<Array>> fields; + fields.resize(this->num_fields()); + + int64_t length = 0; + for (int i = 0; i < this->num_fields(); ++i) { + RETURN_NOT_OK(raw_field_builders_[i]->Finish(&fields[i])); + if (i > 0 && fields[i]->length() != length) { + return Status::Invalid("All fields must be same length when calling Flush"); + } + length = fields[i]->length(); + } + *batch = std::make_shared<RecordBatch>(schema_, length, std::move(fields)); + if (reset_builders) { + return InitBuilders(); + } else { + return Status::OK(); + } +} + +Status RecordBatchBuilder::Flush(std::shared_ptr<RecordBatch>* batch) { + return Flush(true, batch); +} + +void RecordBatchBuilder::SetInitialCapacity(int64_t capacity) { + DCHECK_GT(capacity, 0) << "Initial capacity must be positive"; + initial_capacity_ = capacity; +} + +Status RecordBatchBuilder::CreateBuilders() { + field_builders_.resize(this->num_fields()); + raw_field_builders_.resize(this->num_fields()); + for (int i = 0; i < this->num_fields(); ++i) { + RETURN_NOT_OK(MakeBuilder(pool_, schema_->field(i)->type(), &field_builders_[i])); + raw_field_builders_[i] = field_builders_[i].get(); + } + return Status::OK(); +} + +Status RecordBatchBuilder::InitBuilders() { + for (int i = 0; i < this->num_fields(); ++i) { + RETURN_NOT_OK(raw_field_builders_[i]->Init(initial_capacity_)); + } + return Status::OK(); +} + +} // namespace arrow diff --git a/cpp/src/arrow/table_builder.h b/cpp/src/arrow/table_builder.h new file mode 100644 index 0000000..582389b --- /dev/null +++ b/cpp/src/arrow/table_builder.h @@ -0,0 +1,111 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef ARROW_TABLE_BUILDER_H +#define ARROW_TABLE_BUILDER_H + +#include <cstdint> +#include <memory> +#include <string> +#include <vector> + +#include "arrow/status.h" +#include "arrow/type.h" +#include "arrow/util/visibility.h" + +namespace arrow { + +class ArrayBuilder; +class MemoryPool; +class RecordBatch; +class Schema; + +/// \class RecordBatchBuilder +/// \brief Helper class for creating record batches iteratively given a known +/// schema +class RecordBatchBuilder { + public: + /// \brief Create an initialize a RecordBatchBuilder + /// \param[in] schema The schema for the record batch + /// \param[in] pool A MemoryPool to use for allocations + /// \param[in] builder the created builder instance + static Status Make(const std::shared_ptr<Schema>& schema, MemoryPool* pool, + std::unique_ptr<RecordBatchBuilder>* builder); + + /// \brief Create an initialize a RecordBatchBuilder + /// \param[in] schema The schema for the record batch + /// \param[in] pool A MemoryPool to use for allocations + /// \param[in] initial_capacity The initial capacity for the builders + /// \param[in] builder the created builder instance + static Status Make(const std::shared_ptr<Schema>& schema, MemoryPool* pool, + int64_t initial_capacity, + std::unique_ptr<RecordBatchBuilder>* builder); + + /// \brief Get base pointer to field builder + /// \param i the field index + /// \return pointer to ArrayBuilder + ArrayBuilder* GetField(int i) { return raw_field_builders_[i]; } + + /// \brief Return field builder casted to indicated specific builder type + /// \param i the field index + /// \return pointer to template type + template <typename T> + T* GetFieldAs(int i) { + return static_cast<T*>(raw_field_builders_[i]); + } + + /// \brief Finish current batch and optionally reset + /// \param[in] reset_builders the resulting RecordBatch + /// \param[out] batch the resulting RecordBatch + /// \return Status + Status Flush(bool reset_builders, std::shared_ptr<RecordBatch>* batch); + + /// \brief Finish current batch and reset + /// \param[out] batch the resulting RecordBatch + /// \return Status + Status Flush(std::shared_ptr<RecordBatch>* batch); + + /// \brief Set the initial capacity for new builders + void SetInitialCapacity(int64_t capacity); + + /// \brief The initial capacity for builders + int64_t initial_capacity() const { return initial_capacity_; } + + /// \brief The number of fields in the schema + int num_fields() const { return schema_->num_fields(); } + + /// \brief The number of fields in the schema + std::shared_ptr<Schema> schema() const { return schema_; } + + private: + RecordBatchBuilder(const std::shared_ptr<Schema>& schema, MemoryPool* pool, + int64_t initial_capacity); + + Status CreateBuilders(); + Status InitBuilders(); + + std::shared_ptr<Schema> schema_; + int64_t initial_capacity_; + MemoryPool* pool_; + + std::vector<std::unique_ptr<ArrayBuilder>> field_builders_; + std::vector<ArrayBuilder*> raw_field_builders_; +}; + +} // namespace arrow + +#endif // ARROW_TABLE_BUILDER_H diff --git a/cpp/src/arrow/test-util.h b/cpp/src/arrow/test-util.h index 80e4feb..83ebdea 100644 --- a/cpp/src/arrow/test-util.h +++ b/cpp/src/arrow/test-util.h @@ -22,6 +22,7 @@ #include <limits> #include <memory> #include <random> +#include <sstream> #include <string> #include <vector> @@ -291,6 +292,19 @@ void AssertArraysEqual(const Array& expected, const Array& actual) { } } +#define ASSERT_BATCHES_EQUAL(LEFT, RIGHT) \ + do { \ + if (!LEFT.ApproxEquals(RIGHT)) { \ + std::stringstream ss; \ + ss << "Left:\n"; \ + ASSERT_OK(PrettyPrint(LEFT, 0, &ss)); \ + \ + ss << "\nRight:\n"; \ + ASSERT_OK(PrettyPrint(RIGHT, 0, &ss)); \ + FAIL() << ss.str(); \ + } \ + } while (false) + } // namespace arrow #endif // ARROW_TEST_UTIL_H_ -- To stop receiving notification emails like this one, please contact ['"commits@arrow.apache.org" <commits@arrow.apache.org>'].