PARQUET-712: Add library to read into Arrow memory

At the moment this is just move of the existing code into the state where it 
compiles. Outstanding work includes:

- [x] Understand the issues with ParquetException typeid matching in the tests 
*on macOS*. @wesm We already had this problem and you fixed it somewhere. Do 
you remember the solution?
- [x] Understand why BoolenType tests break with a Thrift exception
- [ ] Add functions that read directly into Arrow memory and not intermediate 
structures.

Author: Uwe L. Korn <uw...@xhochy.com>
Author: Korn, Uwe <uwe.k...@blue-yonder.com>

Closes #158 from xhochy/PARQUET-712 and squashes the following commits:

e55ab1f [Uwe L. Korn] verbose ctest output
62f0f88 [Uwe L. Korn] Add static linkage
fc2c316 [Uwe L. Korn] Style fixes
3f3e24b [Uwe L. Korn] Fix templating problem
45de044 [Uwe L. Korn] Style fixes for IO
1d39a60 [Uwe L. Korn] Import MemoryPool instead of declaring it
251262a [Uwe L. Korn] Style fixes
e0e1518 [Uwe L. Korn] Build parquet_arrow in Travis
874b33d [Korn, Uwe] Add boost libraries for Arrow
142a364 [Korn, Uwe] PARQUET-712: Add library to read into Arrow memory


Project: http://git-wip-us.apache.org/repos/asf/parquet-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-cpp/commit/4a7bf117
Tree: http://git-wip-us.apache.org/repos/asf/parquet-cpp/tree/4a7bf117
Diff: http://git-wip-us.apache.org/repos/asf/parquet-cpp/diff/4a7bf117

Branch: refs/heads/master
Commit: 4a7bf1174419db6b2e4fc1847d866c4763b7be44
Parents: 8ef68b1
Author: Uwe L. Korn <uw...@xhochy.com>
Authored: Sun Sep 18 14:01:41 2016 -0400
Committer: Wes McKinney <w...@apache.org>
Committed: Sun Sep 18 14:01:41 2016 -0400

----------------------------------------------------------------------
 .travis.yml                                   |   6 +-
 CMakeLists.txt                                |  35 ++
 ci/travis_script_cpp.sh                       |   4 +-
 cmake_modules/FindArrow.cmake                 |  87 ++++
 conda.recipe/build.sh                         |   1 +
 src/parquet/arrow/CMakeLists.txt              |  90 ++++
 src/parquet/arrow/arrow-io-test.cc            | 189 ++++++++
 src/parquet/arrow/arrow-reader-writer-test.cc | 502 +++++++++++++++++++++
 src/parquet/arrow/arrow-schema-test.cc        | 265 +++++++++++
 src/parquet/arrow/io.cc                       | 107 +++++
 src/parquet/arrow/io.h                        |  82 ++++
 src/parquet/arrow/reader.cc                   | 406 +++++++++++++++++
 src/parquet/arrow/reader.h                    | 148 ++++++
 src/parquet/arrow/schema.cc                   | 351 ++++++++++++++
 src/parquet/arrow/schema.h                    |  56 +++
 src/parquet/arrow/test-util.h                 | 202 +++++++++
 src/parquet/arrow/utils.h                     |  54 +++
 src/parquet/arrow/writer.cc                   | 373 +++++++++++++++
 src/parquet/arrow/writer.h                    |  78 ++++
 thirdparty/build_thirdparty.sh                |  10 +
 thirdparty/download_thirdparty.sh             |   5 +
 thirdparty/set_thirdparty_env.sh              |   1 +
 thirdparty/versions.sh                        |   4 +
 23 files changed, 3052 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/4a7bf117/.travis.yml
----------------------------------------------------------------------
diff --git a/.travis.yml b/.travis.yml
index 780d9f9..6dc994e 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -9,6 +9,8 @@ addons:
     - g++-4.9
     - valgrind
     - libboost-dev
+    - libboost-filesystem-dev
+    - libboost-system-dev
     - libboost-program-options-dev
     - libboost-test-dev
     - libssl-dev
@@ -26,7 +28,7 @@ matrix:
     before_script:
     - source $TRAVIS_BUILD_DIR/ci/before_script_travis.sh
     - cmake -DCMAKE_CXX_FLAGS="-Werror" -DPARQUET_TEST_MEMCHECK=ON 
-DPARQUET_BUILD_BENCHMARKS=ON
-      -DPARQUET_GENERATE_COVERAGE=1 $TRAVIS_BUILD_DIR
+      -DPARQUET_ARROW=ON -DPARQUET_GENERATE_COVERAGE=1 $TRAVIS_BUILD_DIR
     - export PARQUET_TEST_DATA=$TRAVIS_BUILD_DIR/data
   - compiler: clang
     os: linux
@@ -76,7 +78,7 @@ before_install:
 
 before_script:
 - source $TRAVIS_BUILD_DIR/ci/before_script_travis.sh
-- cmake -DCMAKE_CXX_FLAGS="-Werror" $TRAVIS_BUILD_DIR
+- cmake -DCMAKE_CXX_FLAGS="-Werror" -DPARQUET_ARROW=ON $TRAVIS_BUILD_DIR
 - export PARQUET_TEST_DATA=$TRAVIS_BUILD_DIR/data
 
 script:

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/4a7bf117/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 3878056..42b10ee 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -82,6 +82,9 @@ if ("${CMAKE_SOURCE_DIR}" STREQUAL 
"${CMAKE_CURRENT_SOURCE_DIR}")
   option(PARQUET_BUILD_EXECUTABLES
     "Build the libparquet executable CLI tools"
     ON)
+  option(PARQUET_ARROW
+    "Build the Arrow support"
+    OFF)
 endif()
 
 # If build in-source, create the latest symlink. If build out-of-source, which 
is
@@ -247,6 +250,16 @@ function(ADD_PARQUET_TEST_DEPENDENCIES REL_TEST_NAME)
   add_dependencies(${TEST_NAME} ${ARGN})
 endfunction()
 
+# A wrapper for add_dependencies() that is compatible with PARQUET_BUILD_TESTS.
+function(ADD_PARQUET_LINK_LIBRARIES REL_TEST_NAME)
+  if(NOT PARQUET_BUILD_TESTS)
+    return()
+  endif()
+  get_filename_component(TEST_NAME ${REL_TEST_NAME} NAME_WE)
+
+  target_link_libraries(${TEST_NAME} ${ARGN})
+endfunction()
+
 enable_testing()
 
 ############################################################
@@ -553,6 +566,12 @@ if (PARQUET_BUILD_SHARED)
     target_link_libraries(parquet_shared
       LINK_PUBLIC ${LIBPARQUET_LINK_LIBS}
       LINK_PRIVATE ${LIBPARQUET_PRIVATE_LINK_LIBS})
+    if (APPLE)
+      set_target_properties(parquet_shared
+        PROPERTIES
+        BUILD_WITH_INSTALL_RPATH ON
+        INSTALL_NAME_DIR "@rpath")
+    endif()
 endif()
 
 if (PARQUET_BUILD_STATIC)
@@ -583,6 +602,22 @@ add_dependencies(parquet_objlib parquet_thrift)
 add_subdirectory(benchmarks)
 add_subdirectory(tools)
 
