ARROW-373: [C++] JSON serialization format for testing

C++ version of ARROW-372

Author: Wes McKinney <[email protected]>

Closes #202 from wesm/ARROW-373 and squashes the following commits:

d13a05f [Wes McKinney] Compiler warning
72c24fe [Wes McKinney] Add a minimal literal JSON example
a2cf47b [Wes McKinney] cpplint
3d9fcc2 [Wes McKinney] Complete round trip json file test with multiple record 
batches
2753449 [Wes McKinney] Complete draft json roundtrip implementation. tests not 
complete yet
3d6bbbd [Wes McKinney] Start high level writer scaffold
6bbd669 [Wes McKinney] Tweaks
e2e86b5 [Wes McKinney] Test JSON array roundtrip for numeric types, strings, 
lists, structs
82f108b [Wes McKinney] Refactoring. Array test scaffold
0891378 [Wes McKinney] Declare loop variables
6566343 [Wes McKinney] Recursively construct children for list/struct
35c2f85 [Wes McKinney] Refactoring. Start drafting string/list reader
f26402a [Wes McKinney] Install type_traits.h. cpplint
4fc7294 [Wes McKinney] Refactoring, type attribute consistency. Array reader 
compiles
2c93cce [Wes McKinney] WIP JSON array reader code path
932ba7a [Wes McKinney] Add ArrayVisitor methods, add enough metaprogramming to 
detect presence of c_type type member
15c1094 [Wes McKinney] Add type traits, refactoring, drafting json array 
writing. not working yet
209ba48 [Wes McKinney] More types refactoring. Strange linker error in pyarrow
379da3c [Wes McKinney] Implement union metadata JSON serialization
5fbea41 [Wes McKinney] Implement some more json types and add convenience 
factory functions
1c08233 [Wes McKinney] JSON schema roundtrip passing for many types
86c9559 [Wes McKinney] Add convenience factory functions for common types
3b9d14e [Wes McKinney] Add type-specific JSON metadata to schema writer
820b0f2 [Wes McKinney] Drafting JSON schema read/write
68ee7ab [Wes McKinney] Move forward declarations into type_fwd.h
1edf2a9 [Wes McKinney] Prototyping out visitor pattern for json serialization
24c1d5d [Wes McKinney] Some Types refactoring, add TypeVisitor abstract class. 
Add RapidJSON as external project


Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/ed6ec3b7
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/ed6ec3b7
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/ed6ec3b7

Branch: refs/heads/master
Commit: ed6ec3b76e1ac27fab85cd4bc74fbd61e8dfb27f
Parents: 8417096
Author: Wes McKinney <[email protected]>
Authored: Fri Nov 18 14:58:46 2016 -0500
Committer: Wes McKinney <[email protected]>
Committed: Fri Nov 18 14:58:46 2016 -0500

----------------------------------------------------------------------
 cpp/CMakeLists.txt                    |   19 +
 cpp/src/arrow/CMakeLists.txt          |    2 +
 cpp/src/arrow/array.cc                |   15 +
 cpp/src/arrow/array.h                 |   12 +
 cpp/src/arrow/column-test.cc          |    1 +
 cpp/src/arrow/io/hdfs.cc              |    8 +-
 cpp/src/arrow/io/libhdfs_shim.cc      |   26 +-
 cpp/src/arrow/ipc/CMakeLists.txt      |    7 +
 cpp/src/arrow/ipc/adapter.cc          |    2 +-
 cpp/src/arrow/ipc/ipc-json-test.cc    |  353 +++++++++
 cpp/src/arrow/ipc/json-internal.cc    | 1113 ++++++++++++++++++++++++++++
 cpp/src/arrow/ipc/json-internal.h     |  111 +++
 cpp/src/arrow/ipc/json.cc             |  219 ++++++
 cpp/src/arrow/ipc/json.h              |   92 +++
 cpp/src/arrow/ipc/test-common.h       |   14 +-
 cpp/src/arrow/schema-test.cc          |   52 +-
 cpp/src/arrow/schema.cc               |   15 +
 cpp/src/arrow/schema.h                |   12 +-
 cpp/src/arrow/test-util.h             |   51 +-
 cpp/src/arrow/type.cc                 |  122 ++-
 cpp/src/arrow/type.h                  |  338 +++++++--
 cpp/src/arrow/type_fwd.h              |  157 ++++
 cpp/src/arrow/type_traits.h           |  197 +++++
 cpp/src/arrow/types/CMakeLists.txt    |    1 -
 cpp/src/arrow/types/collection.h      |   41 -
 cpp/src/arrow/types/datetime.h        |   37 +-
 cpp/src/arrow/types/decimal.h         |   14 +-
 cpp/src/arrow/types/list-test.cc      |    2 +-
 cpp/src/arrow/types/list.cc           |    4 +
 cpp/src/arrow/types/list.h            |    8 +-
 cpp/src/arrow/types/primitive-test.cc |   36 +-
 cpp/src/arrow/types/primitive.cc      |   97 ++-
 cpp/src/arrow/types/primitive.h       |  190 ++---
 cpp/src/arrow/types/string-test.cc    |   12 +-
 cpp/src/arrow/types/string.cc         |   16 +-
 cpp/src/arrow/types/string.h          |   24 +-
 cpp/src/arrow/types/struct-test.cc    |    2 +-
 cpp/src/arrow/types/struct.cc         |    4 +
 cpp/src/arrow/types/struct.h          |    4 +
 cpp/src/arrow/types/test-common.h     |   16 +
 cpp/src/arrow/types/union.cc          |   23 +-
 cpp/src/arrow/types/union.h           |   21 -
 cpp/src/arrow/util/logging.h          |    4 +-
 format/Metadata.md                    |    5 +
 44 files changed, 3049 insertions(+), 450 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/arrow/blob/ed6ec3b7/cpp/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt
index 6f95483..0bff752 100644
--- a/cpp/CMakeLists.txt
+++ b/cpp/CMakeLists.txt
@@ -545,6 +545,25 @@ if(ARROW_BUILD_BENCHMARKS)
   endif()
 endif()
 
+# RapidJSON, header only dependency
+if("$ENV{RAPIDJSON_HOME}" STREQUAL "")
+  ExternalProject_Add(rapidjson_ep
+    PREFIX "${CMAKE_BINARY_DIR}"
+    URL "https://github.com/miloyip/rapidjson/archive/v1.1.0.tar.gz";
+    URL_MD5 "badd12c511e081fec6c89c43a7027bce"
+    CONFIGURE_COMMAND ""
+    BUILD_COMMAND ""
+    BUILD_IN_SOURCE 1
+    INSTALL_COMMAND "")
+
+  ExternalProject_Get_Property(rapidjson_ep SOURCE_DIR)
+  set(RAPIDJSON_INCLUDE_DIR "${SOURCE_DIR}/include")
+else()
+  set(RAPIDJSON_INCLUDE_DIR "$ENV{RAPIDJSON_HOME}/include")
+endif()
+message(STATUS "RapidJSON include dir: ${RAPIDJSON_INCLUDE_DIR}")
+include_directories(SYSTEM ${RAPIDJSON_INCLUDE_DIR})
+
 ## Google PerfTools
 ##
 ## Disabled with TSAN/ASAN as well as with gold+dynamic linking (see comment

http://git-wip-us.apache.org/repos/asf/arrow/blob/ed6ec3b7/cpp/src/arrow/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt
index a9b2fec..81851bc 100644
--- a/cpp/src/arrow/CMakeLists.txt
+++ b/cpp/src/arrow/CMakeLists.txt
@@ -24,6 +24,8 @@ install(FILES
   schema.h
   table.h
   type.h
+  type_fwd.h
+  type_traits.h
   test-util.h
   DESTINATION include/arrow)
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/ed6ec3b7/cpp/src/arrow/array.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/array.cc b/cpp/src/arrow/array.cc
index e432a53..3262425 100644
--- a/cpp/src/arrow/array.cc
+++ b/cpp/src/arrow/array.cc
@@ -18,6 +18,7 @@
 #include "arrow/array.h"
 
 #include <cstdint>
+#include <cstring>
 
 #include "arrow/util/bit-util.h"
 #include "arrow/util/buffer.h"
@@ -25,6 +26,16 @@
 
 namespace arrow {
 
+Status GetEmptyBitmap(
+    MemoryPool* pool, int32_t length, std::shared_ptr<MutableBuffer>* result) {
+  auto buffer = std::make_shared<PoolBuffer>(pool);
+  RETURN_NOT_OK(buffer->Resize(BitUtil::BytesForBits(length)));
+  memset(buffer->mutable_data(), 0, buffer->size());
+
+  *result = buffer;
+  return Status::OK();
+}
+
 // ----------------------------------------------------------------------
 // Base array class
 
@@ -66,4 +77,8 @@ bool NullArray::RangeEquals(int32_t start_idx, int32_t 
end_idx, int32_t other_st
   return true;
 }
 
+Status NullArray::Accept(ArrayVisitor* visitor) const {
+  return visitor->Visit(*this);
+}
+
 }  // namespace arrow

