wjones127 commented on code in PR #12775: URL: https://github.com/apache/arrow/pull/12775#discussion_r895489955
########## cpp/examples/arrow/rapidjson_row_converter.cc: ########## @@ -0,0 +1,590 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <arrow/api.h> +#include <arrow/result.h> +#include <arrow/table_builder.h> +#include <arrow/type_traits.h> +#include <arrow/util/iterator.h> +#include <arrow/util/logging.h> +#include <arrow/visit_array_inline.h> + +#include <rapidjson/document.h> +#include <rapidjson/stringbuffer.h> +#include <rapidjson/writer.h> + +#include <cassert> +#include <iostream> +#include <vector> + +// Transforming dynamic row data into Arrow data +// When building connectors to other data systems, it's common to receive data in +// row-based structures. While the row_wise_conversion_example.cc shows how to +// handle this conversion for fixed schemas, this example demonstrates how to +// writer converters for arbitrary schemas. +// +// As an example, this conversion is between Arrow and rapidjson::Documents. +// +// We use the following helpers and patterns here: +// * arrow::ToRowConverter and arrow::FromRowConverter, which provide additional +// conversion methods given a basic converter +// * arrow::VisitArrayInline and arrow::VisitTypeInline for implementing a visitor +// pattern with Arrow to handle different array types +// * arrow::enable_if_primitive_ctype to create a template method that handles +// * conversion for Arrow types that have corresponding C types (bool, integer, +// float). + +rapidjson::Value kNullJsonSingleton = rapidjson::Value(); + +/// \brief Builder that holds state for a single conversion. +/// +/// Implements Visit() methods for each type of Arrow Array that set the values +/// of the corresponding fields in each row. +class RowBatchBuilder { + public: + explicit RowBatchBuilder(int64_t num_rows) : field_(nullptr) { + // Reserve all of the space required up-front to avoid unnecessary resizing + rows_.reserve(num_rows); + + for (int64_t i = 0; i < num_rows; ++i) { + rows_.push_back(rapidjson::Document()); + rows_[i].SetObject(); + } + } + + /// \brief Set which field to convert. + void SetField(std::shared_ptr<arrow::Field> field) { field_ = std::move(field); } + + /// \brief Retrieve converted rows from builder. + std::vector<rapidjson::Document> Rows() && { return std::move(rows_); } + + // Default implementation + arrow::Status Visit(const arrow::Array& array) { + return arrow::Status::NotImplemented( + "Can not convert to json document for array of type ", array.type()->ToString()); + } + + // Handles booleans, integers, floats + template <typename ArrayType, typename DataClass = typename ArrayType::TypeClass> + arrow::enable_if_primitive_ctype<DataClass, arrow::Status> Visit( + const ArrayType& array) { + for (int64_t i = 0; i < array.length(); ++i) { + if (!array.IsNull(i)) { + rapidjson::Value str_key(field_->name().c_str(), rows_[i].GetAllocator()); + rows_[i].AddMember(str_key, array.Value(i), rows_[i].GetAllocator()); + } + } + return arrow::Status::OK(); + } + + arrow::Status Visit(const arrow::StringArray& array) { + for (int64_t i = 0; i < array.length(); ++i) { + if (!array.IsNull(i)) { + rapidjson::Value str_key(field_->name().c_str(), rows_[i].GetAllocator()); + // StringArray.Value returns a string view + auto value_view = array.Value(i); + rapidjson::Value value; + value.SetString(value_view.data(), + static_cast<rapidjson::SizeType>(value_view.size()), + rows_[i].GetAllocator()); + rows_[i].AddMember(str_key, value, rows_[i].GetAllocator()); + } + } + return arrow::Status::OK(); + } + + arrow::Status Visit(const arrow::StructArray& array) { + const arrow::StructType* type = array.struct_type(); + + RowBatchBuilder child_builder(rows_.size()); + for (int i = 0; i < type->num_fields(); ++i) { + std::shared_ptr<arrow::Field> child_field = type->field(i); + child_builder.SetField(std::move(child_field)); + ARROW_RETURN_NOT_OK(arrow::VisitArrayInline(*array.field(i).get(), &child_builder)); + } + std::vector<rapidjson::Document> rows = std::move(child_builder).Rows(); + + for (int64_t i = 0; i < array.length(); ++i) { + if (!array.IsNull(i)) { + rapidjson::Value str_key(field_->name().c_str(), rows_[i].GetAllocator()); + // Must copy value to new allocator + rapidjson::Value row_val; + row_val.CopyFrom(rows[i], rows_[i].GetAllocator()); + rows_[i].AddMember(str_key, row_val, rows_[i].GetAllocator()); + } + } + return arrow::Status::OK(); + } + + arrow::Status Visit(const arrow::ListArray& array) { + // First create rows from values + std::shared_ptr<arrow::Array> values = array.values(); + RowBatchBuilder child_builder(values->length()); + std::shared_ptr<arrow::Field> value_field = array.list_type()->value_field(); + std::string value_field_name = value_field->name(); + child_builder.SetField(value_field); + ARROW_RETURN_NOT_OK(arrow::VisitArrayInline(*values.get(), &child_builder)); + + std::vector<rapidjson::Document> rows = std::move(child_builder).Rows(); + + int64_t values_i = 0; + for (int64_t i = 0; i < array.length(); ++i) { + rapidjson::Document::AllocatorType& allocator = rows_[i].GetAllocator(); + auto array_len = array.value_length(i); + + rapidjson::Value value; + value.SetArray(); + value.Reserve(array_len, allocator); + + for (int64_t j = 0; j < array_len; ++j) { + rapidjson::Value row_val; + // Must copy value to new allocator + row_val.CopyFrom(rows[values_i][value_field_name.c_str()], allocator); + value.PushBack(row_val, allocator); + ++values_i; + } + + rapidjson::Value str_key(field_->name().c_str(), allocator); + rows_[i].AddMember(str_key, value, allocator); + } + + return arrow::Status::OK(); + } + + private: + std::shared_ptr<arrow::Field> field_; + std::vector<rapidjson::Document> rows_; +}; // RowBatchBuilder + +class ArrowToDocumentConverter : public arrow::ToRowConverter<rapidjson::Document> { + public: + arrow::Result<std::vector<rapidjson::Document>> ConvertToVector( + std::shared_ptr<arrow::RecordBatch> batch) override { + RowBatchBuilder builder{batch->num_rows()}; + + for (int i = 0; i < batch->num_columns(); ++i) { + builder.SetField(std::move(batch->schema()->field(i))); + ARROW_RETURN_NOT_OK(arrow::VisitArrayInline(*batch->column(i).get(), &builder)); + } + + return std::move(builder).Rows(); + } +}; // ArrowToDocumentConverter + +/// \brief Iterator over rows values of a document for a given field +/// +/// path and array_levels are used to address each field in a JSON document. As +/// an example, consider this JSON document: +/// { +/// "x": 3, // path: ["x"], array_levels: 0 +/// "files": [ // path: ["files"], array_levels: 0 +/// { // path: ["files"], array_levels: 1 +/// "path": "my_str", // path: ["files", "path"], array_levels: 1 +/// "sizes": [ // path: ["files", "size"], array_levels: 1 +/// 20, // path: ["files", "size"], array_levels: 2 +/// 22 +/// ] +/// } +/// ] +/// }, +class DocValuesIterator { + public: + /// \param rows vector of rows + /// \param path field names to enter + /// \param array_levels number of arrays to enter + DocValuesIterator(const std::vector<rapidjson::Document>& rows, + std::vector<std::string> path, int64_t array_levels) + : rows(rows), path(std::move(path)), array_levels(array_levels) {} + + const rapidjson::Value* NextArrayOrRow(const rapidjson::Value* value, size_t* path_i, + int64_t* arr_i) { + while (array_stack.size() > 0) { + ArrayPosition& pos = array_stack.back(); + // Try to get next position in Array + if (pos.index + 1 < pos.array_node->Size()) { + ++pos.index; + value = &(*pos.array_node)[pos.index]; + *path_i = pos.path_index; + *arr_i = array_stack.size(); + return value; + } else { + array_stack.pop_back(); + } + } + ++row_i; + if (row_i < rows.size()) { + value = static_cast<const rapidjson::Value*>(&rows[row_i]); + } else { + value = nullptr; + } + *path_i = 0; + *arr_i = 0; + return value; + } + + arrow::Result<const rapidjson::Value*> Next() { + const rapidjson::Value* value = nullptr; + size_t path_i; + int64_t arr_i; + // Can either start at document or at last array level + if (array_stack.size() > 0) { + auto pos = array_stack.back(); + value = pos.array_node; + path_i = pos.path_index; + arr_i = array_stack.size() - 1; + } + + value = NextArrayOrRow(value, &path_i, &arr_i); + + // Traverse to desired level (with possible backtracking as needed) + while (path_i < path.size() || arr_i < array_levels) { + if (value == nullptr) { + return value; + } else if (value->IsArray() && value->Size() > 0) { + ArrayPosition pos; + pos.array_node = value; + pos.path_index = path_i; + pos.index = 0; + array_stack.push_back(pos); + + value = &(*value)[0]; + ++arr_i; + } else if (value->IsArray()) { + // Empty array means we need to backtrack and go to next array or row + value = NextArrayOrRow(value, &path_i, &arr_i); + } else if (value->HasMember(path[path_i].c_str())) { + value = &(*value)[path[path_i].c_str()]; + ++path_i; + } else { + return &kNullJsonSingleton; + } + } + + // Return value + return value; + } + + protected: + const std::vector<rapidjson::Document>& rows; + std::vector<std::string> path; + int64_t array_levels; + size_t row_i = -1; // index of current row + + // Info about array position for one array level in array stack + struct ArrayPosition { + const rapidjson::Value* array_node; + int64_t path_index; + rapidjson::SizeType index; + }; + std::vector<ArrayPosition> array_stack; +}; + +class JsonValueConverter { + public: + explicit JsonValueConverter(const std::vector<rapidjson::Document>& rows) + : rows_(rows), array_levels_(0) {} + + JsonValueConverter(const std::vector<rapidjson::Document>& rows, + const std::vector<std::string>& root_path, int64_t array_levels) + : rows_(rows), root_path_(root_path), array_levels_(array_levels) {} + + /// \brief For field passed in, append corresponding values to builder + arrow::Status Convert(const arrow::Field& field, arrow::ArrayBuilder* builder) { + return Convert(field, field.name(), builder); + } + + /// \brief For field passed in, append corresponding values to builder + arrow::Status Convert(const arrow::Field& field, const std::string& field_name, + arrow::ArrayBuilder* builder) { + field_name_ = field_name; + builder_ = builder; + ARROW_RETURN_NOT_OK(arrow::VisitTypeInline(*field.type().get(), this)); + return arrow::Status::OK(); + } + + // Default implementation + arrow::Status Visit(const arrow::DataType& type) { + return arrow::Status::NotImplemented( + "Can not convert to json document for array of type ", type.ToString()); Review Comment: Yup, thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org