+# Arrow
+if (PARQUET_ARROW)
+  find_package(Arrow REQUIRED)
+  include_directories(SYSTEM ${ARROW_INCLUDE_DIR})
+  add_library(arrow SHARED IMPORTED)
+  set_target_properties(arrow PROPERTIES IMPORTED_LOCATION ${ARROW_SHARED_LIB})
+  add_library(arrow_io SHARED IMPORTED)
+  set_target_properties(arrow_io PROPERTIES IMPORTED_LOCATION 
${ARROW_IO_SHARED_LIB})
+  add_library(arrow_static STATIC IMPORTED)
+  set_target_properties(arrow_static PROPERTIES IMPORTED_LOCATION 
${ARROW_STATIC_LIB})
+  add_library(arrow_io_static STATIC IMPORTED)
+  set_target_properties(arrow_io_static PROPERTIES IMPORTED_LOCATION 
${ARROW_IO_STATIC_LIB})
+
+  add_subdirectory(src/parquet/arrow)
+endif()
+
 add_custom_target(clean-all
    COMMAND ${CMAKE_BUILD_TOOL} clean
    COMMAND ${CMAKE_COMMAND} -P cmake_modules/clean-all.cmake

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/4a7bf117/ci/travis_script_cpp.sh
----------------------------------------------------------------------
diff --git a/ci/travis_script_cpp.sh b/ci/travis_script_cpp.sh
index 2e7dcdd..8794559 100755
--- a/ci/travis_script_cpp.sh
+++ b/ci/travis_script_cpp.sh
@@ -16,13 +16,13 @@ make lint
 
 if [ $TRAVIS_OS_NAME == "linux" ]; then
   make -j4 || exit 1
-  ctest -L unittest || { cat 
$TRAVIS_BUILD_DIR/parquet-build/Testing/Temporary/LastTest.log; exit 1; }
+  ctest -VV -L unittest || { cat 
$TRAVIS_BUILD_DIR/parquet-build/Testing/Temporary/LastTest.log; exit 1; }
   sudo pip install cpp_coveralls
   export PARQUET_ROOT=$TRAVIS_BUILD_DIR
   $TRAVIS_BUILD_DIR/ci/upload_coverage.sh
 else
   make -j4 || exit 1
-  ctest -L unittest || { cat 
$TRAVIS_BUILD_DIR/parquet-build/Testing/Temporary/LastTest.log; exit 1; }
+  ctest -VV -L unittest || { cat 
$TRAVIS_BUILD_DIR/parquet-build/Testing/Temporary/LastTest.log; exit 1; }
 fi
 
 popd

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/4a7bf117/cmake_modules/FindArrow.cmake
----------------------------------------------------------------------
diff --git a/cmake_modules/FindArrow.cmake b/cmake_modules/FindArrow.cmake
new file mode 100644
index 0000000..91d0e71
--- /dev/null
+++ b/cmake_modules/FindArrow.cmake
@@ -0,0 +1,87 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# - Find ARROW (arrow/api.h, libarrow.a, libarrow.so)
+# This module defines
+#  ARROW_INCLUDE_DIR, directory containing headers
+#  ARROW_LIBS, directory containing arrow libraries
+#  ARROW_STATIC_LIB, path to libarrow.a
+#  ARROW_SHARED_LIB, path to libarrow's shared library
+#  ARROW_FOUND, whether arrow has been found
+
+set(ARROW_SEARCH_HEADER_PATHS
+  $ENV{ARROW_HOME}/include
+)
+
+set(ARROW_SEARCH_LIB_PATH
+  $ENV{ARROW_HOME}/lib
+)
+
+find_path(ARROW_INCLUDE_DIR arrow/array.h PATHS
+  ${ARROW_SEARCH_HEADER_PATHS}
+  # make sure we don't accidentally pick up a different version
+  NO_DEFAULT_PATH
+)
+
+find_library(ARROW_LIB_PATH NAMES arrow
+  PATHS
+  ${ARROW_SEARCH_LIB_PATH}
+  NO_DEFAULT_PATH)
+
+find_library(ARROW_IO_LIB_PATH NAMES arrow_io
+  PATHS
+  ${ARROW_SEARCH_LIB_PATH}
+  NO_DEFAULT_PATH)
+
+if (ARROW_INCLUDE_DIR AND ARROW_LIB_PATH)
+  set(ARROW_FOUND TRUE)
+  set(ARROW_LIB_NAME libarrow)
+  set(ARROW_IO_LIB_NAME libarrow_io)
+
+  set(ARROW_LIBS ${ARROW_SEARCH_LIB_PATH})
+  set(ARROW_STATIC_LIB ${ARROW_SEARCH_LIB_PATH}/${ARROW_LIB_NAME}.a)
+  set(ARROW_SHARED_LIB 
${ARROW_LIBS}/${ARROW_LIB_NAME}${CMAKE_SHARED_LIBRARY_SUFFIX})
+
+  set(ARROW_IO_STATIC_LIB ${ARROW_SEARCH_LIB_PATH}/${ARROW_IO_LIB_NAME}.a)
+  set(ARROW_IO_SHARED_LIB 
${ARROW_LIBS}/${ARROW_IO_LIB_NAME}${CMAKE_SHARED_LIBRARY_SUFFIX})
+  if (NOT Arrow_FIND_QUIETLY)
+    message(STATUS "Found the Arrow core library: ${ARROW_LIB_PATH}")
+    message(STATUS "Found the Arrow IO library: ${ARROW_IO_LIB_PATH}")
+  endif ()
+else ()
+  if (NOT Arrow_FIND_QUIETLY)
+    set(ARROW_ERR_MSG "Could not find the Arrow library. Looked for headers")
+    set(ARROW_ERR_MSG "${ARROW_ERR_MSG} in ${ARROW_SEARCH_HEADER_PATHS}, and 
for libs")
+    set(ARROW_ERR_MSG "${ARROW_ERR_MSG} in ${ARROW_SEARCH_LIB_PATH}")
+    if (Arrow_FIND_REQUIRED)
+      message(FATAL_ERROR "${ARROW_ERR_MSG}")
+    else (Arrow_FIND_REQUIRED)
+      message(STATUS "${ARROW_ERR_MSG}")
+    endif (Arrow_FIND_REQUIRED)
+  endif ()
+  set(ARROW_FOUND FALSE)
+endif ()
+
+mark_as_advanced(
+  ARROW_FOUND
+  ARROW_INCLUDE_DIR
+  ARROW_LIBS
+  ARROW_STATIC_LIB
+  ARROW_SHARED_LIB
+  ARROW_IO_STATIC_LIB
+  ARROW_IO_SHARED_LIB
+)

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/4a7bf117/conda.recipe/build.sh
----------------------------------------------------------------------
diff --git a/conda.recipe/build.sh b/conda.recipe/build.sh
index 7fe8f91..f77475c 100644
--- a/conda.recipe/build.sh
+++ b/conda.recipe/build.sh
@@ -53,6 +53,7 @@ cmake \
     -DCMAKE_BUILD_TYPE=debug \
     -DCMAKE_INSTALL_PREFIX=$PREFIX \
     -DPARQUET_BUILD_BENCHMARKS=off \
+    -DPARQUET_ARROW=ON \
     ..
 
 make

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/4a7bf117/src/parquet/arrow/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/CMakeLists.txt b/src/parquet/arrow/CMakeLists.txt
new file mode 100644
index 0000000..5d923a7
--- /dev/null
+++ b/src/parquet/arrow/CMakeLists.txt
@@ -0,0 +1,90 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# ----------------------------------------------------------------------
+# parquet_arrow : Arrow <-> Parquet adapter
+
+set(PARQUET_ARROW_SRCS
+  io.cc
+  reader.cc
+  schema.cc
+  writer.cc
+)
+
+add_library(parquet_arrow_objlib OBJECT
+  ${PARQUET_ARROW_SRCS}
+)
+
+# SET_TARGET_PROPERTIES(parquet_arrow PROPERTIES LINKER_LANGUAGE CXX)
+
+if (PARQUET_BUILD_SHARED)
+    add_library(parquet_arrow_shared SHARED 
$<TARGET_OBJECTS:parquet_arrow_objlib>)
+    set_target_properties(parquet_arrow_shared
+      PROPERTIES
+      LIBRARY_OUTPUT_DIRECTORY "${BUILD_OUTPUT_ROOT_DIRECTORY}"
+      LINK_FLAGS "${SHARED_LINK_FLAGS}"
+      OUTPUT_NAME "parquet_arrow")
+    target_link_libraries(parquet_arrow_shared
+      arrow
+      arrow_io
+      parquet_shared)
+    if (APPLE)
+      set_target_properties(parquet_arrow_shared
+        PROPERTIES
+        BUILD_WITH_INSTALL_RPATH ON
+        INSTALL_NAME_DIR "@rpath")
+    endif()
+endif()
+
+if (PARQUET_BUILD_STATIC)
+    add_library(parquet_arrow_static STATIC 
$<TARGET_OBJECTS:parquet_arrow_objlib>)
+    set_target_properties(parquet_arrow_static
+      PROPERTIES
+      LIBRARY_OUTPUT_DIRECTORY "${BUILD_OUTPUT_ROOT_DIRECTORY}"
+      OUTPUT_NAME "parquet_arrow")
+  target_link_libraries(parquet_arrow_static
+      arrow_static
+      arrow_static
+      parquet_static)
+  install(TARGETS parquet_arrow_static
+      ARCHIVE DESTINATION lib
+      LIBRARY DESTINATION lib)
+endif()
+
+ADD_PARQUET_TEST(arrow-schema-test)
+ADD_PARQUET_TEST(arrow-io-test)
+ADD_PARQUET_TEST(arrow-reader-writer-test)
+
+if (PARQUET_BUILD_STATIC)
+  ADD_PARQUET_LINK_LIBRARIES(arrow-schema-test parquet_arrow_static)
+  ADD_PARQUET_LINK_LIBRARIES(arrow-io-test parquet_arrow_static)
+  ADD_PARQUET_LINK_LIBRARIES(arrow-reader-writer-test parquet_arrow_static)
+else()
+  ADD_PARQUET_LINK_LIBRARIES(arrow-schema-test parquet_arrow_shared)
+  ADD_PARQUET_LINK_LIBRARIES(arrow-io-test parquet_arrow_shared)
+  ADD_PARQUET_LINK_LIBRARIES(arrow-reader-writer-test parquet_arrow_shared)
+endif()
+
+# Headers: top level
+install(FILES
+  io.h
+  reader.h
+  schema.h
+  utils.h
+  writer.h
+  DESTINATION include/parquet/arrow)
+

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/4a7bf117/src/parquet/arrow/arrow-io-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/arrow-io-test.cc 
b/src/parquet/arrow/arrow-io-test.cc
new file mode 100644
index 0000000..377fb97
--- /dev/null
+++ b/src/parquet/arrow/arrow-io-test.cc
@@ -0,0 +1,189 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <cstdint>
+#include <cstdlib>
+#include <memory>
+#include <string>
+
+#include "gtest/gtest.h"
+
+#include "arrow/test-util.h"
+#include "arrow/util/memory-pool.h"
+#include "arrow/util/status.h"
+
+#include "parquet/api/io.h"
+#include "parquet/arrow/io.h"
+
+using arrow::default_memory_pool;
+using arrow::MemoryPool;
+using arrow::Status;
+
+// To assist with readability
+using ArrowROFile = arrow::io::RandomAccessFile;
+
+namespace parquet {
+namespace arrow {
+
+// Allocator tests
+
+TEST(TestParquetAllocator, DefaultCtor) {
+  ParquetAllocator allocator;
+
+  const int buffer_size = 10;
+
+  uint8_t* buffer = nullptr;
+  ASSERT_NO_THROW(buffer = allocator.Malloc(buffer_size););
+
+  // valgrind will complain if we write into nullptr
+  memset(buffer, 0, buffer_size);
+
+  allocator.Free(buffer, buffer_size);
+}
+
+// Pass through to the default memory pool
+class TrackingPool : public MemoryPool {
+ public:
+  TrackingPool() : pool_(default_memory_pool()), bytes_allocated_(0) {}
+
+  Status Allocate(int64_t size, uint8_t** out) override {
+    RETURN_NOT_OK(pool_->Allocate(size, out));
+    bytes_allocated_ += size;
+    return Status::OK();
+  }
+
+  void Free(uint8_t* buffer, int64_t size) override {
+    pool_->Free(buffer, size);
+    bytes_allocated_ -= size;
+  }
+
+  int64_t bytes_allocated() const override { return bytes_allocated_; }
+
+ private:
+  MemoryPool* pool_;
+  int64_t bytes_allocated_;
+};
+
+TEST(TestParquetAllocator, CustomPool) {
+  TrackingPool pool;
+
+  ParquetAllocator allocator(&pool);
+
+  ASSERT_EQ(&pool, allocator.pool());
+
+  const int buffer_size = 10;
+
+  uint8_t* buffer = nullptr;
+  ASSERT_NO_THROW(buffer = allocator.Malloc(buffer_size););
+
+  ASSERT_EQ(buffer_size, pool.bytes_allocated());
+
+  // valgrind will complain if we write into nullptr
+  memset(buffer, 0, buffer_size);
+
+  allocator.Free(buffer, buffer_size);
+
+  ASSERT_EQ(0, pool.bytes_allocated());
+}
+
+// ----------------------------------------------------------------------
+// Read source tests
+
+class BufferReader : public ArrowROFile {
+ public:
+  BufferReader(const uint8_t* buffer, int buffer_size)
+      : buffer_(buffer), buffer_size_(buffer_size), position_(0) {}
+
+  Status Close() override {
+    // no-op
+    return Status::OK();
+  }
+
+  Status Tell(int64_t* position) override {
+    *position = position_;
+    return Status::OK();
+  }
+
+  Status ReadAt(
+      int64_t position, int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) 
override {
+    RETURN_NOT_OK(Seek(position));
+    return Read(nbytes, bytes_read, buffer);
+  }
+
+  Status Read(int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) override {
+    memcpy(buffer, buffer_ + position_, nbytes);
+    *bytes_read = std::min(nbytes, buffer_size_ - position_);
+    position_ += *bytes_read;
+    return Status::OK();
+  }
+
+  Status GetSize(int64_t* size) override {
+    *size = buffer_size_;
+    return Status::OK();
+  }
+
+  Status Seek(int64_t position) override {
+    if (position < 0 || position >= buffer_size_) {
+      return Status::IOError("position out of bounds");
+    }
+
+    position_ = position;
+    return Status::OK();
+  }
+
+ private:
+  const uint8_t* buffer_;
+  int buffer_size_;
+  int64_t position_;
+};
+
+TEST(TestParquetReadSource, Basics) {
+  std::string data = "this is the data";
+  auto data_buffer = reinterpret_cast<const uint8_t*>(data.c_str());
+
+  ParquetAllocator allocator(default_memory_pool());
+
+  auto file = std::make_shared<BufferReader>(data_buffer, data.size());
+  auto source = std::make_shared<ParquetReadSource>(&allocator);
+
+  ASSERT_OK(source->Open(file));
+
+  ASSERT_EQ(0, source->Tell());
+  ASSERT_NO_THROW(source->Seek(5));
+  ASSERT_EQ(5, source->Tell());
+  ASSERT_NO_THROW(source->Seek(0));
+
+  // Seek out of bounds
+  ASSERT_THROW(source->Seek(100), ParquetException);
+
+  uint8_t buffer[50];
+
+  ASSERT_NO_THROW(source->Read(4, buffer));
+  ASSERT_EQ(0, std::memcmp(buffer, "this", 4));
+  ASSERT_EQ(4, source->Tell());
+
+  std::shared_ptr<Buffer> pq_buffer;
+
+  ASSERT_NO_THROW(pq_buffer = source->Read(7));
+
+  auto expected_buffer = std::make_shared<Buffer>(data_buffer + 4, 7);
+
+  ASSERT_TRUE(expected_buffer->Equals(*pq_buffer.get()));
+}
+
+}  // namespace arrow
+}  // namespace parquet

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/4a7bf117/src/parquet/arrow/arrow-reader-writer-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/arrow-reader-writer-test.cc 
b/src/parquet/arrow/arrow-reader-writer-test.cc
new file mode 100644
index 0000000..e4e9efa
--- /dev/null
+++ b/src/parquet/arrow/arrow-reader-writer-test.cc
@@ -0,0 +1,502 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "gtest/gtest.h"
+
+#include "parquet/api/reader.h"
+#include "parquet/api/writer.h"
+
+#include "parquet/arrow/test-util.h"
+#include "parquet/arrow/reader.h"
+#include "parquet/arrow/writer.h"
+
+#include "arrow/test-util.h"
+#include "arrow/types/construct.h"
+#include "arrow/types/primitive.h"
+#include "arrow/types/string.h"
+#include "arrow/util/memory-pool.h"
+#include "arrow/util/status.h"
+
+using arrow::Array;
+using arrow::ChunkedArray;
+using arrow::default_memory_pool;
+using arrow::PoolBuffer;
+using arrow::PrimitiveArray;
+using arrow::Status;
+using arrow::Table;
+
+using ParquetBuffer = parquet::Buffer;
+using ParquetType = parquet::Type;
+using parquet::schema::GroupNode;
+using parquet::schema::NodePtr;
+using parquet::schema::PrimitiveNode;
+
+namespace parquet {
+
+namespace arrow {
+
+const int SMALL_SIZE = 100;
+const int LARGE_SIZE = 10000;
+
+template <typename TestType>
+struct test_traits {};
+
+template <>
+struct test_traits<::arrow::BooleanType> {
+  static constexpr ParquetType::type parquet_enum = ParquetType::BOOLEAN;
+  static constexpr LogicalType::type logical_enum = LogicalType::NONE;
+  static uint8_t const value;
+};
+
+const uint8_t test_traits<::arrow::BooleanType>::value(1);
+
+template <>
+struct test_traits<::arrow::UInt8Type> {
+  static constexpr ParquetType::type parquet_enum = ParquetType::INT32;
+  static constexpr LogicalType::type logical_enum = LogicalType::UINT_8;
+  static uint8_t const value;
+};
+
+const uint8_t test_traits<::arrow::UInt8Type>::value(64);
+
+template <>
+struct test_traits<::arrow::Int8Type> {
+  static constexpr ParquetType::type parquet_enum = ParquetType::INT32;
+  static constexpr LogicalType::type logical_enum = LogicalType::INT_8;
+  static int8_t const value;
+};
+
+const int8_t test_traits<::arrow::Int8Type>::value(-64);
+
+template <>
+struct test_traits<::arrow::UInt16Type> {
+  static constexpr ParquetType::type parquet_enum = ParquetType::INT32;
+  static constexpr LogicalType::type logical_enum = LogicalType::UINT_16;
+  static uint16_t const value;
+};
+
+const uint16_t test_traits<::arrow::UInt16Type>::value(1024);
+
+template <>
+struct test_traits<::arrow::Int16Type> {
+  static constexpr ParquetType::type parquet_enum = ParquetType::INT32;
+  static constexpr LogicalType::type logical_enum = LogicalType::INT_16;
+  static int16_t const value;
+};
+
+const int16_t test_traits<::arrow::Int16Type>::value(-1024);
+
+template <>
+struct test_traits<::arrow::UInt32Type> {
+  static constexpr ParquetType::type parquet_enum = ParquetType::INT32;
+  static constexpr LogicalType::type logical_enum = LogicalType::UINT_32;
+  static uint32_t const value;
+};
+
+const uint32_t test_traits<::arrow::UInt32Type>::value(1024);
+
+template <>
+struct test_traits<::arrow::Int32Type> {
+  static constexpr ParquetType::type parquet_enum = ParquetType::INT32;
+  static constexpr LogicalType::type logical_enum = LogicalType::NONE;
+  static int32_t const value;
+};
+
+const int32_t test_traits<::arrow::Int32Type>::value(-1024);
+
+template <>
+struct test_traits<::arrow::UInt64Type> {
+  static constexpr ParquetType::type parquet_enum = ParquetType::INT64;
+  static constexpr LogicalType::type logical_enum = LogicalType::UINT_64;
+  static uint64_t const value;
+};
+
+const uint64_t test_traits<::arrow::UInt64Type>::value(1024);
+
+template <>
+struct test_traits<::arrow::Int64Type> {
+  static constexpr ParquetType::type parquet_enum = ParquetType::INT64;
+  static constexpr LogicalType::type logical_enum = LogicalType::NONE;
+  static int64_t const value;
+};
+
+const int64_t test_traits<::arrow::Int64Type>::value(-1024);
+
+template <>
+struct test_traits<::arrow::TimestampType> {
+  static constexpr ParquetType::type parquet_enum = ParquetType::INT64;
+  static constexpr LogicalType::type logical_enum = 
LogicalType::TIMESTAMP_MILLIS;
+  static int64_t const value;
+};
+
+const int64_t test_traits<::arrow::TimestampType>::value(14695634030000);
+
+template <>
+struct test_traits<::arrow::FloatType> {
+  static constexpr ParquetType::type parquet_enum = ParquetType::FLOAT;
+  static constexpr LogicalType::type logical_enum = LogicalType::NONE;
+  static float const value;
+};
+
+const float test_traits<::arrow::FloatType>::value(2.1f);
+
+template <>
+struct test_traits<::arrow::DoubleType> {
+  static constexpr ParquetType::type parquet_enum = ParquetType::DOUBLE;
+  static constexpr LogicalType::type logical_enum = LogicalType::NONE;
+  static double const value;
+};
+
+const double test_traits<::arrow::DoubleType>::value(4.2);
+
+template <>
+struct test_traits<::arrow::StringType> {
+  static constexpr ParquetType::type parquet_enum = ParquetType::BYTE_ARRAY;
+  static constexpr LogicalType::type logical_enum = LogicalType::UTF8;
+  static std::string const value;
+};
+
+const std::string test_traits<::arrow::StringType>::value("Test");
+
+template <typename T>
+using ParquetDataType = DataType<test_traits<T>::parquet_enum>;
+
+template <typename T>
+using ParquetWriter = TypedColumnWriter<ParquetDataType<T>>;
+
+template <typename TestType>
+class TestParquetIO : public ::testing::Test {
+ public:
+  virtual void SetUp() {}
+
+  std::shared_ptr<GroupNode> MakeSchema(Repetition::type repetition) {
+    auto pnode = PrimitiveNode::Make("column1", repetition,
+        test_traits<TestType>::parquet_enum, 
test_traits<TestType>::logical_enum);
+    NodePtr node_ =
+        GroupNode::Make("schema", Repetition::REQUIRED, 
std::vector<NodePtr>({pnode}));
+    return std::static_pointer_cast<GroupNode>(node_);
+  }
+
+  std::unique_ptr<ParquetFileWriter> MakeWriter(
+      const std::shared_ptr<GroupNode>& schema) {
+    sink_ = std::make_shared<InMemoryOutputStream>();
+    return ParquetFileWriter::Open(sink_, schema);
+  }
+
+  std::unique_ptr<ParquetFileReader> ReaderFromSink() {
+    std::shared_ptr<ParquetBuffer> buffer = sink_->GetBuffer();
+    std::unique_ptr<RandomAccessSource> source(new BufferReader(buffer));
+    return ParquetFileReader::Open(std::move(source));
+  }
+
+  void ReadSingleColumnFile(
+      std::unique_ptr<ParquetFileReader> file_reader, std::shared_ptr<Array>* 
out) {
+    FileReader reader(::arrow::default_memory_pool(), std::move(file_reader));
+    std::unique_ptr<FlatColumnReader> column_reader;
+    ASSERT_OK_NO_THROW(reader.GetFlatColumn(0, &column_reader));
+    ASSERT_NE(nullptr, column_reader.get());
+
+    ASSERT_OK(column_reader->NextBatch(SMALL_SIZE, out));
+    ASSERT_NE(nullptr, out->get());
+  }
+
+  void ReadAndCheckSingleColumnFile(::arrow::Array* values) {
+    std::shared_ptr<::arrow::Array> out;
+    ReadSingleColumnFile(ReaderFromSink(), &out);
+    ASSERT_TRUE(values->Equals(out));
+  }
+
+  void ReadTableFromFile(
+      std::unique_ptr<ParquetFileReader> file_reader, std::shared_ptr<Table>* 
out) {
+    FileReader reader(::arrow::default_memory_pool(), std::move(file_reader));
+    ASSERT_OK_NO_THROW(reader.ReadFlatTable(out));
+    ASSERT_NE(nullptr, out->get());
+  }
+
+  void ReadAndCheckSingleColumnTable(const std::shared_ptr<::arrow::Array>& 
values) {
+    std::shared_ptr<::arrow::Table> out;
+    ReadTableFromFile(ReaderFromSink(), &out);
+    ASSERT_EQ(1, out->num_columns());
+    ASSERT_EQ(values->length(), out->num_rows());
+
+    std::shared_ptr<ChunkedArray> chunked_array = out->column(0)->data();
+    ASSERT_EQ(1, chunked_array->num_chunks());
+    ASSERT_TRUE(values->Equals(chunked_array->chunk(0)));
+  }
+
+  template <typename ArrayType>
+  void WriteFlatColumn(const std::shared_ptr<GroupNode>& schema,
+      const std::shared_ptr<ArrayType>& values) {
+    FileWriter writer(::arrow::default_memory_pool(), MakeWriter(schema));
+    ASSERT_OK_NO_THROW(writer.NewRowGroup(values->length()));
+    ASSERT_OK_NO_THROW(writer.WriteFlatColumnChunk(values.get()));
+    ASSERT_OK_NO_THROW(writer.Close());
+  }
+
+  std::shared_ptr<InMemoryOutputStream> sink_;
+};
+
+// We habe separate tests for UInt32Type as this is currently the only type
+// where a roundtrip does not yield the identical Array structure.
+// There we write an UInt32 Array but receive an Int64 Array as result for
+// Parquet version 1.0.
+
+typedef ::testing::Types<::arrow::BooleanType, ::arrow::UInt8Type, 
::arrow::Int8Type,
+    ::arrow::UInt16Type, ::arrow::Int16Type, ::arrow::Int32Type, 
::arrow::UInt64Type,
+    ::arrow::Int64Type, ::arrow::TimestampType, ::arrow::FloatType, 
::arrow::DoubleType,
+    ::arrow::StringType> TestTypes;
+
+TYPED_TEST_CASE(TestParquetIO, TestTypes);
+
+TYPED_TEST(TestParquetIO, SingleColumnRequiredWrite) {
+  auto values = NonNullArray<TypeParam>(SMALL_SIZE);
+
+  std::shared_ptr<GroupNode> schema = this->MakeSchema(Repetition::REQUIRED);
+  this->WriteFlatColumn(schema, values);
+
+  this->ReadAndCheckSingleColumnFile(values.get());
+}
+
+TYPED_TEST(TestParquetIO, SingleColumnTableRequiredWrite) {
+  auto values = NonNullArray<TypeParam>(SMALL_SIZE);
+  std::shared_ptr<Table> table = MakeSimpleTable(values, false);
+  this->sink_ = std::make_shared<InMemoryOutputStream>();
+  ASSERT_OK_NO_THROW(WriteFlatTable(table.get(), 
::arrow::default_memory_pool(),
+      this->sink_, values->length(), default_writer_properties()));
+
+  std::shared_ptr<Table> out;
+  this->ReadTableFromFile(this->ReaderFromSink(), &out);
+  ASSERT_EQ(1, out->num_columns());
+  ASSERT_EQ(100, out->num_rows());
+
+  std::shared_ptr<ChunkedArray> chunked_array = out->column(0)->data();
+  ASSERT_EQ(1, chunked_array->num_chunks());
+  ASSERT_TRUE(values->Equals(chunked_array->chunk(0)));
+}
+
+TYPED_TEST(TestParquetIO, SingleColumnOptionalReadWrite) {
+  // This also tests max_definition_level = 1
+  auto values = NullableArray<TypeParam>(SMALL_SIZE, 10);
+
+  std::shared_ptr<GroupNode> schema = this->MakeSchema(Repetition::OPTIONAL);
+  this->WriteFlatColumn(schema, values);
+
+  this->ReadAndCheckSingleColumnFile(values.get());
+}
+
+TYPED_TEST(TestParquetIO, SingleColumnTableOptionalReadWrite) {
+  // This also tests max_definition_level = 1
+  std::shared_ptr<Array> values = NullableArray<TypeParam>(SMALL_SIZE, 10);
+  std::shared_ptr<Table> table = MakeSimpleTable(values, true);
+  this->sink_ = std::make_shared<InMemoryOutputStream>();
+  ASSERT_OK_NO_THROW(WriteFlatTable(table.get(), 
::arrow::default_memory_pool(),
+      this->sink_, values->length(), default_writer_properties()));
+
+  this->ReadAndCheckSingleColumnTable(values);
+}
+
+TYPED_TEST(TestParquetIO, SingleColumnRequiredChunkedWrite) {
+  auto values = NonNullArray<TypeParam>(SMALL_SIZE);
+  int64_t chunk_size = values->length() / 4;
+
+  std::shared_ptr<GroupNode> schema = this->MakeSchema(Repetition::REQUIRED);
+  FileWriter writer(default_memory_pool(), this->MakeWriter(schema));
+  for (int i = 0; i < 4; i++) {
+    ASSERT_OK_NO_THROW(writer.NewRowGroup(chunk_size));
+    ASSERT_OK_NO_THROW(
+        writer.WriteFlatColumnChunk(values.get(), i * chunk_size, chunk_size));
+  }
+  ASSERT_OK_NO_THROW(writer.Close());
+
+  this->ReadAndCheckSingleColumnFile(values.get());
+}
+
+TYPED_TEST(TestParquetIO, SingleColumnTableRequiredChunkedWrite) {
+  auto values = NonNullArray<TypeParam>(LARGE_SIZE);
+  std::shared_ptr<Table> table = MakeSimpleTable(values, false);
+  this->sink_ = std::make_shared<InMemoryOutputStream>();
+  ASSERT_OK_NO_THROW(WriteFlatTable(
+      table.get(), default_memory_pool(), this->sink_, 512, 
default_writer_properties()));
+
+  this->ReadAndCheckSingleColumnTable(values);
+}
+
+TYPED_TEST(TestParquetIO, SingleColumnOptionalChunkedWrite) {
+  int64_t chunk_size = SMALL_SIZE / 4;
+  auto values = NullableArray<TypeParam>(SMALL_SIZE, 10);
+
+  std::shared_ptr<GroupNode> schema = this->MakeSchema(Repetition::OPTIONAL);
+  FileWriter writer(::arrow::default_memory_pool(), this->MakeWriter(schema));
+  for (int i = 0; i < 4; i++) {
+    ASSERT_OK_NO_THROW(writer.NewRowGroup(chunk_size));
+    ASSERT_OK_NO_THROW(
+        writer.WriteFlatColumnChunk(values.get(), i * chunk_size, chunk_size));
+  }
+  ASSERT_OK_NO_THROW(writer.Close());
+
+  this->ReadAndCheckSingleColumnFile(values.get());
+}
+
+TYPED_TEST(TestParquetIO, SingleColumnTableOptionalChunkedWrite) {
+  // This also tests max_definition_level = 1
+  auto values = NullableArray<TypeParam>(LARGE_SIZE, 100);
+  std::shared_ptr<Table> table = MakeSimpleTable(values, true);
+  this->sink_ = std::make_shared<InMemoryOutputStream>();
+  ASSERT_OK_NO_THROW(WriteFlatTable(table.get(), 
::arrow::default_memory_pool(),
+      this->sink_, 512, default_writer_properties()));
+
+  this->ReadAndCheckSingleColumnTable(values);
+}
+
+using TestUInt32ParquetIO = TestParquetIO<::arrow::UInt32Type>;
+
+TEST_F(TestUInt32ParquetIO, Parquet_2_0_Compability) {
+  // This also tests max_definition_level = 1
+  std::shared_ptr<PrimitiveArray> values =
+      NullableArray<::arrow::UInt32Type>(LARGE_SIZE, 100);
+  std::shared_ptr<Table> table = MakeSimpleTable(values, true);
+
+  // Parquet 2.0 roundtrip should yield an uint32_t column again
+  this->sink_ = std::make_shared<InMemoryOutputStream>();
+  std::shared_ptr<::parquet::WriterProperties> properties =
+      ::parquet::WriterProperties::Builder()
+          .version(ParquetVersion::PARQUET_2_0)
+          ->build();
+  ASSERT_OK_NO_THROW(
+      WriteFlatTable(table.get(), default_memory_pool(), this->sink_, 512, 
properties));
+  this->ReadAndCheckSingleColumnTable(values);
+}
+
+TEST_F(TestUInt32ParquetIO, Parquet_1_0_Compability) {
+  // This also tests max_definition_level = 1
+  std::shared_ptr<PrimitiveArray> values =
+      NullableArray<::arrow::UInt32Type>(LARGE_SIZE, 100);
+  std::shared_ptr<Table> table = MakeSimpleTable(values, true);
+
+  // Parquet 1.0 returns an int64_t column as there is no way to tell a 
Parquet 1.0
+  // reader that a column is unsigned.
+  this->sink_ = std::make_shared<InMemoryOutputStream>();
+  std::shared_ptr<::parquet::WriterProperties> properties =
+      ::parquet::WriterProperties::Builder()
+          .version(ParquetVersion::PARQUET_1_0)
+          ->build();
+  ASSERT_OK_NO_THROW(WriteFlatTable(
+      table.get(), ::arrow::default_memory_pool(), this->sink_, 512, 
properties));
+
+  std::shared_ptr<Array> expected_values;
+  std::shared_ptr<PoolBuffer> int64_data =
+      std::make_shared<PoolBuffer>(::arrow::default_memory_pool());
+  {
+    ASSERT_OK(int64_data->Resize(sizeof(int64_t) * values->length()));
+    int64_t* int64_data_ptr = 
reinterpret_cast<int64_t*>(int64_data->mutable_data());
+    const uint32_t* uint32_data_ptr =
+        reinterpret_cast<const uint32_t*>(values->data()->data());
+    // std::copy might be faster but this is explicit on the casts)
+    for (int64_t i = 0; i < values->length(); i++) {
+      int64_data_ptr[i] = static_cast<int64_t>(uint32_data_ptr[i]);
+    }
+  }
+  ASSERT_OK(MakePrimitiveArray(std::make_shared<::arrow::Int64Type>(), 
values->length(),
+      int64_data, values->null_count(), values->null_bitmap(), 
&expected_values));
+  this->ReadAndCheckSingleColumnTable(expected_values);
+}
+
+template <typename T>
+using ParquetCDataType = typename ParquetDataType<T>::c_type;
+
+template <typename TestType>
+class TestPrimitiveParquetIO : public TestParquetIO<TestType> {
+ public:
+  typedef typename TestType::c_type T;
+
+  void MakeTestFile(std::vector<T>& values, int num_chunks,
+      std::unique_ptr<ParquetFileReader>* file_reader) {
+    std::shared_ptr<GroupNode> schema = this->MakeSchema(Repetition::REQUIRED);
+    std::unique_ptr<ParquetFileWriter> file_writer = this->MakeWriter(schema);
+    size_t chunk_size = values.size() / num_chunks;
+    // Convert to Parquet's expected physical type
+    std::vector<uint8_t> values_buffer(
+        sizeof(ParquetCDataType<TestType>) * values.size());
+    auto values_parquet =
+        reinterpret_cast<ParquetCDataType<TestType>*>(values_buffer.data());
+    std::copy(values.cbegin(), values.cend(), values_parquet);
+    for (int i = 0; i < num_chunks; i++) {
+      auto row_group_writer = file_writer->AppendRowGroup(chunk_size);
+      auto column_writer =
+          
static_cast<ParquetWriter<TestType>*>(row_group_writer->NextColumn());
+      ParquetCDataType<TestType>* data = values_parquet + i * chunk_size;
+      column_writer->WriteBatch(chunk_size, nullptr, nullptr, data);
+      column_writer->Close();
+      row_group_writer->Close();
+    }
+    file_writer->Close();
+    *file_reader = this->ReaderFromSink();
+  }
+
+  void CheckSingleColumnRequiredTableRead(int num_chunks) {
+    std::vector<T> values(SMALL_SIZE, test_traits<TestType>::value);
+    std::unique_ptr<ParquetFileReader> file_reader;
+    ASSERT_NO_THROW(MakeTestFile(values, num_chunks, &file_reader));
+
+    std::shared_ptr<Table> out;
+    this->ReadTableFromFile(std::move(file_reader), &out);
+    ASSERT_EQ(1, out->num_columns());
+    ASSERT_EQ(SMALL_SIZE, out->num_rows());
+
+    std::shared_ptr<ChunkedArray> chunked_array = out->column(0)->data();
+    ASSERT_EQ(1, chunked_array->num_chunks());
+    ExpectArray<TestType>(values.data(), chunked_array->chunk(0).get());
+  }
+
+  void CheckSingleColumnRequiredRead(int num_chunks) {
+    std::vector<T> values(SMALL_SIZE, test_traits<TestType>::value);
+    std::unique_ptr<ParquetFileReader> file_reader;
+    ASSERT_NO_THROW(MakeTestFile(values, num_chunks, &file_reader));
+
+    std::shared_ptr<Array> out;
+    this->ReadSingleColumnFile(std::move(file_reader), &out);
+
+    ExpectArray<TestType>(values.data(), out.get());
+  }
+};
+
+typedef ::testing::Types<::arrow::BooleanType, ::arrow::UInt8Type, 
::arrow::Int8Type,
+    ::arrow::UInt16Type, ::arrow::Int16Type, ::arrow::UInt32Type, 
::arrow::Int32Type,
+    ::arrow::UInt64Type, ::arrow::Int64Type, ::arrow::FloatType,
+    ::arrow::DoubleType> PrimitiveTestTypes;
+
+TYPED_TEST_CASE(TestPrimitiveParquetIO, PrimitiveTestTypes);
+
+TYPED_TEST(TestPrimitiveParquetIO, SingleColumnRequiredRead) {
+  this->CheckSingleColumnRequiredRead(1);
+}
+
+TYPED_TEST(TestPrimitiveParquetIO, SingleColumnRequiredTableRead) {
+  this->CheckSingleColumnRequiredTableRead(1);
+}
+
+TYPED_TEST(TestPrimitiveParquetIO, SingleColumnRequiredChunkedRead) {
+  this->CheckSingleColumnRequiredRead(4);
+}
+
+TYPED_TEST(TestPrimitiveParquetIO, SingleColumnRequiredChunkedTableRead) {
+  this->CheckSingleColumnRequiredTableRead(4);
+}
+
+}  // namespace arrow
+
+}  // namespace parquet

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/4a7bf117/src/parquet/arrow/arrow-schema-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/arrow-schema-test.cc 
b/src/parquet/arrow/arrow-schema-test.cc
new file mode 100644
index 0000000..3dfaf14
--- /dev/null
+++ b/src/parquet/arrow/arrow-schema-test.cc
@@ -0,0 +1,265 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <memory>
+#include <vector>
+
+#include "gtest/gtest.h"
+
+#include "parquet/arrow/schema.h"
+
+#include "arrow/test-util.h"
+#include "arrow/type.h"
+#include "arrow/types/datetime.h"
+#include "arrow/types/decimal.h"
+#include "arrow/util/status.h"
+
+using arrow::Field;
+
+using ParquetType = parquet::Type;
+using parquet::LogicalType;
+using parquet::Repetition;
+using parquet::schema::NodePtr;
+using parquet::schema::GroupNode;
+using parquet::schema::PrimitiveNode;
+
+namespace parquet {
+
+namespace arrow {
+
+const auto BOOL = std::make_shared<::arrow::BooleanType>();
+const auto UINT8 = std::make_shared<::arrow::UInt8Type>();
+const auto INT32 = std::make_shared<::arrow::Int32Type>();
+const auto INT64 = std::make_shared<::arrow::Int64Type>();
+const auto FLOAT = std::make_shared<::arrow::FloatType>();
+const auto DOUBLE = std::make_shared<::arrow::DoubleType>();
+const auto UTF8 = std::make_shared<::arrow::StringType>();
+const auto TIMESTAMP_MS =
+    
std::make_shared<::arrow::TimestampType>(::arrow::TimestampType::Unit::MILLI);
+// TODO: This requires parquet-cpp implementing the MICROS enum value
+// const auto TIMESTAMP_US = 
std::make_shared<TimestampType>(TimestampType::Unit::MICRO);
+const auto BINARY =
+    std::make_shared<::arrow::ListType>(std::make_shared<Field>("", UINT8));
+const auto DECIMAL_8_4 = std::make_shared<::arrow::DecimalType>(8, 4);
+
+class TestConvertParquetSchema : public ::testing::Test {
+ public:
+  virtual void SetUp() {}
+
+  void CheckFlatSchema(const std::shared_ptr<::arrow::Schema>& 
expected_schema) {
+    ASSERT_EQ(expected_schema->num_fields(), result_schema_->num_fields());
+    for (int i = 0; i < expected_schema->num_fields(); ++i) {
+      auto lhs = result_schema_->field(i);
+      auto rhs = expected_schema->field(i);
+      EXPECT_TRUE(lhs->Equals(rhs)) << i << " " << lhs->ToString()
+                                    << " != " << rhs->ToString();
+    }
+  }
+
+  ::arrow::Status ConvertSchema(const std::vector<NodePtr>& nodes) {
+    NodePtr schema = GroupNode::Make("schema", Repetition::REPEATED, nodes);
+    descr_.Init(schema);
+    return FromParquetSchema(&descr_, &result_schema_);
+  }
+
+ protected:
+  SchemaDescriptor descr_;
+  std::shared_ptr<::arrow::Schema> result_schema_;
+};
+
+TEST_F(TestConvertParquetSchema, ParquetFlatPrimitives) {
+  std::vector<NodePtr> parquet_fields;
+  std::vector<std::shared_ptr<Field>> arrow_fields;
+
+  parquet_fields.push_back(
+      PrimitiveNode::Make("boolean", Repetition::REQUIRED, 
ParquetType::BOOLEAN));
+  arrow_fields.push_back(std::make_shared<Field>("boolean", BOOL, false));
+
+  parquet_fields.push_back(
+      PrimitiveNode::Make("int32", Repetition::REQUIRED, ParquetType::INT32));
+  arrow_fields.push_back(std::make_shared<Field>("int32", INT32, false));
+
+  parquet_fields.push_back(
+      PrimitiveNode::Make("int64", Repetition::REQUIRED, ParquetType::INT64));
+  arrow_fields.push_back(std::make_shared<Field>("int64", INT64, false));
+
+  parquet_fields.push_back(PrimitiveNode::Make("timestamp", 
Repetition::REQUIRED,
+      ParquetType::INT64, LogicalType::TIMESTAMP_MILLIS));
+  arrow_fields.push_back(std::make_shared<Field>("timestamp", TIMESTAMP_MS, 
false));
+
+  // parquet_fields.push_back(PrimitiveNode::Make("timestamp", 
Repetition::REQUIRED,
+  //     ParquetType::INT64, LogicalType::TIMESTAMP_MICROS));
+  // arrow_fields.push_back(std::make_shared<Field>("timestamp", TIMESTAMP_US, 
false));
+
+  parquet_fields.push_back(
+      PrimitiveNode::Make("float", Repetition::OPTIONAL, ParquetType::FLOAT));
+  arrow_fields.push_back(std::make_shared<Field>("float", FLOAT));
+
+  parquet_fields.push_back(
+      PrimitiveNode::Make("double", Repetition::OPTIONAL, 
ParquetType::DOUBLE));
+  arrow_fields.push_back(std::make_shared<Field>("double", DOUBLE));
+
+  parquet_fields.push_back(
+      PrimitiveNode::Make("binary", Repetition::OPTIONAL, 
ParquetType::BYTE_ARRAY));
+  arrow_fields.push_back(std::make_shared<Field>("binary", BINARY));
+
+  parquet_fields.push_back(PrimitiveNode::Make(
+      "string", Repetition::OPTIONAL, ParquetType::BYTE_ARRAY, 
LogicalType::UTF8));
+  arrow_fields.push_back(std::make_shared<Field>("string", UTF8));
+
+  parquet_fields.push_back(PrimitiveNode::Make("flba-binary", 
Repetition::OPTIONAL,
+      ParquetType::FIXED_LEN_BYTE_ARRAY, LogicalType::NONE, 12));
+  arrow_fields.push_back(std::make_shared<Field>("flba-binary", BINARY));
+
+  auto arrow_schema = std::make_shared<::arrow::Schema>(arrow_fields);
+  ASSERT_OK(ConvertSchema(parquet_fields));
+
+  CheckFlatSchema(arrow_schema);
+}
+
+TEST_F(TestConvertParquetSchema, ParquetFlatDecimals) {
+  std::vector<NodePtr> parquet_fields;
+  std::vector<std::shared_ptr<Field>> arrow_fields;
+
+  parquet_fields.push_back(PrimitiveNode::Make("flba-decimal", 
Repetition::OPTIONAL,
+      ParquetType::FIXED_LEN_BYTE_ARRAY, LogicalType::DECIMAL, 4, 8, 4));
+  arrow_fields.push_back(std::make_shared<Field>("flba-decimal", DECIMAL_8_4));
+
+  parquet_fields.push_back(PrimitiveNode::Make("binary-decimal", 
Repetition::OPTIONAL,
+      ParquetType::BYTE_ARRAY, LogicalType::DECIMAL, -1, 8, 4));
+  arrow_fields.push_back(std::make_shared<Field>("binary-decimal", 
DECIMAL_8_4));
+
+  parquet_fields.push_back(PrimitiveNode::Make("int32-decimal", 
Repetition::OPTIONAL,
+      ParquetType::INT32, LogicalType::DECIMAL, -1, 8, 4));
+  arrow_fields.push_back(std::make_shared<Field>("int32-decimal", 
DECIMAL_8_4));
+
+  parquet_fields.push_back(PrimitiveNode::Make("int64-decimal", 
Repetition::OPTIONAL,
+      ParquetType::INT64, LogicalType::DECIMAL, -1, 8, 4));
+  arrow_fields.push_back(std::make_shared<Field>("int64-decimal", 
DECIMAL_8_4));
+
+  auto arrow_schema = std::make_shared<::arrow::Schema>(arrow_fields);
+  ASSERT_OK(ConvertSchema(parquet_fields));
+
+  CheckFlatSchema(arrow_schema);
+}
+
+TEST_F(TestConvertParquetSchema, UnsupportedThings) {
+  std::vector<NodePtr> unsupported_nodes;
+
+  unsupported_nodes.push_back(
+      PrimitiveNode::Make("int96", Repetition::REQUIRED, ParquetType::INT96));
+
+  unsupported_nodes.push_back(
+      GroupNode::Make("repeated-group", Repetition::REPEATED, {}));
+
+  unsupported_nodes.push_back(PrimitiveNode::Make(
+      "int32", Repetition::OPTIONAL, ParquetType::INT32, LogicalType::DATE));
+
+  for (const NodePtr& node : unsupported_nodes) {
+    ASSERT_RAISES(NotImplemented, ConvertSchema({node}));
+  }
+}
+
+class TestConvertArrowSchema : public ::testing::Test {
+ public:
+  virtual void SetUp() {}
+
+  void CheckFlatSchema(const std::vector<NodePtr>& nodes) {
+    NodePtr schema_node = GroupNode::Make("schema", Repetition::REPEATED, 
nodes);
+    const GroupNode* expected_schema_node =
+        static_cast<const GroupNode*>(schema_node.get());
+    const GroupNode* result_schema_node = result_schema_->group_node();
+
+    ASSERT_EQ(expected_schema_node->field_count(), 
result_schema_node->field_count());
+
+    for (int i = 0; i < expected_schema_node->field_count(); i++) {
+      auto lhs = result_schema_node->field(i);
+      auto rhs = expected_schema_node->field(i);
+      EXPECT_TRUE(lhs->Equals(rhs.get()));
+    }
+  }
+
+  ::arrow::Status ConvertSchema(const std::vector<std::shared_ptr<Field>>& 
fields) {
+    arrow_schema_ = std::make_shared<::arrow::Schema>(fields);
+    std::shared_ptr<::parquet::WriterProperties> properties =
+        ::parquet::default_writer_properties();
+    return ToParquetSchema(arrow_schema_.get(), *properties.get(), 
&result_schema_);
+  }
+
+ protected:
+  std::shared_ptr<::arrow::Schema> arrow_schema_;
+  std::shared_ptr<SchemaDescriptor> result_schema_;
+};
+
+TEST_F(TestConvertArrowSchema, ParquetFlatPrimitives) {
+  std::vector<NodePtr> parquet_fields;
+  std::vector<std::shared_ptr<Field>> arrow_fields;
+
+  parquet_fields.push_back(
+      PrimitiveNode::Make("boolean", Repetition::REQUIRED, 
ParquetType::BOOLEAN));
+  arrow_fields.push_back(std::make_shared<Field>("boolean", BOOL, false));
+
+  parquet_fields.push_back(
+      PrimitiveNode::Make("int32", Repetition::REQUIRED, ParquetType::INT32));
+  arrow_fields.push_back(std::make_shared<Field>("int32", INT32, false));
+
+  parquet_fields.push_back(
+      PrimitiveNode::Make("int64", Repetition::REQUIRED, ParquetType::INT64));
+  arrow_fields.push_back(std::make_shared<Field>("int64", INT64, false));
+
+  parquet_fields.push_back(PrimitiveNode::Make("timestamp", 
Repetition::REQUIRED,
+      ParquetType::INT64, LogicalType::TIMESTAMP_MILLIS));
+  arrow_fields.push_back(std::make_shared<Field>("timestamp", TIMESTAMP_MS, 
false));
+
+  // parquet_fields.push_back(PrimitiveNode::Make("timestamp", 
Repetition::REQUIRED,
+  //     ParquetType::INT64, LogicalType::TIMESTAMP_MICROS));
+  // arrow_fields.push_back(std::make_shared<Field>("timestamp", TIMESTAMP_US, 
false));
+
+  parquet_fields.push_back(
+      PrimitiveNode::Make("float", Repetition::OPTIONAL, ParquetType::FLOAT));
+  arrow_fields.push_back(std::make_shared<Field>("float", FLOAT));
+
+  parquet_fields.push_back(
+      PrimitiveNode::Make("double", Repetition::OPTIONAL, 
ParquetType::DOUBLE));
+  arrow_fields.push_back(std::make_shared<Field>("double", DOUBLE));
+
+  // TODO: String types need to be clarified a bit more in the Arrow spec
+  parquet_fields.push_back(PrimitiveNode::Make(
+      "string", Repetition::OPTIONAL, ParquetType::BYTE_ARRAY, 
LogicalType::UTF8));
+  arrow_fields.push_back(std::make_shared<Field>("string", UTF8));
+
+  ASSERT_OK(ConvertSchema(arrow_fields));
+
+  CheckFlatSchema(parquet_fields);
+}
+
+TEST_F(TestConvertArrowSchema, ParquetFlatDecimals) {
+  std::vector<NodePtr> parquet_fields;
+  std::vector<std::shared_ptr<Field>> arrow_fields;
+
+  // TODO: Test Decimal Arrow -> Parquet conversion
+
+  ASSERT_OK(ConvertSchema(arrow_fields));
+
+  CheckFlatSchema(parquet_fields);
+}
+
+TEST(TestNodeConversion, DateAndTime) {}
+
+}  // namespace arrow
+
+}  // namespace parquet

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/4a7bf117/src/parquet/arrow/io.cc
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/io.cc b/src/parquet/arrow/io.cc
new file mode 100644
index 0000000..8e2645a
--- /dev/null
+++ b/src/parquet/arrow/io.cc
@@ -0,0 +1,107 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "parquet/arrow/io.h"
+
+#include <cstdint>
+#include <memory>
+
+#include "parquet/api/io.h"
+#include "parquet/arrow/utils.h"
+
+#include "arrow/util/status.h"
+
+using arrow::Status;
+using arrow::MemoryPool;
+
+// To assist with readability
+using ArrowROFile = arrow::io::RandomAccessFile;
+
+namespace parquet {
+namespace arrow {
+
+// ----------------------------------------------------------------------
+// ParquetAllocator
+
+ParquetAllocator::ParquetAllocator() : pool_(::arrow::default_memory_pool()) {}
+
+ParquetAllocator::ParquetAllocator(MemoryPool* pool) : pool_(pool) {}
+
+ParquetAllocator::~ParquetAllocator() {}
+
+uint8_t* ParquetAllocator::Malloc(int64_t size) {
+  uint8_t* result;
+  PARQUET_THROW_NOT_OK(pool_->Allocate(size, &result));
+  return result;
+}
+
+void ParquetAllocator::Free(uint8_t* buffer, int64_t size) {
+  // Does not report Status
+  pool_->Free(buffer, size);
+}
+
+// ----------------------------------------------------------------------
+// ParquetReadSource
+
+ParquetReadSource::ParquetReadSource(ParquetAllocator* allocator)
+    : file_(nullptr), allocator_(allocator) {}
+
+Status ParquetReadSource::Open(const std::shared_ptr<ArrowROFile>& file) {
+  int64_t file_size;
+  RETURN_NOT_OK(file->GetSize(&file_size));
+
+  file_ = file;
+  size_ = file_size;
+  return Status::OK();
+}
+
+void ParquetReadSource::Close() {
+  // TODO(wesm): Make this a no-op for now. This leaves Python wrappers for
+  // these classes in a borked state. Probably better to explicitly close.
+
+  // PARQUET_THROW_NOT_OK(file_->Close());
+}
+
+int64_t ParquetReadSource::Tell() const {
+  int64_t position;
+  PARQUET_THROW_NOT_OK(file_->Tell(&position));
+  return position;
+}
+
+void ParquetReadSource::Seek(int64_t position) {
+  PARQUET_THROW_NOT_OK(file_->Seek(position));
+}
+
+int64_t ParquetReadSource::Read(int64_t nbytes, uint8_t* out) {
+  int64_t bytes_read;
+  PARQUET_THROW_NOT_OK(file_->Read(nbytes, &bytes_read, out));
+  return bytes_read;
+}
+
+std::shared_ptr<Buffer> ParquetReadSource::Read(int64_t nbytes) {
+  // TODO(wesm): This code is duplicated from parquet/util/input.cc; suggests
+  // that there should be more code sharing amongst file-like sources
+  auto result = std::make_shared<OwnedMutableBuffer>(0, allocator_);
+  result->Resize(nbytes);
+
+  int64_t bytes_read = Read(nbytes, result->mutable_data());
+  if (bytes_read < nbytes) { result->Resize(bytes_read); }
+  return result;
+}
+
+}  // namespace arrow
+}  // namespace parquet

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/4a7bf117/src/parquet/arrow/io.h
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/io.h b/src/parquet/arrow/io.h
new file mode 100644
index 0000000..dc60635
--- /dev/null
+++ b/src/parquet/arrow/io.h
@@ -0,0 +1,82 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Bridges Arrow's IO interfaces and Parquet-cpp's IO interfaces
+
+#ifndef PARQUET_ARROW_IO_H
+#define PARQUET_ARROW_IO_H
+
+#include <cstdint>
+#include <memory>
+
+#include "parquet/api/io.h"
+
+#include "arrow/io/interfaces.h"
+#include "arrow/util/memory-pool.h"
+
+namespace parquet {
+
+namespace arrow {
+
+// An implementation of the Parquet MemoryAllocator API that plugs into an
+// existing Arrow memory pool. This way we can direct all allocations to a
+// single place rather than tracking allocations in different locations (for
+// example: without utilizing parquet-cpp's default allocator)
+class PARQUET_EXPORT ParquetAllocator : public MemoryAllocator {
+ public:
+  // Uses the default memory pool
+  ParquetAllocator();
+
+  explicit ParquetAllocator(::arrow::MemoryPool* pool);
+  virtual ~ParquetAllocator();
+
+  uint8_t* Malloc(int64_t size) override;
+  void Free(uint8_t* buffer, int64_t size) override;
+
+  void set_pool(::arrow::MemoryPool* pool) { pool_ = pool; }
+
+  ::arrow::MemoryPool* pool() const { return pool_; }
+
+ private:
+  ::arrow::MemoryPool* pool_;
+};
+
+class PARQUET_EXPORT ParquetReadSource : public RandomAccessSource {
+ public:
+  explicit ParquetReadSource(ParquetAllocator* allocator);
+
+  // We need to ask for the file size on opening the file, and this can fail
+  ::arrow::Status Open(const std::shared_ptr<::arrow::io::RandomAccessFile>& 
file);
+
+  void Close() override;
+  int64_t Tell() const override;
+  void Seek(int64_t pos) override;
+  int64_t Read(int64_t nbytes, uint8_t* out) override;
+  std::shared_ptr<Buffer> Read(int64_t nbytes) override;
+
+ private:
+  // An Arrow readable file of some kind
+  std::shared_ptr<::arrow::io::RandomAccessFile> file_;
+
+  // The allocator is required for creating managed buffers
+  ParquetAllocator* allocator_;
+};
+
+}  // namespace arrow
+}  // namespace parquet
+
+#endif  // PARQUET_ARROW_IO_H

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/4a7bf117/src/parquet/arrow/reader.cc
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/reader.cc b/src/parquet/arrow/reader.cc
new file mode 100644
index 0000000..056b5ab
--- /dev/null
+++ b/src/parquet/arrow/reader.cc
@@ -0,0 +1,406 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "parquet/arrow/reader.h"
+
+#include <algorithm>
+#include <queue>
+#include <string>
+#include <vector>
+
+#include "parquet/arrow/io.h"
+#include "parquet/arrow/schema.h"
+#include "parquet/arrow/utils.h"
+
+#include "arrow/column.h"
+#include "arrow/schema.h"
+#include "arrow/table.h"
+#include "arrow/types/primitive.h"
+#include "arrow/types/string.h"
+#include "arrow/util/status.h"
+
+using arrow::Array;
+using arrow::Column;
+using arrow::Field;
+using arrow::MemoryPool;
+using arrow::PoolBuffer;
+using arrow::Status;
+using arrow::Table;
+
+// Help reduce verbosity
+using ParquetRAS = parquet::RandomAccessSource;
+using ParquetReader = parquet::ParquetFileReader;
+
+namespace parquet {
+namespace arrow {
+
+template <typename ArrowType>
+struct ArrowTypeTraits {
+  typedef ::arrow::NumericBuilder<ArrowType> builder_type;
+};
+
+template <>
+struct ArrowTypeTraits<BooleanType> {
+  typedef ::arrow::BooleanBuilder builder_type;
+};
+
+template <typename ArrowType>
+using BuilderType = typename ArrowTypeTraits<ArrowType>::builder_type;
+
+class FileReader::Impl {
+ public:
+  Impl(MemoryPool* pool, std::unique_ptr<ParquetFileReader> reader);
+  virtual ~Impl() {}
+
+  bool CheckForFlatColumn(const ColumnDescriptor* descr);
+  Status GetFlatColumn(int i, std::unique_ptr<FlatColumnReader>* out);
+  Status ReadFlatColumn(int i, std::shared_ptr<Array>* out);
+  Status ReadFlatTable(std::shared_ptr<Table>* out);
+
+ private:
+  MemoryPool* pool_;
+  std::unique_ptr<ParquetFileReader> reader_;
+};
+
+class FlatColumnReader::Impl {
+ public:
+  Impl(MemoryPool* pool, const ColumnDescriptor* descr,
+      ParquetFileReader* reader, int column_index);
+  virtual ~Impl() {}
+
+  Status NextBatch(int batch_size, std::shared_ptr<Array>* out);
+  template <typename ArrowType, typename ParquetType>
+  Status TypedReadBatch(int batch_size, std::shared_ptr<Array>* out);
+
+  template <typename ArrowType, typename ParquetType>
+  Status ReadNullableFlatBatch(const int16_t* def_levels,
+      typename ParquetType::c_type* values, int64_t values_read, int64_t 
levels_read,
+      BuilderType<ArrowType>* builder);
+  template <typename ArrowType, typename ParquetType>
+  Status ReadNonNullableBatch(typename ParquetType::c_type* values, int64_t 
values_read,
+      BuilderType<ArrowType>* builder);
+
+ private:
+  void NextRowGroup();
+
+  template <typename InType, typename OutType>
+  struct can_copy_ptr {
+    static constexpr bool value =
+        std::is_same<InType, OutType>::value ||
+        (std::is_integral<InType>{} && std::is_integral<OutType>{} &&
+            (sizeof(InType) == sizeof(OutType)));
+  };
+
+  template <typename InType, typename OutType,
+      typename std::enable_if<can_copy_ptr<InType, OutType>::value>::type* = 
nullptr>
+  Status ConvertPhysicalType(
+      const InType* in_ptr, int64_t length, const OutType** out_ptr) {
+    *out_ptr = reinterpret_cast<const OutType*>(in_ptr);
+    return Status::OK();
+  }
+
+  template <typename InType, typename OutType,
+      typename std::enable_if<not can_copy_ptr<InType, OutType>::value>::type* 
= nullptr>
+  Status ConvertPhysicalType(
+      const InType* in_ptr, int64_t length, const OutType** out_ptr) {
+    RETURN_NOT_OK(values_builder_buffer_.Resize(length * sizeof(OutType)));
+    OutType* mutable_out_ptr =
+        reinterpret_cast<OutType*>(values_builder_buffer_.mutable_data());
+    std::copy(in_ptr, in_ptr + length, mutable_out_ptr);
+    *out_ptr = mutable_out_ptr;
+    return Status::OK();
+  }
+
+  MemoryPool* pool_;
+  const ColumnDescriptor* descr_;
+  ParquetFileReader* reader_;
+  int column_index_;
+  int next_row_group_;
+  std::shared_ptr<ColumnReader> column_reader_;
+  std::shared_ptr<Field> field_;
+
+  PoolBuffer values_buffer_;
+  PoolBuffer def_levels_buffer_;
+  PoolBuffer values_builder_buffer_;
+  PoolBuffer valid_bytes_buffer_;
+};
+
+FileReader::Impl::Impl(
+    MemoryPool* pool, std::unique_ptr<ParquetFileReader> reader)
+    : pool_(pool), reader_(std::move(reader)) {}
+
+bool FileReader::Impl::CheckForFlatColumn(const ColumnDescriptor* descr) {
+  if ((descr->max_repetition_level() > 0) || (descr->max_definition_level() > 
1)) {
+    return false;
+  } else if ((descr->max_definition_level() == 1) &&
+             (descr->schema_node()->repetition() != Repetition::OPTIONAL)) {
+    return false;
+  }
+  return true;
+}
+
+Status FileReader::Impl::GetFlatColumn(int i, 
std::unique_ptr<FlatColumnReader>* out) {
+  const SchemaDescriptor* schema = reader_->metadata()->schema();
+
+  if (!CheckForFlatColumn(schema->Column(i))) {
+    return Status::Invalid("The requested column is not flat");
+  }
+  std::unique_ptr<FlatColumnReader::Impl> impl(
+      new FlatColumnReader::Impl(pool_, schema->Column(i), reader_.get(), i));
+  *out = std::unique_ptr<FlatColumnReader>(new 
FlatColumnReader(std::move(impl)));
+  return Status::OK();
+}
+
+Status FileReader::Impl::ReadFlatColumn(int i, std::shared_ptr<Array>* out) {
+  std::unique_ptr<FlatColumnReader> flat_column_reader;
+  RETURN_NOT_OK(GetFlatColumn(i, &flat_column_reader));
+  return flat_column_reader->NextBatch(reader_->metadata()->num_rows(), out);
+}
+
+Status FileReader::Impl::ReadFlatTable(std::shared_ptr<Table>* table) {
+  auto descr = reader_->metadata()->schema();
+
+  const std::string& name = descr->name();
+  std::shared_ptr<::arrow::Schema> schema;
+  RETURN_NOT_OK(FromParquetSchema(descr, &schema));
+
+  int num_columns = reader_->metadata()->num_columns();
+
+  std::vector<std::shared_ptr<Column>> columns(num_columns);
+  for (int i = 0; i < num_columns; i++) {
+    std::shared_ptr<Array> array;
+    RETURN_NOT_OK(ReadFlatColumn(i, &array));
+    columns[i] = std::make_shared<Column>(schema->field(i), array);
+  }
+
+  *table = std::make_shared<Table>(name, schema, columns);
+  return Status::OK();
+}
+
+FileReader::FileReader(
+    MemoryPool* pool, std::unique_ptr<ParquetFileReader> reader)
+    : impl_(new FileReader::Impl(pool, std::move(reader))) {}
+
+FileReader::~FileReader() {}
+
+// Static ctor
+Status OpenFile(const std::shared_ptr<::arrow::io::RandomAccessFile>& file,
+    ParquetAllocator* allocator, std::unique_ptr<FileReader>* reader) {
+  std::unique_ptr<ParquetReadSource> source(new ParquetReadSource(allocator));
+  RETURN_NOT_OK(source->Open(file));
+
+  // TODO(wesm): reader properties
+  std::unique_ptr<ParquetReader> pq_reader;
+  PARQUET_CATCH_NOT_OK(pq_reader = ParquetReader::Open(std::move(source)));
+
+  // Use the same memory pool as the ParquetAllocator
+  reader->reset(new FileReader(allocator->pool(), std::move(pq_reader)));
+  return Status::OK();
+}
+
+Status FileReader::GetFlatColumn(int i, std::unique_ptr<FlatColumnReader>* 
out) {
+  return impl_->GetFlatColumn(i, out);
+}
+
+Status FileReader::ReadFlatColumn(int i, std::shared_ptr<Array>* out) {
+  return impl_->ReadFlatColumn(i, out);
+}
+
+Status FileReader::ReadFlatTable(std::shared_ptr<Table>* out) {
+  return impl_->ReadFlatTable(out);
+}
+
+FlatColumnReader::Impl::Impl(MemoryPool* pool, const ColumnDescriptor* descr,
+    ParquetFileReader* reader, int column_index)
+    : pool_(pool),
+      descr_(descr),
+      reader_(reader),
+      column_index_(column_index),
+      next_row_group_(0),
+      values_buffer_(pool),
+      def_levels_buffer_(pool) {
+  NodeToField(descr_->schema_node(), &field_);
+  NextRowGroup();
+}
+
+template <typename ArrowType, typename ParquetType>
+Status FlatColumnReader::Impl::ReadNonNullableBatch(typename 
ParquetType::c_type* values,
+    int64_t values_read, BuilderType<ArrowType>* builder) {
+  using ArrowCType = typename ArrowType::c_type;
+  using ParquetCType = typename ParquetType::c_type;
+
+  DCHECK(builder);
+  const ArrowCType* values_ptr = nullptr;
+  RETURN_NOT_OK(
+      (ConvertPhysicalType<ParquetCType, ArrowCType>(values, values_read, 
&values_ptr)));
+  RETURN_NOT_OK(builder->Append(values_ptr, values_read));
+  return Status::OK();
+}
+
+template <typename ArrowType, typename ParquetType>
+Status FlatColumnReader::Impl::ReadNullableFlatBatch(const int16_t* def_levels,
+    typename ParquetType::c_type* values, int64_t values_read, int64_t 
levels_read,
+    BuilderType<ArrowType>* builder) {
+  using ArrowCType = typename ArrowType::c_type;
+
+  DCHECK(builder);
+  RETURN_NOT_OK(values_builder_buffer_.Resize(levels_read * 
sizeof(ArrowCType)));
+  RETURN_NOT_OK(valid_bytes_buffer_.Resize(levels_read * sizeof(uint8_t)));
+  auto values_ptr = 
reinterpret_cast<ArrowCType*>(values_builder_buffer_.mutable_data());
+  uint8_t* valid_bytes = valid_bytes_buffer_.mutable_data();
+  int values_idx = 0;
+  for (int64_t i = 0; i < levels_read; i++) {
+    if (def_levels[i] < descr_->max_definition_level()) {
+      valid_bytes[i] = 0;
+    } else {
+      valid_bytes[i] = 1;
+      values_ptr[i] = values[values_idx++];
+    }
+  }
+  RETURN_NOT_OK(builder->Append(values_ptr, levels_read, valid_bytes));
+  return Status::OK();
+}
+
+template <typename ArrowType, typename ParquetType>
+Status FlatColumnReader::Impl::TypedReadBatch(
+    int batch_size, std::shared_ptr<Array>* out) {
+  using ParquetCType = typename ParquetType::c_type;
+
+  int values_to_read = batch_size;
+  BuilderType<ArrowType> builder(pool_, field_->type);
+  while ((values_to_read > 0) && column_reader_) {
+    values_buffer_.Resize(values_to_read * sizeof(ParquetCType));
+    if (descr_->max_definition_level() > 0) {
+      def_levels_buffer_.Resize(values_to_read * sizeof(int16_t));
+    }
+    auto reader = 
dynamic_cast<TypedColumnReader<ParquetType>*>(column_reader_.get());
+    int64_t values_read;
+    int64_t levels_read;
+    int16_t* def_levels = 
reinterpret_cast<int16_t*>(def_levels_buffer_.mutable_data());
+    auto values = 
reinterpret_cast<ParquetCType*>(values_buffer_.mutable_data());
+    PARQUET_CATCH_NOT_OK(levels_read = reader->ReadBatch(
+                             values_to_read, def_levels, nullptr, values, 
&values_read));
+    values_to_read -= levels_read;
+    if (descr_->max_definition_level() == 0) {
+      RETURN_NOT_OK(
+          (ReadNonNullableBatch<ArrowType, ParquetType>(values, values_read, 
&builder)));
+    } else {
+      // As per the defintion and checks for flat columns:
+      // descr_->max_definition_level() == 1
+      RETURN_NOT_OK((ReadNullableFlatBatch<ArrowType, ParquetType>(
+          def_levels, values, values_read, levels_read, &builder)));
+    }
+    if (!column_reader_->HasNext()) { NextRowGroup(); }
+  }
+  *out = builder.Finish();
+  return Status::OK();
+}
+
+template <>
+Status FlatColumnReader::Impl::TypedReadBatch<::arrow::StringType, 
ByteArrayType>(
+    int batch_size, std::shared_ptr<Array>* out) {
+  int values_to_read = batch_size;
+  ::arrow::StringBuilder builder(pool_, field_->type);
+  while ((values_to_read > 0) && column_reader_) {
+    values_buffer_.Resize(values_to_read * sizeof(ByteArray));
+    if (descr_->max_definition_level() > 0) {
+      def_levels_buffer_.Resize(values_to_read * sizeof(int16_t));
+    }
+    auto reader =
+        dynamic_cast<TypedColumnReader<ByteArrayType>*>(column_reader_.get());
+    int64_t values_read;
+    int64_t levels_read;
+    int16_t* def_levels = 
reinterpret_cast<int16_t*>(def_levels_buffer_.mutable_data());
+    auto values = reinterpret_cast<ByteArray*>(values_buffer_.mutable_data());
+    PARQUET_CATCH_NOT_OK(levels_read = reader->ReadBatch(
+                             values_to_read, def_levels, nullptr, values, 
&values_read));
+    values_to_read -= levels_read;
+    if (descr_->max_definition_level() == 0) {
+      for (int64_t i = 0; i < levels_read; i++) {
+        RETURN_NOT_OK(
+            builder.Append(reinterpret_cast<const char*>(values[i].ptr), 
values[i].len));
+      }
+    } else {
+      // descr_->max_definition_level() == 1
+      int values_idx = 0;
+      for (int64_t i = 0; i < levels_read; i++) {
+        if (def_levels[i] < descr_->max_definition_level()) {
+          RETURN_NOT_OK(builder.AppendNull());
+        } else {
+          RETURN_NOT_OK(
+              builder.Append(reinterpret_cast<const 
char*>(values[values_idx].ptr),
+                  values[values_idx].len));
+          values_idx++;
+        }
+      }
+    }
+    if (!column_reader_->HasNext()) { NextRowGroup(); }
+  }
+  *out = builder.Finish();
+  return Status::OK();
+}
+
+#define TYPED_BATCH_CASE(ENUM, ArrowType, ParquetType)              \
+  case ::arrow::Type::ENUM:                                         \
+    return TypedReadBatch<ArrowType, ParquetType>(batch_size, out); \
+    break;
+
+Status FlatColumnReader::Impl::NextBatch(int batch_size, 
std::shared_ptr<Array>* out) {
+  if (!column_reader_) {
+    // Exhausted all row groups.
+    *out = nullptr;
+    return Status::OK();
+  }
+
+  switch (field_->type->type) {
+    TYPED_BATCH_CASE(BOOL, ::arrow::BooleanType, BooleanType)
+    TYPED_BATCH_CASE(UINT8, ::arrow::UInt8Type, Int32Type)
+    TYPED_BATCH_CASE(INT8, ::arrow::Int8Type, Int32Type)
+    TYPED_BATCH_CASE(UINT16, ::arrow::UInt16Type, Int32Type)
+    TYPED_BATCH_CASE(INT16, ::arrow::Int16Type, Int32Type)
+    TYPED_BATCH_CASE(UINT32, ::arrow::UInt32Type, Int32Type)
+    TYPED_BATCH_CASE(INT32, ::arrow::Int32Type, Int32Type)
+    TYPED_BATCH_CASE(UINT64, ::arrow::UInt64Type, Int64Type)
+    TYPED_BATCH_CASE(INT64, ::arrow::Int64Type, Int64Type)
+    TYPED_BATCH_CASE(FLOAT, ::arrow::FloatType, FloatType)
+    TYPED_BATCH_CASE(DOUBLE, ::arrow::DoubleType, DoubleType)
+    TYPED_BATCH_CASE(STRING, ::arrow::StringType, ByteArrayType)
+    TYPED_BATCH_CASE(TIMESTAMP, ::arrow::TimestampType, Int64Type)
+    default:
+      return Status::NotImplemented(field_->type->ToString());
+  }
+}
+
+void FlatColumnReader::Impl::NextRowGroup() {
+  if (next_row_group_ < reader_->metadata()->num_row_groups()) {
+    column_reader_ = reader_->RowGroup(next_row_group_)->Column(column_index_);
+    next_row_group_++;
+  } else {
+    column_reader_ = nullptr;
+  }
+}
+
+FlatColumnReader::FlatColumnReader(std::unique_ptr<Impl> impl) : 
impl_(std::move(impl)) {}
+
+FlatColumnReader::~FlatColumnReader() {}
+
+Status FlatColumnReader::NextBatch(int batch_size, std::shared_ptr<Array>* 
out) {
+  return impl_->NextBatch(batch_size, out);
+}
+
+}  // namespace arrow
+}  // namespace parquet

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/4a7bf117/src/parquet/arrow/reader.h
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/reader.h b/src/parquet/arrow/reader.h
new file mode 100644
index 0000000..e725728
--- /dev/null
+++ b/src/parquet/arrow/reader.h
@@ -0,0 +1,148 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef PARQUET_ARROW_READER_H
+#define PARQUET_ARROW_READER_H
+
+#include <memory>
+
+#include "parquet/api/reader.h"
+#include "parquet/api/schema.h"
+#include "parquet/arrow/io.h"
+
+#include "arrow/io/interfaces.h"
+
+namespace arrow {
+
+class Array;
+class MemoryPool;
+class RowBatch;
+class Status;
+class Table;
+}
+
+namespace parquet {
+
+namespace arrow {
+
+class FlatColumnReader;
+
+// Arrow read adapter class for deserializing Parquet files as Arrow row
+// batches.
+//
+// TODO(wesm): nested data does not always make sense with this user
+// interface unless you are only reading a single leaf node from a branch of
+// a table. For example:
+//
+// repeated group data {
+//   optional group record {
+//     optional int32 val1;
+//     optional byte_array val2;
+//     optional bool val3;
+//   }
+//   optional int32 val4;
+// }
+//
+// In the Parquet file, there are 3 leaf nodes:
+//
+// * data.record.val1
+// * data.record.val2
+// * data.record.val3
+// * data.val4
+//
+// When materializing this data in an Arrow array, we would have:
+//
+// data: list<struct<
+//   record: struct<
+//    val1: int32,
+//    val2: string (= list<uint8>),
+//    val3: bool,
+//   >,
+//   val4: int32
+// >>
+//
+// However, in the Parquet format, each leaf node has its own repetition and
+// definition levels describing the structure of the intermediate nodes in
+// this array structure. Thus, we will need to scan the leaf data for a group
+// of leaf nodes part of the same type tree to create a single result Arrow
+// nested array structure.
+//
+// This is additionally complicated "chunky" repeated fields or very large byte
+// arrays
+class PARQUET_EXPORT FileReader {
+ public:
+  FileReader(::arrow::MemoryPool* pool, std::unique_ptr<ParquetFileReader> 
reader);
+
+  // Since the distribution of columns amongst a Parquet file's row groups may
+  // be uneven (the number of values in each column chunk can be different), we
+  // provide a column-oriented read interface. The ColumnReader hides the
+  // details of paging through the file's row groups and yielding
+  // fully-materialized arrow::Array instances
+  //
+  // Returns error status if the column of interest is not flat.
+  ::arrow::Status GetFlatColumn(int i, std::unique_ptr<FlatColumnReader>* out);
+  // Read column as a whole into an Array.
+  ::arrow::Status ReadFlatColumn(int i, std::shared_ptr<::arrow::Array>* out);
+  // Read a table of flat columns into a Table.
+  ::arrow::Status ReadFlatTable(std::shared_ptr<::arrow::Table>* out);
+
+  virtual ~FileReader();
+
+ private:
+  class PARQUET_NO_EXPORT Impl;
+  std::unique_ptr<Impl> impl_;
+};
+
+// At this point, the column reader is a stream iterator. It only knows how to
+// read the next batch of values for a particular column from the file until it
+// runs out.
+//
+// We also do not expose any internal Parquet details, such as row groups. This
+// might change in the future.
+class PARQUET_EXPORT FlatColumnReader {
+ public:
+  virtual ~FlatColumnReader();
+
+  // Scan the next array of the indicated size. The actual size of the
+  // returned array may be less than the passed size depending how much data is
+  // available in the file.
+  //
+  // When all the data in the file has been exhausted, the result is set to
+  // nullptr.
+  //
+  // Returns Status::OK on a successful read, including if you have exhausted
+  // the data available in the file.
+  ::arrow::Status NextBatch(int batch_size, std::shared_ptr<::arrow::Array>* 
out);
+
+ private:
+  class PARQUET_NO_EXPORT Impl;
+  std::unique_ptr<Impl> impl_;
+  explicit FlatColumnReader(std::unique_ptr<Impl> impl);
+
+  friend class FileReader;
+};
+
+// Helper function to create a file reader from an implementation of an Arrow
+// readable file
+PARQUET_EXPORT
+::arrow::Status OpenFile(const std::shared_ptr<::arrow::io::RandomAccessFile>& 
file,
+    ParquetAllocator* allocator, std::unique_ptr<FileReader>* reader);
+
+}  // namespace arrow
+}  // namespace parquet
+
+#endif  // PARQUET_ARROW_READER_H

Reply via email to