Repository: parquet-cpp Updated Branches: refs/heads/master 47a94590d -> 8bff44273
PARQUET-782: Support writing to Arrow sinks Author: Uwe L. Korn <[email protected]> Closes #196 from xhochy/PARQUET-782 and squashes the following commits: b89738a [Uwe L. Korn] Update arrow hash 041f66d [Uwe L. Korn] PARQUET-782: Support writing to Arrow sinks Project: http://git-wip-us.apache.org/repos/asf/parquet-cpp/repo Commit: http://git-wip-us.apache.org/repos/asf/parquet-cpp/commit/8bff4427 Tree: http://git-wip-us.apache.org/repos/asf/parquet-cpp/tree/8bff4427 Diff: http://git-wip-us.apache.org/repos/asf/parquet-cpp/diff/8bff4427 Branch: refs/heads/master Commit: 8bff44273d2ef3663c9e52cbc958d7b72442ba8f Parents: 47a9459 Author: Uwe L. Korn <[email protected]> Authored: Sun Nov 27 17:24:27 2016 -0500 Committer: Wes McKinney <[email protected]> Committed: Sun Nov 27 17:24:27 2016 -0500 ---------------------------------------------------------------------- src/parquet/arrow/arrow-reader-writer-test.cc | 40 +++++++++++- src/parquet/arrow/io.cc | 20 ++++++ src/parquet/arrow/io.h | 19 ++++++ src/parquet/arrow/reader.cc | 73 ++++++++++++++++++++++ src/parquet/arrow/test-util.h | 20 +++--- src/parquet/arrow/writer.cc | 8 +++ src/parquet/arrow/writer.h | 7 +++ thirdparty/versions.sh | 2 +- 8 files changed, 177 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/8bff4427/src/parquet/arrow/arrow-reader-writer-test.cc ---------------------------------------------------------------------- diff --git a/src/parquet/arrow/arrow-reader-writer-test.cc b/src/parquet/arrow/arrow-reader-writer-test.cc index 5ec70f3..7bcb590 100644 --- a/src/parquet/arrow/arrow-reader-writer-test.cc +++ b/src/parquet/arrow/arrow-reader-writer-test.cc @@ -24,6 +24,7 @@ #include "parquet/arrow/test-util.h" #include "parquet/arrow/writer.h" +#include "arrow/io/memory.h" #include "arrow/test-util.h" #include "arrow/types/construct.h" #include "arrow/types/primitive.h" @@ -342,6 +343,29 @@ TYPED_TEST(TestParquetIO, SingleColumnTableRequiredChunkedWrite) { this->ReadAndCheckSingleColumnTable(values); } +TYPED_TEST(TestParquetIO, SingleColumnTableRequiredChunkedWriteArrowIO) { + std::shared_ptr<Array> values; + ASSERT_OK(NonNullArray<TypeParam>(LARGE_SIZE, &values)); + std::shared_ptr<Table> table = MakeSimpleTable(values, false); + this->sink_ = std::make_shared<InMemoryOutputStream>(); + auto buffer = std::make_shared<::arrow::PoolBuffer>(); + auto arrow_sink_ = std::make_shared<::arrow::io::BufferOutputStream>(buffer); + ASSERT_OK_NO_THROW(WriteFlatTable( + table.get(), default_memory_pool(), arrow_sink_, 512, default_writer_properties())); + + std::shared_ptr<ParquetBuffer> pbuffer = + std::make_shared<ParquetBuffer>(buffer->data(), buffer->size()); + std::unique_ptr<RandomAccessSource> source(new BufferReader(pbuffer)); + std::shared_ptr<::arrow::Table> out; + this->ReadTableFromFile(ParquetFileReader::Open(std::move(source)), &out); + ASSERT_EQ(1, out->num_columns()); + ASSERT_EQ(values->length(), out->num_rows()); + + std::shared_ptr<ChunkedArray> chunked_array = out->column(0)->data(); + ASSERT_EQ(1, chunked_array->num_chunks()); + ASSERT_TRUE(values->Equals(chunked_array->chunk(0))); +} + TYPED_TEST(TestParquetIO, SingleColumnOptionalChunkedWrite) { int64_t chunk_size = SMALL_SIZE / 4; std::shared_ptr<Array> values; @@ -456,10 +480,20 @@ TEST_F(TestStringParquetIO, EmptyStringColumnRequiredWrite) { template <typename T> using ParquetCDataType = typename ParquetDataType<T>::c_type; +template <typename T> +struct c_type_trait { + using ArrowCType = typename T::c_type; +}; + +template <> +struct c_type_trait<::arrow::BooleanType> { + using ArrowCType = uint8_t; +}; + template <typename TestType> class TestPrimitiveParquetIO : public TestParquetIO<TestType> { public: - typedef typename TestType::c_type T; + typedef typename c_type_trait<TestType>::ArrowCType T; void MakeTestFile(std::vector<T>& values, int num_chunks, std::unique_ptr<ParquetFileReader>* file_reader) { @@ -497,7 +531,7 @@ class TestPrimitiveParquetIO : public TestParquetIO<TestType> { std::shared_ptr<ChunkedArray> chunked_array = out->column(0)->data(); ASSERT_EQ(1, chunked_array->num_chunks()); - ExpectArray<TestType>(values.data(), chunked_array->chunk(0).get()); + ExpectArrayT<TestType>(values.data(), chunked_array->chunk(0).get()); } void CheckSingleColumnRequiredRead(int num_chunks) { @@ -508,7 +542,7 @@ class TestPrimitiveParquetIO : public TestParquetIO<TestType> { std::shared_ptr<Array> out; this->ReadSingleColumnFile(std::move(file_reader), &out); - ExpectArray<TestType>(values.data(), out.get()); + ExpectArrayT<TestType>(values.data(), out.get()); } }; http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/8bff4427/src/parquet/arrow/io.cc ---------------------------------------------------------------------- diff --git a/src/parquet/arrow/io.cc b/src/parquet/arrow/io.cc index 8f3aa3e..74464f2 100644 --- a/src/parquet/arrow/io.cc +++ b/src/parquet/arrow/io.cc @@ -103,5 +103,25 @@ std::shared_ptr<Buffer> ParquetReadSource::Read(int64_t nbytes) { return result; } +ParquetWriteSink::ParquetWriteSink( + const std::shared_ptr<::arrow::io::OutputStream>& stream) + : stream_(stream) {} + +ParquetWriteSink::~ParquetWriteSink() {} + +void ParquetWriteSink::Close() { + PARQUET_THROW_NOT_OK(stream_->Close()); +} + +int64_t ParquetWriteSink::Tell() { + int64_t position; + PARQUET_THROW_NOT_OK(stream_->Tell(&position)); + return position; +} + +void ParquetWriteSink::Write(const uint8_t* data, int64_t length) { + PARQUET_THROW_NOT_OK(stream_->Write(data, length)); +} + } // namespace arrow } // namespace parquet http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/8bff4427/src/parquet/arrow/io.h ---------------------------------------------------------------------- diff --git a/src/parquet/arrow/io.h b/src/parquet/arrow/io.h index 119f8de..a068a4e 100644 --- a/src/parquet/arrow/io.h +++ b/src/parquet/arrow/io.h @@ -76,6 +76,25 @@ class PARQUET_EXPORT ParquetReadSource : public RandomAccessSource { ParquetAllocator* allocator_; }; +class PARQUET_EXPORT ParquetWriteSink : public OutputStream { + public: + explicit ParquetWriteSink(const std::shared_ptr<::arrow::io::OutputStream>& stream); + + virtual ~ParquetWriteSink(); + + // Close the output stream + void Close() override; + + // Return the current position in the output stream relative to the start + int64_t Tell() override; + + // Copy bytes into the output stream + void Write(const uint8_t* data, int64_t length) override; + + private: + std::shared_ptr<::arrow::io::OutputStream> stream_; +}; + } // namespace arrow } // namespace parquet http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/8bff4427/src/parquet/arrow/reader.cc ---------------------------------------------------------------------- diff --git a/src/parquet/arrow/reader.cc b/src/parquet/arrow/reader.cc index bc9ec8f..2d2b5cd 100644 --- a/src/parquet/arrow/reader.cc +++ b/src/parquet/arrow/reader.cc @@ -376,6 +376,79 @@ Status FlatColumnReader::Impl::TypedReadBatch( } template <> +Status FlatColumnReader::Impl::TypedReadBatch<::arrow::BooleanType, BooleanType>( + int batch_size, std::shared_ptr<Array>* out) { + int values_to_read = batch_size; + RETURN_NOT_OK(InitDataBuffer<::arrow::BooleanType>(batch_size)); + valid_bits_idx_ = 0; + if (descr_->max_definition_level() > 0) { + valid_bits_buffer_ = std::make_shared<PoolBuffer>(pool_); + int valid_bits_size = ::arrow::BitUtil::CeilByte(batch_size) / 8; + valid_bits_buffer_->Resize(valid_bits_size); + valid_bits_ptr_ = valid_bits_buffer_->mutable_data(); + memset(valid_bits_ptr_, 0, valid_bits_size); + null_count_ = 0; + } + + while ((values_to_read > 0) && column_reader_) { + values_buffer_.Resize(values_to_read * sizeof(bool)); + if (descr_->max_definition_level() > 0) { + def_levels_buffer_.Resize(values_to_read * sizeof(int16_t)); + } + auto reader = dynamic_cast<TypedColumnReader<BooleanType>*>(column_reader_.get()); + int64_t values_read; + int64_t levels_read; + int16_t* def_levels = reinterpret_cast<int16_t*>(def_levels_buffer_.mutable_data()); + auto values = reinterpret_cast<bool*>(values_buffer_.mutable_data()); + PARQUET_CATCH_NOT_OK(levels_read = reader->ReadBatch( + values_to_read, def_levels, nullptr, values, &values_read)); + values_to_read -= levels_read; + if (descr_->max_definition_level() == 0) { + ReadNonNullableBatch<::arrow::BooleanType, BooleanType>(values, values_read); + } else { + // As per the defintion and checks for flat columns: + // descr_->max_definition_level() == 1 + ReadNullableFlatBatch<::arrow::BooleanType, BooleanType>( + def_levels, values, values_read, levels_read); + } + if (!column_reader_->HasNext()) { NextRowGroup(); } + } + + if (descr_->max_definition_level() > 0) { + // TODO: Shrink arrays in the case they are too large + if (valid_bits_idx_ < batch_size * 0.8) { + // Shrink arrays as they are larger than the output. + // TODO(PARQUET-761/ARROW-360): Use realloc internally to shrink the arrays + // without the need for a copy. Given a decent underlying allocator this + // should still free some underlying pages to the OS. + + auto data_buffer = std::make_shared<PoolBuffer>(pool_); + RETURN_NOT_OK(data_buffer->Resize(valid_bits_idx_ * sizeof(bool))); + memcpy(data_buffer->mutable_data(), data_buffer_->data(), data_buffer->size()); + data_buffer_ = data_buffer; + + auto valid_bits_buffer = std::make_shared<PoolBuffer>(pool_); + RETURN_NOT_OK( + valid_bits_buffer->Resize(::arrow::BitUtil::CeilByte(valid_bits_idx_) / 8)); + memcpy(valid_bits_buffer->mutable_data(), valid_bits_buffer_->data(), + valid_bits_buffer->size()); + valid_bits_buffer_ = valid_bits_buffer; + } + *out = std::make_shared<::arrow::BooleanArray>( + field_->type, valid_bits_idx_, data_buffer_, null_count_, valid_bits_buffer_); + // Relase the ownership + data_buffer_.reset(); + valid_bits_buffer_.reset(); + return Status::OK(); + } else { + *out = std::make_shared<::arrow::BooleanArray>( + field_->type, valid_bits_idx_, data_buffer_); + data_buffer_.reset(); + return Status::OK(); + } +} + +template <> Status FlatColumnReader::Impl::TypedReadBatch<::arrow::StringType, ByteArrayType>( int batch_size, std::shared_ptr<Array>* out) { int values_to_read = batch_size; http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/8bff4427/src/parquet/arrow/test-util.h ---------------------------------------------------------------------- diff --git a/src/parquet/arrow/test-util.h b/src/parquet/arrow/test-util.h index 92798ff..dedd398 100644 --- a/src/parquet/arrow/test-util.h +++ b/src/parquet/arrow/test-util.h @@ -37,6 +37,9 @@ using is_arrow_int = std::is_integral<typename ArrowType::c_type>; template <typename ArrowType> using is_arrow_string = std::is_same<ArrowType, ::arrow::StringType>; +template <typename ArrowType> +using is_arrow_bool = std::is_same<ArrowType, ::arrow::BooleanType>; + template <class ArrowType> typename std::enable_if<is_arrow_float<ArrowType>::value, Status>::type NonNullArray( size_t size, std::shared_ptr<Array>* out) { @@ -70,8 +73,9 @@ typename std::enable_if<is_arrow_string<ArrowType>::value, Status>::type NonNull return builder.Finish(out); } -template <> -Status NonNullArray<::arrow::BooleanType>(size_t size, std::shared_ptr<Array>* out) { +template <class ArrowType> +typename std::enable_if<is_arrow_bool<ArrowType>::value, Status>::type NonNullArray( + size_t size, std::shared_ptr<Array>* out) { std::vector<uint8_t> values; ::arrow::test::randint<uint8_t>(size, 0, 1, &values); ::arrow::BooleanBuilder builder( @@ -135,8 +139,8 @@ typename std::enable_if<is_arrow_string<ArrowType>::value, Status>::type Nullabl } // This helper function only supports (size/2) nulls yet. -template <> -Status NullableArray<::arrow::BooleanType>( +template <class ArrowType> +typename std::enable_if<is_arrow_bool<ArrowType>::value, Status>::type NullableArray( size_t size, size_t num_nulls, std::shared_ptr<Array>* out) { std::vector<uint8_t> values; ::arrow::test::randint<uint8_t>(size, 0, 1, &values); @@ -176,19 +180,19 @@ void ExpectArray(T* expected, Array* result) { } template <typename ArrowType> -void ExpectArray(typename ArrowType::c_type* expected, Array* result) { +void ExpectArrayT(void* expected, Array* result) { ::arrow::PrimitiveArray* p_array = static_cast<::arrow::PrimitiveArray*>(result); for (int64_t i = 0; i < result->length(); i++) { - EXPECT_EQ(expected[i], + EXPECT_EQ(reinterpret_cast<typename ArrowType::c_type*>(expected)[i], reinterpret_cast<const typename ArrowType::c_type*>(p_array->data()->data())[i]); } } template <> -void ExpectArray<::arrow::BooleanType>(uint8_t* expected, Array* result) { +void ExpectArrayT<::arrow::BooleanType>(void* expected, Array* result) { ::arrow::BooleanBuilder builder( ::arrow::default_memory_pool(), std::make_shared<::arrow::BooleanType>()); - builder.Append(expected, result->length()); + builder.Append(reinterpret_cast<uint8_t*>(expected), result->length()); std::shared_ptr<Array> expected_array; EXPECT_OK(builder.Finish(&expected_array)); http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/8bff4427/src/parquet/arrow/writer.cc ---------------------------------------------------------------------- diff --git a/src/parquet/arrow/writer.cc b/src/parquet/arrow/writer.cc index e4d3745..b7c7d20 100644 --- a/src/parquet/arrow/writer.cc +++ b/src/parquet/arrow/writer.cc @@ -20,6 +20,7 @@ #include <algorithm> #include <vector> +#include "parquet/arrow/io.h" #include "parquet/arrow/schema.h" #include "parquet/arrow/utils.h" @@ -370,6 +371,13 @@ Status WriteFlatTable(const Table* table, MemoryPool* pool, return writer.Close(); } +Status WriteFlatTable(const Table* table, MemoryPool* pool, + const std::shared_ptr<::arrow::io::OutputStream>& sink, int64_t chunk_size, + const std::shared_ptr<WriterProperties>& properties) { + auto parquet_sink = std::make_shared<ParquetWriteSink>(sink); + return WriteFlatTable(table, pool, parquet_sink, chunk_size, properties); +} + } // namespace arrow } // namespace parquet http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/8bff4427/src/parquet/arrow/writer.h ---------------------------------------------------------------------- diff --git a/src/parquet/arrow/writer.h b/src/parquet/arrow/writer.h index 92524d8..a82c2f6 100644 --- a/src/parquet/arrow/writer.h +++ b/src/parquet/arrow/writer.h @@ -23,6 +23,8 @@ #include "parquet/api/schema.h" #include "parquet/api/writer.h" +#include "arrow/io/interfaces.h" + namespace arrow { class Array; @@ -71,6 +73,11 @@ class PARQUET_EXPORT FileWriter { int64_t chunk_size, const std::shared_ptr<WriterProperties>& properties = default_writer_properties()); +::arrow::Status PARQUET_EXPORT WriteFlatTable(const ::arrow::Table* table, + ::arrow::MemoryPool* pool, const std::shared_ptr<::arrow::io::OutputStream>& sink, + int64_t chunk_size, + const std::shared_ptr<WriterProperties>& properties = default_writer_properties()); + } // namespace arrow } // namespace parquet http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/8bff4427/thirdparty/versions.sh ---------------------------------------------------------------------- diff --git a/thirdparty/versions.sh b/thirdparty/versions.sh index ff5644e..f877b33 100755 --- a/thirdparty/versions.sh +++ b/thirdparty/versions.sh @@ -15,7 +15,7 @@ # specific language governing permissions and limitations # under the License. -ARROW_VERSION="d946e7917d55cb220becd6469ae93430f2e60764" +ARROW_VERSION="86f56a6073c3254487ede3aff1dc9d117d24adaf" ARROW_URL="https://github.com/apache/arrow/archive/${ARROW_VERSION}.tar.gz" ARROW_BASEDIR="arrow-${ARROW_VERSION}"
