ARROW-373: [C++] JSON serialization format for testing C++ version of ARROW-372
Author: Wes McKinney <[email protected]> Closes #202 from wesm/ARROW-373 and squashes the following commits: d13a05f [Wes McKinney] Compiler warning 72c24fe [Wes McKinney] Add a minimal literal JSON example a2cf47b [Wes McKinney] cpplint 3d9fcc2 [Wes McKinney] Complete round trip json file test with multiple record batches 2753449 [Wes McKinney] Complete draft json roundtrip implementation. tests not complete yet 3d6bbbd [Wes McKinney] Start high level writer scaffold 6bbd669 [Wes McKinney] Tweaks e2e86b5 [Wes McKinney] Test JSON array roundtrip for numeric types, strings, lists, structs 82f108b [Wes McKinney] Refactoring. Array test scaffold 0891378 [Wes McKinney] Declare loop variables 6566343 [Wes McKinney] Recursively construct children for list/struct 35c2f85 [Wes McKinney] Refactoring. Start drafting string/list reader f26402a [Wes McKinney] Install type_traits.h. cpplint 4fc7294 [Wes McKinney] Refactoring, type attribute consistency. Array reader compiles 2c93cce [Wes McKinney] WIP JSON array reader code path 932ba7a [Wes McKinney] Add ArrayVisitor methods, add enough metaprogramming to detect presence of c_type type member 15c1094 [Wes McKinney] Add type traits, refactoring, drafting json array writing. not working yet 209ba48 [Wes McKinney] More types refactoring. Strange linker error in pyarrow 379da3c [Wes McKinney] Implement union metadata JSON serialization 5fbea41 [Wes McKinney] Implement some more json types and add convenience factory functions 1c08233 [Wes McKinney] JSON schema roundtrip passing for many types 86c9559 [Wes McKinney] Add convenience factory functions for common types 3b9d14e [Wes McKinney] Add type-specific JSON metadata to schema writer 820b0f2 [Wes McKinney] Drafting JSON schema read/write 68ee7ab [Wes McKinney] Move forward declarations into type_fwd.h 1edf2a9 [Wes McKinney] Prototyping out visitor pattern for json serialization 24c1d5d [Wes McKinney] Some Types refactoring, add TypeVisitor abstract class. Add RapidJSON as external project Project: http://git-wip-us.apache.org/repos/asf/arrow/repo Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/ed6ec3b7 Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/ed6ec3b7 Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/ed6ec3b7 Branch: refs/heads/master Commit: ed6ec3b76e1ac27fab85cd4bc74fbd61e8dfb27f Parents: 8417096 Author: Wes McKinney <[email protected]> Authored: Fri Nov 18 14:58:46 2016 -0500 Committer: Wes McKinney <[email protected]> Committed: Fri Nov 18 14:58:46 2016 -0500 ---------------------------------------------------------------------- cpp/CMakeLists.txt | 19 + cpp/src/arrow/CMakeLists.txt | 2 + cpp/src/arrow/array.cc | 15 + cpp/src/arrow/array.h | 12 + cpp/src/arrow/column-test.cc | 1 + cpp/src/arrow/io/hdfs.cc | 8 +- cpp/src/arrow/io/libhdfs_shim.cc | 26 +- cpp/src/arrow/ipc/CMakeLists.txt | 7 + cpp/src/arrow/ipc/adapter.cc | 2 +- cpp/src/arrow/ipc/ipc-json-test.cc | 353 +++++++++ cpp/src/arrow/ipc/json-internal.cc | 1113 ++++++++++++++++++++++++++++ cpp/src/arrow/ipc/json-internal.h | 111 +++ cpp/src/arrow/ipc/json.cc | 219 ++++++ cpp/src/arrow/ipc/json.h | 92 +++ cpp/src/arrow/ipc/test-common.h | 14 +- cpp/src/arrow/schema-test.cc | 52 +- cpp/src/arrow/schema.cc | 15 + cpp/src/arrow/schema.h | 12 +- cpp/src/arrow/test-util.h | 51 +- cpp/src/arrow/type.cc | 122 ++- cpp/src/arrow/type.h | 338 +++++++-- cpp/src/arrow/type_fwd.h | 157 ++++ cpp/src/arrow/type_traits.h | 197 +++++ cpp/src/arrow/types/CMakeLists.txt | 1 - cpp/src/arrow/types/collection.h | 41 - cpp/src/arrow/types/datetime.h | 37 +- cpp/src/arrow/types/decimal.h | 14 +- cpp/src/arrow/types/list-test.cc | 2 +- cpp/src/arrow/types/list.cc | 4 + cpp/src/arrow/types/list.h | 8 +- cpp/src/arrow/types/primitive-test.cc | 36 +- cpp/src/arrow/types/primitive.cc | 97 ++- cpp/src/arrow/types/primitive.h | 190 ++--- cpp/src/arrow/types/string-test.cc | 12 +- cpp/src/arrow/types/string.cc | 16 +- cpp/src/arrow/types/string.h | 24 +- cpp/src/arrow/types/struct-test.cc | 2 +- cpp/src/arrow/types/struct.cc | 4 + cpp/src/arrow/types/struct.h | 4 + cpp/src/arrow/types/test-common.h | 16 + cpp/src/arrow/types/union.cc | 23 +- cpp/src/arrow/types/union.h | 21 - cpp/src/arrow/util/logging.h | 4 +- format/Metadata.md | 5 + 44 files changed, 3049 insertions(+), 450 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/arrow/blob/ed6ec3b7/cpp/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index 6f95483..0bff752 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -545,6 +545,25 @@ if(ARROW_BUILD_BENCHMARKS) endif() endif() +# RapidJSON, header only dependency +if("$ENV{RAPIDJSON_HOME}" STREQUAL "") + ExternalProject_Add(rapidjson_ep + PREFIX "${CMAKE_BINARY_DIR}" + URL "https://github.com/miloyip/rapidjson/archive/v1.1.0.tar.gz" + URL_MD5 "badd12c511e081fec6c89c43a7027bce" + CONFIGURE_COMMAND "" + BUILD_COMMAND "" + BUILD_IN_SOURCE 1 + INSTALL_COMMAND "") + + ExternalProject_Get_Property(rapidjson_ep SOURCE_DIR) + set(RAPIDJSON_INCLUDE_DIR "${SOURCE_DIR}/include") +else() + set(RAPIDJSON_INCLUDE_DIR "$ENV{RAPIDJSON_HOME}/include") +endif() +message(STATUS "RapidJSON include dir: ${RAPIDJSON_INCLUDE_DIR}") +include_directories(SYSTEM ${RAPIDJSON_INCLUDE_DIR}) + ## Google PerfTools ## ## Disabled with TSAN/ASAN as well as with gold+dynamic linking (see comment http://git-wip-us.apache.org/repos/asf/arrow/blob/ed6ec3b7/cpp/src/arrow/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt index a9b2fec..81851bc 100644 --- a/cpp/src/arrow/CMakeLists.txt +++ b/cpp/src/arrow/CMakeLists.txt @@ -24,6 +24,8 @@ install(FILES schema.h table.h type.h + type_fwd.h + type_traits.h test-util.h DESTINATION include/arrow) http://git-wip-us.apache.org/repos/asf/arrow/blob/ed6ec3b7/cpp/src/arrow/array.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/array.cc b/cpp/src/arrow/array.cc index e432a53..3262425 100644 --- a/cpp/src/arrow/array.cc +++ b/cpp/src/arrow/array.cc @@ -18,6 +18,7 @@ #include "arrow/array.h" #include <cstdint> +#include <cstring> #include "arrow/util/bit-util.h" #include "arrow/util/buffer.h" @@ -25,6 +26,16 @@ namespace arrow { +Status GetEmptyBitmap( + MemoryPool* pool, int32_t length, std::shared_ptr<MutableBuffer>* result) { + auto buffer = std::make_shared<PoolBuffer>(pool); + RETURN_NOT_OK(buffer->Resize(BitUtil::BytesForBits(length))); + memset(buffer->mutable_data(), 0, buffer->size()); + + *result = buffer; + return Status::OK(); +} + // ---------------------------------------------------------------------- // Base array class @@ -66,4 +77,8 @@ bool NullArray::RangeEquals(int32_t start_idx, int32_t end_idx, int32_t other_st return true; } +Status NullArray::Accept(ArrayVisitor* visitor) const { + return visitor->Visit(*this); +} + } // namespace arrow http://git-wip-us.apache.org/repos/asf/arrow/blob/ed6ec3b7/cpp/src/arrow/array.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/array.h b/cpp/src/arrow/array.h index ff37323..ff2b70e 100644 --- a/cpp/src/arrow/array.h +++ b/cpp/src/arrow/array.h @@ -29,6 +29,8 @@ namespace arrow { class Buffer; +class MemoryPool; +class MutableBuffer; class Status; // Immutable data array with some logical type and some length. Any memory is @@ -70,6 +72,8 @@ class ARROW_EXPORT Array { // returning Status::OK. This can be an expensive check. virtual Status Validate() const; + virtual Status Accept(ArrayVisitor* visitor) const = 0; + protected: std::shared_ptr<DataType> type_; int32_t null_count_; @@ -86,6 +90,8 @@ class ARROW_EXPORT Array { // Degenerate null type Array class ARROW_EXPORT NullArray : public Array { public: + using TypeClass = NullType; + NullArray(const std::shared_ptr<DataType>& type, int32_t length) : Array(type, length, length, nullptr) {} @@ -94,9 +100,15 @@ class ARROW_EXPORT NullArray : public Array { bool Equals(const std::shared_ptr<Array>& arr) const override; bool RangeEquals(int32_t start_idx, int32_t end_idx, int32_t other_start_index, const std::shared_ptr<Array>& arr) const override; + + Status Accept(ArrayVisitor* visitor) const override; }; typedef std::shared_ptr<Array> ArrayPtr; + +Status ARROW_EXPORT GetEmptyBitmap( + MemoryPool* pool, int32_t length, std::shared_ptr<MutableBuffer>* result); + } // namespace arrow #endif http://git-wip-us.apache.org/repos/asf/arrow/blob/ed6ec3b7/cpp/src/arrow/column-test.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/column-test.cc b/cpp/src/arrow/column-test.cc index 1edf313..ac3636d 100644 --- a/cpp/src/arrow/column-test.cc +++ b/cpp/src/arrow/column-test.cc @@ -22,6 +22,7 @@ #include "gtest/gtest.h" +#include "arrow/array.h" #include "arrow/column.h" #include "arrow/schema.h" #include "arrow/test-util.h" http://git-wip-us.apache.org/repos/asf/arrow/blob/ed6ec3b7/cpp/src/arrow/io/hdfs.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/io/hdfs.cc b/cpp/src/arrow/io/hdfs.cc index 6490a75..13491e7 100644 --- a/cpp/src/arrow/io/hdfs.cc +++ b/cpp/src/arrow/io/hdfs.cc @@ -289,13 +289,9 @@ class HdfsClient::HdfsClientImpl { // connect to HDFS with the builder object hdfsBuilder* builder = hdfsNewBuilder(); - if (!config->host.empty()) { - hdfsBuilderSetNameNode(builder, config->host.c_str()); - } + if (!config->host.empty()) { hdfsBuilderSetNameNode(builder, config->host.c_str()); } hdfsBuilderSetNameNodePort(builder, config->port); - if (!config->user.empty()) { - hdfsBuilderSetUserName(builder, config->user.c_str()); - } + if (!config->user.empty()) { hdfsBuilderSetUserName(builder, config->user.c_str()); } if (!config->kerb_ticket.empty()) { hdfsBuilderSetKerbTicketCachePath(builder, config->kerb_ticket.c_str()); } http://git-wip-us.apache.org/repos/asf/arrow/blob/ed6ec3b7/cpp/src/arrow/io/libhdfs_shim.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/io/libhdfs_shim.cc b/cpp/src/arrow/io/libhdfs_shim.cc index 1fee595..36b8a4e 100644 --- a/cpp/src/arrow/io/libhdfs_shim.cc +++ b/cpp/src/arrow/io/libhdfs_shim.cc @@ -74,12 +74,9 @@ static HINSTANCE libjvm_handle = NULL; // NOTE(wesm): cpplint does not like use of short and other imprecise C types static hdfsBuilder* (*ptr_hdfsNewBuilder)(void) = NULL; -static void (*ptr_hdfsBuilderSetNameNode)( - hdfsBuilder* bld, const char* nn) = NULL; -static void (*ptr_hdfsBuilderSetNameNodePort)( - hdfsBuilder* bld, tPort port) = NULL; -static void (*ptr_hdfsBuilderSetUserName)( - hdfsBuilder* bld, const char* userName) = NULL; +static void (*ptr_hdfsBuilderSetNameNode)(hdfsBuilder* bld, const char* nn) = NULL; +static void (*ptr_hdfsBuilderSetNameNodePort)(hdfsBuilder* bld, tPort port) = NULL; +static void (*ptr_hdfsBuilderSetUserName)(hdfsBuilder* bld, const char* userName) = NULL; static void (*ptr_hdfsBuilderSetKerbTicketCachePath)( hdfsBuilder* bld, const char* kerbTicketCachePath) = NULL; static hdfsFS (*ptr_hdfsBuilderConnect)(hdfsBuilder* bld) = NULL; @@ -173,9 +170,9 @@ void hdfsBuilderSetUserName(hdfsBuilder* bld, const char* userName) { ptr_hdfsBuilderSetUserName(bld, userName); } -void hdfsBuilderSetKerbTicketCachePath(hdfsBuilder* bld, - const char* kerbTicketCachePath) { - ptr_hdfsBuilderSetKerbTicketCachePath(bld , kerbTicketCachePath); +void hdfsBuilderSetKerbTicketCachePath( + hdfsBuilder* bld, const char* kerbTicketCachePath) { + ptr_hdfsBuilderSetKerbTicketCachePath(bld, kerbTicketCachePath); } hdfsFS hdfsBuilderConnect(hdfsBuilder* bld) { @@ -364,7 +361,7 @@ static std::vector<fs::path> get_potential_libhdfs_paths() { std::vector<fs::path> libhdfs_potential_paths; std::string file_name; - // OS-specific file name +// OS-specific file name #ifdef __WIN32 file_name = "hdfs.dll"; #elif __APPLE__ @@ -374,10 +371,7 @@ static std::vector<fs::path> get_potential_libhdfs_paths() { #endif // Common paths - std::vector<fs::path> search_paths = { - fs::path(""), - fs::path(".") - }; + std::vector<fs::path> search_paths = {fs::path(""), fs::path(".")}; // Path from environment variable const char* hadoop_home = std::getenv("HADOOP_HOME"); @@ -387,9 +381,7 @@ static std::vector<fs::path> get_potential_libhdfs_paths() { } const char* libhdfs_dir = std::getenv("ARROW_LIBHDFS_DIR"); - if (libhdfs_dir != nullptr) { - search_paths.push_back(fs::path(libhdfs_dir)); - } + if (libhdfs_dir != nullptr) { search_paths.push_back(fs::path(libhdfs_dir)); } // All paths with file name for (auto& path : search_paths) { http://git-wip-us.apache.org/repos/asf/arrow/blob/ed6ec3b7/cpp/src/arrow/ipc/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/CMakeLists.txt b/cpp/src/arrow/ipc/CMakeLists.txt index d2db339..6955bcb 100644 --- a/cpp/src/arrow/ipc/CMakeLists.txt +++ b/cpp/src/arrow/ipc/CMakeLists.txt @@ -34,6 +34,8 @@ set(ARROW_IPC_TEST_LINK_LIBS set(ARROW_IPC_SRCS adapter.cc file.cc + json.cc + json-internal.cc metadata.cc metadata-internal.cc ) @@ -79,6 +81,10 @@ ADD_ARROW_TEST(ipc-metadata-test) ARROW_TEST_LINK_LIBRARIES(ipc-metadata-test ${ARROW_IPC_TEST_LINK_LIBS}) +ADD_ARROW_TEST(ipc-json-test) +ARROW_TEST_LINK_LIBRARIES(ipc-json-test + ${ARROW_IPC_TEST_LINK_LIBS}) + # make clean will delete the generated file set_source_files_properties(Metadata_generated.h PROPERTIES GENERATED TRUE) @@ -114,6 +120,7 @@ add_dependencies(arrow_objlib metadata_fbs) install(FILES adapter.h file.h + json.h metadata.h DESTINATION include/arrow/ipc) http://git-wip-us.apache.org/repos/asf/arrow/blob/ed6ec3b7/cpp/src/arrow/ipc/adapter.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/adapter.cc b/cpp/src/arrow/ipc/adapter.cc index 74786bf..da718c0 100644 --- a/cpp/src/arrow/ipc/adapter.cc +++ b/cpp/src/arrow/ipc/adapter.cc @@ -106,7 +106,7 @@ Status VisitArray(const Array* arr, std::vector<flatbuf::FieldNode>* field_nodes buffers->push_back(binary_arr->data()); } else if (arr->type_enum() == Type::LIST) { const auto list_arr = static_cast<const ListArray*>(arr); - buffers->push_back(list_arr->offset_buffer()); + buffers->push_back(list_arr->offsets()); RETURN_NOT_OK(VisitArray( list_arr->values().get(), field_nodes, buffers, max_recursion_depth - 1)); } else if (arr->type_enum() == Type::STRUCT) { http://git-wip-us.apache.org/repos/asf/arrow/blob/ed6ec3b7/cpp/src/arrow/ipc/ipc-json-test.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/ipc-json-test.cc b/cpp/src/arrow/ipc/ipc-json-test.cc new file mode 100644 index 0000000..a51371c --- /dev/null +++ b/cpp/src/arrow/ipc/ipc-json-test.cc @@ -0,0 +1,353 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <cstdint> +#include <cstdio> +#include <cstring> +#include <iostream> +#include <memory> +#include <string> +#include <vector> + +#include "gtest/gtest.h" + +#include "arrow/array.h" +#include "arrow/ipc/json-internal.h" +#include "arrow/ipc/json.h" +#include "arrow/table.h" +#include "arrow/test-util.h" +#include "arrow/type.h" +#include "arrow/type_traits.h" +#include "arrow/types/primitive.h" +#include "arrow/types/string.h" +#include "arrow/types/struct.h" +#include "arrow/util/memory-pool.h" +#include "arrow/util/status.h" + +namespace arrow { +namespace ipc { + +void TestSchemaRoundTrip(const Schema& schema) { + rj::StringBuffer sb; + rj::Writer<rj::StringBuffer> writer(sb); + + ASSERT_OK(WriteJsonSchema(schema, &writer)); + + rj::Document d; + d.Parse(sb.GetString()); + + std::shared_ptr<Schema> out; + ASSERT_OK(ReadJsonSchema(d, &out)); + + ASSERT_TRUE(schema.Equals(out)); +} + +void TestArrayRoundTrip(const Array& array) { + static std::string name = "dummy"; + + rj::StringBuffer sb; + rj::Writer<rj::StringBuffer> writer(sb); + + ASSERT_OK(WriteJsonArray(name, array, &writer)); + + std::string array_as_json = sb.GetString(); + + rj::Document d; + d.Parse(array_as_json); + + if (d.HasParseError()) { FAIL() << "JSON parsing failed"; } + + std::shared_ptr<Array> out; + ASSERT_OK(ReadJsonArray(default_memory_pool(), d, array.type(), &out)); + + ASSERT_TRUE(array.Equals(out)) << array_as_json; +} + +template <typename T, typename ValueType> +void CheckPrimitive(const std::shared_ptr<DataType>& type, + const std::vector<bool>& is_valid, const std::vector<ValueType>& values) { + MemoryPool* pool = default_memory_pool(); + typename TypeTraits<T>::BuilderType builder(pool, type); + + for (size_t i = 0; i < values.size(); ++i) { + if (is_valid[i]) { + ASSERT_OK(builder.Append(values[i])); + } else { + ASSERT_OK(builder.AppendNull()); + } + } + + std::shared_ptr<Array> array; + ASSERT_OK(builder.Finish(&array)); + TestArrayRoundTrip(*array.get()); +} + +template <typename TYPE, typename C_TYPE> +void MakeArray(const std::shared_ptr<DataType>& type, const std::vector<bool>& is_valid, + const std::vector<C_TYPE>& values, std::shared_ptr<Array>* out) { + std::shared_ptr<Buffer> values_buffer; + std::shared_ptr<Buffer> values_bitmap; + + ASSERT_OK(test::CopyBufferFromVector(values, &values_buffer)); + ASSERT_OK(test::GetBitmapFromBoolVector(is_valid, &values_bitmap)); + + using ArrayType = typename TypeTraits<TYPE>::ArrayType; + + int32_t null_count = 0; + for (bool val : is_valid) { + if (!val) { ++null_count; } + } + + *out = std::make_shared<ArrayType>(type, static_cast<int32_t>(values.size()), + values_buffer, null_count, values_bitmap); +} + +TEST(TestJsonSchemaWriter, FlatTypes) { + std::vector<std::shared_ptr<Field>> fields = {field("f0", int8()), + field("f1", int16(), false), field("f2", int32()), field("f3", int64(), false), + field("f4", uint8()), field("f5", uint16()), field("f6", uint32()), + field("f7", uint64()), field("f8", float32()), field("f9", float64()), + field("f10", utf8()), field("f11", binary()), field("f12", list(int32())), + field("f13", struct_({field("s1", int32()), field("s2", utf8())})), + field("f14", date()), field("f15", timestamp(TimeUnit::NANO)), + field("f16", time(TimeUnit::MICRO)), + field("f17", union_({field("u1", int8()), field("u2", time(TimeUnit::MILLI))}, + {0, 1}, UnionMode::DENSE))}; + + Schema schema(fields); + TestSchemaRoundTrip(schema); +} + +template <typename T> +void PrimitiveTypesCheckOne() { + using c_type = typename T::c_type; + + std::vector<bool> is_valid = {true, false, true, true, true, false, true, true}; + std::vector<c_type> values = {0, 1, 2, 3, 4, 5, 6, 7}; + CheckPrimitive<T, c_type>(std::make_shared<T>(), is_valid, values); +} + +TEST(TestJsonArrayWriter, PrimitiveTypes) { + PrimitiveTypesCheckOne<Int8Type>(); + PrimitiveTypesCheckOne<Int16Type>(); + PrimitiveTypesCheckOne<Int32Type>(); + PrimitiveTypesCheckOne<Int64Type>(); + PrimitiveTypesCheckOne<UInt8Type>(); + PrimitiveTypesCheckOne<UInt16Type>(); + PrimitiveTypesCheckOne<UInt32Type>(); + PrimitiveTypesCheckOne<UInt64Type>(); + PrimitiveTypesCheckOne<FloatType>(); + PrimitiveTypesCheckOne<DoubleType>(); + + std::vector<bool> is_valid = {true, false, true, true, true, false, true, true}; + std::vector<std::string> values = {"foo", "bar", "", "baz", "qux", "foo", "a", "1"}; + + CheckPrimitive<StringType, std::string>(utf8(), is_valid, values); + CheckPrimitive<BinaryType, std::string>(binary(), is_valid, values); +} + +TEST(TestJsonArrayWriter, NestedTypes) { + auto value_type = int32(); + + std::vector<bool> values_is_valid = {true, false, true, true, false, true, true}; + std::vector<int32_t> values = {0, 1, 2, 3, 4, 5, 6}; + + std::shared_ptr<Array> values_array; + MakeArray<Int32Type, int32_t>(int32(), values_is_valid, values, &values_array); + + // List + std::vector<bool> list_is_valid = {true, false, true, true, true}; + std::vector<int32_t> offsets = {0, 0, 0, 1, 4, 7}; + + std::shared_ptr<Buffer> list_bitmap; + ASSERT_OK(test::GetBitmapFromBoolVector(list_is_valid, &list_bitmap)); + std::shared_ptr<Buffer> offsets_buffer = test::GetBufferFromVector(offsets); + + ListArray list_array(list(value_type), 5, offsets_buffer, values_array, 1, list_bitmap); + + TestArrayRoundTrip(list_array); + + // Struct + std::vector<bool> struct_is_valid = {true, false, true, true, true, false, true}; + std::shared_ptr<Buffer> struct_bitmap; + ASSERT_OK(test::GetBitmapFromBoolVector(struct_is_valid, &struct_bitmap)); + + auto struct_type = + struct_({field("f1", int32()), field("f2", int32()), field("f3", int32())}); + + std::vector<std::shared_ptr<Array>> fields = {values_array, values_array, values_array}; + StructArray struct_array( + struct_type, static_cast<int>(struct_is_valid.size()), fields, 2, struct_bitmap); + TestArrayRoundTrip(struct_array); +} + +// Data generation for test case below +void MakeBatchArrays(const std::shared_ptr<Schema>& schema, const int num_rows, + std::vector<std::shared_ptr<Array>>* arrays) { + std::vector<bool> is_valid; + test::random_is_valid(num_rows, 0.25, &is_valid); + + std::vector<int8_t> v1_values; + std::vector<int32_t> v2_values; + + test::randint<int8_t>(num_rows, 0, 100, &v1_values); + test::randint<int32_t>(num_rows, 0, 100, &v2_values); + + std::shared_ptr<Array> v1; + MakeArray<Int8Type, int8_t>(schema->field(0)->type, is_valid, v1_values, &v1); + + std::shared_ptr<Array> v2; + MakeArray<Int32Type, int32_t>(schema->field(1)->type, is_valid, v2_values, &v2); + + static const int kBufferSize = 10; + static uint8_t buffer[kBufferSize]; + static uint32_t seed = 0; + StringBuilder string_builder(default_memory_pool(), utf8()); + for (int i = 0; i < num_rows; ++i) { + if (!is_valid[i]) { + string_builder.AppendNull(); + } else { + test::random_ascii(kBufferSize, seed++, buffer); + string_builder.Append(buffer, kBufferSize); + } + } + std::shared_ptr<Array> v3; + ASSERT_OK(string_builder.Finish(&v3)); + + arrays->emplace_back(v1); + arrays->emplace_back(v2); + arrays->emplace_back(v3); +} + +TEST(TestJsonFileReadWrite, BasicRoundTrip) { + auto v1_type = int8(); + auto v2_type = int32(); + auto v3_type = utf8(); + + std::shared_ptr<Schema> schema( + new Schema({field("f1", v1_type), field("f2", v2_type), field("f3", v3_type)})); + + std::unique_ptr<JsonWriter> writer; + ASSERT_OK(JsonWriter::Open(schema, &writer)); + + const int nbatches = 3; + std::vector<std::shared_ptr<RecordBatch>> batches; + for (int i = 0; i < nbatches; ++i) { + int32_t num_rows = 5 + i * 5; + std::vector<std::shared_ptr<Array>> arrays; + + MakeBatchArrays(schema, num_rows, &arrays); + batches.emplace_back(std::make_shared<RecordBatch>(schema, num_rows, arrays)); + ASSERT_OK(writer->WriteRecordBatch(arrays, num_rows)); + } + + std::string result; + ASSERT_OK(writer->Finish(&result)); + + std::unique_ptr<JsonReader> reader; + + auto buffer = std::make_shared<Buffer>( + reinterpret_cast<const uint8_t*>(result.c_str()), static_cast<int>(result.size())); + + ASSERT_OK(JsonReader::Open(buffer, &reader)); + ASSERT_TRUE(reader->schema()->Equals(*schema.get())); + + ASSERT_EQ(nbatches, reader->num_record_batches()); + + for (int i = 0; i < nbatches; ++i) { + std::shared_ptr<RecordBatch> batch; + ASSERT_OK(reader->GetRecordBatch(i, &batch)); + ASSERT_TRUE(batch->Equals(*batches[i].get())); + } +} + +TEST(TestJsonFileReadWrite, MinimalFormatExample) { + static const char* example = R"example( +{ + "schema": { + "fields": [ + { + "name": "foo", + "type": {"name": "int", "isSigned": true, "bitWidth": 32}, + "nullable": true, "children": [], + "typeLayout": [ + {"type": "VALIDITY", "typeBitWidth": 1}, + {"type": "DATA", "typeBitWidth": 32} + ] + }, + { + "name": "bar", + "type": {"name": "floatingpoint", "precision": "DOUBLE"}, + "nullable": true, "children": [], + "typeLayout": [ + {"type": "VALIDITY", "typeBitWidth": 1}, + {"type": "DATA", "typeBitWidth": 64} + ] + } + ] + }, + "batches": [ + { + "count": 5, + "columns": [ + { + "name": "foo", + "count": 5, + "DATA": [1, 2, 3, 4, 5], + "VALIDITY": [1, 0, 1, 1, 1] + }, + { + "name": "bar", + "count": 5, + "DATA": [1.0, 2.0, 3.0, 4.0, 5.0], + "VALIDITY": [1, 0, 0, 1, 1] + } + ] + } + ] +} +)example"; + + auto buffer = std::make_shared<Buffer>( + reinterpret_cast<const uint8_t*>(example), strlen(example)); + + std::unique_ptr<JsonReader> reader; + ASSERT_OK(JsonReader::Open(buffer, &reader)); + + Schema ex_schema({field("foo", int32()), field("bar", float64())}); + + ASSERT_TRUE(reader->schema()->Equals(ex_schema)); + ASSERT_EQ(1, reader->num_record_batches()); + + std::shared_ptr<RecordBatch> batch; + ASSERT_OK(reader->GetRecordBatch(0, &batch)); + + std::vector<bool> foo_valid = {true, false, true, true, true}; + std::vector<int32_t> foo_values = {1, 2, 3, 4, 5}; + std::shared_ptr<Array> foo; + MakeArray<Int32Type, int32_t>(int32(), foo_valid, foo_values, &foo); + ASSERT_TRUE(batch->column(0)->Equals(foo)); + + std::vector<bool> bar_valid = {true, false, false, true, true}; + std::vector<double> bar_values = {1, 2, 3, 4, 5}; + std::shared_ptr<Array> bar; + MakeArray<DoubleType, double>(float64(), bar_valid, bar_values, &bar); + ASSERT_TRUE(batch->column(1)->Equals(bar)); +} + +} // namespace ipc +} // namespace arrow http://git-wip-us.apache.org/repos/asf/arrow/blob/ed6ec3b7/cpp/src/arrow/ipc/json-internal.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/json-internal.cc b/cpp/src/arrow/ipc/json-internal.cc new file mode 100644 index 0000000..31fe35b --- /dev/null +++ b/cpp/src/arrow/ipc/json-internal.cc @@ -0,0 +1,1113 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/ipc/json-internal.h" + +#include <cstdint> +#include <memory> +#include <sstream> +#include <string> +#include <type_traits> +#include <vector> + +#include "rapidjson/stringbuffer.h" +#include "rapidjson/writer.h" + +#include "arrow/array.h" +#include "arrow/schema.h" +#include "arrow/type.h" +#include "arrow/type_traits.h" +#include "arrow/types/list.h" +#include "arrow/types/primitive.h" +#include "arrow/types/string.h" +#include "arrow/types/struct.h" +#include "arrow/util/bit-util.h" +#include "arrow/util/memory-pool.h" +#include "arrow/util/status.h" + +namespace arrow { +namespace ipc { + +using RjArray = rj::Value::ConstArray; +using RjObject = rj::Value::ConstObject; + +enum class BufferType : char { DATA, OFFSET, TYPE, VALIDITY }; + +static std::string GetBufferTypeName(BufferType type) { + switch (type) { + case BufferType::DATA: + return "DATA"; + case BufferType::OFFSET: + return "OFFSET"; + case BufferType::TYPE: + return "TYPE"; + case BufferType::VALIDITY: + return "VALIDITY"; + default: + break; + } + return "UNKNOWN"; +} + +static std::string GetFloatingPrecisionName(FloatingPointMeta::Precision precision) { + switch (precision) { + case FloatingPointMeta::HALF: + return "HALF"; + case FloatingPointMeta::SINGLE: + return "SINGLE"; + case FloatingPointMeta::DOUBLE: + return "DOUBLE"; + default: + break; + } + return "UNKNOWN"; +} + +static std::string GetTimeUnitName(TimeUnit unit) { + switch (unit) { + case TimeUnit::SECOND: + return "SECOND"; + case TimeUnit::MILLI: + return "MILLISECOND"; + case TimeUnit::MICRO: + return "MICROSECOND"; + case TimeUnit::NANO: + return "NANOSECOND"; + default: + break; + } + return "UNKNOWN"; +} + +class BufferLayout { + public: + BufferLayout(BufferType type, int bit_width) : type_(type), bit_width_(bit_width) {} + + BufferType type() const { return type_; } + int bit_width() const { return bit_width_; } + + private: + BufferType type_; + int bit_width_; +}; + +static const BufferLayout kValidityBuffer(BufferType::VALIDITY, 1); +static const BufferLayout kOffsetBuffer(BufferType::OFFSET, 32); +static const BufferLayout kTypeBuffer(BufferType::TYPE, 32); +static const BufferLayout kBooleanBuffer(BufferType::DATA, 1); +static const BufferLayout kValues64(BufferType::DATA, 64); +static const BufferLayout kValues32(BufferType::DATA, 32); +static const BufferLayout kValues16(BufferType::DATA, 16); +static const BufferLayout kValues8(BufferType::DATA, 8); + +class JsonSchemaWriter : public TypeVisitor { + public: + explicit JsonSchemaWriter(const Schema& schema, RjWriter* writer) + : schema_(schema), writer_(writer) {} + + Status Write() { + writer_->StartObject(); + writer_->Key("fields"); + writer_->StartArray(); + for (const std::shared_ptr<Field>& field : schema_.fields()) { + RETURN_NOT_OK(VisitField(*field.get())); + } + writer_->EndArray(); + writer_->EndObject(); + return Status::OK(); + } + + Status VisitField(const Field& field) { + writer_->StartObject(); + + writer_->Key("name"); + writer_->String(field.name.c_str()); + + writer_->Key("nullable"); + writer_->Bool(field.nullable); + + // Visit the type + RETURN_NOT_OK(field.type->Accept(this)); + writer_->EndObject(); + + return Status::OK(); + } + + void SetNoChildren() { + writer_->Key("children"); + writer_->StartArray(); + writer_->EndArray(); + } + + template <typename T> + typename std::enable_if<std::is_base_of<NoExtraMeta, T>::value || + std::is_base_of<BooleanType, T>::value || + std::is_base_of<NullType, T>::value, + void>::type + WriteTypeMetadata(const T& type) {} + + template <typename T> + typename std::enable_if<std::is_base_of<IntegerMeta, T>::value, void>::type + WriteTypeMetadata(const T& type) { + writer_->Key("bitWidth"); + writer_->Int(type.bit_width()); + writer_->Key("isSigned"); + writer_->Bool(type.is_signed()); + } + + template <typename T> + typename std::enable_if<std::is_base_of<FloatingPointMeta, T>::value, void>::type + WriteTypeMetadata(const T& type) { + writer_->Key("precision"); + writer_->String(GetFloatingPrecisionName(type.precision())); + } + + template <typename T> + typename std::enable_if<std::is_base_of<IntervalType, T>::value, void>::type + WriteTypeMetadata(const T& type) { + writer_->Key("unit"); + switch (type.unit) { + case IntervalType::Unit::YEAR_MONTH: + writer_->String("YEAR_MONTH"); + break; + case IntervalType::Unit::DAY_TIME: + writer_->String("DAY_TIME"); + break; + } + } + + template <typename T> + typename std::enable_if<std::is_base_of<TimeType, T>::value || + std::is_base_of<TimestampType, T>::value, + void>::type + WriteTypeMetadata(const T& type) { + writer_->Key("unit"); + writer_->String(GetTimeUnitName(type.unit)); + } + + template <typename T> + typename std::enable_if<std::is_base_of<DecimalType, T>::value, void>::type + WriteTypeMetadata(const T& type) { + writer_->Key("precision"); + writer_->Int(type.precision); + writer_->Key("scale"); + writer_->Int(type.scale); + } + + template <typename T> + typename std::enable_if<std::is_base_of<UnionType, T>::value, void>::type + WriteTypeMetadata(const T& type) { + writer_->Key("mode"); + switch (type.mode) { + case UnionMode::SPARSE: + writer_->String("SPARSE"); + break; + case UnionMode::DENSE: + writer_->String("DENSE"); + break; + } + + // Write type ids + writer_->Key("typeIds"); + writer_->StartArray(); + for (size_t i = 0; i < type.type_ids.size(); ++i) { + writer_->Uint(type.type_ids[i]); + } + writer_->EndArray(); + } + + // TODO(wesm): Other Type metadata + + template <typename T> + void WriteName(const std::string& typeclass, const T& type) { + writer_->Key("type"); + writer_->StartObject(); + writer_->Key("name"); + writer_->String(typeclass); + WriteTypeMetadata(type); + writer_->EndObject(); + } + + template <typename T> + Status WritePrimitive(const std::string& typeclass, const T& type, + const std::vector<BufferLayout>& buffer_layout) { + WriteName(typeclass, type); + SetNoChildren(); + WriteBufferLayout(buffer_layout); + return Status::OK(); + } + + template <typename T> + Status WriteVarBytes(const std::string& typeclass, const T& type) { + WriteName(typeclass, type); + SetNoChildren(); + WriteBufferLayout({kValidityBuffer, kOffsetBuffer, kValues8}); + return Status::OK(); + } + + void WriteBufferLayout(const std::vector<BufferLayout>& buffer_layout) { + writer_->Key("typeLayout"); + writer_->StartArray(); + + for (const BufferLayout& buffer : buffer_layout) { + writer_->StartObject(); + writer_->Key("type"); + writer_->String(GetBufferTypeName(buffer.type())); + + writer_->Key("typeBitWidth"); + writer_->Int(buffer.bit_width()); + + writer_->EndObject(); + } + writer_->EndArray(); + } + + Status WriteChildren(const std::vector<std::shared_ptr<Field>>& children) { + writer_->Key("children"); + writer_->StartArray(); + for (const std::shared_ptr<Field>& field : children) { + RETURN_NOT_OK(VisitField(*field.get())); + } + writer_->EndArray(); + return Status::OK(); + } + + Status Visit(const NullType& type) override { return WritePrimitive("null", type, {}); } + + Status Visit(const BooleanType& type) override { + return WritePrimitive("bool", type, {kValidityBuffer, kBooleanBuffer}); + } + + Status Visit(const Int8Type& type) override { + return WritePrimitive("int", type, {kValidityBuffer, kValues8}); + } + + Status Visit(const Int16Type& type) override { + return WritePrimitive("int", type, {kValidityBuffer, kValues16}); + } + + Status Visit(const Int32Type& type) override { + return WritePrimitive("int", type, {kValidityBuffer, kValues32}); + } + + Status Visit(const Int64Type& type) override { + return WritePrimitive("int", type, {kValidityBuffer, kValues64}); + } + + Status Visit(const UInt8Type& type) override { + return WritePrimitive("int", type, {kValidityBuffer, kValues8}); + } + + Status Visit(const UInt16Type& type) override { + return WritePrimitive("int", type, {kValidityBuffer, kValues16}); + } + + Status Visit(const UInt32Type& type) override { + return WritePrimitive("int", type, {kValidityBuffer, kValues32}); + } + + Status Visit(const UInt64Type& type) override { + return WritePrimitive("int", type, {kValidityBuffer, kValues64}); + } + + Status Visit(const HalfFloatType& type) override { + return WritePrimitive("floatingpoint", type, {kValidityBuffer, kValues16}); + } + + Status Visit(const FloatType& type) override { + return WritePrimitive("floatingpoint", type, {kValidityBuffer, kValues32}); + } + + Status Visit(const DoubleType& type) override { + return WritePrimitive("floatingpoint", type, {kValidityBuffer, kValues64}); + } + + Status Visit(const StringType& type) override { return WriteVarBytes("utf8", type); } + + Status Visit(const BinaryType& type) override { return WriteVarBytes("binary", type); } + + Status Visit(const DateType& type) override { + return WritePrimitive("date", type, {kValidityBuffer, kValues64}); + } + + Status Visit(const TimeType& type) override { + return WritePrimitive("time", type, {kValidityBuffer, kValues64}); + } + + Status Visit(const TimestampType& type) override { + return WritePrimitive("timestamp", type, {kValidityBuffer, kValues64}); + } + + Status Visit(const IntervalType& type) override { + return WritePrimitive("interval", type, {kValidityBuffer, kValues64}); + } + + Status Visit(const DecimalType& type) override { return Status::NotImplemented("NYI"); } + + Status Visit(const ListType& type) override { + WriteName("list", type); + RETURN_NOT_OK(WriteChildren(type.children())); + WriteBufferLayout({kValidityBuffer, kOffsetBuffer}); + return Status::OK(); + } + + Status Visit(const StructType& type) override { + WriteName("struct", type); + WriteChildren(type.children()); + WriteBufferLayout({kValidityBuffer, kTypeBuffer}); + return Status::OK(); + } + + Status Visit(const UnionType& type) override { + WriteName("union", type); + WriteChildren(type.children()); + + if (type.mode == UnionMode::SPARSE) { + WriteBufferLayout({kValidityBuffer, kTypeBuffer}); + } else { + WriteBufferLayout({kValidityBuffer, kTypeBuffer, kOffsetBuffer}); + } + return Status::OK(); + } + + private: + const Schema& schema_; + RjWriter* writer_; +}; + +class JsonArrayWriter : public ArrayVisitor { + public: + explicit JsonArrayWriter(const std::string& name, const Array& array, RjWriter* writer) + : name_(name), array_(array), writer_(writer) {} + + Status Write() { return VisitArray(name_, array_); } + + Status VisitArray(const std::string& name, const Array& arr) { + writer_->StartObject(); + writer_->Key("name"); + writer_->String(name); + + writer_->Key("count"); + writer_->Int(arr.length()); + + RETURN_NOT_OK(arr.Accept(this)); + + writer_->EndObject(); + return Status::OK(); + } + + template <typename T> + typename std::enable_if<IsSignedInt<T>::value, void>::type WriteDataValues( + const T& arr) { + const auto data = arr.raw_data(); + for (int i = 0; i < arr.length(); ++i) { + writer_->Int64(data[i]); + } + } + + template <typename T> + typename std::enable_if<IsUnsignedInt<T>::value, void>::type WriteDataValues( + const T& arr) { + const auto data = arr.raw_data(); + for (int i = 0; i < arr.length(); ++i) { + writer_->Uint64(data[i]); + } + } + + template <typename T> + typename std::enable_if<IsFloatingPoint<T>::value, void>::type WriteDataValues( + const T& arr) { + const auto data = arr.raw_data(); + for (int i = 0; i < arr.length(); ++i) { + writer_->Double(data[i]); + } + } + + // String (Utf8), Binary + template <typename T> + typename std::enable_if<std::is_base_of<BinaryArray, T>::value, void>::type + WriteDataValues(const T& arr) { + for (int i = 0; i < arr.length(); ++i) { + int32_t length; + const char* buf = reinterpret_cast<const char*>(arr.GetValue(i, &length)); + writer_->String(buf, length); + } + } + + template <typename T> + typename std::enable_if<std::is_base_of<BooleanArray, T>::value, void>::type + WriteDataValues(const T& arr) { + for (int i = 0; i < arr.length(); ++i) { + writer_->Bool(arr.Value(i)); + } + } + + template <typename T> + void WriteDataField(const T& arr) { + writer_->Key("DATA"); + writer_->StartArray(); + WriteDataValues(arr); + writer_->EndArray(); + } + + template <typename T> + void WriteOffsetsField(const T* offsets, int32_t length) { + writer_->Key("OFFSETS"); + writer_->StartArray(); + for (int i = 0; i < length; ++i) { + writer_->Int64(offsets[i]); + } + writer_->EndArray(); + } + + void WriteValidityField(const Array& arr) { + writer_->Key("VALIDITY"); + writer_->StartArray(); + if (arr.null_count() > 0) { + for (int i = 0; i < arr.length(); ++i) { + writer_->Int(arr.IsNull(i) ? 0 : 1); + } + } else { + for (int i = 0; i < arr.length(); ++i) { + writer_->Int(1); + } + } + writer_->EndArray(); + } + + void SetNoChildren() { + writer_->Key("children"); + writer_->StartArray(); + writer_->EndArray(); + } + + template <typename T> + Status WritePrimitive(const T& array) { + WriteValidityField(array); + WriteDataField(array); + SetNoChildren(); + return Status::OK(); + } + + template <typename T> + Status WriteVarBytes(const T& array) { + WriteValidityField(array); + WriteOffsetsField(array.raw_offsets(), array.length() + 1); + WriteDataField(array); + SetNoChildren(); + return Status::OK(); + } + + Status WriteChildren(const std::vector<std::shared_ptr<Field>>& fields, + const std::vector<std::shared_ptr<Array>>& arrays) { + writer_->Key("children"); + writer_->StartArray(); + for (size_t i = 0; i < fields.size(); ++i) { + RETURN_NOT_OK(VisitArray(fields[i]->name, *arrays[i].get())); + } + writer_->EndArray(); + return Status::OK(); + } + + Status Visit(const NullArray& array) override { + SetNoChildren(); + return Status::OK(); + } + + Status Visit(const BooleanArray& array) override { return WritePrimitive(array); } + + Status Visit(const Int8Array& array) override { return WritePrimitive(array); } + + Status Visit(const Int16Array& array) override { return WritePrimitive(array); } + + Status Visit(const Int32Array& array) override { return WritePrimitive(array); } + + Status Visit(const Int64Array& array) override { return WritePrimitive(array); } + + Status Visit(const UInt8Array& array) override { return WritePrimitive(array); } + + Status Visit(const UInt16Array& array) override { return WritePrimitive(array); } + + Status Visit(const UInt32Array& array) override { return WritePrimitive(array); } + + Status Visit(const UInt64Array& array) override { return WritePrimitive(array); } + + Status Visit(const HalfFloatArray& array) override { return WritePrimitive(array); } + + Status Visit(const FloatArray& array) override { return WritePrimitive(array); } + + Status Visit(const DoubleArray& array) override { return WritePrimitive(array); } + + Status Visit(const StringArray& array) override { return WriteVarBytes(array); } + + Status Visit(const BinaryArray& array) override { return WriteVarBytes(array); } + + Status Visit(const DateArray& array) override { return Status::NotImplemented("date"); } + + Status Visit(const TimeArray& array) override { return Status::NotImplemented("time"); } + + Status Visit(const TimestampArray& array) override { + return Status::NotImplemented("timestamp"); + } + + Status Visit(const IntervalArray& array) override { + return Status::NotImplemented("interval"); + } + + Status Visit(const DecimalArray& array) override { + return Status::NotImplemented("decimal"); + } + + Status Visit(const ListArray& array) override { + WriteValidityField(array); + WriteOffsetsField(array.raw_offsets(), array.length() + 1); + auto type = static_cast<const ListType*>(array.type().get()); + return WriteChildren(type->children(), {array.values()}); + } + + Status Visit(const StructArray& array) override { + WriteValidityField(array); + auto type = static_cast<const StructType*>(array.type().get()); + return WriteChildren(type->children(), array.fields()); + } + + Status Visit(const UnionArray& array) override { + return Status::NotImplemented("union"); + } + + private: + const std::string& name_; + const Array& array_; + RjWriter* writer_; +}; + +class JsonSchemaReader { + public: + explicit JsonSchemaReader(const rj::Value& json_schema) : json_schema_(json_schema) {} + + Status GetSchema(std::shared_ptr<Schema>* schema) { + const auto& obj_schema = json_schema_.GetObject(); + + const auto& json_fields = obj_schema.FindMember("fields"); + RETURN_NOT_ARRAY("fields", json_fields, obj_schema); + + std::vector<std::shared_ptr<Field>> fields; + RETURN_NOT_OK(GetFieldsFromArray(json_fields->value, &fields)); + + *schema = std::make_shared<Schema>(fields); + return Status::OK(); + } + + Status GetFieldsFromArray( + const rj::Value& obj, std::vector<std::shared_ptr<Field>>* fields) { + const auto& values = obj.GetArray(); + + fields->resize(values.Size()); + for (size_t i = 0; i < fields->size(); ++i) { + RETURN_NOT_OK(GetField(values[i], &(*fields)[i])); + } + return Status::OK(); + } + + Status GetField(const rj::Value& obj, std::shared_ptr<Field>* field) { + if (!obj.IsObject()) { return Status::Invalid("Field was not a JSON object"); } + const auto& json_field = obj.GetObject(); + + const auto& json_name = json_field.FindMember("name"); + RETURN_NOT_STRING("name", json_name, json_field); + + const auto& json_nullable = json_field.FindMember("nullable"); + RETURN_NOT_BOOL("nullable", json_nullable, json_field); + + const auto& json_type = json_field.FindMember("type"); + RETURN_NOT_OBJECT("type", json_type, json_field); + + const auto& json_children = json_field.FindMember("children"); + RETURN_NOT_ARRAY("children", json_children, json_field); + + std::vector<std::shared_ptr<Field>> children; + RETURN_NOT_OK(GetFieldsFromArray(json_children->value, &children)); + + std::shared_ptr<DataType> type; + RETURN_NOT_OK(GetType(json_type->value.GetObject(), children, &type)); + + *field = std::make_shared<Field>( + json_name->value.GetString(), type, json_nullable->value.GetBool()); + return Status::OK(); + } + + Status GetInteger( + const rj::Value::ConstObject& json_type, std::shared_ptr<DataType>* type) { + const auto& json_bit_width = json_type.FindMember("bitWidth"); + RETURN_NOT_INT("bitWidth", json_bit_width, json_type); + + const auto& json_is_signed = json_type.FindMember("isSigned"); + RETURN_NOT_BOOL("isSigned", json_is_signed, json_type); + + bool is_signed = json_is_signed->value.GetBool(); + int bit_width = json_bit_width->value.GetInt(); + + switch (bit_width) { + case 8: + *type = is_signed ? int8() : uint8(); + break; + case 16: + *type = is_signed ? int16() : uint16(); + break; + case 32: + *type = is_signed ? int32() : uint32(); + break; + case 64: + *type = is_signed ? int64() : uint64(); + break; + default: + std::stringstream ss; + ss << "Invalid bit width: " << bit_width; + return Status::Invalid(ss.str()); + } + return Status::OK(); + } + + Status GetFloatingPoint(const RjObject& json_type, std::shared_ptr<DataType>* type) { + const auto& json_precision = json_type.FindMember("precision"); + RETURN_NOT_STRING("precision", json_precision, json_type); + + std::string precision = json_precision->value.GetString(); + + if (precision == "DOUBLE") { + *type = float64(); + } else if (precision == "SINGLE") { + *type = float32(); + } else if (precision == "HALF") { + *type = float16(); + } else { + std::stringstream ss; + ss << "Invalid precision: " << precision; + return Status::Invalid(ss.str()); + } + return Status::OK(); + } + + template <typename T> + Status GetTimeLike(const RjObject& json_type, std::shared_ptr<DataType>* type) { + const auto& json_unit = json_type.FindMember("unit"); + RETURN_NOT_STRING("unit", json_unit, json_type); + + std::string unit_str = json_unit->value.GetString(); + + TimeUnit unit; + + if (unit_str == "SECOND") { + unit = TimeUnit::SECOND; + } else if (unit_str == "MILLISECOND") { + unit = TimeUnit::MILLI; + } else if (unit_str == "MICROSECOND") { + unit = TimeUnit::MICRO; + } else if (unit_str == "NANOSECOND") { + unit = TimeUnit::NANO; + } else { + std::stringstream ss; + ss << "Invalid time unit: " << unit_str; + return Status::Invalid(ss.str()); + } + + *type = std::make_shared<T>(unit); + + return Status::OK(); + } + + Status GetUnion(const RjObject& json_type, + const std::vector<std::shared_ptr<Field>>& children, + std::shared_ptr<DataType>* type) { + const auto& json_mode = json_type.FindMember("mode"); + RETURN_NOT_STRING("mode", json_mode, json_type); + + std::string mode_str = json_mode->value.GetString(); + UnionMode mode; + + if (mode_str == "SPARSE") { + mode = UnionMode::SPARSE; + } else if (mode_str == "DENSE") { + mode = UnionMode::DENSE; + } else { + std::stringstream ss; + ss << "Invalid union mode: " << mode_str; + return Status::Invalid(ss.str()); + } + + const auto& json_type_ids = json_type.FindMember("typeIds"); + RETURN_NOT_ARRAY("typeIds", json_type_ids, json_type); + + std::vector<uint8_t> type_ids; + const auto& id_array = json_type_ids->value.GetArray(); + for (const rj::Value& val : id_array) { + DCHECK(val.IsUint()); + type_ids.push_back(val.GetUint()); + } + + *type = union_(children, type_ids, mode); + + return Status::OK(); + } + + Status GetType(const RjObject& json_type, + const std::vector<std::shared_ptr<Field>>& children, + std::shared_ptr<DataType>* type) { + const auto& json_type_name = json_type.FindMember("name"); + RETURN_NOT_STRING("name", json_type_name, json_type); + + std::string type_name = json_type_name->value.GetString(); + + if (type_name == "int") { + return GetInteger(json_type, type); + } else if (type_name == "floatingpoint") { + return GetFloatingPoint(json_type, type); + } else if (type_name == "bool") { + *type = boolean(); + } else if (type_name == "utf8") { + *type = utf8(); + } else if (type_name == "binary") { + *type = binary(); + } else if (type_name == "null") { + *type = null(); + } else if (type_name == "date") { + *type = date(); + } else if (type_name == "time") { + return GetTimeLike<TimeType>(json_type, type); + } else if (type_name == "timestamp") { + return GetTimeLike<TimestampType>(json_type, type); + } else if (type_name == "list") { + *type = list(children[0]); + } else if (type_name == "struct") { + *type = struct_(children); + } else { + return GetUnion(json_type, children, type); + } + return Status::OK(); + } + + private: + const rj::Value& json_schema_; +}; + +class JsonArrayReader { + public: + explicit JsonArrayReader(MemoryPool* pool) : pool_(pool) {} + + Status GetValidityBuffer(const std::vector<bool>& is_valid, int32_t* null_count, + std::shared_ptr<Buffer>* validity_buffer) { + int length = static_cast<int>(is_valid.size()); + + std::shared_ptr<MutableBuffer> out_buffer; + RETURN_NOT_OK(GetEmptyBitmap(pool_, length, &out_buffer)); + uint8_t* bitmap = out_buffer->mutable_data(); + + *null_count = 0; + for (int i = 0; i < length; ++i) { + if (!is_valid[i]) { + ++(*null_count); + continue; + } + BitUtil::SetBit(bitmap, i); + } + + *validity_buffer = out_buffer; + return Status::OK(); + } + + template <typename T> + typename std::enable_if<std::is_base_of<PrimitiveCType, T>::value || + std::is_base_of<BooleanType, T>::value, + Status>::type + ReadArray(const RjObject& json_array, int32_t length, const std::vector<bool>& is_valid, + const std::shared_ptr<DataType>& type, std::shared_ptr<Array>* array) { + typename TypeTraits<T>::BuilderType builder(pool_, type); + + const auto& json_data = json_array.FindMember("DATA"); + RETURN_NOT_ARRAY("DATA", json_data, json_array); + + const auto& json_data_arr = json_data->value.GetArray(); + + DCHECK_EQ(static_cast<int32_t>(json_data_arr.Size()), length); + for (int i = 0; i < length; ++i) { + if (!is_valid[i]) { + builder.AppendNull(); + continue; + } + + const rj::Value& val = json_data_arr[i]; + if (IsSignedInt<T>::value) { + DCHECK(val.IsInt()); + builder.Append(val.GetInt64()); + } else if (IsUnsignedInt<T>::value) { + DCHECK(val.IsUint()); + builder.Append(val.GetUint64()); + } else if (IsFloatingPoint<T>::value) { + DCHECK(val.IsFloat()); + builder.Append(val.GetFloat()); + } else if (std::is_base_of<BooleanType, T>::value) { + DCHECK(val.IsBool()); + builder.Append(val.GetBool()); + } else { + // We are in the wrong function + return Status::Invalid(type->ToString()); + } + } + + return builder.Finish(array); + } + + template <typename T> + typename std::enable_if<std::is_base_of<BinaryType, T>::value, Status>::type ReadArray( + const RjObject& json_array, int32_t length, const std::vector<bool>& is_valid, + const std::shared_ptr<DataType>& type, std::shared_ptr<Array>* array) { + typename TypeTraits<T>::BuilderType builder(pool_, type); + + const auto& json_data = json_array.FindMember("DATA"); + RETURN_NOT_ARRAY("DATA", json_data, json_array); + + const auto& json_data_arr = json_data->value.GetArray(); + + DCHECK_EQ(static_cast<int32_t>(json_data_arr.Size()), length); + for (int i = 0; i < length; ++i) { + if (!is_valid[i]) { + builder.AppendNull(); + continue; + } + + const rj::Value& val = json_data_arr[i]; + DCHECK(val.IsString()); + builder.Append(val.GetString()); + } + + return builder.Finish(array); + } + + template <typename T> + typename std::enable_if<std::is_base_of<ListType, T>::value, Status>::type ReadArray( + const RjObject& json_array, int32_t length, const std::vector<bool>& is_valid, + const std::shared_ptr<DataType>& type, std::shared_ptr<Array>* array) { + const auto& json_offsets = json_array.FindMember("OFFSETS"); + RETURN_NOT_ARRAY("OFFSETS", json_offsets, json_array); + const auto& json_offsets_arr = json_offsets->value.GetArray(); + + int32_t null_count = 0; + std::shared_ptr<Buffer> validity_buffer; + RETURN_NOT_OK(GetValidityBuffer(is_valid, &null_count, &validity_buffer)); + + auto offsets_buffer = std::make_shared<PoolBuffer>(pool_); + RETURN_NOT_OK(offsets_buffer->Resize((length + 1) * sizeof(int32_t))); + int32_t* offsets = reinterpret_cast<int32_t*>(offsets_buffer->mutable_data()); + + for (int i = 0; i < length + 1; ++i) { + const rj::Value& val = json_offsets_arr[i]; + DCHECK(val.IsInt()); + offsets[i] = val.GetInt(); + } + + std::vector<std::shared_ptr<Array>> children; + RETURN_NOT_OK(GetChildren(json_array, type, &children)); + DCHECK_EQ(children.size(), 1); + + *array = std::make_shared<ListArray>( + type, length, offsets_buffer, children[0], null_count, validity_buffer); + + return Status::OK(); + } + + template <typename T> + typename std::enable_if<std::is_base_of<StructType, T>::value, Status>::type ReadArray( + const RjObject& json_array, int32_t length, const std::vector<bool>& is_valid, + const std::shared_ptr<DataType>& type, std::shared_ptr<Array>* array) { + int32_t null_count = 0; + std::shared_ptr<Buffer> validity_buffer; + RETURN_NOT_OK(GetValidityBuffer(is_valid, &null_count, &validity_buffer)); + + std::vector<std::shared_ptr<Array>> fields; + RETURN_NOT_OK(GetChildren(json_array, type, &fields)); + + *array = + std::make_shared<StructArray>(type, length, fields, null_count, validity_buffer); + + return Status::OK(); + } + + template <typename T> + typename std::enable_if<std::is_base_of<NullType, T>::value, Status>::type ReadArray( + const RjObject& json_array, int32_t length, const std::vector<bool>& is_valid, + const std::shared_ptr<DataType>& type, std::shared_ptr<Array>* array) { + *array = std::make_shared<NullArray>(type, length); + return Status::OK(); + } + + Status GetChildren(const RjObject& json_array, const std::shared_ptr<DataType>& type, + std::vector<std::shared_ptr<Array>>* array) { + const auto& json_children = json_array.FindMember("children"); + RETURN_NOT_ARRAY("children", json_children, json_array); + const auto& json_children_arr = json_children->value.GetArray(); + + if (type->num_children() != static_cast<int>(json_children_arr.Size())) { + std::stringstream ss; + ss << "Expected " << type->num_children() << " children, but got " + << json_children_arr.Size(); + return Status::Invalid(ss.str()); + } + + for (int i = 0; i < static_cast<int>(json_children_arr.Size()); ++i) { + const rj::Value& json_child = json_children_arr[i]; + DCHECK(json_child.IsObject()); + + std::shared_ptr<Field> child_field = type->child(i); + + auto it = json_child.FindMember("name"); + RETURN_NOT_STRING("name", it, json_child); + + DCHECK_EQ(it->value.GetString(), child_field->name); + std::shared_ptr<Array> child; + RETURN_NOT_OK(GetArray(json_children_arr[i], child_field->type, &child)); + array->emplace_back(child); + } + + return Status::OK(); + } + + Status GetArray(const rj::Value& obj, const std::shared_ptr<DataType>& type, + std::shared_ptr<Array>* array) { + if (!obj.IsObject()) { + return Status::Invalid("Array element was not a JSON object"); + } + const auto& json_array = obj.GetObject(); + + const auto& json_length = json_array.FindMember("count"); + RETURN_NOT_INT("count", json_length, json_array); + int32_t length = json_length->value.GetInt(); + + const auto& json_valid_iter = json_array.FindMember("VALIDITY"); + RETURN_NOT_ARRAY("VALIDITY", json_valid_iter, json_array); + + const auto& json_validity = json_valid_iter->value.GetArray(); + + DCHECK_EQ(static_cast<int>(json_validity.Size()), length); + + std::vector<bool> is_valid; + for (const rj::Value& val : json_validity) { + DCHECK(val.IsInt()); + is_valid.push_back(static_cast<bool>(val.GetInt())); + } + +#define TYPE_CASE(TYPE) \ + case TYPE::type_id: \ + return ReadArray<TYPE>(json_array, length, is_valid, type, array); + +#define NOT_IMPLEMENTED_CASE(TYPE_ENUM) \ + case Type::TYPE_ENUM: { \ + std::stringstream ss; \ + ss << type->ToString(); \ + return Status::NotImplemented(ss.str()); \ + } + + switch (type->type) { + TYPE_CASE(NullType); + TYPE_CASE(BooleanType); + TYPE_CASE(UInt8Type); + TYPE_CASE(Int8Type); + TYPE_CASE(UInt16Type); + TYPE_CASE(Int16Type); + TYPE_CASE(UInt32Type); + TYPE_CASE(Int32Type); + TYPE_CASE(UInt64Type); + TYPE_CASE(Int64Type); + TYPE_CASE(HalfFloatType); + TYPE_CASE(FloatType); + TYPE_CASE(DoubleType); + TYPE_CASE(StringType); + TYPE_CASE(BinaryType); + NOT_IMPLEMENTED_CASE(DATE); + NOT_IMPLEMENTED_CASE(TIMESTAMP); + NOT_IMPLEMENTED_CASE(TIME); + NOT_IMPLEMENTED_CASE(INTERVAL); + TYPE_CASE(ListType); + TYPE_CASE(StructType); + NOT_IMPLEMENTED_CASE(UNION); + default: + std::stringstream ss; + ss << type->ToString(); + return Status::NotImplemented(ss.str()); + } + +#undef TYPE_CASE +#undef NOT_IMPLEMENTED_CASE + + return Status::OK(); + } + + private: + MemoryPool* pool_; +}; + +Status WriteJsonSchema(const Schema& schema, RjWriter* json_writer) { + JsonSchemaWriter converter(schema, json_writer); + return converter.Write(); +} + +Status ReadJsonSchema(const rj::Value& json_schema, std::shared_ptr<Schema>* schema) { + JsonSchemaReader converter(json_schema); + return converter.GetSchema(schema); +} + +Status WriteJsonArray( + const std::string& name, const Array& array, RjWriter* json_writer) { + JsonArrayWriter converter(name, array, json_writer); + return converter.Write(); +} + +Status ReadJsonArray(MemoryPool* pool, const rj::Value& json_array, + const std::shared_ptr<DataType>& type, std::shared_ptr<Array>* array) { + JsonArrayReader converter(pool); + return converter.GetArray(json_array, type, array); +} + +Status ReadJsonArray(MemoryPool* pool, const rj::Value& json_array, const Schema& schema, + std::shared_ptr<Array>* array) { + if (!json_array.IsObject()) { return Status::Invalid("Element was not a JSON object"); } + + const auto& json_obj = json_array.GetObject(); + + const auto& json_name = json_obj.FindMember("name"); + RETURN_NOT_STRING("name", json_name, json_obj); + + std::string name = json_name->value.GetString(); + + std::shared_ptr<Field> result = nullptr; + for (const std::shared_ptr<Field>& field : schema.fields()) { + if (field->name == name) { + result = field; + break; + } + } + + if (result == nullptr) { + std::stringstream ss; + ss << "Field named " << name << " not found in schema"; + return Status::KeyError(ss.str()); + } + + return ReadJsonArray(pool, json_array, result->type, array); +} + +} // namespace ipc +} // namespace arrow http://git-wip-us.apache.org/repos/asf/arrow/blob/ed6ec3b7/cpp/src/arrow/ipc/json-internal.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/json-internal.h b/cpp/src/arrow/ipc/json-internal.h new file mode 100644 index 0000000..0c167a4 --- /dev/null +++ b/cpp/src/arrow/ipc/json-internal.h @@ -0,0 +1,111 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef ARROW_IPC_JSON_INTERNAL_H +#define ARROW_IPC_JSON_INTERNAL_H + +#define RAPIDJSON_HAS_STDSTRING 1 +#define RAPIDJSON_HAS_CXX11_RVALUE_REFS 1 +#define RAPIDJSON_HAS_CXX11_RANGE_FOR 1 + +#include <memory> +#include <sstream> +#include <string> + +#include "rapidjson/document.h" +#include "rapidjson/stringbuffer.h" +#include "rapidjson/writer.h" + +#include "arrow/type_fwd.h" +#include "arrow/util/visibility.h" + +namespace rj = rapidjson; +using RjWriter = rj::Writer<rj::StringBuffer>; + +#define RETURN_NOT_FOUND(TOK, NAME, PARENT) \ + if (NAME == PARENT.MemberEnd()) { \ + std::stringstream ss; \ + ss << "field " << TOK << " not found"; \ + return Status::Invalid(ss.str()); \ + } + +#define RETURN_NOT_STRING(TOK, NAME, PARENT) \ + RETURN_NOT_FOUND(TOK, NAME, PARENT); \ + if (!NAME->value.IsString()) { \ + std::stringstream ss; \ + ss << "field was not a string" \ + << " line " << __LINE__; \ + return Status::Invalid(ss.str()); \ + } + +#define RETURN_NOT_BOOL(TOK, NAME, PARENT) \ + RETURN_NOT_FOUND(TOK, NAME, PARENT); \ + if (!NAME->value.IsBool()) { \ + std::stringstream ss; \ + ss << "field was not a boolean" \ + << " line " << __LINE__; \ + return Status::Invalid(ss.str()); \ + } + +#define RETURN_NOT_INT(TOK, NAME, PARENT) \ + RETURN_NOT_FOUND(TOK, NAME, PARENT); \ + if (!NAME->value.IsInt()) { \ + std::stringstream ss; \ + ss << "field was not an int" \ + << " line " << __LINE__; \ + return Status::Invalid(ss.str()); \ + } + +#define RETURN_NOT_ARRAY(TOK, NAME, PARENT) \ + RETURN_NOT_FOUND(TOK, NAME, PARENT); \ + if (!NAME->value.IsArray()) { \ + std::stringstream ss; \ + ss << "field was not an array" \ + << " line " << __LINE__; \ + return Status::Invalid(ss.str()); \ + } + +#define RETURN_NOT_OBJECT(TOK, NAME, PARENT) \ + RETURN_NOT_FOUND(TOK, NAME, PARENT); \ + if (!NAME->value.IsObject()) { \ + std::stringstream ss; \ + ss << "field was not an object" \ + << " line " << __LINE__; \ + return Status::Invalid(ss.str()); \ + } + +namespace arrow { +namespace ipc { + +// TODO(wesm): Only exporting these because arrow_ipc does not have a static +// library at the moment. Better to not export +Status ARROW_EXPORT WriteJsonSchema(const Schema& schema, RjWriter* json_writer); +Status ARROW_EXPORT WriteJsonArray( + const std::string& name, const Array& array, RjWriter* json_writer); + +Status ARROW_EXPORT ReadJsonSchema( + const rj::Value& json_obj, std::shared_ptr<Schema>* schema); +Status ARROW_EXPORT ReadJsonArray(MemoryPool* pool, const rj::Value& json_obj, + const std::shared_ptr<DataType>& type, std::shared_ptr<Array>* array); + +Status ARROW_EXPORT ReadJsonArray(MemoryPool* pool, const rj::Value& json_obj, + const Schema& schema, std::shared_ptr<Array>* array); + +} // namespace ipc +} // namespace arrow + +#endif // ARROW_IPC_JSON_INTERNAL_H http://git-wip-us.apache.org/repos/asf/arrow/blob/ed6ec3b7/cpp/src/arrow/ipc/json.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/json.cc b/cpp/src/arrow/ipc/json.cc new file mode 100644 index 0000000..2281611 --- /dev/null +++ b/cpp/src/arrow/ipc/json.cc @@ -0,0 +1,219 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/ipc/json.h" + +#include <cstdint> +#include <memory> +#include <string> +#include <vector> + +#include "arrow/array.h" +#include "arrow/ipc/json-internal.h" +#include "arrow/schema.h" +#include "arrow/table.h" +#include "arrow/type.h" +#include "arrow/util/buffer.h" +#include "arrow/util/logging.h" +#include "arrow/util/memory-pool.h" +#include "arrow/util/status.h" + +namespace arrow { +namespace ipc { + +// ---------------------------------------------------------------------- +// Writer implementation + +class JsonWriter::JsonWriterImpl { + public: + explicit JsonWriterImpl(const std::shared_ptr<Schema>& schema) : schema_(schema) { + writer_.reset(new RjWriter(string_buffer_)); + } + + Status Start() { + writer_->StartObject(); + + writer_->Key("schema"); + RETURN_NOT_OK(WriteJsonSchema(*schema_.get(), writer_.get())); + + // Record batches + writer_->Key("batches"); + writer_->StartArray(); + return Status::OK(); + } + + Status Finish(std::string* result) { + writer_->EndArray(); // Record batches + writer_->EndObject(); + + *result = string_buffer_.GetString(); + return Status::OK(); + } + + Status WriteRecordBatch( + const std::vector<std::shared_ptr<Array>>& columns, int32_t num_rows) { + DCHECK_EQ(static_cast<int>(columns.size()), schema_->num_fields()); + + writer_->StartObject(); + writer_->Key("count"); + writer_->Int(num_rows); + + writer_->Key("columns"); + writer_->StartArray(); + + for (int i = 0; i < schema_->num_fields(); ++i) { + const std::shared_ptr<Array>& column = columns[i]; + + DCHECK_EQ(num_rows, column->length()) + << "Array length did not match record batch length"; + + RETURN_NOT_OK( + WriteJsonArray(schema_->field(i)->name, *column.get(), writer_.get())); + } + + writer_->EndArray(); + writer_->EndObject(); + return Status::OK(); + } + + private: + std::shared_ptr<Schema> schema_; + + rj::StringBuffer string_buffer_; + std::unique_ptr<RjWriter> writer_; +}; + +JsonWriter::JsonWriter(const std::shared_ptr<Schema>& schema) { + impl_.reset(new JsonWriterImpl(schema)); +} + +JsonWriter::~JsonWriter() {} + +Status JsonWriter::Open( + const std::shared_ptr<Schema>& schema, std::unique_ptr<JsonWriter>* writer) { + *writer = std::unique_ptr<JsonWriter>(new JsonWriter(schema)); + return (*writer)->impl_->Start(); +} + +Status JsonWriter::Finish(std::string* result) { + return impl_->Finish(result); +} + +Status JsonWriter::WriteRecordBatch( + const std::vector<std::shared_ptr<Array>>& columns, int32_t num_rows) { + return impl_->WriteRecordBatch(columns, num_rows); +} + +// ---------------------------------------------------------------------- +// Reader implementation + +class JsonReader::JsonReaderImpl { + public: + JsonReaderImpl(MemoryPool* pool, const std::shared_ptr<Buffer>& data) + : pool_(pool), data_(data) {} + + Status ParseAndReadSchema() { + doc_.Parse(reinterpret_cast<const rj::Document::Ch*>(data_->data()), + static_cast<size_t>(data_->size())); + if (doc_.HasParseError()) { return Status::IOError("JSON parsing failed"); } + + auto it = doc_.FindMember("schema"); + RETURN_NOT_OBJECT("schema", it, doc_); + RETURN_NOT_OK(ReadJsonSchema(it->value, &schema_)); + + it = doc_.FindMember("batches"); + RETURN_NOT_ARRAY("batches", it, doc_); + record_batches_ = &it->value; + + return Status::OK(); + } + + Status GetRecordBatch(int i, std::shared_ptr<RecordBatch>* batch) const { + DCHECK_GE(i, 0) << "i out of bounds"; + DCHECK_LT(i, static_cast<int>(record_batches_->GetArray().Size())) + << "i out of bounds"; + + const auto& batch_val = record_batches_->GetArray()[i]; + DCHECK(batch_val.IsObject()); + + const auto& batch_obj = batch_val.GetObject(); + + auto it = batch_obj.FindMember("count"); + RETURN_NOT_INT("count", it, batch_obj); + int32_t num_rows = static_cast<int32_t>(it->value.GetInt()); + + it = batch_obj.FindMember("columns"); + RETURN_NOT_ARRAY("columns", it, batch_obj); + const auto& json_columns = it->value.GetArray(); + + std::vector<std::shared_ptr<Array>> columns(json_columns.Size()); + for (size_t i = 0; i < columns.size(); ++i) { + const std::shared_ptr<DataType>& type = schema_->field(i)->type; + RETURN_NOT_OK(ReadJsonArray(pool_, json_columns[i], type, &columns[i])); + } + + *batch = std::make_shared<RecordBatch>(schema_, num_rows, columns); + return Status::OK(); + } + + std::shared_ptr<Schema> schema() const { return schema_; } + + int num_record_batches() const { + return static_cast<int>(record_batches_->GetArray().Size()); + } + + private: + MemoryPool* pool_; + std::shared_ptr<Buffer> data_; + rj::Document doc_; + + const rj::Value* record_batches_; + + std::shared_ptr<Schema> schema_; +}; + +JsonReader::JsonReader(MemoryPool* pool, const std::shared_ptr<Buffer>& data) { + impl_.reset(new JsonReaderImpl(pool, data)); +} + +JsonReader::~JsonReader() {} + +Status JsonReader::Open( + const std::shared_ptr<Buffer>& data, std::unique_ptr<JsonReader>* reader) { + return Open(default_memory_pool(), data, reader); +} + +Status JsonReader::Open(MemoryPool* pool, const std::shared_ptr<Buffer>& data, + std::unique_ptr<JsonReader>* reader) { + *reader = std::unique_ptr<JsonReader>(new JsonReader(pool, data)); + return (*reader)->impl_->ParseAndReadSchema(); +} + +std::shared_ptr<Schema> JsonReader::schema() const { + return impl_->schema(); +} + +int JsonReader::num_record_batches() const { + return impl_->num_record_batches(); +} + +Status JsonReader::GetRecordBatch(int i, std::shared_ptr<RecordBatch>* batch) const { + return impl_->GetRecordBatch(i, batch); +} + +} // namespace ipc +} // namespace arrow http://git-wip-us.apache.org/repos/asf/arrow/blob/ed6ec3b7/cpp/src/arrow/ipc/json.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/json.h b/cpp/src/arrow/ipc/json.h new file mode 100644 index 0000000..7395be4 --- /dev/null +++ b/cpp/src/arrow/ipc/json.h @@ -0,0 +1,92 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Implement Arrow JSON serialization format + +#ifndef ARROW_IPC_JSON_H +#define ARROW_IPC_JSON_H + +#include <memory> +#include <string> +#include <vector> + +#include "arrow/type_fwd.h" +#include "arrow/util/visibility.h" + +namespace arrow { +namespace io { + +class OutputStream; +class ReadableFileInterface; + +} // namespace io + +namespace ipc { + +class ARROW_EXPORT JsonWriter { + public: + ~JsonWriter(); + + static Status Open( + const std::shared_ptr<Schema>& schema, std::unique_ptr<JsonWriter>* out); + + // TODO(wesm): Write dictionaries + + Status WriteRecordBatch( + const std::vector<std::shared_ptr<Array>>& columns, int32_t num_rows); + + Status Finish(std::string* result); + + private: + explicit JsonWriter(const std::shared_ptr<Schema>& schema); + + // Hide RapidJSON details from public API + class JsonWriterImpl; + std::unique_ptr<JsonWriterImpl> impl_; +}; + +// TODO(wesm): Read from a file stream rather than an in-memory buffer +class ARROW_EXPORT JsonReader { + public: + ~JsonReader(); + + static Status Open(MemoryPool* pool, const std::shared_ptr<Buffer>& data, + std::unique_ptr<JsonReader>* reader); + + // Use the default memory pool + static Status Open( + const std::shared_ptr<Buffer>& data, std::unique_ptr<JsonReader>* reader); + + std::shared_ptr<Schema> schema() const; + + int num_record_batches() const; + + // Read a record batch from the file + Status GetRecordBatch(int i, std::shared_ptr<RecordBatch>* batch) const; + + private: + JsonReader(MemoryPool* pool, const std::shared_ptr<Buffer>& data); + + // Hide RapidJSON details from public API + class JsonReaderImpl; + std::unique_ptr<JsonReaderImpl> impl_; +}; + +} // namespace ipc +} // namespace arrow + +#endif // ARROW_IPC_JSON_H http://git-wip-us.apache.org/repos/asf/arrow/blob/ed6ec3b7/cpp/src/arrow/ipc/test-common.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/test-common.h b/cpp/src/arrow/ipc/test-common.h index 784e238..9abc20d 100644 --- a/cpp/src/arrow/ipc/test-common.h +++ b/cpp/src/arrow/ipc/test-common.h @@ -27,6 +27,7 @@ #include "arrow/array.h" #include "arrow/table.h" #include "arrow/test-util.h" +#include "arrow/type.h" #include "arrow/types/list.h" #include "arrow/types/primitive.h" #include "arrow/types/string.h" @@ -39,15 +40,14 @@ namespace arrow { namespace ipc { const auto kInt32 = std::make_shared<Int32Type>(); -const auto kListInt32 = std::make_shared<ListType>(kInt32); -const auto kListListInt32 = std::make_shared<ListType>(kListInt32); +const auto kListInt32 = list(kInt32); +const auto kListListInt32 = list(kListInt32); Status MakeRandomInt32Array( int32_t length, bool include_nulls, MemoryPool* pool, std::shared_ptr<Array>* out) { std::shared_ptr<PoolBuffer> data; test::MakeRandomInt32PoolBuffer(length, pool, &data); - const auto kInt32 = std::make_shared<Int32Type>(); - Int32Builder builder(pool, kInt32); + Int32Builder builder(pool, int32()); if (include_nulls) { std::shared_ptr<PoolBuffer> valid_bytes; test::MakeRandomBytePoolBuffer(length, pool, &valid_bytes); @@ -134,8 +134,8 @@ Status MakeRandomBinaryArray( Status MakeStringTypesRecordBatch(std::shared_ptr<RecordBatch>* out) { const int32_t length = 500; - auto string_type = std::make_shared<StringType>(); - auto binary_type = std::make_shared<BinaryType>(); + auto string_type = utf8(); + auto binary_type = binary(); auto f0 = std::make_shared<Field>("f0", string_type); auto f1 = std::make_shared<Field>("f1", binary_type); std::shared_ptr<Schema> schema(new Schema({f0, f1})); @@ -233,7 +233,7 @@ Status MakeDeeplyNestedList(std::shared_ptr<RecordBatch>* out) { const bool include_nulls = true; RETURN_NOT_OK(MakeRandomInt32Array(1000, include_nulls, pool, &array)); for (int i = 0; i < 63; ++i) { - type = std::static_pointer_cast<DataType>(std::make_shared<ListType>(type)); + type = std::static_pointer_cast<DataType>(list(type)); RETURN_NOT_OK(MakeRandomListArray(array, batch_length, include_nulls, pool, &array)); } http://git-wip-us.apache.org/repos/asf/arrow/blob/ed6ec3b7/cpp/src/arrow/schema-test.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/schema-test.cc b/cpp/src/arrow/schema-test.cc index 8cc80be..4826199 100644 --- a/cpp/src/arrow/schema-test.cc +++ b/cpp/src/arrow/schema-test.cc @@ -29,23 +29,21 @@ using std::vector; namespace arrow { -const auto INT32 = std::make_shared<Int32Type>(); - TEST(TestField, Basics) { - Field f0("f0", INT32); - Field f0_nn("f0", INT32, false); + Field f0("f0", int32()); + Field f0_nn("f0", int32(), false); ASSERT_EQ(f0.name, "f0"); - ASSERT_EQ(f0.type->ToString(), INT32->ToString()); + ASSERT_EQ(f0.type->ToString(), int32()->ToString()); ASSERT_TRUE(f0.nullable); ASSERT_FALSE(f0_nn.nullable); } TEST(TestField, Equals) { - Field f0("f0", INT32); - Field f0_nn("f0", INT32, false); - Field f0_other("f0", INT32); + Field f0("f0", int32()); + Field f0_nn("f0", int32(), false); + Field f0_other("f0", int32()); ASSERT_EQ(f0, f0_other); ASSERT_NE(f0, f0_nn); @@ -57,11 +55,11 @@ class TestSchema : public ::testing::Test { }; TEST_F(TestSchema, Basics) { - auto f0 = std::make_shared<Field>("f0", INT32); - auto f1 = std::make_shared<Field>("f1", std::make_shared<UInt8Type>(), false); - auto f1_optional = std::make_shared<Field>("f1", std::make_shared<UInt8Type>()); + auto f0 = field("f0", int32()); + auto f1 = field("f1", uint8(), false); + auto f1_optional = field("f1", uint8()); - auto f2 = std::make_shared<Field>("f2", std::make_shared<StringType>()); + auto f2 = field("f2", utf8()); vector<shared_ptr<Field>> fields = {f0, f1, f2}; auto schema = std::make_shared<Schema>(fields); @@ -83,11 +81,10 @@ TEST_F(TestSchema, Basics) { } TEST_F(TestSchema, ToString) { - auto f0 = std::make_shared<Field>("f0", INT32); - auto f1 = std::make_shared<Field>("f1", std::make_shared<UInt8Type>(), false); - auto f2 = std::make_shared<Field>("f2", std::make_shared<StringType>()); - auto f3 = std::make_shared<Field>( - "f3", std::make_shared<ListType>(std::make_shared<Int16Type>())); + auto f0 = field("f0", int32()); + auto f1 = field("f1", uint8(), false); + auto f2 = field("f2", utf8()); + auto f3 = field("f3", list(int16())); vector<shared_ptr<Field>> fields = {f0, f1, f2, f3}; auto schema = std::make_shared<Schema>(fields); @@ -101,4 +98,25 @@ f3: list<item: int16>)"; ASSERT_EQ(expected, result); } +TEST_F(TestSchema, GetFieldByName) { + auto f0 = field("f0", int32()); + auto f1 = field("f1", uint8(), false); + auto f2 = field("f2", utf8()); + auto f3 = field("f3", list(int16())); + + vector<shared_ptr<Field>> fields = {f0, f1, f2, f3}; + auto schema = std::make_shared<Schema>(fields); + + std::shared_ptr<Field> result; + + result = schema->GetFieldByName("f1"); + ASSERT_TRUE(f1->Equals(result)); + + result = schema->GetFieldByName("f3"); + ASSERT_TRUE(f3->Equals(result)); + + result = schema->GetFieldByName("not-found"); + ASSERT_TRUE(result == nullptr); +} + } // namespace arrow http://git-wip-us.apache.org/repos/asf/arrow/blob/ed6ec3b7/cpp/src/arrow/schema.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/schema.cc b/cpp/src/arrow/schema.cc index ff3ea19..cd8256e 100644 --- a/cpp/src/arrow/schema.cc +++ b/cpp/src/arrow/schema.cc @@ -42,6 +42,21 @@ bool Schema::Equals(const std::shared_ptr<Schema>& other) const { return Equals(*other.get()); } +std::shared_ptr<Field> Schema::GetFieldByName(const std::string& name) { + if (fields_.size() > 0 && name_to_index_.size() == 0) { + for (size_t i = 0; i < fields_.size(); ++i) { + name_to_index_[fields_[i]->name] = i; + } + } + + auto it = name_to_index_.find(name); + if (it == name_to_index_.end()) { + return nullptr; + } else { + return fields_[it->second]; + } +} + std::string Schema::ToString() const { std::stringstream buffer; http://git-wip-us.apache.org/repos/asf/arrow/blob/ed6ec3b7/cpp/src/arrow/schema.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/schema.h b/cpp/src/arrow/schema.h index 4301968..0e1ab5c 100644 --- a/cpp/src/arrow/schema.h +++ b/cpp/src/arrow/schema.h @@ -20,14 +20,14 @@ #include <memory> #include <string> +#include <unordered_map> #include <vector> +#include "arrow/type.h" #include "arrow/util/visibility.h" namespace arrow { -struct Field; - class ARROW_EXPORT Schema { public: explicit Schema(const std::vector<std::shared_ptr<Field>>& fields); @@ -37,7 +37,12 @@ class ARROW_EXPORT Schema { bool Equals(const std::shared_ptr<Schema>& other) const; // Return the ith schema element. Does not boundscheck - const std::shared_ptr<Field>& field(int i) const { return fields_[i]; } + std::shared_ptr<Field> field(int i) const { return fields_[i]; } + + // Returns nullptr if name not found + std::shared_ptr<Field> GetFieldByName(const std::string& name); + + const std::vector<std::shared_ptr<Field>>& fields() const { return fields_; } // Render a string representation of the schema suitable for debugging std::string ToString() const; @@ -46,6 +51,7 @@ class ARROW_EXPORT Schema { private: std::vector<std::shared_ptr<Field>> fields_; + std::unordered_map<std::string, int> name_to_index_; }; } // namespace arrow http://git-wip-us.apache.org/repos/asf/arrow/blob/ed6ec3b7/cpp/src/arrow/test-util.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/test-util.h b/cpp/src/arrow/test-util.h index ac56f5e..ab4b980 100644 --- a/cpp/src/arrow/test-util.h +++ b/cpp/src/arrow/test-util.h @@ -27,6 +27,7 @@ #include "gtest/gtest.h" +#include "arrow/array.h" #include "arrow/column.h" #include "arrow/schema.h" #include "arrow/table.h" @@ -102,20 +103,57 @@ void random_real(int n, uint32_t seed, T min_value, T max_value, std::vector<T>* } template <typename T> -std::shared_ptr<Buffer> to_buffer(const std::vector<T>& values) { +std::shared_ptr<Buffer> GetBufferFromVector(const std::vector<T>& values) { return std::make_shared<Buffer>( reinterpret_cast<const uint8_t*>(values.data()), values.size() * sizeof(T)); } +template <typename T> +inline Status CopyBufferFromVector( + const std::vector<T>& values, std::shared_ptr<Buffer>* result) { + int64_t nbytes = static_cast<int>(values.size()) * sizeof(T); + + auto buffer = std::make_shared<PoolBuffer>(default_memory_pool()); + RETURN_NOT_OK(buffer->Resize(nbytes)); + memcpy(buffer->mutable_data(), values.data(), nbytes); + + *result = buffer; + return Status::OK(); +} + +static inline Status GetBitmapFromBoolVector( + const std::vector<bool>& is_valid, std::shared_ptr<Buffer>* result) { + int length = static_cast<int>(is_valid.size()); + + std::shared_ptr<MutableBuffer> buffer; + RETURN_NOT_OK(GetEmptyBitmap(default_memory_pool(), length, &buffer)); + + uint8_t* bitmap = buffer->mutable_data(); + for (int i = 0; i < length; ++i) { + if (is_valid[i]) { BitUtil::SetBit(bitmap, i); } + } + + *result = buffer; + return Status::OK(); +} + // Sets approximately pct_null of the first n bytes in null_bytes to zero // and the rest to non-zero (true) values. -void random_null_bytes(int64_t n, double pct_null, uint8_t* null_bytes) { +static inline void random_null_bytes(int64_t n, double pct_null, uint8_t* null_bytes) { Random rng(random_seed()); for (int i = 0; i < n; ++i) { null_bytes[i] = rng.NextDoubleFraction() > pct_null; } } +static inline void random_is_valid( + int64_t n, double pct_null, std::vector<bool>* is_valid) { + Random rng(random_seed()); + for (int i = 0; i < n; ++i) { + is_valid->push_back(rng.NextDoubleFraction() > pct_null); + } +} + static inline void random_bytes(int n, uint32_t seed, uint8_t* out) { std::mt19937 gen(seed); std::uniform_int_distribution<int> d(0, 255); @@ -125,6 +163,15 @@ static inline void random_bytes(int n, uint32_t seed, uint8_t* out) { } } +static inline void random_ascii(int n, uint32_t seed, uint8_t* out) { + std::mt19937 gen(seed); + std::uniform_int_distribution<int> d(65, 122); + + for (int i = 0; i < n; ++i) { + out[i] = d(gen) & 0xFF; + } +} + template <typename T> void rand_uniform_int(int n, uint32_t seed, T min_value, T max_value, T* out) { DCHECK(out);