http://git-wip-us.apache.org/repos/asf/arrow/blob/ed6ec3b7/cpp/src/arrow/array.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/array.h b/cpp/src/arrow/array.h
index ff37323..ff2b70e 100644
--- a/cpp/src/arrow/array.h
+++ b/cpp/src/arrow/array.h
@@ -29,6 +29,8 @@
 namespace arrow {
 
 class Buffer;
+class MemoryPool;
+class MutableBuffer;
 class Status;
 
 // Immutable data array with some logical type and some length. Any memory is
@@ -70,6 +72,8 @@ class ARROW_EXPORT Array {
   // returning Status::OK.  This can be an expensive check.
   virtual Status Validate() const;
 
+  virtual Status Accept(ArrayVisitor* visitor) const = 0;
+
  protected:
   std::shared_ptr<DataType> type_;
   int32_t null_count_;
@@ -86,6 +90,8 @@ class ARROW_EXPORT Array {
 // Degenerate null type Array
 class ARROW_EXPORT NullArray : public Array {
  public:
+  using TypeClass = NullType;
+
   NullArray(const std::shared_ptr<DataType>& type, int32_t length)
       : Array(type, length, length, nullptr) {}
 
@@ -94,9 +100,15 @@ class ARROW_EXPORT NullArray : public Array {
   bool Equals(const std::shared_ptr<Array>& arr) const override;
   bool RangeEquals(int32_t start_idx, int32_t end_idx, int32_t 
other_start_index,
       const std::shared_ptr<Array>& arr) const override;
+
+  Status Accept(ArrayVisitor* visitor) const override;
 };
 
 typedef std::shared_ptr<Array> ArrayPtr;
+
+Status ARROW_EXPORT GetEmptyBitmap(
+    MemoryPool* pool, int32_t length, std::shared_ptr<MutableBuffer>* result);
+
 }  // namespace arrow
 
 #endif

http://git-wip-us.apache.org/repos/asf/arrow/blob/ed6ec3b7/cpp/src/arrow/column-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/column-test.cc b/cpp/src/arrow/column-test.cc
index 1edf313..ac3636d 100644
--- a/cpp/src/arrow/column-test.cc
+++ b/cpp/src/arrow/column-test.cc
@@ -22,6 +22,7 @@
 
 #include "gtest/gtest.h"
 
+#include "arrow/array.h"
 #include "arrow/column.h"
 #include "arrow/schema.h"
 #include "arrow/test-util.h"

http://git-wip-us.apache.org/repos/asf/arrow/blob/ed6ec3b7/cpp/src/arrow/io/hdfs.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/hdfs.cc b/cpp/src/arrow/io/hdfs.cc
index 6490a75..13491e7 100644
--- a/cpp/src/arrow/io/hdfs.cc
+++ b/cpp/src/arrow/io/hdfs.cc
@@ -289,13 +289,9 @@ class HdfsClient::HdfsClientImpl {
 
     // connect to HDFS with the builder object
     hdfsBuilder* builder = hdfsNewBuilder();
-    if (!config->host.empty()) {
-      hdfsBuilderSetNameNode(builder, config->host.c_str());
-    }
+    if (!config->host.empty()) { hdfsBuilderSetNameNode(builder, 
config->host.c_str()); }
     hdfsBuilderSetNameNodePort(builder, config->port);
-    if (!config->user.empty()) {
-      hdfsBuilderSetUserName(builder, config->user.c_str());
-    }
+    if (!config->user.empty()) { hdfsBuilderSetUserName(builder, 
config->user.c_str()); }
     if (!config->kerb_ticket.empty()) {
       hdfsBuilderSetKerbTicketCachePath(builder, config->kerb_ticket.c_str());
     }

http://git-wip-us.apache.org/repos/asf/arrow/blob/ed6ec3b7/cpp/src/arrow/io/libhdfs_shim.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/libhdfs_shim.cc b/cpp/src/arrow/io/libhdfs_shim.cc
index 1fee595..36b8a4e 100644
--- a/cpp/src/arrow/io/libhdfs_shim.cc
+++ b/cpp/src/arrow/io/libhdfs_shim.cc
@@ -74,12 +74,9 @@ static HINSTANCE libjvm_handle = NULL;
 // NOTE(wesm): cpplint does not like use of short and other imprecise C types
 
 static hdfsBuilder* (*ptr_hdfsNewBuilder)(void) = NULL;
-static void (*ptr_hdfsBuilderSetNameNode)(
-    hdfsBuilder* bld, const char* nn) = NULL;
-static void (*ptr_hdfsBuilderSetNameNodePort)(
-    hdfsBuilder* bld, tPort port) = NULL;
-static void (*ptr_hdfsBuilderSetUserName)(
-    hdfsBuilder* bld, const char* userName) = NULL;
+static void (*ptr_hdfsBuilderSetNameNode)(hdfsBuilder* bld, const char* nn) = 
NULL;
+static void (*ptr_hdfsBuilderSetNameNodePort)(hdfsBuilder* bld, tPort port) = 
NULL;
+static void (*ptr_hdfsBuilderSetUserName)(hdfsBuilder* bld, const char* 
userName) = NULL;
 static void (*ptr_hdfsBuilderSetKerbTicketCachePath)(
     hdfsBuilder* bld, const char* kerbTicketCachePath) = NULL;
 static hdfsFS (*ptr_hdfsBuilderConnect)(hdfsBuilder* bld) = NULL;
@@ -173,9 +170,9 @@ void hdfsBuilderSetUserName(hdfsBuilder* bld, const char* 
userName) {
   ptr_hdfsBuilderSetUserName(bld, userName);
 }
 
-void hdfsBuilderSetKerbTicketCachePath(hdfsBuilder* bld,
-    const char* kerbTicketCachePath) {
-  ptr_hdfsBuilderSetKerbTicketCachePath(bld , kerbTicketCachePath);
+void hdfsBuilderSetKerbTicketCachePath(
+    hdfsBuilder* bld, const char* kerbTicketCachePath) {
+  ptr_hdfsBuilderSetKerbTicketCachePath(bld, kerbTicketCachePath);
 }
 
 hdfsFS hdfsBuilderConnect(hdfsBuilder* bld) {
@@ -364,7 +361,7 @@ static std::vector<fs::path> get_potential_libhdfs_paths() {
   std::vector<fs::path> libhdfs_potential_paths;
   std::string file_name;
 
-  // OS-specific file name
+// OS-specific file name
 #ifdef __WIN32
   file_name = "hdfs.dll";
 #elif __APPLE__
@@ -374,10 +371,7 @@ static std::vector<fs::path> get_potential_libhdfs_paths() 
{
 #endif
 
   // Common paths
-  std::vector<fs::path> search_paths = {
-      fs::path(""),
-      fs::path(".")
-  };
+  std::vector<fs::path> search_paths = {fs::path(""), fs::path(".")};
 
   // Path from environment variable
   const char* hadoop_home = std::getenv("HADOOP_HOME");
@@ -387,9 +381,7 @@ static std::vector<fs::path> get_potential_libhdfs_paths() {
   }
 
   const char* libhdfs_dir = std::getenv("ARROW_LIBHDFS_DIR");
-  if (libhdfs_dir != nullptr) {
-    search_paths.push_back(fs::path(libhdfs_dir));
-  }
+  if (libhdfs_dir != nullptr) { search_paths.push_back(fs::path(libhdfs_dir)); 
}
 
   // All paths with file name
   for (auto& path : search_paths) {

http://git-wip-us.apache.org/repos/asf/arrow/blob/ed6ec3b7/cpp/src/arrow/ipc/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/CMakeLists.txt b/cpp/src/arrow/ipc/CMakeLists.txt
index d2db339..6955bcb 100644
--- a/cpp/src/arrow/ipc/CMakeLists.txt
+++ b/cpp/src/arrow/ipc/CMakeLists.txt
@@ -34,6 +34,8 @@ set(ARROW_IPC_TEST_LINK_LIBS
 set(ARROW_IPC_SRCS
   adapter.cc
   file.cc
+  json.cc
+  json-internal.cc
   metadata.cc
   metadata-internal.cc
 )
@@ -79,6 +81,10 @@ ADD_ARROW_TEST(ipc-metadata-test)
 ARROW_TEST_LINK_LIBRARIES(ipc-metadata-test
   ${ARROW_IPC_TEST_LINK_LIBS})
 
+ADD_ARROW_TEST(ipc-json-test)
+ARROW_TEST_LINK_LIBRARIES(ipc-json-test
+  ${ARROW_IPC_TEST_LINK_LIBS})
+
 # make clean will delete the generated file
 set_source_files_properties(Metadata_generated.h PROPERTIES GENERATED TRUE)
 
@@ -114,6 +120,7 @@ add_dependencies(arrow_objlib metadata_fbs)
 install(FILES
   adapter.h
   file.h
+  json.h
   metadata.h
   DESTINATION include/arrow/ipc)
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/ed6ec3b7/cpp/src/arrow/ipc/adapter.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/adapter.cc b/cpp/src/arrow/ipc/adapter.cc
index 74786bf..da718c0 100644
--- a/cpp/src/arrow/ipc/adapter.cc
+++ b/cpp/src/arrow/ipc/adapter.cc
@@ -106,7 +106,7 @@ Status VisitArray(const Array* arr, 
std::vector<flatbuf::FieldNode>* field_nodes
     buffers->push_back(binary_arr->data());
   } else if (arr->type_enum() == Type::LIST) {
     const auto list_arr = static_cast<const ListArray*>(arr);
-    buffers->push_back(list_arr->offset_buffer());
+    buffers->push_back(list_arr->offsets());
     RETURN_NOT_OK(VisitArray(
         list_arr->values().get(), field_nodes, buffers, max_recursion_depth - 
1));
   } else if (arr->type_enum() == Type::STRUCT) {

http://git-wip-us.apache.org/repos/asf/arrow/blob/ed6ec3b7/cpp/src/arrow/ipc/ipc-json-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/ipc-json-test.cc 
b/cpp/src/arrow/ipc/ipc-json-test.cc
new file mode 100644
index 0000000..a51371c
--- /dev/null
+++ b/cpp/src/arrow/ipc/ipc-json-test.cc
@@ -0,0 +1,353 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <cstdint>
+#include <cstdio>
+#include <cstring>
+#include <iostream>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "gtest/gtest.h"
+
+#include "arrow/array.h"
+#include "arrow/ipc/json-internal.h"
+#include "arrow/ipc/json.h"
+#include "arrow/table.h"
+#include "arrow/test-util.h"
+#include "arrow/type.h"
+#include "arrow/type_traits.h"
+#include "arrow/types/primitive.h"
+#include "arrow/types/string.h"
+#include "arrow/types/struct.h"
+#include "arrow/util/memory-pool.h"
+#include "arrow/util/status.h"
+
+namespace arrow {
+namespace ipc {
+
+void TestSchemaRoundTrip(const Schema& schema) {
+  rj::StringBuffer sb;
+  rj::Writer<rj::StringBuffer> writer(sb);
+
+  ASSERT_OK(WriteJsonSchema(schema, &writer));
+
+  rj::Document d;
+  d.Parse(sb.GetString());
+
+  std::shared_ptr<Schema> out;
+  ASSERT_OK(ReadJsonSchema(d, &out));
+
+  ASSERT_TRUE(schema.Equals(out));
+}
+
+void TestArrayRoundTrip(const Array& array) {
+  static std::string name = "dummy";
+
+  rj::StringBuffer sb;
+  rj::Writer<rj::StringBuffer> writer(sb);
+
+  ASSERT_OK(WriteJsonArray(name, array, &writer));
+
+  std::string array_as_json = sb.GetString();
+
+  rj::Document d;
+  d.Parse(array_as_json);
+
+  if (d.HasParseError()) { FAIL() << "JSON parsing failed"; }
+
+  std::shared_ptr<Array> out;
+  ASSERT_OK(ReadJsonArray(default_memory_pool(), d, array.type(), &out));
+
+  ASSERT_TRUE(array.Equals(out)) << array_as_json;
+}
+
+template <typename T, typename ValueType>
+void CheckPrimitive(const std::shared_ptr<DataType>& type,
+    const std::vector<bool>& is_valid, const std::vector<ValueType>& values) {
+  MemoryPool* pool = default_memory_pool();
+  typename TypeTraits<T>::BuilderType builder(pool, type);
+
+  for (size_t i = 0; i < values.size(); ++i) {
+    if (is_valid[i]) {
+      ASSERT_OK(builder.Append(values[i]));
+    } else {
+      ASSERT_OK(builder.AppendNull());
+    }
+  }
+
+  std::shared_ptr<Array> array;
+  ASSERT_OK(builder.Finish(&array));
+  TestArrayRoundTrip(*array.get());
+}
+
+template <typename TYPE, typename C_TYPE>
+void MakeArray(const std::shared_ptr<DataType>& type, const std::vector<bool>& 
is_valid,
+    const std::vector<C_TYPE>& values, std::shared_ptr<Array>* out) {
+  std::shared_ptr<Buffer> values_buffer;
+  std::shared_ptr<Buffer> values_bitmap;
+
+  ASSERT_OK(test::CopyBufferFromVector(values, &values_buffer));
+  ASSERT_OK(test::GetBitmapFromBoolVector(is_valid, &values_bitmap));
+
+  using ArrayType = typename TypeTraits<TYPE>::ArrayType;
+
+  int32_t null_count = 0;
+  for (bool val : is_valid) {
+    if (!val) { ++null_count; }
+  }
+
+  *out = std::make_shared<ArrayType>(type, static_cast<int32_t>(values.size()),
+      values_buffer, null_count, values_bitmap);
+}
+
+TEST(TestJsonSchemaWriter, FlatTypes) {
+  std::vector<std::shared_ptr<Field>> fields = {field("f0", int8()),
+      field("f1", int16(), false), field("f2", int32()), field("f3", int64(), 
false),
+      field("f4", uint8()), field("f5", uint16()), field("f6", uint32()),
+      field("f7", uint64()), field("f8", float32()), field("f9", float64()),
+      field("f10", utf8()), field("f11", binary()), field("f12", 
list(int32())),
+      field("f13", struct_({field("s1", int32()), field("s2", utf8())})),
+      field("f14", date()), field("f15", timestamp(TimeUnit::NANO)),
+      field("f16", time(TimeUnit::MICRO)),
+      field("f17", union_({field("u1", int8()), field("u2", 
time(TimeUnit::MILLI))},
+                       {0, 1}, UnionMode::DENSE))};
+
+  Schema schema(fields);
+  TestSchemaRoundTrip(schema);
+}
+
+template <typename T>
+void PrimitiveTypesCheckOne() {
+  using c_type = typename T::c_type;
+
+  std::vector<bool> is_valid = {true, false, true, true, true, false, true, 
true};
+  std::vector<c_type> values = {0, 1, 2, 3, 4, 5, 6, 7};
+  CheckPrimitive<T, c_type>(std::make_shared<T>(), is_valid, values);
+}
+
+TEST(TestJsonArrayWriter, PrimitiveTypes) {
+  PrimitiveTypesCheckOne<Int8Type>();
+  PrimitiveTypesCheckOne<Int16Type>();
+  PrimitiveTypesCheckOne<Int32Type>();
+  PrimitiveTypesCheckOne<Int64Type>();
+  PrimitiveTypesCheckOne<UInt8Type>();
+  PrimitiveTypesCheckOne<UInt16Type>();
+  PrimitiveTypesCheckOne<UInt32Type>();
+  PrimitiveTypesCheckOne<UInt64Type>();
+  PrimitiveTypesCheckOne<FloatType>();
+  PrimitiveTypesCheckOne<DoubleType>();
+
+  std::vector<bool> is_valid = {true, false, true, true, true, false, true, 
true};
+  std::vector<std::string> values = {"foo", "bar", "", "baz", "qux", "foo", 
"a", "1"};
+
+  CheckPrimitive<StringType, std::string>(utf8(), is_valid, values);
+  CheckPrimitive<BinaryType, std::string>(binary(), is_valid, values);
+}
+
+TEST(TestJsonArrayWriter, NestedTypes) {
+  auto value_type = int32();
+
+  std::vector<bool> values_is_valid = {true, false, true, true, false, true, 
true};
+  std::vector<int32_t> values = {0, 1, 2, 3, 4, 5, 6};
+
+  std::shared_ptr<Array> values_array;
+  MakeArray<Int32Type, int32_t>(int32(), values_is_valid, values, 
&values_array);
+
+  // List
+  std::vector<bool> list_is_valid = {true, false, true, true, true};
+  std::vector<int32_t> offsets = {0, 0, 0, 1, 4, 7};
+
+  std::shared_ptr<Buffer> list_bitmap;
+  ASSERT_OK(test::GetBitmapFromBoolVector(list_is_valid, &list_bitmap));
+  std::shared_ptr<Buffer> offsets_buffer = test::GetBufferFromVector(offsets);
+
+  ListArray list_array(list(value_type), 5, offsets_buffer, values_array, 1, 
list_bitmap);
+
+  TestArrayRoundTrip(list_array);
+
+  // Struct
+  std::vector<bool> struct_is_valid = {true, false, true, true, true, false, 
true};
+  std::shared_ptr<Buffer> struct_bitmap;
+  ASSERT_OK(test::GetBitmapFromBoolVector(struct_is_valid, &struct_bitmap));
+
+  auto struct_type =
+      struct_({field("f1", int32()), field("f2", int32()), field("f3", 
int32())});
+
+  std::vector<std::shared_ptr<Array>> fields = {values_array, values_array, 
values_array};
+  StructArray struct_array(
+      struct_type, static_cast<int>(struct_is_valid.size()), fields, 2, 
struct_bitmap);
+  TestArrayRoundTrip(struct_array);
+}
+
+// Data generation for test case below
+void MakeBatchArrays(const std::shared_ptr<Schema>& schema, const int num_rows,
+    std::vector<std::shared_ptr<Array>>* arrays) {
+  std::vector<bool> is_valid;
+  test::random_is_valid(num_rows, 0.25, &is_valid);
+
+  std::vector<int8_t> v1_values;
+  std::vector<int32_t> v2_values;
+
+  test::randint<int8_t>(num_rows, 0, 100, &v1_values);
+  test::randint<int32_t>(num_rows, 0, 100, &v2_values);
+
+  std::shared_ptr<Array> v1;
+  MakeArray<Int8Type, int8_t>(schema->field(0)->type, is_valid, v1_values, 
&v1);
+
+  std::shared_ptr<Array> v2;
+  MakeArray<Int32Type, int32_t>(schema->field(1)->type, is_valid, v2_values, 
&v2);
+
+  static const int kBufferSize = 10;
+  static uint8_t buffer[kBufferSize];
+  static uint32_t seed = 0;
+  StringBuilder string_builder(default_memory_pool(), utf8());
+  for (int i = 0; i < num_rows; ++i) {
+    if (!is_valid[i]) {
+      string_builder.AppendNull();
+    } else {
+      test::random_ascii(kBufferSize, seed++, buffer);
+      string_builder.Append(buffer, kBufferSize);
+    }
+  }
+  std::shared_ptr<Array> v3;
+  ASSERT_OK(string_builder.Finish(&v3));
+
+  arrays->emplace_back(v1);
+  arrays->emplace_back(v2);
+  arrays->emplace_back(v3);
+}
+
+TEST(TestJsonFileReadWrite, BasicRoundTrip) {
+  auto v1_type = int8();
+  auto v2_type = int32();
+  auto v3_type = utf8();
+
+  std::shared_ptr<Schema> schema(
+      new Schema({field("f1", v1_type), field("f2", v2_type), field("f3", 
v3_type)}));
+
+  std::unique_ptr<JsonWriter> writer;
+  ASSERT_OK(JsonWriter::Open(schema, &writer));
+
+  const int nbatches = 3;
+  std::vector<std::shared_ptr<RecordBatch>> batches;
+  for (int i = 0; i < nbatches; ++i) {
+    int32_t num_rows = 5 + i * 5;
+    std::vector<std::shared_ptr<Array>> arrays;
+
+    MakeBatchArrays(schema, num_rows, &arrays);
+    batches.emplace_back(std::make_shared<RecordBatch>(schema, num_rows, 
arrays));
+    ASSERT_OK(writer->WriteRecordBatch(arrays, num_rows));
+  }
+
+  std::string result;
+  ASSERT_OK(writer->Finish(&result));
+
+  std::unique_ptr<JsonReader> reader;
+
+  auto buffer = std::make_shared<Buffer>(
+      reinterpret_cast<const uint8_t*>(result.c_str()), 
static_cast<int>(result.size()));
+
+  ASSERT_OK(JsonReader::Open(buffer, &reader));
+  ASSERT_TRUE(reader->schema()->Equals(*schema.get()));
+
+  ASSERT_EQ(nbatches, reader->num_record_batches());
+
+  for (int i = 0; i < nbatches; ++i) {
+    std::shared_ptr<RecordBatch> batch;
+    ASSERT_OK(reader->GetRecordBatch(i, &batch));
+    ASSERT_TRUE(batch->Equals(*batches[i].get()));
+  }
+}
+
+TEST(TestJsonFileReadWrite, MinimalFormatExample) {
+  static const char* example = R"example(
+{
+  "schema": {
+    "fields": [
+      {
+        "name": "foo",
+        "type": {"name": "int", "isSigned": true, "bitWidth": 32},
+        "nullable": true, "children": [],
+        "typeLayout": [
+          {"type": "VALIDITY", "typeBitWidth": 1},
+          {"type": "DATA", "typeBitWidth": 32}
+        ]
+      },
+      {
+        "name": "bar",
+        "type": {"name": "floatingpoint", "precision": "DOUBLE"},
+        "nullable": true, "children": [],
+        "typeLayout": [
+          {"type": "VALIDITY", "typeBitWidth": 1},
+          {"type": "DATA", "typeBitWidth": 64}
+        ]
+      }
+    ]
+  },
+  "batches": [
+    {
+      "count": 5,
+      "columns": [
+        {
+          "name": "foo",
+          "count": 5,
+          "DATA": [1, 2, 3, 4, 5],
+          "VALIDITY": [1, 0, 1, 1, 1]
+        },
+        {
+          "name": "bar",
+          "count": 5,
+          "DATA": [1.0, 2.0, 3.0, 4.0, 5.0],
+          "VALIDITY": [1, 0, 0, 1, 1]
+        }
+      ]
+    }
+  ]
+}
+)example";
+
+  auto buffer = std::make_shared<Buffer>(
+      reinterpret_cast<const uint8_t*>(example), strlen(example));
+
+  std::unique_ptr<JsonReader> reader;
+  ASSERT_OK(JsonReader::Open(buffer, &reader));
+
+  Schema ex_schema({field("foo", int32()), field("bar", float64())});
+
+  ASSERT_TRUE(reader->schema()->Equals(ex_schema));
+  ASSERT_EQ(1, reader->num_record_batches());
+
+  std::shared_ptr<RecordBatch> batch;
+  ASSERT_OK(reader->GetRecordBatch(0, &batch));
+
+  std::vector<bool> foo_valid = {true, false, true, true, true};
+  std::vector<int32_t> foo_values = {1, 2, 3, 4, 5};
+  std::shared_ptr<Array> foo;
+  MakeArray<Int32Type, int32_t>(int32(), foo_valid, foo_values, &foo);
+  ASSERT_TRUE(batch->column(0)->Equals(foo));
+
+  std::vector<bool> bar_valid = {true, false, false, true, true};
+  std::vector<double> bar_values = {1, 2, 3, 4, 5};
+  std::shared_ptr<Array> bar;
+  MakeArray<DoubleType, double>(float64(), bar_valid, bar_values, &bar);
+  ASSERT_TRUE(batch->column(1)->Equals(bar));
+}
+
+}  // namespace ipc
+}  // namespace arrow

http://git-wip-us.apache.org/repos/asf/arrow/blob/ed6ec3b7/cpp/src/arrow/ipc/json-internal.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/json-internal.cc 
b/cpp/src/arrow/ipc/json-internal.cc
new file mode 100644
index 0000000..31fe35b
--- /dev/null
+++ b/cpp/src/arrow/ipc/json-internal.cc
@@ -0,0 +1,1113 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/ipc/json-internal.h"
+
+#include <cstdint>
+#include <memory>
+#include <sstream>
+#include <string>
+#include <type_traits>
+#include <vector>
+
+#include "rapidjson/stringbuffer.h"
+#include "rapidjson/writer.h"
+
+#include "arrow/array.h"
+#include "arrow/schema.h"
+#include "arrow/type.h"
+#include "arrow/type_traits.h"
+#include "arrow/types/list.h"
+#include "arrow/types/primitive.h"
+#include "arrow/types/string.h"
+#include "arrow/types/struct.h"
+#include "arrow/util/bit-util.h"
+#include "arrow/util/memory-pool.h"
+#include "arrow/util/status.h"
+
+namespace arrow {
+namespace ipc {
+
+using RjArray = rj::Value::ConstArray;
+using RjObject = rj::Value::ConstObject;
+
+enum class BufferType : char { DATA, OFFSET, TYPE, VALIDITY };
+
+static std::string GetBufferTypeName(BufferType type) {
+  switch (type) {
+    case BufferType::DATA:
+      return "DATA";
+    case BufferType::OFFSET:
+      return "OFFSET";
+    case BufferType::TYPE:
+      return "TYPE";
+    case BufferType::VALIDITY:
+      return "VALIDITY";
+    default:
+      break;
+  }
+  return "UNKNOWN";
+}
+
+static std::string GetFloatingPrecisionName(FloatingPointMeta::Precision 
precision) {
+  switch (precision) {
+    case FloatingPointMeta::HALF:
+      return "HALF";
+    case FloatingPointMeta::SINGLE:
+      return "SINGLE";
+    case FloatingPointMeta::DOUBLE:
+      return "DOUBLE";
+    default:
+      break;
+  }
+  return "UNKNOWN";
+}
+
+static std::string GetTimeUnitName(TimeUnit unit) {
+  switch (unit) {
+    case TimeUnit::SECOND:
+      return "SECOND";
+    case TimeUnit::MILLI:
+      return "MILLISECOND";
+    case TimeUnit::MICRO:
+      return "MICROSECOND";
+    case TimeUnit::NANO:
+      return "NANOSECOND";
+    default:
+      break;
+  }
+  return "UNKNOWN";
+}
+
+class BufferLayout {
+ public:
+  BufferLayout(BufferType type, int bit_width) : type_(type), 
bit_width_(bit_width) {}
+
+  BufferType type() const { return type_; }
+  int bit_width() const { return bit_width_; }
+
+ private:
+  BufferType type_;
+  int bit_width_;
+};
+
+static const BufferLayout kValidityBuffer(BufferType::VALIDITY, 1);
+static const BufferLayout kOffsetBuffer(BufferType::OFFSET, 32);
+static const BufferLayout kTypeBuffer(BufferType::TYPE, 32);
+static const BufferLayout kBooleanBuffer(BufferType::DATA, 1);
+static const BufferLayout kValues64(BufferType::DATA, 64);
+static const BufferLayout kValues32(BufferType::DATA, 32);
+static const BufferLayout kValues16(BufferType::DATA, 16);
+static const BufferLayout kValues8(BufferType::DATA, 8);
+
+class JsonSchemaWriter : public TypeVisitor {
+ public:
+  explicit JsonSchemaWriter(const Schema& schema, RjWriter* writer)
+      : schema_(schema), writer_(writer) {}
+
+  Status Write() {
+    writer_->StartObject();
+    writer_->Key("fields");
+    writer_->StartArray();
+    for (const std::shared_ptr<Field>& field : schema_.fields()) {
+      RETURN_NOT_OK(VisitField(*field.get()));
+    }
+    writer_->EndArray();
+    writer_->EndObject();
+    return Status::OK();
+  }
+
+  Status VisitField(const Field& field) {
+    writer_->StartObject();
+
+    writer_->Key("name");
+    writer_->String(field.name.c_str());
+
+    writer_->Key("nullable");
+    writer_->Bool(field.nullable);
+
+    // Visit the type
+    RETURN_NOT_OK(field.type->Accept(this));
+    writer_->EndObject();
+
+    return Status::OK();
+  }
+
+  void SetNoChildren() {
+    writer_->Key("children");
+    writer_->StartArray();
+    writer_->EndArray();
+  }
+
+  template <typename T>
+  typename std::enable_if<std::is_base_of<NoExtraMeta, T>::value ||
+                              std::is_base_of<BooleanType, T>::value ||
+                              std::is_base_of<NullType, T>::value,
+      void>::type
+  WriteTypeMetadata(const T& type) {}
+
+  template <typename T>
+  typename std::enable_if<std::is_base_of<IntegerMeta, T>::value, void>::type
+  WriteTypeMetadata(const T& type) {
+    writer_->Key("bitWidth");
+    writer_->Int(type.bit_width());
+    writer_->Key("isSigned");
+    writer_->Bool(type.is_signed());
+  }
+
+  template <typename T>
+  typename std::enable_if<std::is_base_of<FloatingPointMeta, T>::value, 
void>::type
+  WriteTypeMetadata(const T& type) {
+    writer_->Key("precision");
+    writer_->String(GetFloatingPrecisionName(type.precision()));
+  }
+
+  template <typename T>
+  typename std::enable_if<std::is_base_of<IntervalType, T>::value, void>::type
+  WriteTypeMetadata(const T& type) {
+    writer_->Key("unit");
+    switch (type.unit) {
+      case IntervalType::Unit::YEAR_MONTH:
+        writer_->String("YEAR_MONTH");
+        break;
+      case IntervalType::Unit::DAY_TIME:
+        writer_->String("DAY_TIME");
+        break;
+    }
+  }
+
+  template <typename T>
+  typename std::enable_if<std::is_base_of<TimeType, T>::value ||
+                              std::is_base_of<TimestampType, T>::value,
+      void>::type
+  WriteTypeMetadata(const T& type) {
+    writer_->Key("unit");
+    writer_->String(GetTimeUnitName(type.unit));
+  }
+
+  template <typename T>
+  typename std::enable_if<std::is_base_of<DecimalType, T>::value, void>::type
+  WriteTypeMetadata(const T& type) {
+    writer_->Key("precision");
+    writer_->Int(type.precision);
+    writer_->Key("scale");
+    writer_->Int(type.scale);
+  }
+
+  template <typename T>
+  typename std::enable_if<std::is_base_of<UnionType, T>::value, void>::type
+  WriteTypeMetadata(const T& type) {
+    writer_->Key("mode");
+    switch (type.mode) {
+      case UnionMode::SPARSE:
+        writer_->String("SPARSE");
+        break;
+      case UnionMode::DENSE:
+        writer_->String("DENSE");
+        break;
+    }
+
+    // Write type ids
+    writer_->Key("typeIds");
+    writer_->StartArray();
+    for (size_t i = 0; i < type.type_ids.size(); ++i) {
+      writer_->Uint(type.type_ids[i]);
+    }
+    writer_->EndArray();
+  }
+
+  // TODO(wesm): Other Type metadata
+
+  template <typename T>
+  void WriteName(const std::string& typeclass, const T& type) {
+    writer_->Key("type");
+    writer_->StartObject();
+    writer_->Key("name");
+    writer_->String(typeclass);
+    WriteTypeMetadata(type);
+    writer_->EndObject();
+  }
+
+  template <typename T>
+  Status WritePrimitive(const std::string& typeclass, const T& type,
+      const std::vector<BufferLayout>& buffer_layout) {
+    WriteName(typeclass, type);
+    SetNoChildren();
+    WriteBufferLayout(buffer_layout);
+    return Status::OK();
+  }
+
+  template <typename T>
+  Status WriteVarBytes(const std::string& typeclass, const T& type) {
+    WriteName(typeclass, type);
+    SetNoChildren();
+    WriteBufferLayout({kValidityBuffer, kOffsetBuffer, kValues8});
+    return Status::OK();
+  }
+
+  void WriteBufferLayout(const std::vector<BufferLayout>& buffer_layout) {
+    writer_->Key("typeLayout");
+    writer_->StartArray();
+
+    for (const BufferLayout& buffer : buffer_layout) {
+      writer_->StartObject();
+      writer_->Key("type");
+      writer_->String(GetBufferTypeName(buffer.type()));
+
+      writer_->Key("typeBitWidth");
+      writer_->Int(buffer.bit_width());
+
+      writer_->EndObject();
+    }
+    writer_->EndArray();
+  }
+
+  Status WriteChildren(const std::vector<std::shared_ptr<Field>>& children) {
+    writer_->Key("children");
+    writer_->StartArray();
+    for (const std::shared_ptr<Field>& field : children) {
+      RETURN_NOT_OK(VisitField(*field.get()));
+    }
+    writer_->EndArray();
+    return Status::OK();
+  }
+
+  Status Visit(const NullType& type) override { return WritePrimitive("null", 
type, {}); }
+
+  Status Visit(const BooleanType& type) override {
+    return WritePrimitive("bool", type, {kValidityBuffer, kBooleanBuffer});
+  }
+
+  Status Visit(const Int8Type& type) override {
+    return WritePrimitive("int", type, {kValidityBuffer, kValues8});
+  }
+
+  Status Visit(const Int16Type& type) override {
+    return WritePrimitive("int", type, {kValidityBuffer, kValues16});
+  }
+
+  Status Visit(const Int32Type& type) override {
+    return WritePrimitive("int", type, {kValidityBuffer, kValues32});
+  }
+
+  Status Visit(const Int64Type& type) override {
+    return WritePrimitive("int", type, {kValidityBuffer, kValues64});
+  }
+
+  Status Visit(const UInt8Type& type) override {
+    return WritePrimitive("int", type, {kValidityBuffer, kValues8});
+  }
+
+  Status Visit(const UInt16Type& type) override {
+    return WritePrimitive("int", type, {kValidityBuffer, kValues16});
+  }
+
+  Status Visit(const UInt32Type& type) override {
+    return WritePrimitive("int", type, {kValidityBuffer, kValues32});
+  }
+
+  Status Visit(const UInt64Type& type) override {
+    return WritePrimitive("int", type, {kValidityBuffer, kValues64});
+  }
+
+  Status Visit(const HalfFloatType& type) override {
+    return WritePrimitive("floatingpoint", type, {kValidityBuffer, kValues16});
+  }
+
+  Status Visit(const FloatType& type) override {
+    return WritePrimitive("floatingpoint", type, {kValidityBuffer, kValues32});
+  }
+
+  Status Visit(const DoubleType& type) override {
+    return WritePrimitive("floatingpoint", type, {kValidityBuffer, kValues64});
+  }
+
+  Status Visit(const StringType& type) override { return WriteVarBytes("utf8", 
type); }
+
+  Status Visit(const BinaryType& type) override { return 
WriteVarBytes("binary", type); }
+
+  Status Visit(const DateType& type) override {
+    return WritePrimitive("date", type, {kValidityBuffer, kValues64});
+  }
+
+  Status Visit(const TimeType& type) override {
+    return WritePrimitive("time", type, {kValidityBuffer, kValues64});
+  }
+
+  Status Visit(const TimestampType& type) override {
+    return WritePrimitive("timestamp", type, {kValidityBuffer, kValues64});
+  }
+
+  Status Visit(const IntervalType& type) override {
+    return WritePrimitive("interval", type, {kValidityBuffer, kValues64});
+  }
+
+  Status Visit(const DecimalType& type) override { return 
Status::NotImplemented("NYI"); }
+
+  Status Visit(const ListType& type) override {
+    WriteName("list", type);
+    RETURN_NOT_OK(WriteChildren(type.children()));
+    WriteBufferLayout({kValidityBuffer, kOffsetBuffer});
+    return Status::OK();
+  }
+
+  Status Visit(const StructType& type) override {
+    WriteName("struct", type);
+    WriteChildren(type.children());
+    WriteBufferLayout({kValidityBuffer, kTypeBuffer});
+    return Status::OK();
+  }
+
+  Status Visit(const UnionType& type) override {
+    WriteName("union", type);
+    WriteChildren(type.children());
+
+    if (type.mode == UnionMode::SPARSE) {
+      WriteBufferLayout({kValidityBuffer, kTypeBuffer});
+    } else {
+      WriteBufferLayout({kValidityBuffer, kTypeBuffer, kOffsetBuffer});
+    }
+    return Status::OK();
+  }
+
+ private:
+  const Schema& schema_;
+  RjWriter* writer_;
+};
+
+class JsonArrayWriter : public ArrayVisitor {
+ public:
+  explicit JsonArrayWriter(const std::string& name, const Array& array, 
RjWriter* writer)
+      : name_(name), array_(array), writer_(writer) {}
+
+  Status Write() { return VisitArray(name_, array_); }
+
+  Status VisitArray(const std::string& name, const Array& arr) {
+    writer_->StartObject();
+    writer_->Key("name");
+    writer_->String(name);
+
+    writer_->Key("count");
+    writer_->Int(arr.length());
+
+    RETURN_NOT_OK(arr.Accept(this));
+
+    writer_->EndObject();
+    return Status::OK();
+  }
+
+  template <typename T>
+  typename std::enable_if<IsSignedInt<T>::value, void>::type WriteDataValues(
+      const T& arr) {
+    const auto data = arr.raw_data();
+    for (int i = 0; i < arr.length(); ++i) {
+      writer_->Int64(data[i]);
+    }
+  }
+
+  template <typename T>
+  typename std::enable_if<IsUnsignedInt<T>::value, void>::type WriteDataValues(
+      const T& arr) {
+    const auto data = arr.raw_data();
+    for (int i = 0; i < arr.length(); ++i) {
+      writer_->Uint64(data[i]);
+    }
+  }
+
+  template <typename T>
+  typename std::enable_if<IsFloatingPoint<T>::value, void>::type 
WriteDataValues(
+      const T& arr) {
+    const auto data = arr.raw_data();
+    for (int i = 0; i < arr.length(); ++i) {
+      writer_->Double(data[i]);
+    }
+  }
+
+  // String (Utf8), Binary
+  template <typename T>
+  typename std::enable_if<std::is_base_of<BinaryArray, T>::value, void>::type
+  WriteDataValues(const T& arr) {
+    for (int i = 0; i < arr.length(); ++i) {
+      int32_t length;
+      const char* buf = reinterpret_cast<const char*>(arr.GetValue(i, 
&length));
+      writer_->String(buf, length);
+    }
+  }
+
+  template <typename T>
+  typename std::enable_if<std::is_base_of<BooleanArray, T>::value, void>::type
+  WriteDataValues(const T& arr) {
+    for (int i = 0; i < arr.length(); ++i) {
+      writer_->Bool(arr.Value(i));
+    }
+  }
+
+  template <typename T>
+  void WriteDataField(const T& arr) {
+    writer_->Key("DATA");
+    writer_->StartArray();
+    WriteDataValues(arr);
+    writer_->EndArray();
+  }
+
+  template <typename T>
+  void WriteOffsetsField(const T* offsets, int32_t length) {
+    writer_->Key("OFFSETS");
+    writer_->StartArray();
+    for (int i = 0; i < length; ++i) {
+      writer_->Int64(offsets[i]);
+    }
+    writer_->EndArray();
+  }
+
+  void WriteValidityField(const Array& arr) {
+    writer_->Key("VALIDITY");
+    writer_->StartArray();
+    if (arr.null_count() > 0) {
+      for (int i = 0; i < arr.length(); ++i) {
+        writer_->Int(arr.IsNull(i) ? 0 : 1);
+      }
+    } else {
+      for (int i = 0; i < arr.length(); ++i) {
+        writer_->Int(1);
+      }
+    }
+    writer_->EndArray();
+  }
+
+  void SetNoChildren() {
+    writer_->Key("children");
+    writer_->StartArray();
+    writer_->EndArray();
+  }
+
+  template <typename T>
+  Status WritePrimitive(const T& array) {
+    WriteValidityField(array);
+    WriteDataField(array);
+    SetNoChildren();
+    return Status::OK();
+  }
+
+  template <typename T>
+  Status WriteVarBytes(const T& array) {
+    WriteValidityField(array);
+    WriteOffsetsField(array.raw_offsets(), array.length() + 1);
+    WriteDataField(array);
+    SetNoChildren();
+    return Status::OK();
+  }
+
+  Status WriteChildren(const std::vector<std::shared_ptr<Field>>& fields,
+      const std::vector<std::shared_ptr<Array>>& arrays) {
+    writer_->Key("children");
+    writer_->StartArray();
+    for (size_t i = 0; i < fields.size(); ++i) {
+      RETURN_NOT_OK(VisitArray(fields[i]->name, *arrays[i].get()));
+    }
+    writer_->EndArray();
+    return Status::OK();
+  }
+
+  Status Visit(const NullArray& array) override {
+    SetNoChildren();
+    return Status::OK();
+  }
+
+  Status Visit(const BooleanArray& array) override { return 
WritePrimitive(array); }
+
+  Status Visit(const Int8Array& array) override { return 
WritePrimitive(array); }
+
+  Status Visit(const Int16Array& array) override { return 
WritePrimitive(array); }
+
+  Status Visit(const Int32Array& array) override { return 
WritePrimitive(array); }
+
+  Status Visit(const Int64Array& array) override { return 
WritePrimitive(array); }
+
+  Status Visit(const UInt8Array& array) override { return 
WritePrimitive(array); }
+
+  Status Visit(const UInt16Array& array) override { return 
WritePrimitive(array); }
+
+  Status Visit(const UInt32Array& array) override { return 
WritePrimitive(array); }
+
+  Status Visit(const UInt64Array& array) override { return 
WritePrimitive(array); }
+
+  Status Visit(const HalfFloatArray& array) override { return 
WritePrimitive(array); }
+
+  Status Visit(const FloatArray& array) override { return 
WritePrimitive(array); }
+
+  Status Visit(const DoubleArray& array) override { return 
WritePrimitive(array); }
+
+  Status Visit(const StringArray& array) override { return 
WriteVarBytes(array); }
+
+  Status Visit(const BinaryArray& array) override { return 
WriteVarBytes(array); }
+
+  Status Visit(const DateArray& array) override { return 
Status::NotImplemented("date"); }
+
+  Status Visit(const TimeArray& array) override { return 
Status::NotImplemented("time"); }
+
+  Status Visit(const TimestampArray& array) override {
+    return Status::NotImplemented("timestamp");
+  }
+
+  Status Visit(const IntervalArray& array) override {
+    return Status::NotImplemented("interval");
+  }
+
+  Status Visit(const DecimalArray& array) override {
+    return Status::NotImplemented("decimal");
+  }
+
+  Status Visit(const ListArray& array) override {
+    WriteValidityField(array);
+    WriteOffsetsField(array.raw_offsets(), array.length() + 1);
+    auto type = static_cast<const ListType*>(array.type().get());
+    return WriteChildren(type->children(), {array.values()});
+  }
+
+  Status Visit(const StructArray& array) override {
+    WriteValidityField(array);
+    auto type = static_cast<const StructType*>(array.type().get());
+    return WriteChildren(type->children(), array.fields());
+  }
+
+  Status Visit(const UnionArray& array) override {
+    return Status::NotImplemented("union");
+  }
+
+ private:
+  const std::string& name_;
+  const Array& array_;
+  RjWriter* writer_;
+};
+
+class JsonSchemaReader {
+ public:
+  explicit JsonSchemaReader(const rj::Value& json_schema) : 
json_schema_(json_schema) {}
+
+  Status GetSchema(std::shared_ptr<Schema>* schema) {
+    const auto& obj_schema = json_schema_.GetObject();
+
+    const auto& json_fields = obj_schema.FindMember("fields");
+    RETURN_NOT_ARRAY("fields", json_fields, obj_schema);
+
+    std::vector<std::shared_ptr<Field>> fields;
+    RETURN_NOT_OK(GetFieldsFromArray(json_fields->value, &fields));
+
+    *schema = std::make_shared<Schema>(fields);
+    return Status::OK();
+  }
+
+  Status GetFieldsFromArray(
+      const rj::Value& obj, std::vector<std::shared_ptr<Field>>* fields) {
+    const auto& values = obj.GetArray();
+
+    fields->resize(values.Size());
+    for (size_t i = 0; i < fields->size(); ++i) {
+      RETURN_NOT_OK(GetField(values[i], &(*fields)[i]));
+    }
+    return Status::OK();
+  }
+
+  Status GetField(const rj::Value& obj, std::shared_ptr<Field>* field) {
+    if (!obj.IsObject()) { return Status::Invalid("Field was not a JSON 
object"); }
+    const auto& json_field = obj.GetObject();
+
+    const auto& json_name = json_field.FindMember("name");
+    RETURN_NOT_STRING("name", json_name, json_field);
+
+    const auto& json_nullable = json_field.FindMember("nullable");
+    RETURN_NOT_BOOL("nullable", json_nullable, json_field);
+
+    const auto& json_type = json_field.FindMember("type");
+    RETURN_NOT_OBJECT("type", json_type, json_field);
+
+    const auto& json_children = json_field.FindMember("children");
+    RETURN_NOT_ARRAY("children", json_children, json_field);
+
+    std::vector<std::shared_ptr<Field>> children;
+    RETURN_NOT_OK(GetFieldsFromArray(json_children->value, &children));
+
+    std::shared_ptr<DataType> type;
+    RETURN_NOT_OK(GetType(json_type->value.GetObject(), children, &type));
+
+    *field = std::make_shared<Field>(
+        json_name->value.GetString(), type, json_nullable->value.GetBool());
+    return Status::OK();
+  }
+
+  Status GetInteger(
+      const rj::Value::ConstObject& json_type, std::shared_ptr<DataType>* 
type) {
+    const auto& json_bit_width = json_type.FindMember("bitWidth");
+    RETURN_NOT_INT("bitWidth", json_bit_width, json_type);
+
+    const auto& json_is_signed = json_type.FindMember("isSigned");
+    RETURN_NOT_BOOL("isSigned", json_is_signed, json_type);
+
+    bool is_signed = json_is_signed->value.GetBool();
+    int bit_width = json_bit_width->value.GetInt();
+
+    switch (bit_width) {
+      case 8:
+        *type = is_signed ? int8() : uint8();
+        break;
+      case 16:
+        *type = is_signed ? int16() : uint16();
+        break;
+      case 32:
+        *type = is_signed ? int32() : uint32();
+        break;
+      case 64:
+        *type = is_signed ? int64() : uint64();
+        break;
+      default:
+        std::stringstream ss;
+        ss << "Invalid bit width: " << bit_width;
+        return Status::Invalid(ss.str());
+    }
+    return Status::OK();
+  }
+
+  Status GetFloatingPoint(const RjObject& json_type, 
std::shared_ptr<DataType>* type) {
+    const auto& json_precision = json_type.FindMember("precision");
+    RETURN_NOT_STRING("precision", json_precision, json_type);
+
+    std::string precision = json_precision->value.GetString();
+
+    if (precision == "DOUBLE") {
+      *type = float64();
+    } else if (precision == "SINGLE") {
+      *type = float32();
+    } else if (precision == "HALF") {
+      *type = float16();
+    } else {
+      std::stringstream ss;
+      ss << "Invalid precision: " << precision;
+      return Status::Invalid(ss.str());
+    }
+    return Status::OK();
+  }
+
+  template <typename T>
+  Status GetTimeLike(const RjObject& json_type, std::shared_ptr<DataType>* 
type) {
+    const auto& json_unit = json_type.FindMember("unit");
+    RETURN_NOT_STRING("unit", json_unit, json_type);
+
+    std::string unit_str = json_unit->value.GetString();
+
+    TimeUnit unit;
+
+    if (unit_str == "SECOND") {
+      unit = TimeUnit::SECOND;
+    } else if (unit_str == "MILLISECOND") {
+      unit = TimeUnit::MILLI;
+    } else if (unit_str == "MICROSECOND") {
+      unit = TimeUnit::MICRO;
+    } else if (unit_str == "NANOSECOND") {
+      unit = TimeUnit::NANO;
+    } else {
+      std::stringstream ss;
+      ss << "Invalid time unit: " << unit_str;
+      return Status::Invalid(ss.str());
+    }
+
+    *type = std::make_shared<T>(unit);
+
+    return Status::OK();
+  }
+
+  Status GetUnion(const RjObject& json_type,
+      const std::vector<std::shared_ptr<Field>>& children,
+      std::shared_ptr<DataType>* type) {
+    const auto& json_mode = json_type.FindMember("mode");
+    RETURN_NOT_STRING("mode", json_mode, json_type);
+
+    std::string mode_str = json_mode->value.GetString();
+    UnionMode mode;
+
+    if (mode_str == "SPARSE") {
+      mode = UnionMode::SPARSE;
+    } else if (mode_str == "DENSE") {
+      mode = UnionMode::DENSE;
+    } else {
+      std::stringstream ss;
+      ss << "Invalid union mode: " << mode_str;
+      return Status::Invalid(ss.str());
+    }
+
+    const auto& json_type_ids = json_type.FindMember("typeIds");
+    RETURN_NOT_ARRAY("typeIds", json_type_ids, json_type);
+
+    std::vector<uint8_t> type_ids;
+    const auto& id_array = json_type_ids->value.GetArray();
+    for (const rj::Value& val : id_array) {
+      DCHECK(val.IsUint());
+      type_ids.push_back(val.GetUint());
+    }
+
+    *type = union_(children, type_ids, mode);
+
+    return Status::OK();
+  }
+
+  Status GetType(const RjObject& json_type,
+      const std::vector<std::shared_ptr<Field>>& children,
+      std::shared_ptr<DataType>* type) {
+    const auto& json_type_name = json_type.FindMember("name");
+    RETURN_NOT_STRING("name", json_type_name, json_type);
+
+    std::string type_name = json_type_name->value.GetString();
+
+    if (type_name == "int") {
+      return GetInteger(json_type, type);
+    } else if (type_name == "floatingpoint") {
+      return GetFloatingPoint(json_type, type);
+    } else if (type_name == "bool") {
+      *type = boolean();
+    } else if (type_name == "utf8") {
+      *type = utf8();
+    } else if (type_name == "binary") {
+      *type = binary();
+    } else if (type_name == "null") {
+      *type = null();
+    } else if (type_name == "date") {
+      *type = date();
+    } else if (type_name == "time") {
+      return GetTimeLike<TimeType>(json_type, type);
+    } else if (type_name == "timestamp") {
+      return GetTimeLike<TimestampType>(json_type, type);
+    } else if (type_name == "list") {
+      *type = list(children[0]);
+    } else if (type_name == "struct") {
+      *type = struct_(children);
+    } else {
+      return GetUnion(json_type, children, type);
+    }
+    return Status::OK();
+  }
+
+ private:
+  const rj::Value& json_schema_;
+};
+
+class JsonArrayReader {
+ public:
+  explicit JsonArrayReader(MemoryPool* pool) : pool_(pool) {}
+
+  Status GetValidityBuffer(const std::vector<bool>& is_valid, int32_t* 
null_count,
+      std::shared_ptr<Buffer>* validity_buffer) {
+    int length = static_cast<int>(is_valid.size());
+
+    std::shared_ptr<MutableBuffer> out_buffer;
+    RETURN_NOT_OK(GetEmptyBitmap(pool_, length, &out_buffer));
+    uint8_t* bitmap = out_buffer->mutable_data();
+
+    *null_count = 0;
+    for (int i = 0; i < length; ++i) {
+      if (!is_valid[i]) {
+        ++(*null_count);
+        continue;
+      }
+      BitUtil::SetBit(bitmap, i);
+    }
+
+    *validity_buffer = out_buffer;
+    return Status::OK();
+  }
+
+  template <typename T>
+  typename std::enable_if<std::is_base_of<PrimitiveCType, T>::value ||
+                              std::is_base_of<BooleanType, T>::value,
+      Status>::type
+  ReadArray(const RjObject& json_array, int32_t length, const 
std::vector<bool>& is_valid,
+      const std::shared_ptr<DataType>& type, std::shared_ptr<Array>* array) {
+    typename TypeTraits<T>::BuilderType builder(pool_, type);
+
+    const auto& json_data = json_array.FindMember("DATA");
+    RETURN_NOT_ARRAY("DATA", json_data, json_array);
+
+    const auto& json_data_arr = json_data->value.GetArray();
+
+    DCHECK_EQ(static_cast<int32_t>(json_data_arr.Size()), length);
+    for (int i = 0; i < length; ++i) {
+      if (!is_valid[i]) {
+        builder.AppendNull();
+        continue;
+      }
+
+      const rj::Value& val = json_data_arr[i];
+      if (IsSignedInt<T>::value) {
+        DCHECK(val.IsInt());
+        builder.Append(val.GetInt64());
+      } else if (IsUnsignedInt<T>::value) {
+        DCHECK(val.IsUint());
+        builder.Append(val.GetUint64());
+      } else if (IsFloatingPoint<T>::value) {
+        DCHECK(val.IsFloat());
+        builder.Append(val.GetFloat());
+      } else if (std::is_base_of<BooleanType, T>::value) {
+        DCHECK(val.IsBool());
+        builder.Append(val.GetBool());
+      } else {
+        // We are in the wrong function
+        return Status::Invalid(type->ToString());
+      }
+    }
+
+    return builder.Finish(array);
+  }
+
+  template <typename T>
+  typename std::enable_if<std::is_base_of<BinaryType, T>::value, Status>::type 
ReadArray(
+      const RjObject& json_array, int32_t length, const std::vector<bool>& 
is_valid,
+      const std::shared_ptr<DataType>& type, std::shared_ptr<Array>* array) {
+    typename TypeTraits<T>::BuilderType builder(pool_, type);
+
+    const auto& json_data = json_array.FindMember("DATA");
+    RETURN_NOT_ARRAY("DATA", json_data, json_array);
+
+    const auto& json_data_arr = json_data->value.GetArray();
+
+    DCHECK_EQ(static_cast<int32_t>(json_data_arr.Size()), length);
+    for (int i = 0; i < length; ++i) {
+      if (!is_valid[i]) {
+        builder.AppendNull();
+        continue;
+      }
+
+      const rj::Value& val = json_data_arr[i];
+      DCHECK(val.IsString());
+      builder.Append(val.GetString());
+    }
+
+    return builder.Finish(array);
+  }
+
+  template <typename T>
+  typename std::enable_if<std::is_base_of<ListType, T>::value, Status>::type 
ReadArray(
+      const RjObject& json_array, int32_t length, const std::vector<bool>& 
is_valid,
+      const std::shared_ptr<DataType>& type, std::shared_ptr<Array>* array) {
+    const auto& json_offsets = json_array.FindMember("OFFSETS");
+    RETURN_NOT_ARRAY("OFFSETS", json_offsets, json_array);
+    const auto& json_offsets_arr = json_offsets->value.GetArray();
+
+    int32_t null_count = 0;
+    std::shared_ptr<Buffer> validity_buffer;
+    RETURN_NOT_OK(GetValidityBuffer(is_valid, &null_count, &validity_buffer));
+
+    auto offsets_buffer = std::make_shared<PoolBuffer>(pool_);
+    RETURN_NOT_OK(offsets_buffer->Resize((length + 1) * sizeof(int32_t)));
+    int32_t* offsets = 
reinterpret_cast<int32_t*>(offsets_buffer->mutable_data());
+
+    for (int i = 0; i < length + 1; ++i) {
+      const rj::Value& val = json_offsets_arr[i];
+      DCHECK(val.IsInt());
+      offsets[i] = val.GetInt();
+    }
+
+    std::vector<std::shared_ptr<Array>> children;
+    RETURN_NOT_OK(GetChildren(json_array, type, &children));
+    DCHECK_EQ(children.size(), 1);
+
+    *array = std::make_shared<ListArray>(
+        type, length, offsets_buffer, children[0], null_count, 
validity_buffer);
+
+    return Status::OK();
+  }
+
+  template <typename T>
+  typename std::enable_if<std::is_base_of<StructType, T>::value, Status>::type 
ReadArray(
+      const RjObject& json_array, int32_t length, const std::vector<bool>& 
is_valid,
+      const std::shared_ptr<DataType>& type, std::shared_ptr<Array>* array) {
+    int32_t null_count = 0;
+    std::shared_ptr<Buffer> validity_buffer;
+    RETURN_NOT_OK(GetValidityBuffer(is_valid, &null_count, &validity_buffer));
+
+    std::vector<std::shared_ptr<Array>> fields;
+    RETURN_NOT_OK(GetChildren(json_array, type, &fields));
+
+    *array =
+        std::make_shared<StructArray>(type, length, fields, null_count, 
validity_buffer);
+
+    return Status::OK();
+  }
+
+  template <typename T>
+  typename std::enable_if<std::is_base_of<NullType, T>::value, Status>::type 
ReadArray(
+      const RjObject& json_array, int32_t length, const std::vector<bool>& 
is_valid,
+      const std::shared_ptr<DataType>& type, std::shared_ptr<Array>* array) {
+    *array = std::make_shared<NullArray>(type, length);
+    return Status::OK();
+  }
+
+  Status GetChildren(const RjObject& json_array, const 
std::shared_ptr<DataType>& type,
+      std::vector<std::shared_ptr<Array>>* array) {
+    const auto& json_children = json_array.FindMember("children");
+    RETURN_NOT_ARRAY("children", json_children, json_array);
+    const auto& json_children_arr = json_children->value.GetArray();
+
+    if (type->num_children() != static_cast<int>(json_children_arr.Size())) {
+      std::stringstream ss;
+      ss << "Expected " << type->num_children() << " children, but got "
+         << json_children_arr.Size();
+      return Status::Invalid(ss.str());
+    }
+
+    for (int i = 0; i < static_cast<int>(json_children_arr.Size()); ++i) {
+      const rj::Value& json_child = json_children_arr[i];
+      DCHECK(json_child.IsObject());
+
+      std::shared_ptr<Field> child_field = type->child(i);
+
+      auto it = json_child.FindMember("name");
+      RETURN_NOT_STRING("name", it, json_child);
+
+      DCHECK_EQ(it->value.GetString(), child_field->name);
+      std::shared_ptr<Array> child;
+      RETURN_NOT_OK(GetArray(json_children_arr[i], child_field->type, &child));
+      array->emplace_back(child);
+    }
+
+    return Status::OK();
+  }
+
+  Status GetArray(const rj::Value& obj, const std::shared_ptr<DataType>& type,
+      std::shared_ptr<Array>* array) {
+    if (!obj.IsObject()) {
+      return Status::Invalid("Array element was not a JSON object");
+    }
+    const auto& json_array = obj.GetObject();
+
+    const auto& json_length = json_array.FindMember("count");
+    RETURN_NOT_INT("count", json_length, json_array);
+    int32_t length = json_length->value.GetInt();
+
+    const auto& json_valid_iter = json_array.FindMember("VALIDITY");
+    RETURN_NOT_ARRAY("VALIDITY", json_valid_iter, json_array);
+
+    const auto& json_validity = json_valid_iter->value.GetArray();
+
+    DCHECK_EQ(static_cast<int>(json_validity.Size()), length);
+
+    std::vector<bool> is_valid;
+    for (const rj::Value& val : json_validity) {
+      DCHECK(val.IsInt());
+      is_valid.push_back(static_cast<bool>(val.GetInt()));
+    }
+
+#define TYPE_CASE(TYPE) \
+  case TYPE::type_id:   \
+    return ReadArray<TYPE>(json_array, length, is_valid, type, array);
+
+#define NOT_IMPLEMENTED_CASE(TYPE_ENUM)      \
+  case Type::TYPE_ENUM: {                    \
+    std::stringstream ss;                    \
+    ss << type->ToString();                  \
+    return Status::NotImplemented(ss.str()); \
+  }
+
+    switch (type->type) {
+      TYPE_CASE(NullType);
+      TYPE_CASE(BooleanType);
+      TYPE_CASE(UInt8Type);
+      TYPE_CASE(Int8Type);
+      TYPE_CASE(UInt16Type);
+      TYPE_CASE(Int16Type);
+      TYPE_CASE(UInt32Type);
+      TYPE_CASE(Int32Type);
+      TYPE_CASE(UInt64Type);
+      TYPE_CASE(Int64Type);
+      TYPE_CASE(HalfFloatType);
+      TYPE_CASE(FloatType);
+      TYPE_CASE(DoubleType);
+      TYPE_CASE(StringType);
+      TYPE_CASE(BinaryType);
+      NOT_IMPLEMENTED_CASE(DATE);
+      NOT_IMPLEMENTED_CASE(TIMESTAMP);
+      NOT_IMPLEMENTED_CASE(TIME);
+      NOT_IMPLEMENTED_CASE(INTERVAL);
+      TYPE_CASE(ListType);
+      TYPE_CASE(StructType);
+      NOT_IMPLEMENTED_CASE(UNION);
+      default:
+        std::stringstream ss;
+        ss << type->ToString();
+        return Status::NotImplemented(ss.str());
+    }
+
+#undef TYPE_CASE
+#undef NOT_IMPLEMENTED_CASE
+
+    return Status::OK();
+  }
+
+ private:
+  MemoryPool* pool_;
+};
+
+Status WriteJsonSchema(const Schema& schema, RjWriter* json_writer) {
+  JsonSchemaWriter converter(schema, json_writer);
+  return converter.Write();
+}
+
+Status ReadJsonSchema(const rj::Value& json_schema, std::shared_ptr<Schema>* 
schema) {
+  JsonSchemaReader converter(json_schema);
+  return converter.GetSchema(schema);
+}
+
+Status WriteJsonArray(
+    const std::string& name, const Array& array, RjWriter* json_writer) {
+  JsonArrayWriter converter(name, array, json_writer);
+  return converter.Write();
+}
+
+Status ReadJsonArray(MemoryPool* pool, const rj::Value& json_array,
+    const std::shared_ptr<DataType>& type, std::shared_ptr<Array>* array) {
+  JsonArrayReader converter(pool);
+  return converter.GetArray(json_array, type, array);
+}
+
+Status ReadJsonArray(MemoryPool* pool, const rj::Value& json_array, const 
Schema& schema,
+    std::shared_ptr<Array>* array) {
+  if (!json_array.IsObject()) { return Status::Invalid("Element was not a JSON 
object"); }
+
+  const auto& json_obj = json_array.GetObject();
+
+  const auto& json_name = json_obj.FindMember("name");
+  RETURN_NOT_STRING("name", json_name, json_obj);
+
+  std::string name = json_name->value.GetString();
+
+  std::shared_ptr<Field> result = nullptr;
+  for (const std::shared_ptr<Field>& field : schema.fields()) {
+    if (field->name == name) {
+      result = field;
+      break;
+    }
+  }
+
+  if (result == nullptr) {
+    std::stringstream ss;
+    ss << "Field named " << name << " not found in schema";
+    return Status::KeyError(ss.str());
+  }
+
+  return ReadJsonArray(pool, json_array, result->type, array);
+}
+
+}  // namespace ipc
+}  // namespace arrow

http://git-wip-us.apache.org/repos/asf/arrow/blob/ed6ec3b7/cpp/src/arrow/ipc/json-internal.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/json-internal.h 
b/cpp/src/arrow/ipc/json-internal.h
new file mode 100644
index 0000000..0c167a4
--- /dev/null
+++ b/cpp/src/arrow/ipc/json-internal.h
@@ -0,0 +1,111 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef ARROW_IPC_JSON_INTERNAL_H
+#define ARROW_IPC_JSON_INTERNAL_H
+
+#define RAPIDJSON_HAS_STDSTRING 1
+#define RAPIDJSON_HAS_CXX11_RVALUE_REFS 1
+#define RAPIDJSON_HAS_CXX11_RANGE_FOR 1
+
+#include <memory>
+#include <sstream>
+#include <string>
+
+#include "rapidjson/document.h"
+#include "rapidjson/stringbuffer.h"
+#include "rapidjson/writer.h"
+
+#include "arrow/type_fwd.h"
+#include "arrow/util/visibility.h"
+
+namespace rj = rapidjson;
+using RjWriter = rj::Writer<rj::StringBuffer>;
+
+#define RETURN_NOT_FOUND(TOK, NAME, PARENT) \
+  if (NAME == PARENT.MemberEnd()) {         \
+    std::stringstream ss;                   \
+    ss << "field " << TOK << " not found";  \
+    return Status::Invalid(ss.str());       \
+  }
+
+#define RETURN_NOT_STRING(TOK, NAME, PARENT) \
+  RETURN_NOT_FOUND(TOK, NAME, PARENT);       \
+  if (!NAME->value.IsString()) {             \
+    std::stringstream ss;                    \
+    ss << "field was not a string"           \
+       << " line " << __LINE__;              \
+    return Status::Invalid(ss.str());        \
+  }
+
+#define RETURN_NOT_BOOL(TOK, NAME, PARENT) \
+  RETURN_NOT_FOUND(TOK, NAME, PARENT);     \
+  if (!NAME->value.IsBool()) {             \
+    std::stringstream ss;                  \
+    ss << "field was not a boolean"        \
+       << " line " << __LINE__;            \
+    return Status::Invalid(ss.str());      \
+  }
+
+#define RETURN_NOT_INT(TOK, NAME, PARENT) \
+  RETURN_NOT_FOUND(TOK, NAME, PARENT);    \
+  if (!NAME->value.IsInt()) {             \
+    std::stringstream ss;                 \
+    ss << "field was not an int"          \
+       << " line " << __LINE__;           \
+    return Status::Invalid(ss.str());     \
+  }
+
+#define RETURN_NOT_ARRAY(TOK, NAME, PARENT) \
+  RETURN_NOT_FOUND(TOK, NAME, PARENT);      \
+  if (!NAME->value.IsArray()) {             \
+    std::stringstream ss;                   \
+    ss << "field was not an array"          \
+       << " line " << __LINE__;             \
+    return Status::Invalid(ss.str());       \
+  }
+
+#define RETURN_NOT_OBJECT(TOK, NAME, PARENT) \
+  RETURN_NOT_FOUND(TOK, NAME, PARENT);       \
+  if (!NAME->value.IsObject()) {             \
+    std::stringstream ss;                    \
+    ss << "field was not an object"          \
+       << " line " << __LINE__;              \
+    return Status::Invalid(ss.str());        \
+  }
+
+namespace arrow {
+namespace ipc {
+
+// TODO(wesm): Only exporting these because arrow_ipc does not have a static
+// library at the moment. Better to not export
+Status ARROW_EXPORT WriteJsonSchema(const Schema& schema, RjWriter* 
json_writer);
+Status ARROW_EXPORT WriteJsonArray(
+    const std::string& name, const Array& array, RjWriter* json_writer);
+
+Status ARROW_EXPORT ReadJsonSchema(
+    const rj::Value& json_obj, std::shared_ptr<Schema>* schema);
+Status ARROW_EXPORT ReadJsonArray(MemoryPool* pool, const rj::Value& json_obj,
+    const std::shared_ptr<DataType>& type, std::shared_ptr<Array>* array);
+
+Status ARROW_EXPORT ReadJsonArray(MemoryPool* pool, const rj::Value& json_obj,
+    const Schema& schema, std::shared_ptr<Array>* array);
+
+}  // namespace ipc
+}  // namespace arrow
+
+#endif  // ARROW_IPC_JSON_INTERNAL_H

http://git-wip-us.apache.org/repos/asf/arrow/blob/ed6ec3b7/cpp/src/arrow/ipc/json.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/json.cc b/cpp/src/arrow/ipc/json.cc
new file mode 100644
index 0000000..2281611
--- /dev/null
+++ b/cpp/src/arrow/ipc/json.cc
@@ -0,0 +1,219 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/ipc/json.h"
+
+#include <cstdint>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "arrow/array.h"
+#include "arrow/ipc/json-internal.h"
+#include "arrow/schema.h"
+#include "arrow/table.h"
+#include "arrow/type.h"
+#include "arrow/util/buffer.h"
+#include "arrow/util/logging.h"
+#include "arrow/util/memory-pool.h"
+#include "arrow/util/status.h"
+
+namespace arrow {
+namespace ipc {
+
+// ----------------------------------------------------------------------
+// Writer implementation
+
+class JsonWriter::JsonWriterImpl {
+ public:
+  explicit JsonWriterImpl(const std::shared_ptr<Schema>& schema) : 
schema_(schema) {
+    writer_.reset(new RjWriter(string_buffer_));
+  }
+
+  Status Start() {
+    writer_->StartObject();
+
+    writer_->Key("schema");
+    RETURN_NOT_OK(WriteJsonSchema(*schema_.get(), writer_.get()));
+
+    // Record batches
+    writer_->Key("batches");
+    writer_->StartArray();
+    return Status::OK();
+  }
+
+  Status Finish(std::string* result) {
+    writer_->EndArray();  // Record batches
+    writer_->EndObject();
+
+    *result = string_buffer_.GetString();
+    return Status::OK();
+  }
+
+  Status WriteRecordBatch(
+      const std::vector<std::shared_ptr<Array>>& columns, int32_t num_rows) {
+    DCHECK_EQ(static_cast<int>(columns.size()), schema_->num_fields());
+
+    writer_->StartObject();
+    writer_->Key("count");
+    writer_->Int(num_rows);
+
+    writer_->Key("columns");
+    writer_->StartArray();
+
+    for (int i = 0; i < schema_->num_fields(); ++i) {
+      const std::shared_ptr<Array>& column = columns[i];
+
+      DCHECK_EQ(num_rows, column->length())
+          << "Array length did not match record batch length";
+
+      RETURN_NOT_OK(
+          WriteJsonArray(schema_->field(i)->name, *column.get(), 
writer_.get()));
+    }
+
+    writer_->EndArray();
+    writer_->EndObject();
+    return Status::OK();
+  }
+
+ private:
+  std::shared_ptr<Schema> schema_;
+
+  rj::StringBuffer string_buffer_;
+  std::unique_ptr<RjWriter> writer_;
+};
+
+JsonWriter::JsonWriter(const std::shared_ptr<Schema>& schema) {
+  impl_.reset(new JsonWriterImpl(schema));
+}
+
+JsonWriter::~JsonWriter() {}
+
+Status JsonWriter::Open(
+    const std::shared_ptr<Schema>& schema, std::unique_ptr<JsonWriter>* 
writer) {
+  *writer = std::unique_ptr<JsonWriter>(new JsonWriter(schema));
+  return (*writer)->impl_->Start();
+}
+
+Status JsonWriter::Finish(std::string* result) {
+  return impl_->Finish(result);
+}
+
+Status JsonWriter::WriteRecordBatch(
+    const std::vector<std::shared_ptr<Array>>& columns, int32_t num_rows) {
+  return impl_->WriteRecordBatch(columns, num_rows);
+}
+
+// ----------------------------------------------------------------------
+// Reader implementation
+
+class JsonReader::JsonReaderImpl {
+ public:
+  JsonReaderImpl(MemoryPool* pool, const std::shared_ptr<Buffer>& data)
+      : pool_(pool), data_(data) {}
+
+  Status ParseAndReadSchema() {
+    doc_.Parse(reinterpret_cast<const rj::Document::Ch*>(data_->data()),
+        static_cast<size_t>(data_->size()));
+    if (doc_.HasParseError()) { return Status::IOError("JSON parsing failed"); 
}
+
+    auto it = doc_.FindMember("schema");
+    RETURN_NOT_OBJECT("schema", it, doc_);
+    RETURN_NOT_OK(ReadJsonSchema(it->value, &schema_));
+
+    it = doc_.FindMember("batches");
+    RETURN_NOT_ARRAY("batches", it, doc_);
+    record_batches_ = &it->value;
+
+    return Status::OK();
+  }
+
+  Status GetRecordBatch(int i, std::shared_ptr<RecordBatch>* batch) const {
+    DCHECK_GE(i, 0) << "i out of bounds";
+    DCHECK_LT(i, static_cast<int>(record_batches_->GetArray().Size()))
+        << "i out of bounds";
+
+    const auto& batch_val = record_batches_->GetArray()[i];
+    DCHECK(batch_val.IsObject());
+
+    const auto& batch_obj = batch_val.GetObject();
+
+    auto it = batch_obj.FindMember("count");
+    RETURN_NOT_INT("count", it, batch_obj);
+    int32_t num_rows = static_cast<int32_t>(it->value.GetInt());
+
+    it = batch_obj.FindMember("columns");
+    RETURN_NOT_ARRAY("columns", it, batch_obj);
+    const auto& json_columns = it->value.GetArray();
+
+    std::vector<std::shared_ptr<Array>> columns(json_columns.Size());
+    for (size_t i = 0; i < columns.size(); ++i) {
+      const std::shared_ptr<DataType>& type = schema_->field(i)->type;
+      RETURN_NOT_OK(ReadJsonArray(pool_, json_columns[i], type, &columns[i]));
+    }
+
+    *batch = std::make_shared<RecordBatch>(schema_, num_rows, columns);
+    return Status::OK();
+  }
+
+  std::shared_ptr<Schema> schema() const { return schema_; }
+
+  int num_record_batches() const {
+    return static_cast<int>(record_batches_->GetArray().Size());
+  }
+
+ private:
+  MemoryPool* pool_;
+  std::shared_ptr<Buffer> data_;
+  rj::Document doc_;
+
+  const rj::Value* record_batches_;
+
+  std::shared_ptr<Schema> schema_;
+};
+
+JsonReader::JsonReader(MemoryPool* pool, const std::shared_ptr<Buffer>& data) {
+  impl_.reset(new JsonReaderImpl(pool, data));
+}
+
+JsonReader::~JsonReader() {}
+
+Status JsonReader::Open(
+    const std::shared_ptr<Buffer>& data, std::unique_ptr<JsonReader>* reader) {
+  return Open(default_memory_pool(), data, reader);
+}
+
+Status JsonReader::Open(MemoryPool* pool, const std::shared_ptr<Buffer>& data,
+    std::unique_ptr<JsonReader>* reader) {
+  *reader = std::unique_ptr<JsonReader>(new JsonReader(pool, data));
+  return (*reader)->impl_->ParseAndReadSchema();
+}
+
+std::shared_ptr<Schema> JsonReader::schema() const {
+  return impl_->schema();
+}
+
+int JsonReader::num_record_batches() const {
+  return impl_->num_record_batches();
+}
+
+Status JsonReader::GetRecordBatch(int i, std::shared_ptr<RecordBatch>* batch) 
const {
+  return impl_->GetRecordBatch(i, batch);
+}
+
+}  // namespace ipc
+}  // namespace arrow

http://git-wip-us.apache.org/repos/asf/arrow/blob/ed6ec3b7/cpp/src/arrow/ipc/json.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/json.h b/cpp/src/arrow/ipc/json.h
new file mode 100644
index 0000000..7395be4
--- /dev/null
+++ b/cpp/src/arrow/ipc/json.h
@@ -0,0 +1,92 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Implement Arrow JSON serialization format
+
+#ifndef ARROW_IPC_JSON_H
+#define ARROW_IPC_JSON_H
+
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "arrow/type_fwd.h"
+#include "arrow/util/visibility.h"
+
+namespace arrow {
+namespace io {
+
+class OutputStream;
+class ReadableFileInterface;
+
+}  // namespace io
+
+namespace ipc {
+
+class ARROW_EXPORT JsonWriter {
+ public:
+  ~JsonWriter();
+
+  static Status Open(
+      const std::shared_ptr<Schema>& schema, std::unique_ptr<JsonWriter>* out);
+
+  // TODO(wesm): Write dictionaries
+
+  Status WriteRecordBatch(
+      const std::vector<std::shared_ptr<Array>>& columns, int32_t num_rows);
+
+  Status Finish(std::string* result);
+
+ private:
+  explicit JsonWriter(const std::shared_ptr<Schema>& schema);
+
+  // Hide RapidJSON details from public API
+  class JsonWriterImpl;
+  std::unique_ptr<JsonWriterImpl> impl_;
+};
+
+// TODO(wesm): Read from a file stream rather than an in-memory buffer
+class ARROW_EXPORT JsonReader {
+ public:
+  ~JsonReader();
+
+  static Status Open(MemoryPool* pool, const std::shared_ptr<Buffer>& data,
+      std::unique_ptr<JsonReader>* reader);
+
+  // Use the default memory pool
+  static Status Open(
+      const std::shared_ptr<Buffer>& data, std::unique_ptr<JsonReader>* 
reader);
+
+  std::shared_ptr<Schema> schema() const;
+
+  int num_record_batches() const;
+
+  // Read a record batch from the file
+  Status GetRecordBatch(int i, std::shared_ptr<RecordBatch>* batch) const;
+
+ private:
+  JsonReader(MemoryPool* pool, const std::shared_ptr<Buffer>& data);
+
+  // Hide RapidJSON details from public API
+  class JsonReaderImpl;
+  std::unique_ptr<JsonReaderImpl> impl_;
+};
+
+}  // namespace ipc
+}  // namespace arrow
+
+#endif  // ARROW_IPC_JSON_H

http://git-wip-us.apache.org/repos/asf/arrow/blob/ed6ec3b7/cpp/src/arrow/ipc/test-common.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/test-common.h b/cpp/src/arrow/ipc/test-common.h
index 784e238..9abc20d 100644
--- a/cpp/src/arrow/ipc/test-common.h
+++ b/cpp/src/arrow/ipc/test-common.h
@@ -27,6 +27,7 @@
 #include "arrow/array.h"
 #include "arrow/table.h"
 #include "arrow/test-util.h"
+#include "arrow/type.h"
 #include "arrow/types/list.h"
 #include "arrow/types/primitive.h"
 #include "arrow/types/string.h"
@@ -39,15 +40,14 @@ namespace arrow {
 namespace ipc {
 
 const auto kInt32 = std::make_shared<Int32Type>();
-const auto kListInt32 = std::make_shared<ListType>(kInt32);
-const auto kListListInt32 = std::make_shared<ListType>(kListInt32);
+const auto kListInt32 = list(kInt32);
+const auto kListListInt32 = list(kListInt32);
 
 Status MakeRandomInt32Array(
     int32_t length, bool include_nulls, MemoryPool* pool, 
std::shared_ptr<Array>* out) {
   std::shared_ptr<PoolBuffer> data;
   test::MakeRandomInt32PoolBuffer(length, pool, &data);
-  const auto kInt32 = std::make_shared<Int32Type>();
-  Int32Builder builder(pool, kInt32);
+  Int32Builder builder(pool, int32());
   if (include_nulls) {
     std::shared_ptr<PoolBuffer> valid_bytes;
     test::MakeRandomBytePoolBuffer(length, pool, &valid_bytes);
@@ -134,8 +134,8 @@ Status MakeRandomBinaryArray(
 
 Status MakeStringTypesRecordBatch(std::shared_ptr<RecordBatch>* out) {
   const int32_t length = 500;
-  auto string_type = std::make_shared<StringType>();
-  auto binary_type = std::make_shared<BinaryType>();
+  auto string_type = utf8();
+  auto binary_type = binary();
   auto f0 = std::make_shared<Field>("f0", string_type);
   auto f1 = std::make_shared<Field>("f1", binary_type);
   std::shared_ptr<Schema> schema(new Schema({f0, f1}));
@@ -233,7 +233,7 @@ Status MakeDeeplyNestedList(std::shared_ptr<RecordBatch>* 
out) {
   const bool include_nulls = true;
   RETURN_NOT_OK(MakeRandomInt32Array(1000, include_nulls, pool, &array));
   for (int i = 0; i < 63; ++i) {
-    type = 
std::static_pointer_cast<DataType>(std::make_shared<ListType>(type));
+    type = std::static_pointer_cast<DataType>(list(type));
     RETURN_NOT_OK(MakeRandomListArray(array, batch_length, include_nulls, 
pool, &array));
   }
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/ed6ec3b7/cpp/src/arrow/schema-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/schema-test.cc b/cpp/src/arrow/schema-test.cc
index 8cc80be..4826199 100644
--- a/cpp/src/arrow/schema-test.cc
+++ b/cpp/src/arrow/schema-test.cc
@@ -29,23 +29,21 @@ using std::vector;
 
 namespace arrow {
 
-const auto INT32 = std::make_shared<Int32Type>();
-
 TEST(TestField, Basics) {
-  Field f0("f0", INT32);
-  Field f0_nn("f0", INT32, false);
+  Field f0("f0", int32());
+  Field f0_nn("f0", int32(), false);
 
   ASSERT_EQ(f0.name, "f0");
-  ASSERT_EQ(f0.type->ToString(), INT32->ToString());
+  ASSERT_EQ(f0.type->ToString(), int32()->ToString());
 
   ASSERT_TRUE(f0.nullable);
   ASSERT_FALSE(f0_nn.nullable);
 }
 
 TEST(TestField, Equals) {
-  Field f0("f0", INT32);
-  Field f0_nn("f0", INT32, false);
-  Field f0_other("f0", INT32);
+  Field f0("f0", int32());
+  Field f0_nn("f0", int32(), false);
+  Field f0_other("f0", int32());
 
   ASSERT_EQ(f0, f0_other);
   ASSERT_NE(f0, f0_nn);
@@ -57,11 +55,11 @@ class TestSchema : public ::testing::Test {
 };
 
 TEST_F(TestSchema, Basics) {
-  auto f0 = std::make_shared<Field>("f0", INT32);
-  auto f1 = std::make_shared<Field>("f1", std::make_shared<UInt8Type>(), 
false);
-  auto f1_optional = std::make_shared<Field>("f1", 
std::make_shared<UInt8Type>());
+  auto f0 = field("f0", int32());
+  auto f1 = field("f1", uint8(), false);
+  auto f1_optional = field("f1", uint8());
 
-  auto f2 = std::make_shared<Field>("f2", std::make_shared<StringType>());
+  auto f2 = field("f2", utf8());
 
   vector<shared_ptr<Field>> fields = {f0, f1, f2};
   auto schema = std::make_shared<Schema>(fields);
@@ -83,11 +81,10 @@ TEST_F(TestSchema, Basics) {
 }
 
 TEST_F(TestSchema, ToString) {
-  auto f0 = std::make_shared<Field>("f0", INT32);
-  auto f1 = std::make_shared<Field>("f1", std::make_shared<UInt8Type>(), 
false);
-  auto f2 = std::make_shared<Field>("f2", std::make_shared<StringType>());
-  auto f3 = std::make_shared<Field>(
-      "f3", std::make_shared<ListType>(std::make_shared<Int16Type>()));
+  auto f0 = field("f0", int32());
+  auto f1 = field("f1", uint8(), false);
+  auto f2 = field("f2", utf8());
+  auto f3 = field("f3", list(int16()));
 
   vector<shared_ptr<Field>> fields = {f0, f1, f2, f3};
   auto schema = std::make_shared<Schema>(fields);
@@ -101,4 +98,25 @@ f3: list<item: int16>)";
   ASSERT_EQ(expected, result);
 }
 
+TEST_F(TestSchema, GetFieldByName) {
+  auto f0 = field("f0", int32());
+  auto f1 = field("f1", uint8(), false);
+  auto f2 = field("f2", utf8());
+  auto f3 = field("f3", list(int16()));
+
+  vector<shared_ptr<Field>> fields = {f0, f1, f2, f3};
+  auto schema = std::make_shared<Schema>(fields);
+
+  std::shared_ptr<Field> result;
+
+  result = schema->GetFieldByName("f1");
+  ASSERT_TRUE(f1->Equals(result));
+
+  result = schema->GetFieldByName("f3");
+  ASSERT_TRUE(f3->Equals(result));
+
+  result = schema->GetFieldByName("not-found");
+  ASSERT_TRUE(result == nullptr);
+}
+
 }  // namespace arrow

http://git-wip-us.apache.org/repos/asf/arrow/blob/ed6ec3b7/cpp/src/arrow/schema.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/schema.cc b/cpp/src/arrow/schema.cc
index ff3ea19..cd8256e 100644
--- a/cpp/src/arrow/schema.cc
+++ b/cpp/src/arrow/schema.cc
@@ -42,6 +42,21 @@ bool Schema::Equals(const std::shared_ptr<Schema>& other) 
const {
   return Equals(*other.get());
 }
 
+std::shared_ptr<Field> Schema::GetFieldByName(const std::string& name) {
+  if (fields_.size() > 0 && name_to_index_.size() == 0) {
+    for (size_t i = 0; i < fields_.size(); ++i) {
+      name_to_index_[fields_[i]->name] = i;
+    }
+  }
+
+  auto it = name_to_index_.find(name);
+  if (it == name_to_index_.end()) {
+    return nullptr;
+  } else {
+    return fields_[it->second];
+  }
+}
+
 std::string Schema::ToString() const {
   std::stringstream buffer;
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/ed6ec3b7/cpp/src/arrow/schema.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/schema.h b/cpp/src/arrow/schema.h
index 4301968..0e1ab5c 100644
--- a/cpp/src/arrow/schema.h
+++ b/cpp/src/arrow/schema.h
@@ -20,14 +20,14 @@
 
 #include <memory>
 #include <string>
+#include <unordered_map>
 #include <vector>
 
+#include "arrow/type.h"
 #include "arrow/util/visibility.h"
 
 namespace arrow {
 
-struct Field;
-
 class ARROW_EXPORT Schema {
  public:
   explicit Schema(const std::vector<std::shared_ptr<Field>>& fields);
@@ -37,7 +37,12 @@ class ARROW_EXPORT Schema {
   bool Equals(const std::shared_ptr<Schema>& other) const;
 
   // Return the ith schema element. Does not boundscheck
-  const std::shared_ptr<Field>& field(int i) const { return fields_[i]; }
+  std::shared_ptr<Field> field(int i) const { return fields_[i]; }
+
+  // Returns nullptr if name not found
+  std::shared_ptr<Field> GetFieldByName(const std::string& name);
+
+  const std::vector<std::shared_ptr<Field>>& fields() const { return fields_; }
 
   // Render a string representation of the schema suitable for debugging
   std::string ToString() const;
@@ -46,6 +51,7 @@ class ARROW_EXPORT Schema {
 
  private:
   std::vector<std::shared_ptr<Field>> fields_;
+  std::unordered_map<std::string, int> name_to_index_;
 };
 
 }  // namespace arrow

http://git-wip-us.apache.org/repos/asf/arrow/blob/ed6ec3b7/cpp/src/arrow/test-util.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/test-util.h b/cpp/src/arrow/test-util.h
index ac56f5e..ab4b980 100644
--- a/cpp/src/arrow/test-util.h
+++ b/cpp/src/arrow/test-util.h
@@ -27,6 +27,7 @@
 
 #include "gtest/gtest.h"
 
+#include "arrow/array.h"
 #include "arrow/column.h"
 #include "arrow/schema.h"
 #include "arrow/table.h"
@@ -102,20 +103,57 @@ void random_real(int n, uint32_t seed, T min_value, T 
max_value, std::vector<T>*
 }
 
 template <typename T>
-std::shared_ptr<Buffer> to_buffer(const std::vector<T>& values) {
+std::shared_ptr<Buffer> GetBufferFromVector(const std::vector<T>& values) {
   return std::make_shared<Buffer>(
       reinterpret_cast<const uint8_t*>(values.data()), values.size() * 
sizeof(T));
 }
 
+template <typename T>
+inline Status CopyBufferFromVector(
+    const std::vector<T>& values, std::shared_ptr<Buffer>* result) {
+  int64_t nbytes = static_cast<int>(values.size()) * sizeof(T);
+
+  auto buffer = std::make_shared<PoolBuffer>(default_memory_pool());
+  RETURN_NOT_OK(buffer->Resize(nbytes));
+  memcpy(buffer->mutable_data(), values.data(), nbytes);
+
+  *result = buffer;
+  return Status::OK();
+}
+
+static inline Status GetBitmapFromBoolVector(
+    const std::vector<bool>& is_valid, std::shared_ptr<Buffer>* result) {
+  int length = static_cast<int>(is_valid.size());
+
+  std::shared_ptr<MutableBuffer> buffer;
+  RETURN_NOT_OK(GetEmptyBitmap(default_memory_pool(), length, &buffer));
+
+  uint8_t* bitmap = buffer->mutable_data();
+  for (int i = 0; i < length; ++i) {
+    if (is_valid[i]) { BitUtil::SetBit(bitmap, i); }
+  }
+
+  *result = buffer;
+  return Status::OK();
+}
+
 // Sets approximately pct_null of the first n bytes in null_bytes to zero
 // and the rest to non-zero (true) values.
-void random_null_bytes(int64_t n, double pct_null, uint8_t* null_bytes) {
+static inline void random_null_bytes(int64_t n, double pct_null, uint8_t* 
null_bytes) {
   Random rng(random_seed());
   for (int i = 0; i < n; ++i) {
     null_bytes[i] = rng.NextDoubleFraction() > pct_null;
   }
 }
 
+static inline void random_is_valid(
+    int64_t n, double pct_null, std::vector<bool>* is_valid) {
+  Random rng(random_seed());
+  for (int i = 0; i < n; ++i) {
+    is_valid->push_back(rng.NextDoubleFraction() > pct_null);
+  }
+}
+
 static inline void random_bytes(int n, uint32_t seed, uint8_t* out) {
   std::mt19937 gen(seed);
   std::uniform_int_distribution<int> d(0, 255);
@@ -125,6 +163,15 @@ static inline void random_bytes(int n, uint32_t seed, 
uint8_t* out) {
   }
 }
 
+static inline void random_ascii(int n, uint32_t seed, uint8_t* out) {
+  std::mt19937 gen(seed);
+  std::uniform_int_distribution<int> d(65, 122);
+
+  for (int i = 0; i < n; ++i) {
+    out[i] = d(gen) & 0xFF;
+  }
+}
+
 template <typename T>
 void rand_uniform_int(int n, uint32_t seed, T min_value, T max_value, T* out) {
   DCHECK(out);

Reply via email to