ARROW-296: [Python / C++] Remove arrow::parquet, make pyarrow link against parquet_arrow
This patch depends on PARQUET-728 (to run the full test suite, including pyarrow Parquet tests) Author: Wes McKinney <wes.mckin...@twosigma.com> Closes #145 from wesm/ARROW-296 and squashes the following commits: d67b4f9 [Wes McKinney] Refactor to link against parquet_arrow, fix up cmake files Project: http://git-wip-us.apache.org/repos/asf/arrow/repo Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/32fd692f Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/32fd692f Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/32fd692f Branch: refs/heads/master Commit: 32fd692f3aced29cc65a786d5ec63f8cd484853c Parents: 7e39747 Author: Wes McKinney <wes.mckin...@twosigma.com> Authored: Sun Sep 25 19:28:26 2016 -0400 Committer: Wes McKinney <wes.mckin...@twosigma.com> Committed: Sun Sep 25 19:28:26 2016 -0400 ---------------------------------------------------------------------- cpp/CMakeLists.txt | 18 - cpp/cmake_modules/FindParquet.cmake | 44 +- cpp/doc/Parquet.md | 15 +- cpp/src/arrow/parquet/CMakeLists.txt | 67 --- cpp/src/arrow/parquet/io.cc | 105 ---- cpp/src/arrow/parquet/io.h | 84 ---- cpp/src/arrow/parquet/parquet-io-test.cc | 135 ----- .../arrow/parquet/parquet-reader-writer-test.cc | 499 ------------------- cpp/src/arrow/parquet/parquet-schema-test.cc | 261 ---------- cpp/src/arrow/parquet/reader.cc | 401 --------------- cpp/src/arrow/parquet/reader.h | 146 ------ cpp/src/arrow/parquet/schema.cc | 344 ------------- cpp/src/arrow/parquet/schema.h | 53 -- cpp/src/arrow/parquet/test-util.h | 193 ------- cpp/src/arrow/parquet/utils.h | 52 -- cpp/src/arrow/parquet/writer.cc | 365 -------------- cpp/src/arrow/parquet/writer.h | 76 --- cpp/src/arrow/types/string.cc | 2 +- python/CMakeLists.txt | 14 +- python/cmake_modules/FindArrow.cmake | 22 - python/pyarrow/includes/parquet.pxd | 10 +- 21 files changed, 55 insertions(+), 2851 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/arrow/blob/32fd692f/cpp/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index be95dab..f3f4a7d 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -52,10 +52,6 @@ if("${CMAKE_SOURCE_DIR}" STREQUAL "${CMAKE_CURRENT_SOURCE_DIR}") "Build the libarrow shared libraries" ON) - option(ARROW_PARQUET - "Build the Parquet adapter and link to libparquet" - OFF) - option(ARROW_TEST_MEMCHECK "Run the test suite using valgrind --tool=memcheck" OFF) @@ -703,20 +699,6 @@ add_subdirectory(src/arrow/util) add_subdirectory(src/arrow/types) #---------------------------------------------------------------------- -# Parquet adapter library - -if(ARROW_PARQUET) - find_package(Parquet REQUIRED) - include_directories(SYSTEM ${PARQUET_INCLUDE_DIR}) - ADD_THIRDPARTY_LIB(parquet - STATIC_LIB ${PARQUET_STATIC_LIB} - SHARED_LIB ${PARQUET_SHARED_LIB}) - - add_subdirectory(src/arrow/parquet) - list(APPEND LINK_LIBS arrow_parquet parquet) -endif() - -#---------------------------------------------------------------------- # IPC library ## Flatbuffers http://git-wip-us.apache.org/repos/asf/arrow/blob/32fd692f/cpp/cmake_modules/FindParquet.cmake ---------------------------------------------------------------------- diff --git a/cpp/cmake_modules/FindParquet.cmake b/cpp/cmake_modules/FindParquet.cmake index 36f4828..7445e09 100644 --- a/cpp/cmake_modules/FindParquet.cmake +++ b/cpp/cmake_modules/FindParquet.cmake @@ -29,15 +29,20 @@ endif() # Try the parameterized roots, if they exist if ( _parquet_roots ) - find_path( PARQUET_INCLUDE_DIR NAMES parquet/api/reader.h - PATHS ${_parquet_roots} NO_DEFAULT_PATH - PATH_SUFFIXES "include" ) - find_library( PARQUET_LIBRARIES NAMES parquet - PATHS ${_parquet_roots} NO_DEFAULT_PATH - PATH_SUFFIXES "lib" ) + find_path( PARQUET_INCLUDE_DIR NAMES parquet/api/reader.h + PATHS ${_parquet_roots} NO_DEFAULT_PATH + PATH_SUFFIXES "include" ) + find_library( PARQUET_LIBRARIES NAMES parquet + PATHS ${_parquet_roots} NO_DEFAULT_PATH + PATH_SUFFIXES "lib" ) + + find_library(PARQUET_ARROW_LIBRARIES NAMES parquet_arrow + PATHS ${_parquet_roots} NO_DEFAULT_PATH + PATH_SUFFIXES "lib") else () - find_path( PARQUET_INCLUDE_DIR NAMES parquet/api/reader.h ) - find_library( PARQUET_LIBRARIES NAMES parquet ) + find_path(PARQUET_INCLUDE_DIR NAMES parquet/api/reader.h ) + find_library(PARQUET_LIBRARIES NAMES parquet) + find_library(PARQUET_ARROW_LIBRARIES NAMES parquet_arrow) endif () @@ -51,6 +56,18 @@ else () set(PARQUET_FOUND FALSE) endif () +if (PARQUET_INCLUDE_DIR AND PARQUET_ARROW_LIBRARIES) + set(PARQUET_ARROW_FOUND TRUE) + get_filename_component(PARQUET_ARROW_LIBS ${PARQUET_ARROW_LIBRARIES} PATH) + set(PARQUET_ARROW_LIB_NAME libparquet_arrow) + set(PARQUET_ARROW_STATIC_LIB + ${PARQUET_ARROW_LIBS}/${PARQUET_ARROW_LIB_NAME}.a) + set(PARQUET_ARROW_SHARED_LIB + ${PARQUET_ARROW_LIBS}/${PARQUET_ARROW_LIB_NAME}${CMAKE_SHARED_LIBRARY_SUFFIX}) +else () + set(PARQUET_ARROW_FOUND FALSE) +endif () + if (PARQUET_FOUND) if (NOT Parquet_FIND_QUIETLY) message(STATUS "Found the Parquet library: ${PARQUET_LIBRARIES}") @@ -71,6 +88,12 @@ else () endif () endif () +if (PARQUET_ARROW_FOUND) + if (NOT Parquet_FIND_QUIETLY) + message(STATUS "Found the Parquet Arrow library: ${PARQUET_ARROW_LIBS}") + endif() +endif() + mark_as_advanced( PARQUET_FOUND PARQUET_INCLUDE_DIR @@ -78,4 +101,9 @@ mark_as_advanced( PARQUET_LIBRARIES PARQUET_STATIC_LIB PARQUET_SHARED_LIB + + PARQUET_ARROW_FOUND + PARQUET_ARROW_LIBS + PARQUET_ARROW_STATIC_LIB + PARQUET_ARROW_SHARED_LIB ) http://git-wip-us.apache.org/repos/asf/arrow/blob/32fd692f/cpp/doc/Parquet.md ---------------------------------------------------------------------- diff --git a/cpp/doc/Parquet.md b/cpp/doc/Parquet.md index 370ac83..96471d9 100644 --- a/cpp/doc/Parquet.md +++ b/cpp/doc/Parquet.md @@ -1,24 +1,19 @@ ## Building Arrow-Parquet integration -To build the Arrow C++'s Parquet adapter library, you must first build [parquet-cpp][1]: +To use Arrow C++ with Parquet, you must first build the Arrow C++ libraries and +install them someplace. Then, you can build [parquet-cpp][1] with the Arrow +adapter library: ```bash # Set this to your preferred install location -export PARQUET_HOME=$HOME/local +export ARROW_HOME=$HOME/local git clone https://github.com/apache/parquet-cpp.git cd parquet-cpp source setup_build_env.sh -cmake -DCMAKE_INSTALL_PREFIX=$PARQUET_HOME +cmake -DCMAKE_INSTALL_PREFIX=$PARQUET_HOME -DPARQUET_ARROW=on make -j4 make install ``` -Make sure that `$PARQUET_HOME` is set to the installation location. Now, build -Arrow with the Parquet adapter enabled: - -```bash -cmake -DARROW_PARQUET=ON -``` - [1]: https://github.com/apache/parquet-cpp \ No newline at end of file http://git-wip-us.apache.org/repos/asf/arrow/blob/32fd692f/cpp/src/arrow/parquet/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/parquet/CMakeLists.txt b/cpp/src/arrow/parquet/CMakeLists.txt deleted file mode 100644 index c400e14..0000000 --- a/cpp/src/arrow/parquet/CMakeLists.txt +++ /dev/null @@ -1,67 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -# ---------------------------------------------------------------------- -# arrow_parquet : Arrow <-> Parquet adapter - -set(PARQUET_SRCS - io.cc - reader.cc - schema.cc - writer.cc -) - -set(PARQUET_LIBS - arrow_shared - arrow_io - parquet_shared -) - -add_library(arrow_parquet SHARED - ${PARQUET_SRCS} -) -target_link_libraries(arrow_parquet ${PARQUET_LIBS}) -SET_TARGET_PROPERTIES(arrow_parquet PROPERTIES LINKER_LANGUAGE CXX) - -if (APPLE) - set_target_properties(arrow_parquet - PROPERTIES - BUILD_WITH_INSTALL_RPATH ON - INSTALL_NAME_DIR "@rpath") -endif() - -ADD_ARROW_TEST(parquet-schema-test) -ARROW_TEST_LINK_LIBRARIES(parquet-schema-test arrow_parquet) - -ADD_ARROW_TEST(parquet-io-test) -ARROW_TEST_LINK_LIBRARIES(parquet-io-test arrow_parquet) - -ADD_ARROW_TEST(parquet-reader-writer-test) -ARROW_TEST_LINK_LIBRARIES(parquet-reader-writer-test arrow_parquet) - -# Headers: top level -install(FILES - io.h - reader.h - schema.h - utils.h - writer.h - DESTINATION include/arrow/parquet) - -install(TARGETS arrow_parquet - LIBRARY DESTINATION lib - ARCHIVE DESTINATION lib) http://git-wip-us.apache.org/repos/asf/arrow/blob/32fd692f/cpp/src/arrow/parquet/io.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/parquet/io.cc b/cpp/src/arrow/parquet/io.cc deleted file mode 100644 index a50d753..0000000 --- a/cpp/src/arrow/parquet/io.cc +++ /dev/null @@ -1,105 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "arrow/parquet/io.h" - -#include <cstdint> -#include <memory> - -#include "parquet/api/io.h" - -#include "arrow/parquet/utils.h" -#include "arrow/util/memory-pool.h" -#include "arrow/util/status.h" - -// To assist with readability -using ArrowROFile = arrow::io::ReadableFileInterface; - -namespace arrow { -namespace parquet { - -// ---------------------------------------------------------------------- -// ParquetAllocator - -ParquetAllocator::ParquetAllocator() : pool_(default_memory_pool()) {} - -ParquetAllocator::ParquetAllocator(MemoryPool* pool) : pool_(pool) {} - -ParquetAllocator::~ParquetAllocator() {} - -uint8_t* ParquetAllocator::Malloc(int64_t size) { - uint8_t* result; - PARQUET_THROW_NOT_OK(pool_->Allocate(size, &result)); - return result; -} - -void ParquetAllocator::Free(uint8_t* buffer, int64_t size) { - // Does not report Status - pool_->Free(buffer, size); -} - -// ---------------------------------------------------------------------- -// ParquetReadSource - -ParquetReadSource::ParquetReadSource(ParquetAllocator* allocator) - : file_(nullptr), allocator_(allocator) {} - -Status ParquetReadSource::Open(const std::shared_ptr<io::ReadableFileInterface>& file) { - int64_t file_size; - RETURN_NOT_OK(file->GetSize(&file_size)); - - file_ = file; - size_ = file_size; - return Status::OK(); -} - -void ParquetReadSource::Close() { - // TODO(wesm): Make this a no-op for now. This leaves Python wrappers for - // these classes in a borked state. Probably better to explicitly close. - - // PARQUET_THROW_NOT_OK(file_->Close()); -} - -int64_t ParquetReadSource::Tell() const { - int64_t position; - PARQUET_THROW_NOT_OK(file_->Tell(&position)); - return position; -} - -void ParquetReadSource::Seek(int64_t position) { - PARQUET_THROW_NOT_OK(file_->Seek(position)); -} - -int64_t ParquetReadSource::Read(int64_t nbytes, uint8_t* out) { - int64_t bytes_read; - PARQUET_THROW_NOT_OK(file_->Read(nbytes, &bytes_read, out)); - return bytes_read; -} - -std::shared_ptr<::parquet::Buffer> ParquetReadSource::Read(int64_t nbytes) { - // TODO(wesm): This code is duplicated from parquet/util/input.cc; suggests - // that there should be more code sharing amongst file-like sources - auto result = std::make_shared<::parquet::OwnedMutableBuffer>(0, allocator_); - result->Resize(nbytes); - - int64_t bytes_read = Read(nbytes, result->mutable_data()); - if (bytes_read < nbytes) { result->Resize(bytes_read); } - return result; -} - -} // namespace parquet -} // namespace arrow http://git-wip-us.apache.org/repos/asf/arrow/blob/32fd692f/cpp/src/arrow/parquet/io.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/parquet/io.h b/cpp/src/arrow/parquet/io.h deleted file mode 100644 index 1734863..0000000 --- a/cpp/src/arrow/parquet/io.h +++ /dev/null @@ -1,84 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -// Bridges Arrow's IO interfaces and Parquet-cpp's IO interfaces - -#ifndef ARROW_PARQUET_IO_H -#define ARROW_PARQUET_IO_H - -#include <cstdint> -#include <memory> - -#include "parquet/api/io.h" - -#include "arrow/io/interfaces.h" -#include "arrow/util/visibility.h" - -namespace arrow { - -class MemoryPool; - -namespace parquet { - -// An implementation of the Parquet MemoryAllocator API that plugs into an -// existing Arrow memory pool. This way we can direct all allocations to a -// single place rather than tracking allocations in different locations (for -// example: without utilizing parquet-cpp's default allocator) -class ARROW_EXPORT ParquetAllocator : public ::parquet::MemoryAllocator { - public: - // Uses the default memory pool - ParquetAllocator(); - - explicit ParquetAllocator(MemoryPool* pool); - virtual ~ParquetAllocator(); - - uint8_t* Malloc(int64_t size) override; - void Free(uint8_t* buffer, int64_t size) override; - - void set_pool(MemoryPool* pool) { pool_ = pool; } - - MemoryPool* pool() const { return pool_; } - - private: - MemoryPool* pool_; -}; - -class ARROW_EXPORT ParquetReadSource : public ::parquet::RandomAccessSource { - public: - explicit ParquetReadSource(ParquetAllocator* allocator); - - // We need to ask for the file size on opening the file, and this can fail - Status Open(const std::shared_ptr<io::ReadableFileInterface>& file); - - void Close() override; - int64_t Tell() const override; - void Seek(int64_t pos) override; - int64_t Read(int64_t nbytes, uint8_t* out) override; - std::shared_ptr<::parquet::Buffer> Read(int64_t nbytes) override; - - private: - // An Arrow readable file of some kind - std::shared_ptr<io::ReadableFileInterface> file_; - - // The allocator is required for creating managed buffers - ParquetAllocator* allocator_; -}; - -} // namespace parquet -} // namespace arrow - -#endif // ARROW_PARQUET_IO_H http://git-wip-us.apache.org/repos/asf/arrow/blob/32fd692f/cpp/src/arrow/parquet/parquet-io-test.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/parquet/parquet-io-test.cc b/cpp/src/arrow/parquet/parquet-io-test.cc deleted file mode 100644 index 208b3e8..0000000 --- a/cpp/src/arrow/parquet/parquet-io-test.cc +++ /dev/null @@ -1,135 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include <cstdint> -#include <cstdlib> -#include <memory> -#include <string> - -#include "gtest/gtest.h" - -#include "arrow/io/memory.h" -#include "arrow/parquet/io.h" -#include "arrow/test-util.h" -#include "arrow/util/memory-pool.h" -#include "arrow/util/status.h" - -#include "parquet/api/io.h" - -namespace arrow { -namespace parquet { - -// Allocator tests - -TEST(TestParquetAllocator, DefaultCtor) { - ParquetAllocator allocator; - - const int buffer_size = 10; - - uint8_t* buffer = nullptr; - ASSERT_NO_THROW(buffer = allocator.Malloc(buffer_size);); - - // valgrind will complain if we write into nullptr - memset(buffer, 0, buffer_size); - - allocator.Free(buffer, buffer_size); -} - -// Pass through to the default memory pool -class TrackingPool : public MemoryPool { - public: - TrackingPool() : pool_(default_memory_pool()), bytes_allocated_(0) {} - - Status Allocate(int64_t size, uint8_t** out) override { - RETURN_NOT_OK(pool_->Allocate(size, out)); - bytes_allocated_ += size; - return Status::OK(); - } - - void Free(uint8_t* buffer, int64_t size) override { - pool_->Free(buffer, size); - bytes_allocated_ -= size; - } - - int64_t bytes_allocated() const override { return bytes_allocated_; } - - private: - MemoryPool* pool_; - int64_t bytes_allocated_; -}; - -TEST(TestParquetAllocator, CustomPool) { - TrackingPool pool; - - ParquetAllocator allocator(&pool); - - ASSERT_EQ(&pool, allocator.pool()); - - const int buffer_size = 10; - - uint8_t* buffer = nullptr; - ASSERT_NO_THROW(buffer = allocator.Malloc(buffer_size);); - - ASSERT_EQ(buffer_size, pool.bytes_allocated()); - - // valgrind will complain if we write into nullptr - memset(buffer, 0, buffer_size); - - allocator.Free(buffer, buffer_size); - - ASSERT_EQ(0, pool.bytes_allocated()); -} - -// ---------------------------------------------------------------------- -// Read source tests - -TEST(TestParquetReadSource, Basics) { - std::string data = "this is the data"; - auto data_buffer = reinterpret_cast<const uint8_t*>(data.c_str()); - - ParquetAllocator allocator(default_memory_pool()); - - auto file = std::make_shared<io::BufferReader>(data_buffer, data.size()); - auto source = std::make_shared<ParquetReadSource>(&allocator); - - ASSERT_OK(source->Open(file)); - - ASSERT_EQ(0, source->Tell()); - ASSERT_NO_THROW(source->Seek(5)); - ASSERT_EQ(5, source->Tell()); - ASSERT_NO_THROW(source->Seek(0)); - - // Seek out of bounds - ASSERT_THROW(source->Seek(100), ::parquet::ParquetException); - - uint8_t buffer[50]; - - ASSERT_NO_THROW(source->Read(4, buffer)); - ASSERT_EQ(0, std::memcmp(buffer, "this", 4)); - ASSERT_EQ(4, source->Tell()); - - std::shared_ptr<::parquet::Buffer> pq_buffer; - - ASSERT_NO_THROW(pq_buffer = source->Read(7)); - - auto expected_buffer = std::make_shared<::parquet::Buffer>(data_buffer + 4, 7); - - ASSERT_TRUE(expected_buffer->Equals(*pq_buffer.get())); -} - -} // namespace parquet -} // namespace arrow http://git-wip-us.apache.org/repos/asf/arrow/blob/32fd692f/cpp/src/arrow/parquet/parquet-reader-writer-test.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/parquet/parquet-reader-writer-test.cc b/cpp/src/arrow/parquet/parquet-reader-writer-test.cc deleted file mode 100644 index d7b39dd..0000000 --- a/cpp/src/arrow/parquet/parquet-reader-writer-test.cc +++ /dev/null @@ -1,499 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "gtest/gtest.h" - -#include "arrow/test-util.h" -#include "arrow/parquet/test-util.h" -#include "arrow/parquet/reader.h" -#include "arrow/parquet/writer.h" -#include "arrow/types/construct.h" -#include "arrow/types/primitive.h" -#include "arrow/types/string.h" -#include "arrow/util/memory-pool.h" -#include "arrow/util/status.h" - -#include "parquet/api/reader.h" -#include "parquet/api/writer.h" - -using ParquetBuffer = parquet::Buffer; -using parquet::BufferReader; -using parquet::default_writer_properties; -using parquet::InMemoryOutputStream; -using parquet::LogicalType; -using parquet::ParquetFileReader; -using parquet::ParquetFileWriter; -using parquet::RandomAccessSource; -using parquet::Repetition; -using parquet::SchemaDescriptor; -using parquet::ParquetVersion; -using ParquetType = parquet::Type; -using parquet::schema::GroupNode; -using parquet::schema::NodePtr; -using parquet::schema::PrimitiveNode; - -namespace arrow { - -namespace parquet { - -const int SMALL_SIZE = 100; -const int LARGE_SIZE = 10000; - -template <typename TestType> -struct test_traits {}; - -template <> -struct test_traits<BooleanType> { - static constexpr ParquetType::type parquet_enum = ParquetType::BOOLEAN; - static constexpr LogicalType::type logical_enum = LogicalType::NONE; - static uint8_t const value; -}; - -const uint8_t test_traits<BooleanType>::value(1); - -template <> -struct test_traits<UInt8Type> { - static constexpr ParquetType::type parquet_enum = ParquetType::INT32; - static constexpr LogicalType::type logical_enum = LogicalType::UINT_8; - static uint8_t const value; -}; - -const uint8_t test_traits<UInt8Type>::value(64); - -template <> -struct test_traits<Int8Type> { - static constexpr ParquetType::type parquet_enum = ParquetType::INT32; - static constexpr LogicalType::type logical_enum = LogicalType::INT_8; - static int8_t const value; -}; - -const int8_t test_traits<Int8Type>::value(-64); - -template <> -struct test_traits<UInt16Type> { - static constexpr ParquetType::type parquet_enum = ParquetType::INT32; - static constexpr LogicalType::type logical_enum = LogicalType::UINT_16; - static uint16_t const value; -}; - -const uint16_t test_traits<UInt16Type>::value(1024); - -template <> -struct test_traits<Int16Type> { - static constexpr ParquetType::type parquet_enum = ParquetType::INT32; - static constexpr LogicalType::type logical_enum = LogicalType::INT_16; - static int16_t const value; -}; - -const int16_t test_traits<Int16Type>::value(-1024); - -template <> -struct test_traits<UInt32Type> { - static constexpr ParquetType::type parquet_enum = ParquetType::INT32; - static constexpr LogicalType::type logical_enum = LogicalType::UINT_32; - static uint32_t const value; -}; - -const uint32_t test_traits<UInt32Type>::value(1024); - -template <> -struct test_traits<Int32Type> { - static constexpr ParquetType::type parquet_enum = ParquetType::INT32; - static constexpr LogicalType::type logical_enum = LogicalType::NONE; - static int32_t const value; -}; - -const int32_t test_traits<Int32Type>::value(-1024); - -template <> -struct test_traits<UInt64Type> { - static constexpr ParquetType::type parquet_enum = ParquetType::INT64; - static constexpr LogicalType::type logical_enum = LogicalType::UINT_64; - static uint64_t const value; -}; - -const uint64_t test_traits<UInt64Type>::value(1024); - -template <> -struct test_traits<Int64Type> { - static constexpr ParquetType::type parquet_enum = ParquetType::INT64; - static constexpr LogicalType::type logical_enum = LogicalType::NONE; - static int64_t const value; -}; - -const int64_t test_traits<Int64Type>::value(-1024); - -template <> -struct test_traits<TimestampType> { - static constexpr ParquetType::type parquet_enum = ParquetType::INT64; - static constexpr LogicalType::type logical_enum = LogicalType::TIMESTAMP_MILLIS; - static int64_t const value; -}; - -const int64_t test_traits<TimestampType>::value(14695634030000); - -template <> -struct test_traits<FloatType> { - static constexpr ParquetType::type parquet_enum = ParquetType::FLOAT; - static constexpr LogicalType::type logical_enum = LogicalType::NONE; - static float const value; -}; - -const float test_traits<FloatType>::value(2.1f); - -template <> -struct test_traits<DoubleType> { - static constexpr ParquetType::type parquet_enum = ParquetType::DOUBLE; - static constexpr LogicalType::type logical_enum = LogicalType::NONE; - static double const value; -}; - -const double test_traits<DoubleType>::value(4.2); - -template <> -struct test_traits<StringType> { - static constexpr ParquetType::type parquet_enum = ParquetType::BYTE_ARRAY; - static constexpr LogicalType::type logical_enum = LogicalType::UTF8; - static std::string const value; -}; - -const std::string test_traits<StringType>::value("Test"); - -template <typename T> -using ParquetDataType = ::parquet::DataType<test_traits<T>::parquet_enum>; - -template <typename T> -using ParquetWriter = ::parquet::TypedColumnWriter<ParquetDataType<T>>; - -template <typename TestType> -class TestParquetIO : public ::testing::Test { - public: - virtual void SetUp() {} - - std::shared_ptr<GroupNode> MakeSchema(Repetition::type repetition) { - auto pnode = PrimitiveNode::Make("column1", repetition, - test_traits<TestType>::parquet_enum, test_traits<TestType>::logical_enum); - NodePtr node_ = - GroupNode::Make("schema", Repetition::REQUIRED, std::vector<NodePtr>({pnode})); - return std::static_pointer_cast<GroupNode>(node_); - } - - std::unique_ptr<ParquetFileWriter> MakeWriter( - const std::shared_ptr<GroupNode>& schema) { - sink_ = std::make_shared<InMemoryOutputStream>(); - return ParquetFileWriter::Open(sink_, schema); - } - - std::unique_ptr<ParquetFileReader> ReaderFromSink() { - std::shared_ptr<ParquetBuffer> buffer = sink_->GetBuffer(); - std::unique_ptr<RandomAccessSource> source(new BufferReader(buffer)); - return ParquetFileReader::Open(std::move(source)); - } - - void ReadSingleColumnFile( - std::unique_ptr<ParquetFileReader> file_reader, std::shared_ptr<Array>* out) { - arrow::parquet::FileReader reader(default_memory_pool(), std::move(file_reader)); - std::unique_ptr<arrow::parquet::FlatColumnReader> column_reader; - ASSERT_OK_NO_THROW(reader.GetFlatColumn(0, &column_reader)); - ASSERT_NE(nullptr, column_reader.get()); - - ASSERT_OK(column_reader->NextBatch(SMALL_SIZE, out)); - ASSERT_NE(nullptr, out->get()); - } - - void ReadAndCheckSingleColumnFile(Array* values) { - std::shared_ptr<Array> out; - ReadSingleColumnFile(ReaderFromSink(), &out); - ASSERT_TRUE(values->Equals(out)); - } - - void ReadTableFromFile( - std::unique_ptr<ParquetFileReader> file_reader, std::shared_ptr<Table>* out) { - arrow::parquet::FileReader reader(default_memory_pool(), std::move(file_reader)); - ASSERT_OK_NO_THROW(reader.ReadFlatTable(out)); - ASSERT_NE(nullptr, out->get()); - } - - void ReadAndCheckSingleColumnTable(const std::shared_ptr<Array>& values) { - std::shared_ptr<Table> out; - ReadTableFromFile(ReaderFromSink(), &out); - ASSERT_EQ(1, out->num_columns()); - ASSERT_EQ(values->length(), out->num_rows()); - - std::shared_ptr<ChunkedArray> chunked_array = out->column(0)->data(); - ASSERT_EQ(1, chunked_array->num_chunks()); - ASSERT_TRUE(values->Equals(chunked_array->chunk(0))); - } - - template <typename ArrayType> - void WriteFlatColumn(const std::shared_ptr<GroupNode>& schema, - const std::shared_ptr<ArrayType>& values) { - FileWriter writer(default_memory_pool(), MakeWriter(schema)); - ASSERT_OK_NO_THROW(writer.NewRowGroup(values->length())); - ASSERT_OK_NO_THROW(writer.WriteFlatColumnChunk(values.get())); - ASSERT_OK_NO_THROW(writer.Close()); - } - - std::shared_ptr<InMemoryOutputStream> sink_; -}; - -// We habe separate tests for UInt32Type as this is currently the only type -// where a roundtrip does not yield the identical Array structure. -// There we write an UInt32 Array but receive an Int64 Array as result for -// Parquet version 1.0. - -typedef ::testing::Types<BooleanType, UInt8Type, Int8Type, UInt16Type, Int16Type, - Int32Type, UInt64Type, Int64Type, TimestampType, FloatType, DoubleType, - StringType> TestTypes; - -TYPED_TEST_CASE(TestParquetIO, TestTypes); - -TYPED_TEST(TestParquetIO, SingleColumnRequiredWrite) { - auto values = NonNullArray<TypeParam>(SMALL_SIZE); - - std::shared_ptr<GroupNode> schema = this->MakeSchema(Repetition::REQUIRED); - this->WriteFlatColumn(schema, values); - - this->ReadAndCheckSingleColumnFile(values.get()); -} - -TYPED_TEST(TestParquetIO, SingleColumnTableRequiredWrite) { - auto values = NonNullArray<TypeParam>(SMALL_SIZE); - std::shared_ptr<Table> table = MakeSimpleTable(values, false); - this->sink_ = std::make_shared<InMemoryOutputStream>(); - ASSERT_OK_NO_THROW(WriteFlatTable(table.get(), default_memory_pool(), this->sink_, - values->length(), default_writer_properties())); - - std::shared_ptr<Table> out; - this->ReadTableFromFile(this->ReaderFromSink(), &out); - ASSERT_EQ(1, out->num_columns()); - ASSERT_EQ(100, out->num_rows()); - - std::shared_ptr<ChunkedArray> chunked_array = out->column(0)->data(); - ASSERT_EQ(1, chunked_array->num_chunks()); - ASSERT_TRUE(values->Equals(chunked_array->chunk(0))); -} - -TYPED_TEST(TestParquetIO, SingleColumnOptionalReadWrite) { - // This also tests max_definition_level = 1 - auto values = NullableArray<TypeParam>(SMALL_SIZE, 10); - - std::shared_ptr<GroupNode> schema = this->MakeSchema(Repetition::OPTIONAL); - this->WriteFlatColumn(schema, values); - - this->ReadAndCheckSingleColumnFile(values.get()); -} - -TYPED_TEST(TestParquetIO, SingleColumnTableOptionalReadWrite) { - // This also tests max_definition_level = 1 - std::shared_ptr<Array> values = NullableArray<TypeParam>(SMALL_SIZE, 10); - std::shared_ptr<Table> table = MakeSimpleTable(values, true); - this->sink_ = std::make_shared<InMemoryOutputStream>(); - ASSERT_OK_NO_THROW(WriteFlatTable(table.get(), default_memory_pool(), this->sink_, - values->length(), default_writer_properties())); - - this->ReadAndCheckSingleColumnTable(values); -} - -TYPED_TEST(TestParquetIO, SingleColumnRequiredChunkedWrite) { - auto values = NonNullArray<TypeParam>(SMALL_SIZE); - int64_t chunk_size = values->length() / 4; - - std::shared_ptr<GroupNode> schema = this->MakeSchema(Repetition::REQUIRED); - FileWriter writer(default_memory_pool(), this->MakeWriter(schema)); - for (int i = 0; i < 4; i++) { - ASSERT_OK_NO_THROW(writer.NewRowGroup(chunk_size)); - ASSERT_OK_NO_THROW( - writer.WriteFlatColumnChunk(values.get(), i * chunk_size, chunk_size)); - } - ASSERT_OK_NO_THROW(writer.Close()); - - this->ReadAndCheckSingleColumnFile(values.get()); -} - -TYPED_TEST(TestParquetIO, SingleColumnTableRequiredChunkedWrite) { - auto values = NonNullArray<TypeParam>(LARGE_SIZE); - std::shared_ptr<Table> table = MakeSimpleTable(values, false); - this->sink_ = std::make_shared<InMemoryOutputStream>(); - ASSERT_OK_NO_THROW(WriteFlatTable( - table.get(), default_memory_pool(), this->sink_, 512, default_writer_properties())); - - this->ReadAndCheckSingleColumnTable(values); -} - -TYPED_TEST(TestParquetIO, SingleColumnOptionalChunkedWrite) { - int64_t chunk_size = SMALL_SIZE / 4; - auto values = NullableArray<TypeParam>(SMALL_SIZE, 10); - - std::shared_ptr<GroupNode> schema = this->MakeSchema(Repetition::OPTIONAL); - FileWriter writer(default_memory_pool(), this->MakeWriter(schema)); - for (int i = 0; i < 4; i++) { - ASSERT_OK_NO_THROW(writer.NewRowGroup(chunk_size)); - ASSERT_OK_NO_THROW( - writer.WriteFlatColumnChunk(values.get(), i * chunk_size, chunk_size)); - } - ASSERT_OK_NO_THROW(writer.Close()); - - this->ReadAndCheckSingleColumnFile(values.get()); -} - -TYPED_TEST(TestParquetIO, SingleColumnTableOptionalChunkedWrite) { - // This also tests max_definition_level = 1 - auto values = NullableArray<TypeParam>(LARGE_SIZE, 100); - std::shared_ptr<Table> table = MakeSimpleTable(values, true); - this->sink_ = std::make_shared<InMemoryOutputStream>(); - ASSERT_OK_NO_THROW(WriteFlatTable( - table.get(), default_memory_pool(), this->sink_, 512, default_writer_properties())); - - this->ReadAndCheckSingleColumnTable(values); -} - -using TestUInt32ParquetIO = TestParquetIO<UInt32Type>; - -TEST_F(TestUInt32ParquetIO, Parquet_2_0_Compability) { - // This also tests max_definition_level = 1 - std::shared_ptr<PrimitiveArray> values = NullableArray<UInt32Type>(LARGE_SIZE, 100); - std::shared_ptr<Table> table = MakeSimpleTable(values, true); - - // Parquet 2.0 roundtrip should yield an uint32_t column again - this->sink_ = std::make_shared<InMemoryOutputStream>(); - std::shared_ptr<::parquet::WriterProperties> properties = - ::parquet::WriterProperties::Builder() - .version(ParquetVersion::PARQUET_2_0) - ->build(); - ASSERT_OK_NO_THROW( - WriteFlatTable(table.get(), default_memory_pool(), this->sink_, 512, properties)); - this->ReadAndCheckSingleColumnTable(values); -} - -TEST_F(TestUInt32ParquetIO, Parquet_1_0_Compability) { - // This also tests max_definition_level = 1 - std::shared_ptr<PrimitiveArray> values = NullableArray<UInt32Type>(LARGE_SIZE, 100); - std::shared_ptr<Table> table = MakeSimpleTable(values, true); - - // Parquet 1.0 returns an int64_t column as there is no way to tell a Parquet 1.0 - // reader that a column is unsigned. - this->sink_ = std::make_shared<InMemoryOutputStream>(); - std::shared_ptr<::parquet::WriterProperties> properties = - ::parquet::WriterProperties::Builder() - .version(ParquetVersion::PARQUET_1_0) - ->build(); - ASSERT_OK_NO_THROW( - WriteFlatTable(table.get(), default_memory_pool(), this->sink_, 512, properties)); - - std::shared_ptr<Array> expected_values; - std::shared_ptr<PoolBuffer> int64_data = - std::make_shared<PoolBuffer>(default_memory_pool()); - { - ASSERT_OK(int64_data->Resize(sizeof(int64_t) * values->length())); - int64_t* int64_data_ptr = reinterpret_cast<int64_t*>(int64_data->mutable_data()); - const uint32_t* uint32_data_ptr = - reinterpret_cast<const uint32_t*>(values->data()->data()); - // std::copy might be faster but this is explicit on the casts) - for (int64_t i = 0; i < values->length(); i++) { - int64_data_ptr[i] = static_cast<int64_t>(uint32_data_ptr[i]); - } - } - ASSERT_OK(MakePrimitiveArray(std::make_shared<Int64Type>(), values->length(), - int64_data, values->null_count(), values->null_bitmap(), &expected_values)); - this->ReadAndCheckSingleColumnTable(expected_values); -} - -template <typename T> -using ParquetCDataType = typename ParquetDataType<T>::c_type; - -template <typename TestType> -class TestPrimitiveParquetIO : public TestParquetIO<TestType> { - public: - typedef typename TestType::c_type T; - - void MakeTestFile(std::vector<T>& values, int num_chunks, - std::unique_ptr<ParquetFileReader>* file_reader) { - std::shared_ptr<GroupNode> schema = this->MakeSchema(Repetition::REQUIRED); - std::unique_ptr<ParquetFileWriter> file_writer = this->MakeWriter(schema); - size_t chunk_size = values.size() / num_chunks; - // Convert to Parquet's expected physical type - std::vector<uint8_t> values_buffer( - sizeof(ParquetCDataType<TestType>) * values.size()); - auto values_parquet = - reinterpret_cast<ParquetCDataType<TestType>*>(values_buffer.data()); - std::copy(values.cbegin(), values.cend(), values_parquet); - for (int i = 0; i < num_chunks; i++) { - auto row_group_writer = file_writer->AppendRowGroup(chunk_size); - auto column_writer = - static_cast<ParquetWriter<TestType>*>(row_group_writer->NextColumn()); - ParquetCDataType<TestType>* data = values_parquet + i * chunk_size; - column_writer->WriteBatch(chunk_size, nullptr, nullptr, data); - column_writer->Close(); - row_group_writer->Close(); - } - file_writer->Close(); - *file_reader = this->ReaderFromSink(); - } - - void CheckSingleColumnRequiredTableRead(int num_chunks) { - std::vector<T> values(SMALL_SIZE, test_traits<TestType>::value); - std::unique_ptr<ParquetFileReader> file_reader; - ASSERT_NO_THROW(MakeTestFile(values, num_chunks, &file_reader)); - - std::shared_ptr<Table> out; - this->ReadTableFromFile(std::move(file_reader), &out); - ASSERT_EQ(1, out->num_columns()); - ASSERT_EQ(SMALL_SIZE, out->num_rows()); - - std::shared_ptr<ChunkedArray> chunked_array = out->column(0)->data(); - ASSERT_EQ(1, chunked_array->num_chunks()); - ExpectArray<TestType>(values.data(), chunked_array->chunk(0).get()); - } - - void CheckSingleColumnRequiredRead(int num_chunks) { - std::vector<T> values(SMALL_SIZE, test_traits<TestType>::value); - std::unique_ptr<ParquetFileReader> file_reader; - ASSERT_NO_THROW(MakeTestFile(values, num_chunks, &file_reader)); - - std::shared_ptr<Array> out; - this->ReadSingleColumnFile(std::move(file_reader), &out); - - ExpectArray<TestType>(values.data(), out.get()); - } -}; - -typedef ::testing::Types<BooleanType, UInt8Type, Int8Type, UInt16Type, Int16Type, - UInt32Type, Int32Type, UInt64Type, Int64Type, FloatType, - DoubleType> PrimitiveTestTypes; - -TYPED_TEST_CASE(TestPrimitiveParquetIO, PrimitiveTestTypes); - -TYPED_TEST(TestPrimitiveParquetIO, SingleColumnRequiredRead) { - this->CheckSingleColumnRequiredRead(1); -} - -TYPED_TEST(TestPrimitiveParquetIO, SingleColumnRequiredTableRead) { - this->CheckSingleColumnRequiredTableRead(1); -} - -TYPED_TEST(TestPrimitiveParquetIO, SingleColumnRequiredChunkedRead) { - this->CheckSingleColumnRequiredRead(4); -} - -TYPED_TEST(TestPrimitiveParquetIO, SingleColumnRequiredChunkedTableRead) { - this->CheckSingleColumnRequiredTableRead(4); -} - -} // namespace parquet - -} // namespace arrow http://git-wip-us.apache.org/repos/asf/arrow/blob/32fd692f/cpp/src/arrow/parquet/parquet-schema-test.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/parquet/parquet-schema-test.cc b/cpp/src/arrow/parquet/parquet-schema-test.cc deleted file mode 100644 index 63ad8fb..0000000 --- a/cpp/src/arrow/parquet/parquet-schema-test.cc +++ /dev/null @@ -1,261 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include <memory> -#include <vector> - -#include "gtest/gtest.h" - -#include "arrow/test-util.h" -#include "arrow/type.h" -#include "arrow/types/datetime.h" -#include "arrow/types/decimal.h" -#include "arrow/util/status.h" - -#include "arrow/parquet/schema.h" - -using ParquetType = parquet::Type; -using parquet::LogicalType; -using parquet::Repetition; -using parquet::schema::NodePtr; -using parquet::schema::GroupNode; -using parquet::schema::PrimitiveNode; - -namespace arrow { - -namespace parquet { - -const auto BOOL = std::make_shared<BooleanType>(); -const auto UINT8 = std::make_shared<UInt8Type>(); -const auto INT32 = std::make_shared<Int32Type>(); -const auto INT64 = std::make_shared<Int64Type>(); -const auto FLOAT = std::make_shared<FloatType>(); -const auto DOUBLE = std::make_shared<DoubleType>(); -const auto UTF8 = std::make_shared<StringType>(); -const auto TIMESTAMP_MS = std::make_shared<TimestampType>(TimestampType::Unit::MILLI); -// TODO: This requires parquet-cpp implementing the MICROS enum value -// const auto TIMESTAMP_US = std::make_shared<TimestampType>(TimestampType::Unit::MICRO); -const auto BINARY = std::make_shared<ListType>(std::make_shared<Field>("", UINT8)); -const auto DECIMAL_8_4 = std::make_shared<DecimalType>(8, 4); - -class TestConvertParquetSchema : public ::testing::Test { - public: - virtual void SetUp() {} - - void CheckFlatSchema(const std::shared_ptr<Schema>& expected_schema) { - ASSERT_EQ(expected_schema->num_fields(), result_schema_->num_fields()); - for (int i = 0; i < expected_schema->num_fields(); ++i) { - auto lhs = result_schema_->field(i); - auto rhs = expected_schema->field(i); - EXPECT_TRUE(lhs->Equals(rhs)) << i << " " << lhs->ToString() - << " != " << rhs->ToString(); - } - } - - Status ConvertSchema(const std::vector<NodePtr>& nodes) { - NodePtr schema = GroupNode::Make("schema", Repetition::REPEATED, nodes); - descr_.Init(schema); - return FromParquetSchema(&descr_, &result_schema_); - } - - protected: - ::parquet::SchemaDescriptor descr_; - std::shared_ptr<Schema> result_schema_; -}; - -TEST_F(TestConvertParquetSchema, ParquetFlatPrimitives) { - std::vector<NodePtr> parquet_fields; - std::vector<std::shared_ptr<Field>> arrow_fields; - - parquet_fields.push_back( - PrimitiveNode::Make("boolean", Repetition::REQUIRED, ParquetType::BOOLEAN)); - arrow_fields.push_back(std::make_shared<Field>("boolean", BOOL, false)); - - parquet_fields.push_back( - PrimitiveNode::Make("int32", Repetition::REQUIRED, ParquetType::INT32)); - arrow_fields.push_back(std::make_shared<Field>("int32", INT32, false)); - - parquet_fields.push_back( - PrimitiveNode::Make("int64", Repetition::REQUIRED, ParquetType::INT64)); - arrow_fields.push_back(std::make_shared<Field>("int64", INT64, false)); - - parquet_fields.push_back(PrimitiveNode::Make("timestamp", Repetition::REQUIRED, - ParquetType::INT64, LogicalType::TIMESTAMP_MILLIS)); - arrow_fields.push_back(std::make_shared<Field>("timestamp", TIMESTAMP_MS, false)); - - // parquet_fields.push_back(PrimitiveNode::Make("timestamp", Repetition::REQUIRED, - // ParquetType::INT64, LogicalType::TIMESTAMP_MICROS)); - // arrow_fields.push_back(std::make_shared<Field>("timestamp", TIMESTAMP_US, false)); - - parquet_fields.push_back( - PrimitiveNode::Make("float", Repetition::OPTIONAL, ParquetType::FLOAT)); - arrow_fields.push_back(std::make_shared<Field>("float", FLOAT)); - - parquet_fields.push_back( - PrimitiveNode::Make("double", Repetition::OPTIONAL, ParquetType::DOUBLE)); - arrow_fields.push_back(std::make_shared<Field>("double", DOUBLE)); - - parquet_fields.push_back( - PrimitiveNode::Make("binary", Repetition::OPTIONAL, ParquetType::BYTE_ARRAY)); - arrow_fields.push_back(std::make_shared<Field>("binary", BINARY)); - - parquet_fields.push_back(PrimitiveNode::Make( - "string", Repetition::OPTIONAL, ParquetType::BYTE_ARRAY, LogicalType::UTF8)); - arrow_fields.push_back(std::make_shared<Field>("string", UTF8)); - - parquet_fields.push_back(PrimitiveNode::Make("flba-binary", Repetition::OPTIONAL, - ParquetType::FIXED_LEN_BYTE_ARRAY, LogicalType::NONE, 12)); - arrow_fields.push_back(std::make_shared<Field>("flba-binary", BINARY)); - - auto arrow_schema = std::make_shared<Schema>(arrow_fields); - ASSERT_OK(ConvertSchema(parquet_fields)); - - CheckFlatSchema(arrow_schema); -} - -TEST_F(TestConvertParquetSchema, ParquetFlatDecimals) { - std::vector<NodePtr> parquet_fields; - std::vector<std::shared_ptr<Field>> arrow_fields; - - parquet_fields.push_back(PrimitiveNode::Make("flba-decimal", Repetition::OPTIONAL, - ParquetType::FIXED_LEN_BYTE_ARRAY, LogicalType::DECIMAL, 4, 8, 4)); - arrow_fields.push_back(std::make_shared<Field>("flba-decimal", DECIMAL_8_4)); - - parquet_fields.push_back(PrimitiveNode::Make("binary-decimal", Repetition::OPTIONAL, - ParquetType::BYTE_ARRAY, LogicalType::DECIMAL, -1, 8, 4)); - arrow_fields.push_back(std::make_shared<Field>("binary-decimal", DECIMAL_8_4)); - - parquet_fields.push_back(PrimitiveNode::Make("int32-decimal", Repetition::OPTIONAL, - ParquetType::INT32, LogicalType::DECIMAL, -1, 8, 4)); - arrow_fields.push_back(std::make_shared<Field>("int32-decimal", DECIMAL_8_4)); - - parquet_fields.push_back(PrimitiveNode::Make("int64-decimal", Repetition::OPTIONAL, - ParquetType::INT64, LogicalType::DECIMAL, -1, 8, 4)); - arrow_fields.push_back(std::make_shared<Field>("int64-decimal", DECIMAL_8_4)); - - auto arrow_schema = std::make_shared<Schema>(arrow_fields); - ASSERT_OK(ConvertSchema(parquet_fields)); - - CheckFlatSchema(arrow_schema); -} - -TEST_F(TestConvertParquetSchema, UnsupportedThings) { - std::vector<NodePtr> unsupported_nodes; - - unsupported_nodes.push_back( - PrimitiveNode::Make("int96", Repetition::REQUIRED, ParquetType::INT96)); - - unsupported_nodes.push_back( - GroupNode::Make("repeated-group", Repetition::REPEATED, {})); - - unsupported_nodes.push_back(PrimitiveNode::Make( - "int32", Repetition::OPTIONAL, ParquetType::INT32, LogicalType::DATE)); - - for (const NodePtr& node : unsupported_nodes) { - ASSERT_RAISES(NotImplemented, ConvertSchema({node})); - } -} - -class TestConvertArrowSchema : public ::testing::Test { - public: - virtual void SetUp() {} - - void CheckFlatSchema(const std::vector<NodePtr>& nodes) { - NodePtr schema_node = GroupNode::Make("schema", Repetition::REPEATED, nodes); - const GroupNode* expected_schema_node = - static_cast<const GroupNode*>(schema_node.get()); - const GroupNode* result_schema_node = result_schema_->group_node(); - - ASSERT_EQ(expected_schema_node->field_count(), result_schema_node->field_count()); - - for (int i = 0; i < expected_schema_node->field_count(); i++) { - auto lhs = result_schema_node->field(i); - auto rhs = expected_schema_node->field(i); - EXPECT_TRUE(lhs->Equals(rhs.get())); - } - } - - Status ConvertSchema(const std::vector<std::shared_ptr<Field>>& fields) { - arrow_schema_ = std::make_shared<Schema>(fields); - std::shared_ptr<::parquet::WriterProperties> properties = - ::parquet::default_writer_properties(); - return ToParquetSchema(arrow_schema_.get(), *properties.get(), &result_schema_); - } - - protected: - std::shared_ptr<Schema> arrow_schema_; - std::shared_ptr<::parquet::SchemaDescriptor> result_schema_; -}; - -TEST_F(TestConvertArrowSchema, ParquetFlatPrimitives) { - std::vector<NodePtr> parquet_fields; - std::vector<std::shared_ptr<Field>> arrow_fields; - - parquet_fields.push_back( - PrimitiveNode::Make("boolean", Repetition::REQUIRED, ParquetType::BOOLEAN)); - arrow_fields.push_back(std::make_shared<Field>("boolean", BOOL, false)); - - parquet_fields.push_back( - PrimitiveNode::Make("int32", Repetition::REQUIRED, ParquetType::INT32)); - arrow_fields.push_back(std::make_shared<Field>("int32", INT32, false)); - - parquet_fields.push_back( - PrimitiveNode::Make("int64", Repetition::REQUIRED, ParquetType::INT64)); - arrow_fields.push_back(std::make_shared<Field>("int64", INT64, false)); - - parquet_fields.push_back(PrimitiveNode::Make("timestamp", Repetition::REQUIRED, - ParquetType::INT64, LogicalType::TIMESTAMP_MILLIS)); - arrow_fields.push_back(std::make_shared<Field>("timestamp", TIMESTAMP_MS, false)); - - // parquet_fields.push_back(PrimitiveNode::Make("timestamp", Repetition::REQUIRED, - // ParquetType::INT64, LogicalType::TIMESTAMP_MICROS)); - // arrow_fields.push_back(std::make_shared<Field>("timestamp", TIMESTAMP_US, false)); - - parquet_fields.push_back( - PrimitiveNode::Make("float", Repetition::OPTIONAL, ParquetType::FLOAT)); - arrow_fields.push_back(std::make_shared<Field>("float", FLOAT)); - - parquet_fields.push_back( - PrimitiveNode::Make("double", Repetition::OPTIONAL, ParquetType::DOUBLE)); - arrow_fields.push_back(std::make_shared<Field>("double", DOUBLE)); - - // TODO: String types need to be clarified a bit more in the Arrow spec - parquet_fields.push_back(PrimitiveNode::Make( - "string", Repetition::OPTIONAL, ParquetType::BYTE_ARRAY, LogicalType::UTF8)); - arrow_fields.push_back(std::make_shared<Field>("string", UTF8)); - - ASSERT_OK(ConvertSchema(arrow_fields)); - - CheckFlatSchema(parquet_fields); -} - -TEST_F(TestConvertArrowSchema, ParquetFlatDecimals) { - std::vector<NodePtr> parquet_fields; - std::vector<std::shared_ptr<Field>> arrow_fields; - - // TODO: Test Decimal Arrow -> Parquet conversion - - ASSERT_OK(ConvertSchema(arrow_fields)); - - CheckFlatSchema(parquet_fields); -} - -TEST(TestNodeConversion, DateAndTime) {} - -} // namespace parquet - -} // namespace arrow http://git-wip-us.apache.org/repos/asf/arrow/blob/32fd692f/cpp/src/arrow/parquet/reader.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/parquet/reader.cc b/cpp/src/arrow/parquet/reader.cc deleted file mode 100644 index 0c2fc6e..0000000 --- a/cpp/src/arrow/parquet/reader.cc +++ /dev/null @@ -1,401 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "arrow/parquet/reader.h" - -#include <algorithm> -#include <queue> -#include <string> -#include <vector> - -#include "arrow/column.h" -#include "arrow/parquet/io.h" -#include "arrow/parquet/schema.h" -#include "arrow/parquet/utils.h" -#include "arrow/schema.h" -#include "arrow/table.h" -#include "arrow/types/primitive.h" -#include "arrow/types/string.h" -#include "arrow/util/status.h" - -using parquet::ColumnReader; -using parquet::Repetition; -using parquet::TypedColumnReader; - -// Help reduce verbosity -using ParquetRAS = parquet::RandomAccessSource; -using ParquetReader = parquet::ParquetFileReader; - -namespace arrow { -namespace parquet { - -template <typename ArrowType> -struct ArrowTypeTraits { - typedef NumericBuilder<ArrowType> builder_type; -}; - -template <> -struct ArrowTypeTraits<BooleanType> { - typedef BooleanBuilder builder_type; -}; - -template <typename ArrowType> -using BuilderType = typename ArrowTypeTraits<ArrowType>::builder_type; - -class FileReader::Impl { - public: - Impl(MemoryPool* pool, std::unique_ptr<::parquet::ParquetFileReader> reader); - virtual ~Impl() {} - - bool CheckForFlatColumn(const ::parquet::ColumnDescriptor* descr); - Status GetFlatColumn(int i, std::unique_ptr<FlatColumnReader>* out); - Status ReadFlatColumn(int i, std::shared_ptr<Array>* out); - Status ReadFlatTable(std::shared_ptr<Table>* out); - - private: - MemoryPool* pool_; - std::unique_ptr<::parquet::ParquetFileReader> reader_; -}; - -class FlatColumnReader::Impl { - public: - Impl(MemoryPool* pool, const ::parquet::ColumnDescriptor* descr, - ::parquet::ParquetFileReader* reader, int column_index); - virtual ~Impl() {} - - Status NextBatch(int batch_size, std::shared_ptr<Array>* out); - template <typename ArrowType, typename ParquetType> - Status TypedReadBatch(int batch_size, std::shared_ptr<Array>* out); - - template <typename ArrowType, typename ParquetType> - Status ReadNullableFlatBatch(const int16_t* def_levels, - typename ParquetType::c_type* values, int64_t values_read, int64_t levels_read, - BuilderType<ArrowType>* builder); - template <typename ArrowType, typename ParquetType> - Status ReadNonNullableBatch(typename ParquetType::c_type* values, int64_t values_read, - BuilderType<ArrowType>* builder); - - private: - void NextRowGroup(); - - template <typename InType, typename OutType> - struct can_copy_ptr { - static constexpr bool value = - std::is_same<InType, OutType>::value || - (std::is_integral<InType>{} && std::is_integral<OutType>{} && - (sizeof(InType) == sizeof(OutType))); - }; - - template <typename InType, typename OutType, - typename std::enable_if<can_copy_ptr<InType, OutType>::value>::type* = nullptr> - Status ConvertPhysicalType( - const InType* in_ptr, int64_t length, const OutType** out_ptr) { - *out_ptr = reinterpret_cast<const OutType*>(in_ptr); - return Status::OK(); - } - - template <typename InType, typename OutType, - typename std::enable_if<not can_copy_ptr<InType, OutType>::value>::type* = nullptr> - Status ConvertPhysicalType( - const InType* in_ptr, int64_t length, const OutType** out_ptr) { - RETURN_NOT_OK(values_builder_buffer_.Resize(length * sizeof(OutType))); - OutType* mutable_out_ptr = - reinterpret_cast<OutType*>(values_builder_buffer_.mutable_data()); - std::copy(in_ptr, in_ptr + length, mutable_out_ptr); - *out_ptr = mutable_out_ptr; - return Status::OK(); - } - - MemoryPool* pool_; - const ::parquet::ColumnDescriptor* descr_; - ::parquet::ParquetFileReader* reader_; - int column_index_; - int next_row_group_; - std::shared_ptr<ColumnReader> column_reader_; - std::shared_ptr<Field> field_; - - PoolBuffer values_buffer_; - PoolBuffer def_levels_buffer_; - PoolBuffer values_builder_buffer_; - PoolBuffer valid_bytes_buffer_; -}; - -FileReader::Impl::Impl( - MemoryPool* pool, std::unique_ptr<::parquet::ParquetFileReader> reader) - : pool_(pool), reader_(std::move(reader)) {} - -bool FileReader::Impl::CheckForFlatColumn(const ::parquet::ColumnDescriptor* descr) { - if ((descr->max_repetition_level() > 0) || (descr->max_definition_level() > 1)) { - return false; - } else if ((descr->max_definition_level() == 1) && - (descr->schema_node()->repetition() != Repetition::OPTIONAL)) { - return false; - } - return true; -} - -Status FileReader::Impl::GetFlatColumn(int i, std::unique_ptr<FlatColumnReader>* out) { - const ::parquet::SchemaDescriptor* schema = reader_->metadata()->schema(); - - if (!CheckForFlatColumn(schema->Column(i))) { - return Status::Invalid("The requested column is not flat"); - } - std::unique_ptr<FlatColumnReader::Impl> impl( - new FlatColumnReader::Impl(pool_, schema->Column(i), reader_.get(), i)); - *out = std::unique_ptr<FlatColumnReader>(new FlatColumnReader(std::move(impl))); - return Status::OK(); -} - -Status FileReader::Impl::ReadFlatColumn(int i, std::shared_ptr<Array>* out) { - std::unique_ptr<FlatColumnReader> flat_column_reader; - RETURN_NOT_OK(GetFlatColumn(i, &flat_column_reader)); - return flat_column_reader->NextBatch(reader_->metadata()->num_rows(), out); -} - -Status FileReader::Impl::ReadFlatTable(std::shared_ptr<Table>* table) { - auto descr = reader_->metadata()->schema(); - - const std::string& name = descr->name(); - std::shared_ptr<Schema> schema; - RETURN_NOT_OK(FromParquetSchema(descr, &schema)); - - int num_columns = reader_->metadata()->num_columns(); - - std::vector<std::shared_ptr<Column>> columns(num_columns); - for (int i = 0; i < num_columns; i++) { - std::shared_ptr<Array> array; - RETURN_NOT_OK(ReadFlatColumn(i, &array)); - columns[i] = std::make_shared<Column>(schema->field(i), array); - } - - *table = std::make_shared<Table>(name, schema, columns); - return Status::OK(); -} - -FileReader::FileReader( - MemoryPool* pool, std::unique_ptr<::parquet::ParquetFileReader> reader) - : impl_(new FileReader::Impl(pool, std::move(reader))) {} - -FileReader::~FileReader() {} - -// Static ctor -Status OpenFile(const std::shared_ptr<io::ReadableFileInterface>& file, - ParquetAllocator* allocator, std::unique_ptr<FileReader>* reader) { - std::unique_ptr<ParquetReadSource> source(new ParquetReadSource(allocator)); - RETURN_NOT_OK(source->Open(file)); - - // TODO(wesm): reader properties - std::unique_ptr<ParquetReader> pq_reader; - PARQUET_CATCH_NOT_OK(pq_reader = ParquetReader::Open(std::move(source))); - - // Use the same memory pool as the ParquetAllocator - reader->reset(new FileReader(allocator->pool(), std::move(pq_reader))); - return Status::OK(); -} - -Status FileReader::GetFlatColumn(int i, std::unique_ptr<FlatColumnReader>* out) { - return impl_->GetFlatColumn(i, out); -} - -Status FileReader::ReadFlatColumn(int i, std::shared_ptr<Array>* out) { - return impl_->ReadFlatColumn(i, out); -} - -Status FileReader::ReadFlatTable(std::shared_ptr<Table>* out) { - return impl_->ReadFlatTable(out); -} - -FlatColumnReader::Impl::Impl(MemoryPool* pool, const ::parquet::ColumnDescriptor* descr, - ::parquet::ParquetFileReader* reader, int column_index) - : pool_(pool), - descr_(descr), - reader_(reader), - column_index_(column_index), - next_row_group_(0), - values_buffer_(pool), - def_levels_buffer_(pool) { - NodeToField(descr_->schema_node(), &field_); - NextRowGroup(); -} - -template <typename ArrowType, typename ParquetType> -Status FlatColumnReader::Impl::ReadNonNullableBatch(typename ParquetType::c_type* values, - int64_t values_read, BuilderType<ArrowType>* builder) { - using ArrowCType = typename ArrowType::c_type; - using ParquetCType = typename ParquetType::c_type; - - DCHECK(builder); - const ArrowCType* values_ptr = nullptr; - RETURN_NOT_OK( - (ConvertPhysicalType<ParquetCType, ArrowCType>(values, values_read, &values_ptr))); - RETURN_NOT_OK(builder->Append(values_ptr, values_read)); - return Status::OK(); -} - -template <typename ArrowType, typename ParquetType> -Status FlatColumnReader::Impl::ReadNullableFlatBatch(const int16_t* def_levels, - typename ParquetType::c_type* values, int64_t values_read, int64_t levels_read, - BuilderType<ArrowType>* builder) { - using ArrowCType = typename ArrowType::c_type; - - DCHECK(builder); - RETURN_NOT_OK(values_builder_buffer_.Resize(levels_read * sizeof(ArrowCType))); - RETURN_NOT_OK(valid_bytes_buffer_.Resize(levels_read * sizeof(uint8_t))); - auto values_ptr = reinterpret_cast<ArrowCType*>(values_builder_buffer_.mutable_data()); - uint8_t* valid_bytes = valid_bytes_buffer_.mutable_data(); - int values_idx = 0; - for (int64_t i = 0; i < levels_read; i++) { - if (def_levels[i] < descr_->max_definition_level()) { - valid_bytes[i] = 0; - } else { - valid_bytes[i] = 1; - values_ptr[i] = values[values_idx++]; - } - } - RETURN_NOT_OK(builder->Append(values_ptr, levels_read, valid_bytes)); - return Status::OK(); -} - -template <typename ArrowType, typename ParquetType> -Status FlatColumnReader::Impl::TypedReadBatch( - int batch_size, std::shared_ptr<Array>* out) { - using ParquetCType = typename ParquetType::c_type; - - int values_to_read = batch_size; - BuilderType<ArrowType> builder(pool_, field_->type); - while ((values_to_read > 0) && column_reader_) { - values_buffer_.Resize(values_to_read * sizeof(ParquetCType)); - if (descr_->max_definition_level() > 0) { - def_levels_buffer_.Resize(values_to_read * sizeof(int16_t)); - } - auto reader = dynamic_cast<TypedColumnReader<ParquetType>*>(column_reader_.get()); - int64_t values_read; - int64_t levels_read; - int16_t* def_levels = reinterpret_cast<int16_t*>(def_levels_buffer_.mutable_data()); - auto values = reinterpret_cast<ParquetCType*>(values_buffer_.mutable_data()); - PARQUET_CATCH_NOT_OK(levels_read = reader->ReadBatch( - values_to_read, def_levels, nullptr, values, &values_read)); - values_to_read -= levels_read; - if (descr_->max_definition_level() == 0) { - RETURN_NOT_OK( - (ReadNonNullableBatch<ArrowType, ParquetType>(values, values_read, &builder))); - } else { - // As per the defintion and checks for flat columns: - // descr_->max_definition_level() == 1 - RETURN_NOT_OK((ReadNullableFlatBatch<ArrowType, ParquetType>( - def_levels, values, values_read, levels_read, &builder))); - } - if (!column_reader_->HasNext()) { NextRowGroup(); } - } - *out = builder.Finish(); - return Status::OK(); -} - -template <> -Status FlatColumnReader::Impl::TypedReadBatch<StringType, ::parquet::ByteArrayType>( - int batch_size, std::shared_ptr<Array>* out) { - int values_to_read = batch_size; - StringBuilder builder(pool_, field_->type); - while ((values_to_read > 0) && column_reader_) { - values_buffer_.Resize(values_to_read * sizeof(::parquet::ByteArray)); - if (descr_->max_definition_level() > 0) { - def_levels_buffer_.Resize(values_to_read * sizeof(int16_t)); - } - auto reader = - dynamic_cast<TypedColumnReader<::parquet::ByteArrayType>*>(column_reader_.get()); - int64_t values_read; - int64_t levels_read; - int16_t* def_levels = reinterpret_cast<int16_t*>(def_levels_buffer_.mutable_data()); - auto values = reinterpret_cast<::parquet::ByteArray*>(values_buffer_.mutable_data()); - PARQUET_CATCH_NOT_OK(levels_read = reader->ReadBatch( - values_to_read, def_levels, nullptr, values, &values_read)); - values_to_read -= levels_read; - if (descr_->max_definition_level() == 0) { - for (int64_t i = 0; i < levels_read; i++) { - RETURN_NOT_OK( - builder.Append(reinterpret_cast<const char*>(values[i].ptr), values[i].len)); - } - } else { - // descr_->max_definition_level() == 1 - int values_idx = 0; - for (int64_t i = 0; i < levels_read; i++) { - if (def_levels[i] < descr_->max_definition_level()) { - RETURN_NOT_OK(builder.AppendNull()); - } else { - RETURN_NOT_OK( - builder.Append(reinterpret_cast<const char*>(values[values_idx].ptr), - values[values_idx].len)); - values_idx++; - } - } - } - if (!column_reader_->HasNext()) { NextRowGroup(); } - } - *out = builder.Finish(); - return Status::OK(); -} - -#define TYPED_BATCH_CASE(ENUM, ArrowType, ParquetType) \ - case Type::ENUM: \ - return TypedReadBatch<ArrowType, ParquetType>(batch_size, out); \ - break; - -Status FlatColumnReader::Impl::NextBatch(int batch_size, std::shared_ptr<Array>* out) { - if (!column_reader_) { - // Exhausted all row groups. - *out = nullptr; - return Status::OK(); - } - - switch (field_->type->type) { - TYPED_BATCH_CASE(BOOL, BooleanType, ::parquet::BooleanType) - TYPED_BATCH_CASE(UINT8, UInt8Type, ::parquet::Int32Type) - TYPED_BATCH_CASE(INT8, Int8Type, ::parquet::Int32Type) - TYPED_BATCH_CASE(UINT16, UInt16Type, ::parquet::Int32Type) - TYPED_BATCH_CASE(INT16, Int16Type, ::parquet::Int32Type) - TYPED_BATCH_CASE(UINT32, UInt32Type, ::parquet::Int32Type) - TYPED_BATCH_CASE(INT32, Int32Type, ::parquet::Int32Type) - TYPED_BATCH_CASE(UINT64, UInt64Type, ::parquet::Int64Type) - TYPED_BATCH_CASE(INT64, Int64Type, ::parquet::Int64Type) - TYPED_BATCH_CASE(FLOAT, FloatType, ::parquet::FloatType) - TYPED_BATCH_CASE(DOUBLE, DoubleType, ::parquet::DoubleType) - TYPED_BATCH_CASE(STRING, StringType, ::parquet::ByteArrayType) - TYPED_BATCH_CASE(TIMESTAMP, TimestampType, ::parquet::Int64Type) - default: - return Status::NotImplemented(field_->type->ToString()); - } -} - -void FlatColumnReader::Impl::NextRowGroup() { - if (next_row_group_ < reader_->metadata()->num_row_groups()) { - column_reader_ = reader_->RowGroup(next_row_group_)->Column(column_index_); - next_row_group_++; - } else { - column_reader_ = nullptr; - } -} - -FlatColumnReader::FlatColumnReader(std::unique_ptr<Impl> impl) : impl_(std::move(impl)) {} - -FlatColumnReader::~FlatColumnReader() {} - -Status FlatColumnReader::NextBatch(int batch_size, std::shared_ptr<Array>* out) { - return impl_->NextBatch(batch_size, out); -} - -} // namespace parquet -} // namespace arrow http://git-wip-us.apache.org/repos/asf/arrow/blob/32fd692f/cpp/src/arrow/parquet/reader.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/parquet/reader.h b/cpp/src/arrow/parquet/reader.h deleted file mode 100644 index 2689beb..0000000 --- a/cpp/src/arrow/parquet/reader.h +++ /dev/null @@ -1,146 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#ifndef ARROW_PARQUET_READER_H -#define ARROW_PARQUET_READER_H - -#include <memory> - -#include "parquet/api/reader.h" -#include "parquet/api/schema.h" - -#include "arrow/io/interfaces.h" -#include "arrow/parquet/io.h" -#include "arrow/util/visibility.h" - -namespace arrow { - -class Array; -class MemoryPool; -class RecordBatch; -class Status; -class Table; - -namespace parquet { - -class FlatColumnReader; - -// Arrow read adapter class for deserializing Parquet files as Arrow row -// batches. -// -// TODO(wesm): nested data does not always make sense with this user -// interface unless you are only reading a single leaf node from a branch of -// a table. For example: -// -// repeated group data { -// optional group record { -// optional int32 val1; -// optional byte_array val2; -// optional bool val3; -// } -// optional int32 val4; -// } -// -// In the Parquet file, there are 3 leaf nodes: -// -// * data.record.val1 -// * data.record.val2 -// * data.record.val3 -// * data.val4 -// -// When materializing this data in an Arrow array, we would have: -// -// data: list<struct< -// record: struct< -// val1: int32, -// val2: string (= list<uint8>), -// val3: bool, -// >, -// val4: int32 -// >> -// -// However, in the Parquet format, each leaf node has its own repetition and -// definition levels describing the structure of the intermediate nodes in -// this array structure. Thus, we will need to scan the leaf data for a group -// of leaf nodes part of the same type tree to create a single result Arrow -// nested array structure. -// -// This is additionally complicated "chunky" repeated fields or very large byte -// arrays -class ARROW_EXPORT FileReader { - public: - FileReader(MemoryPool* pool, std::unique_ptr<::parquet::ParquetFileReader> reader); - - // Since the distribution of columns amongst a Parquet file's row groups may - // be uneven (the number of values in each column chunk can be different), we - // provide a column-oriented read interface. The ColumnReader hides the - // details of paging through the file's row groups and yielding - // fully-materialized arrow::Array instances - // - // Returns error status if the column of interest is not flat. - Status GetFlatColumn(int i, std::unique_ptr<FlatColumnReader>* out); - // Read column as a whole into an Array. - Status ReadFlatColumn(int i, std::shared_ptr<Array>* out); - // Read a table of flat columns into a Table. - Status ReadFlatTable(std::shared_ptr<Table>* out); - - virtual ~FileReader(); - - private: - class ARROW_NO_EXPORT Impl; - std::unique_ptr<Impl> impl_; -}; - -// At this point, the column reader is a stream iterator. It only knows how to -// read the next batch of values for a particular column from the file until it -// runs out. -// -// We also do not expose any internal Parquet details, such as row groups. This -// might change in the future. -class ARROW_EXPORT FlatColumnReader { - public: - virtual ~FlatColumnReader(); - - // Scan the next array of the indicated size. The actual size of the - // returned array may be less than the passed size depending how much data is - // available in the file. - // - // When all the data in the file has been exhausted, the result is set to - // nullptr. - // - // Returns Status::OK on a successful read, including if you have exhausted - // the data available in the file. - Status NextBatch(int batch_size, std::shared_ptr<Array>* out); - - private: - class ARROW_NO_EXPORT Impl; - std::unique_ptr<Impl> impl_; - explicit FlatColumnReader(std::unique_ptr<Impl> impl); - - friend class FileReader; -}; - -// Helper function to create a file reader from an implementation of an Arrow -// readable file -ARROW_EXPORT -Status OpenFile(const std::shared_ptr<io::ReadableFileInterface>& file, - ParquetAllocator* allocator, std::unique_ptr<FileReader>* reader); - -} // namespace parquet -} // namespace arrow - -#endif // ARROW_PARQUET_READER_H http://git-wip-us.apache.org/repos/asf/arrow/blob/32fd692f/cpp/src/arrow/parquet/schema.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/parquet/schema.cc b/cpp/src/arrow/parquet/schema.cc deleted file mode 100644 index ff32e51..0000000 --- a/cpp/src/arrow/parquet/schema.cc +++ /dev/null @@ -1,344 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "arrow/parquet/schema.h" - -#include <string> -#include <vector> - -#include "parquet/api/schema.h" - -#include "arrow/parquet/utils.h" -#include "arrow/types/decimal.h" -#include "arrow/types/string.h" -#include "arrow/util/status.h" - -using parquet::Repetition; -using parquet::schema::Node; -using parquet::schema::NodePtr; -using parquet::schema::GroupNode; -using parquet::schema::PrimitiveNode; - -using ParquetType = parquet::Type; -using parquet::LogicalType; - -namespace arrow { - -namespace parquet { - -const auto BOOL = std::make_shared<BooleanType>(); -const auto UINT8 = std::make_shared<UInt8Type>(); -const auto INT8 = std::make_shared<Int8Type>(); -const auto UINT16 = std::make_shared<UInt16Type>(); -const auto INT16 = std::make_shared<Int16Type>(); -const auto UINT32 = std::make_shared<UInt32Type>(); -const auto INT32 = std::make_shared<Int32Type>(); -const auto UINT64 = std::make_shared<UInt64Type>(); -const auto INT64 = std::make_shared<Int64Type>(); -const auto FLOAT = std::make_shared<FloatType>(); -const auto DOUBLE = std::make_shared<DoubleType>(); -const auto UTF8 = std::make_shared<StringType>(); -const auto TIMESTAMP_MS = std::make_shared<TimestampType>(TimestampType::Unit::MILLI); -const auto BINARY = std::make_shared<ListType>(std::make_shared<Field>("", UINT8)); - -TypePtr MakeDecimalType(const PrimitiveNode* node) { - int precision = node->decimal_metadata().precision; - int scale = node->decimal_metadata().scale; - return std::make_shared<DecimalType>(precision, scale); -} - -static Status FromByteArray(const PrimitiveNode* node, TypePtr* out) { - switch (node->logical_type()) { - case LogicalType::UTF8: - *out = UTF8; - break; - case LogicalType::DECIMAL: - *out = MakeDecimalType(node); - break; - default: - // BINARY - *out = BINARY; - break; - } - return Status::OK(); -} - -static Status FromFLBA(const PrimitiveNode* node, TypePtr* out) { - switch (node->logical_type()) { - case LogicalType::NONE: - *out = BINARY; - break; - case LogicalType::DECIMAL: - *out = MakeDecimalType(node); - break; - default: - return Status::NotImplemented("unhandled type"); - break; - } - - return Status::OK(); -} - -static Status FromInt32(const PrimitiveNode* node, TypePtr* out) { - switch (node->logical_type()) { - case LogicalType::NONE: - *out = INT32; - break; - case LogicalType::UINT_8: - *out = UINT8; - break; - case LogicalType::INT_8: - *out = INT8; - break; - case LogicalType::UINT_16: - *out = UINT16; - break; - case LogicalType::INT_16: - *out = INT16; - break; - case LogicalType::UINT_32: - *out = UINT32; - break; - case LogicalType::DECIMAL: - *out = MakeDecimalType(node); - break; - default: - return Status::NotImplemented("Unhandled logical type for int32"); - break; - } - return Status::OK(); -} - -static Status FromInt64(const PrimitiveNode* node, TypePtr* out) { - switch (node->logical_type()) { - case LogicalType::NONE: - *out = INT64; - break; - case LogicalType::UINT_64: - *out = UINT64; - break; - case LogicalType::DECIMAL: - *out = MakeDecimalType(node); - break; - case LogicalType::TIMESTAMP_MILLIS: - *out = TIMESTAMP_MS; - break; - default: - return Status::NotImplemented("Unhandled logical type for int64"); - break; - } - return Status::OK(); -} - -// TODO: Logical Type Handling -Status NodeToField(const NodePtr& node, std::shared_ptr<Field>* out) { - std::shared_ptr<DataType> type; - - if (node->is_repeated()) { - return Status::NotImplemented("No support yet for repeated node types"); - } - - if (node->is_group()) { - const GroupNode* group = static_cast<const GroupNode*>(node.get()); - std::vector<std::shared_ptr<Field>> fields(group->field_count()); - for (int i = 0; i < group->field_count(); i++) { - RETURN_NOT_OK(NodeToField(group->field(i), &fields[i])); - } - type = std::make_shared<StructType>(fields); - } else { - // Primitive (leaf) node - const PrimitiveNode* primitive = static_cast<const PrimitiveNode*>(node.get()); - - switch (primitive->physical_type()) { - case ParquetType::BOOLEAN: - type = BOOL; - break; - case ParquetType::INT32: - RETURN_NOT_OK(FromInt32(primitive, &type)); - break; - case ParquetType::INT64: - RETURN_NOT_OK(FromInt64(primitive, &type)); - break; - case ParquetType::INT96: - // TODO: Do we have that type in Arrow? - // type = TypePtr(new Int96Type()); - return Status::NotImplemented("int96"); - case ParquetType::FLOAT: - type = FLOAT; - break; - case ParquetType::DOUBLE: - type = DOUBLE; - break; - case ParquetType::BYTE_ARRAY: - // TODO: Do we have that type in Arrow? - RETURN_NOT_OK(FromByteArray(primitive, &type)); - break; - case ParquetType::FIXED_LEN_BYTE_ARRAY: - RETURN_NOT_OK(FromFLBA(primitive, &type)); - break; - } - } - - *out = std::make_shared<Field>(node->name(), type, !node->is_required()); - return Status::OK(); -} - -Status FromParquetSchema( - const ::parquet::SchemaDescriptor* parquet_schema, std::shared_ptr<Schema>* out) { - // TODO(wesm): Consider adding an arrow::Schema name attribute, which comes - // from the root Parquet node - const GroupNode* schema_node = - static_cast<const GroupNode*>(parquet_schema->group_node()); - - std::vector<std::shared_ptr<Field>> fields(schema_node->field_count()); - for (int i = 0; i < schema_node->field_count(); i++) { - RETURN_NOT_OK(NodeToField(schema_node->field(i), &fields[i])); - } - - *out = std::make_shared<Schema>(fields); - return Status::OK(); -} - -Status StructToNode(const std::shared_ptr<StructType>& type, const std::string& name, - bool nullable, const ::parquet::WriterProperties& properties, NodePtr* out) { - Repetition::type repetition = Repetition::REQUIRED; - if (nullable) { repetition = Repetition::OPTIONAL; } - - std::vector<NodePtr> children(type->num_children()); - for (int i = 0; i < type->num_children(); i++) { - RETURN_NOT_OK(FieldToNode(type->child(i), properties, &children[i])); - } - - *out = GroupNode::Make(name, repetition, children); - return Status::OK(); -} - -Status FieldToNode(const std::shared_ptr<Field>& field, - const ::parquet::WriterProperties& properties, NodePtr* out) { - LogicalType::type logical_type = LogicalType::NONE; - ParquetType::type type; - Repetition::type repetition = Repetition::REQUIRED; - if (field->nullable) { repetition = Repetition::OPTIONAL; } - int length = -1; - - switch (field->type->type) { - // TODO: - // case Type::NA: - // break; - case Type::BOOL: - type = ParquetType::BOOLEAN; - break; - case Type::UINT8: - type = ParquetType::INT32; - logical_type = LogicalType::UINT_8; - break; - case Type::INT8: - type = ParquetType::INT32; - logical_type = LogicalType::INT_8; - break; - case Type::UINT16: - type = ParquetType::INT32; - logical_type = LogicalType::UINT_16; - break; - case Type::INT16: - type = ParquetType::INT32; - logical_type = LogicalType::INT_16; - break; - case Type::UINT32: - if (properties.version() == ::parquet::ParquetVersion::PARQUET_1_0) { - type = ParquetType::INT64; - } else { - type = ParquetType::INT32; - logical_type = LogicalType::UINT_32; - } - break; - case Type::INT32: - type = ParquetType::INT32; - break; - case Type::UINT64: - type = ParquetType::INT64; - logical_type = LogicalType::UINT_64; - break; - case Type::INT64: - type = ParquetType::INT64; - break; - case Type::FLOAT: - type = ParquetType::FLOAT; - break; - case Type::DOUBLE: - type = ParquetType::DOUBLE; - break; - case Type::STRING: - type = ParquetType::BYTE_ARRAY; - logical_type = LogicalType::UTF8; - break; - case Type::BINARY: - type = ParquetType::BYTE_ARRAY; - break; - case Type::DATE: - type = ParquetType::INT32; - logical_type = LogicalType::DATE; - break; - case Type::TIMESTAMP: { - auto timestamp_type = static_cast<TimestampType*>(field->type.get()); - if (timestamp_type->unit != TimestampType::Unit::MILLI) { - return Status::NotImplemented( - "Other timestamp units than millisecond are not yet support with parquet."); - } - type = ParquetType::INT64; - logical_type = LogicalType::TIMESTAMP_MILLIS; - } break; - case Type::TIMESTAMP_DOUBLE: - type = ParquetType::INT64; - // This is specified as seconds since the UNIX epoch - // TODO: Converted type in Parquet? - // logical_type = LogicalType::TIMESTAMP_MILLIS; - break; - case Type::TIME: - type = ParquetType::INT64; - logical_type = LogicalType::TIME_MILLIS; - break; - case Type::STRUCT: { - auto struct_type = std::static_pointer_cast<StructType>(field->type); - return StructToNode(struct_type, field->name, field->nullable, properties, out); - } break; - default: - // TODO: LIST, DENSE_UNION, SPARE_UNION, JSON_SCALAR, DECIMAL, DECIMAL_TEXT, VARCHAR - return Status::NotImplemented("unhandled type"); - } - *out = PrimitiveNode::Make(field->name, repetition, type, logical_type, length); - return Status::OK(); -} - -Status ToParquetSchema(const Schema* arrow_schema, - const ::parquet::WriterProperties& properties, - std::shared_ptr<::parquet::SchemaDescriptor>* out) { - std::vector<NodePtr> nodes(arrow_schema->num_fields()); - for (int i = 0; i < arrow_schema->num_fields(); i++) { - RETURN_NOT_OK(FieldToNode(arrow_schema->field(i), properties, &nodes[i])); - } - - NodePtr schema = GroupNode::Make("schema", Repetition::REPEATED, nodes); - *out = std::make_shared<::parquet::SchemaDescriptor>(); - PARQUET_CATCH_NOT_OK((*out)->Init(schema)); - - return Status::OK(); -} - -} // namespace parquet - -} // namespace arrow http://git-wip-us.apache.org/repos/asf/arrow/blob/32fd692f/cpp/src/arrow/parquet/schema.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/parquet/schema.h b/cpp/src/arrow/parquet/schema.h deleted file mode 100644 index 88b5977..0000000 --- a/cpp/src/arrow/parquet/schema.h +++ /dev/null @@ -1,53 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#ifndef ARROW_PARQUET_SCHEMA_H -#define ARROW_PARQUET_SCHEMA_H - -#include <memory> - -#include "parquet/api/schema.h" -#include "parquet/api/writer.h" - -#include "arrow/schema.h" -#include "arrow/type.h" -#include "arrow/util/visibility.h" - -namespace arrow { - -class Status; - -namespace parquet { - -Status ARROW_EXPORT NodeToField( - const ::parquet::schema::NodePtr& node, std::shared_ptr<Field>* out); - -Status ARROW_EXPORT FromParquetSchema( - const ::parquet::SchemaDescriptor* parquet_schema, std::shared_ptr<Schema>* out); - -Status ARROW_EXPORT FieldToNode(const std::shared_ptr<Field>& field, - const ::parquet::WriterProperties& properties, ::parquet::schema::NodePtr* out); - -Status ARROW_EXPORT ToParquetSchema(const Schema* arrow_schema, - const ::parquet::WriterProperties& properties, - std::shared_ptr<::parquet::SchemaDescriptor>* out); - -} // namespace parquet - -} // namespace arrow - -#endif // ARROW_PARQUET_SCHEMA_H