PARQUET-834: Support I/O of arrow::ListArray Author: Korn, Uwe <[email protected]>
Closes #229 from xhochy/PARQUET-834 and squashes the following commits: ba68dec [Korn, Uwe] Remove signed/unsigned comparisons 0967992 [Korn, Uwe] Remove signed/unsigned comparisons 05979c3 [Korn, Uwe] Add missing RETURN_NOT_OK 6484e86 [Korn, Uwe] Remove unused member e58a4e9 [Korn, Uwe] ListofLists finally work e8267c7 [Korn, Uwe] Add test for 2 level List f59da0c [Korn, Uwe] No need to distinguish anymore between different array types 1dc3bbe [Korn, Uwe] Determine values inputs 0ec90e9 [Korn, Uwe] Style fixes ee609e5 [Korn, Uwe] Unify level generation 17cfe15 [Korn, Uwe] Write lists of any depth 75a4871 [Korn, Uwe] Directly use TypedWriteBatch 89b3e35 [Korn, Uwe] Remove unused import ccdf25c [Korn, Uwe] Use TypedWriteBatch for all list cases d7e09cf [Korn, Uwe] Reuse TypedWriteBatch for lists d1b82d3 [Korn, Uwe] Activate fast path for timestamp type 0b98475 [Korn, Uwe] TypedWriteBatch should be applicable for all definition levels 34bea2f [Korn, Uwe] Push level generation one level up 89aaa8c [Korn, Uwe] Remove empty if section 0cda75b [Korn, Uwe] Refactor level generation into separate method c50f9f7 [Korn, Uwe] Adjust WriteSpaced to behave as ReadSpaced c76b7f3 [Korn, Uwe] Simplify list unittest fbfe2a4 [Korn, Uwe] Review comments bcef2b9 [Korn, Uwe] Make compatible schema detection more readable be05282 [Korn, Uwe] Reuse repeated test code 856f75c [Korn, Uwe] Fix signed comparison 75c920d [Korn, Uwe] Correctly handle empty lists 93e92ab [Korn, Uwe] Fix benchmark compilation 0201578 [Korn, Uwe] Remove dead ASSERTs e78cb13 [Korn, Uwe] Add support for lists with max_definition_level = 2 f43effc [Korn, Uwe] Remove 'Flat' from the reader API e3b7f58 [Korn, Uwe] Update arrow hash e44c1d8 [Korn, Uwe] Support boolean lists of lists f46c056 [Korn, Uwe] Add UINT32 support d20f2be [Korn, Uwe] Read string and binary listarrays 8d6b08b [Korn, Uwe] Remove 'Flat' from the writer API f9ab91d [Korn, Uwe] PARQUET-834: Support I/O of arrow::ListArray Project: http://git-wip-us.apache.org/repos/asf/parquet-cpp/repo Commit: http://git-wip-us.apache.org/repos/asf/parquet-cpp/commit/ad56e7ae Tree: http://git-wip-us.apache.org/repos/asf/parquet-cpp/tree/ad56e7ae Diff: http://git-wip-us.apache.org/repos/asf/parquet-cpp/diff/ad56e7ae Branch: refs/heads/master Commit: ad56e7aea9acbcf141fb72ef2085f63a278f3bc1 Parents: b1c85ca Author: Korn, Uwe <[email protected]> Authored: Thu Feb 2 16:14:30 2017 -0500 Committer: Wes McKinney <[email protected]> Committed: Thu Feb 2 16:14:30 2017 -0500 ---------------------------------------------------------------------- cmake_modules/ThirdpartyToolchain.cmake | 2 +- .../arrow/arrow-reader-writer-benchmark.cc | 8 +- src/parquet/arrow/arrow-reader-writer-test.cc | 118 +++-- src/parquet/arrow/reader.cc | 468 ++++++++++------- src/parquet/arrow/reader.h | 16 +- src/parquet/arrow/test-util.h | 39 ++ src/parquet/arrow/writer.cc | 514 +++++++++++++------ src/parquet/arrow/writer.h | 10 +- src/parquet/column/column-reader-test.cc | 43 +- src/parquet/column/reader.h | 77 ++- src/parquet/column/statistics.cc | 4 +- src/parquet/column/writer.cc | 24 +- src/parquet/column/writer.h | 2 +- src/parquet/encodings/dictionary-encoding.h | 2 +- src/parquet/encodings/encoder.h | 4 +- src/parquet/file/metadata.cc | 6 + src/parquet/file/metadata.h | 1 + src/parquet/file/writer-internal.cc | 4 + src/parquet/file/writer-internal.h | 1 + src/parquet/file/writer.cc | 4 + src/parquet/file/writer.h | 3 + src/parquet/util/bit-util.h | 22 +- src/parquet/util/rle-encoding.h | 6 +- 23 files changed, 930 insertions(+), 448 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/ad56e7ae/cmake_modules/ThirdpartyToolchain.cmake ---------------------------------------------------------------------- diff --git a/cmake_modules/ThirdpartyToolchain.cmake b/cmake_modules/ThirdpartyToolchain.cmake index 8fc1b78..8b052df 100644 --- a/cmake_modules/ThirdpartyToolchain.cmake +++ b/cmake_modules/ThirdpartyToolchain.cmake @@ -22,7 +22,7 @@ set(THRIFT_VERSION "0.9.1") # Brotli 0.5.2 does not install headers/libraries yet, but 0.6.0.dev does set(BROTLI_VERSION "5db62dcc9d386579609540cdf8869e95ad334bbd") -set(ARROW_VERSION "085c8754b0ab2da7fcd245fc88bc4de9a6806a4c") +set(ARROW_VERSION "4226adfbc6b3dff10b3fe7a6691b30bcc94140bd") # find boost headers and libs set(Boost_DEBUG TRUE) http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/ad56e7ae/src/parquet/arrow/arrow-reader-writer-benchmark.cc ---------------------------------------------------------------------- diff --git a/src/parquet/arrow/arrow-reader-writer-benchmark.cc b/src/parquet/arrow/arrow-reader-writer-benchmark.cc index cf90ebc..3f2a688 100644 --- a/src/parquet/arrow/arrow-reader-writer-benchmark.cc +++ b/src/parquet/arrow/arrow-reader-writer-benchmark.cc @@ -32,7 +32,7 @@ using arrow::NumericBuilder; namespace parquet { using arrow::FileReader; -using arrow::WriteFlatTable; +using arrow::WriteTable; using schema::PrimitiveNode; namespace benchmark { @@ -109,7 +109,7 @@ static void BM_WriteColumn(::benchmark::State& state) { while (state.KeepRunning()) { auto output = std::make_shared<InMemoryOutputStream>(); - WriteFlatTable(table.get(), ::arrow::default_memory_pool(), output, BENCHMARK_SIZE); + WriteTable(table.get(), ::arrow::default_memory_pool(), output, BENCHMARK_SIZE); } SetBytesProcessed<nullable, ParquetType>(state); } @@ -128,7 +128,7 @@ static void BM_ReadColumn(::benchmark::State& state) { std::vector<typename ParquetType::c_type> values(BENCHMARK_SIZE, 128); std::shared_ptr<::arrow::Table> table = TableFromVector<nullable, ParquetType>(values); auto output = std::make_shared<InMemoryOutputStream>(); - WriteFlatTable(table.get(), ::arrow::default_memory_pool(), output, BENCHMARK_SIZE); + WriteTable(table.get(), ::arrow::default_memory_pool(), output, BENCHMARK_SIZE); std::shared_ptr<Buffer> buffer = output->GetBuffer(); while (state.KeepRunning()) { @@ -136,7 +136,7 @@ static void BM_ReadColumn(::benchmark::State& state) { ParquetFileReader::Open(std::make_shared<::arrow::io::BufferReader>(buffer)); FileReader filereader(::arrow::default_memory_pool(), std::move(reader)); std::shared_ptr<::arrow::Table> table; - filereader.ReadFlatTable(&table); + filereader.ReadTable(&table); } SetBytesProcessed<nullable, ParquetType>(state); } http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/ad56e7ae/src/parquet/arrow/arrow-reader-writer-test.cc ---------------------------------------------------------------------- diff --git a/src/parquet/arrow/arrow-reader-writer-test.cc b/src/parquet/arrow/arrow-reader-writer-test.cc index d681e57..619d5a3 100644 --- a/src/parquet/arrow/arrow-reader-writer-test.cc +++ b/src/parquet/arrow/arrow-reader-writer-test.cc @@ -36,6 +36,7 @@ using arrow::Column; using arrow::ChunkedArray; using arrow::default_memory_pool; using arrow::io::BufferReader; +using arrow::ListArray; using arrow::PoolBuffer; using arrow::PrimitiveArray; using arrow::Status; @@ -216,8 +217,8 @@ class TestParquetIO : public ::testing::Test { void ReadSingleColumnFile( std::unique_ptr<FileReader> file_reader, std::shared_ptr<Array>* out) { - std::unique_ptr<FlatColumnReader> column_reader; - ASSERT_OK_NO_THROW(file_reader->GetFlatColumn(0, &column_reader)); + std::unique_ptr<ColumnReader> column_reader; + ASSERT_OK_NO_THROW(file_reader->GetColumn(0, &column_reader)); ASSERT_NE(nullptr, column_reader.get()); ASSERT_OK(column_reader->NextBatch(SMALL_SIZE, out)); @@ -235,7 +236,7 @@ class TestParquetIO : public ::testing::Test { void ReadTableFromFile( std::unique_ptr<FileReader> reader, std::shared_ptr<Table>* out) { - ASSERT_OK_NO_THROW(reader->ReadFlatTable(out)); + ASSERT_OK_NO_THROW(reader->ReadTable(out)); ASSERT_NE(nullptr, out->get()); } @@ -252,12 +253,47 @@ class TestParquetIO : public ::testing::Test { ASSERT_TRUE(values->Equals(chunked_array->chunk(0))); } + void PrepareListTable(int64_t size, bool nullable_lists, bool nullable_elements, + int64_t null_count, std::shared_ptr<Table>* out) { + std::shared_ptr<Array> values; + ASSERT_OK(NullableArray<TestType>( + size * size, nullable_elements ? null_count : 0, kDefaultSeed, &values)); + std::shared_ptr<ListArray> lists; + ASSERT_OK(MakeListArary( + values, size, nullable_lists ? null_count : 0, nullable_elements, &lists)); + *out = MakeSimpleTable(lists, nullable_lists); + } + + void PrepareListOfListTable(int64_t size, bool nullable_parent_lists, + bool nullable_lists, bool nullable_elements, int64_t null_count, + std::shared_ptr<Table>* out) { + std::shared_ptr<Array> values; + ASSERT_OK(NullableArray<TestType>( + size * 6, nullable_elements ? null_count : 0, kDefaultSeed, &values)); + std::shared_ptr<ListArray> lists; + ASSERT_OK(MakeListArary( + values, size * 3, nullable_lists ? null_count : 0, nullable_elements, &lists)); + std::shared_ptr<ListArray> parent_lists; + ASSERT_OK(MakeListArary(lists, size, nullable_parent_lists ? null_count : 0, + nullable_lists, &parent_lists)); + *out = MakeSimpleTable(parent_lists, nullable_parent_lists); + } + + void WriteReadAndCheckSingleColumnTable(const std::shared_ptr<Table>& table) { + std::shared_ptr<Array> values = table->column(0)->data()->chunk(0); + this->sink_ = std::make_shared<InMemoryOutputStream>(); + ASSERT_OK_NO_THROW(WriteTable(table.get(), ::arrow::default_memory_pool(), + this->sink_, values->length(), default_writer_properties())); + + this->ReadAndCheckSingleColumnTable(values); + } + template <typename ArrayType> - void WriteFlatColumn(const std::shared_ptr<GroupNode>& schema, + void WriteColumn(const std::shared_ptr<GroupNode>& schema, const std::shared_ptr<ArrayType>& values) { FileWriter writer(::arrow::default_memory_pool(), MakeWriter(schema)); ASSERT_OK_NO_THROW(writer.NewRowGroup(values->length())); - ASSERT_OK_NO_THROW(writer.WriteFlatColumnChunk(values.get())); + ASSERT_OK_NO_THROW(writer.WriteColumnChunk(values.get())); ASSERT_OK_NO_THROW(writer.Close()); } @@ -282,7 +318,7 @@ TYPED_TEST(TestParquetIO, SingleColumnRequiredWrite) { ASSERT_OK(NonNullArray<TypeParam>(SMALL_SIZE, &values)); std::shared_ptr<GroupNode> schema = this->MakeSchema(Repetition::REQUIRED); - this->WriteFlatColumn(schema, values); + this->WriteColumn(schema, values); this->ReadAndCheckSingleColumnFile(values.get()); } @@ -292,8 +328,8 @@ TYPED_TEST(TestParquetIO, SingleColumnTableRequiredWrite) { ASSERT_OK(NonNullArray<TypeParam>(SMALL_SIZE, &values)); std::shared_ptr<Table> table = MakeSimpleTable(values, false); this->sink_ = std::make_shared<InMemoryOutputStream>(); - ASSERT_OK_NO_THROW(WriteFlatTable(table.get(), ::arrow::default_memory_pool(), - this->sink_, values->length(), default_writer_properties())); + ASSERT_OK_NO_THROW(WriteTable(table.get(), ::arrow::default_memory_pool(), this->sink_, + values->length(), default_writer_properties())); std::shared_ptr<Table> out; std::unique_ptr<FileReader> reader; @@ -314,7 +350,7 @@ TYPED_TEST(TestParquetIO, SingleColumnOptionalReadWrite) { ASSERT_OK(NullableArray<TypeParam>(SMALL_SIZE, 10, kDefaultSeed, &values)); std::shared_ptr<GroupNode> schema = this->MakeSchema(Repetition::OPTIONAL); - this->WriteFlatColumn(schema, values); + this->WriteColumn(schema, values); this->ReadAndCheckSingleColumnFile(values.get()); } @@ -325,11 +361,37 @@ TYPED_TEST(TestParquetIO, SingleColumnTableOptionalReadWrite) { ASSERT_OK(NullableArray<TypeParam>(SMALL_SIZE, 10, kDefaultSeed, &values)); std::shared_ptr<Table> table = MakeSimpleTable(values, true); - this->sink_ = std::make_shared<InMemoryOutputStream>(); - ASSERT_OK_NO_THROW(WriteFlatTable(table.get(), ::arrow::default_memory_pool(), - this->sink_, values->length(), default_writer_properties())); + this->WriteReadAndCheckSingleColumnTable(table); +} - this->ReadAndCheckSingleColumnTable(values); +TYPED_TEST(TestParquetIO, SingleNullableListNullableColumnReadWrite) { + std::shared_ptr<Table> table; + this->PrepareListTable(SMALL_SIZE, true, true, 10, &table); + this->WriteReadAndCheckSingleColumnTable(table); +} + +TYPED_TEST(TestParquetIO, SingleRequiredListNullableColumnReadWrite) { + std::shared_ptr<Table> table; + this->PrepareListTable(SMALL_SIZE, false, true, 10, &table); + this->WriteReadAndCheckSingleColumnTable(table); +} + +TYPED_TEST(TestParquetIO, SingleNullableListRequiredColumnReadWrite) { + std::shared_ptr<Table> table; + this->PrepareListTable(SMALL_SIZE, true, false, 10, &table); + this->WriteReadAndCheckSingleColumnTable(table); +} + +TYPED_TEST(TestParquetIO, SingleRequiredListRequiredColumnReadWrite) { + std::shared_ptr<Table> table; + this->PrepareListTable(SMALL_SIZE, false, false, 0, &table); + this->WriteReadAndCheckSingleColumnTable(table); +} + +TYPED_TEST(TestParquetIO, SingleNullableListRequiredListRequiredColumnReadWrite) { + std::shared_ptr<Table> table; + this->PrepareListOfListTable(SMALL_SIZE, true, false, false, 0, &table); + this->WriteReadAndCheckSingleColumnTable(table); } TYPED_TEST(TestParquetIO, SingleColumnRequiredChunkedWrite) { @@ -341,8 +403,7 @@ TYPED_TEST(TestParquetIO, SingleColumnRequiredChunkedWrite) { FileWriter writer(default_memory_pool(), this->MakeWriter(schema)); for (int i = 0; i < 4; i++) { ASSERT_OK_NO_THROW(writer.NewRowGroup(chunk_size)); - ASSERT_OK_NO_THROW( - writer.WriteFlatColumnChunk(values.get(), i * chunk_size, chunk_size)); + ASSERT_OK_NO_THROW(writer.WriteColumnChunk(values.get(), i * chunk_size, chunk_size)); } ASSERT_OK_NO_THROW(writer.Close()); @@ -354,7 +415,7 @@ TYPED_TEST(TestParquetIO, SingleColumnTableRequiredChunkedWrite) { ASSERT_OK(NonNullArray<TypeParam>(LARGE_SIZE, &values)); std::shared_ptr<Table> table = MakeSimpleTable(values, false); this->sink_ = std::make_shared<InMemoryOutputStream>(); - ASSERT_OK_NO_THROW(WriteFlatTable( + ASSERT_OK_NO_THROW(WriteTable( table.get(), default_memory_pool(), this->sink_, 512, default_writer_properties())); this->ReadAndCheckSingleColumnTable(values); @@ -370,8 +431,8 @@ TYPED_TEST(TestParquetIO, SingleColumnTableRequiredChunkedWriteArrowIO) { { // BufferOutputStream closed on gc auto arrow_sink_ = std::make_shared<::arrow::io::BufferOutputStream>(buffer); - ASSERT_OK_NO_THROW(WriteFlatTable(table.get(), default_memory_pool(), arrow_sink_, - 512, default_writer_properties())); + ASSERT_OK_NO_THROW(WriteTable(table.get(), default_memory_pool(), arrow_sink_, 512, + default_writer_properties())); // XXX: Remove this after ARROW-455 completed ASSERT_OK(arrow_sink_->Close()); @@ -402,8 +463,7 @@ TYPED_TEST(TestParquetIO, SingleColumnOptionalChunkedWrite) { FileWriter writer(::arrow::default_memory_pool(), this->MakeWriter(schema)); for (int i = 0; i < 4; i++) { ASSERT_OK_NO_THROW(writer.NewRowGroup(chunk_size)); - ASSERT_OK_NO_THROW( - writer.WriteFlatColumnChunk(values.get(), i * chunk_size, chunk_size)); + ASSERT_OK_NO_THROW(writer.WriteColumnChunk(values.get(), i * chunk_size, chunk_size)); } ASSERT_OK_NO_THROW(writer.Close()); @@ -417,8 +477,8 @@ TYPED_TEST(TestParquetIO, SingleColumnTableOptionalChunkedWrite) { ASSERT_OK(NullableArray<TypeParam>(LARGE_SIZE, 100, kDefaultSeed, &values)); std::shared_ptr<Table> table = MakeSimpleTable(values, true); this->sink_ = std::make_shared<InMemoryOutputStream>(); - ASSERT_OK_NO_THROW(WriteFlatTable(table.get(), ::arrow::default_memory_pool(), - this->sink_, 512, default_writer_properties())); + ASSERT_OK_NO_THROW(WriteTable(table.get(), ::arrow::default_memory_pool(), this->sink_, + 512, default_writer_properties())); this->ReadAndCheckSingleColumnTable(values); } @@ -491,7 +551,7 @@ TEST_F(TestUInt32ParquetIO, Parquet_2_0_Compability) { .version(ParquetVersion::PARQUET_2_0) ->build(); ASSERT_OK_NO_THROW( - WriteFlatTable(table.get(), default_memory_pool(), this->sink_, 512, properties)); + WriteTable(table.get(), default_memory_pool(), this->sink_, 512, properties)); this->ReadAndCheckSingleColumnTable(values); } @@ -512,7 +572,7 @@ TEST_F(TestUInt32ParquetIO, Parquet_1_0_Compability) { ::parquet::WriterProperties::Builder() .version(ParquetVersion::PARQUET_1_0) ->build(); - ASSERT_OK_NO_THROW(WriteFlatTable( + ASSERT_OK_NO_THROW(WriteTable( table.get(), ::arrow::default_memory_pool(), this->sink_, 512, properties)); std::shared_ptr<Array> expected_values; @@ -544,8 +604,8 @@ TEST_F(TestStringParquetIO, EmptyStringColumnRequiredWrite) { ASSERT_OK(builder.Finish(&values)); std::shared_ptr<Table> table = MakeSimpleTable(values, false); this->sink_ = std::make_shared<InMemoryOutputStream>(); - ASSERT_OK_NO_THROW(WriteFlatTable(table.get(), ::arrow::default_memory_pool(), - this->sink_, values->length(), default_writer_properties())); + ASSERT_OK_NO_THROW(WriteTable(table.get(), ::arrow::default_memory_pool(), this->sink_, + values->length(), default_writer_properties())); std::shared_ptr<Table> out; std::unique_ptr<FileReader> reader; @@ -675,7 +735,7 @@ void DoTableRoundtrip(const std::shared_ptr<Table>& table, int num_threads, const std::vector<int>& column_subset, std::shared_ptr<Table>* out) { auto sink = std::make_shared<InMemoryOutputStream>(); - ASSERT_OK_NO_THROW(WriteFlatTable( + ASSERT_OK_NO_THROW(WriteTable( table.get(), ::arrow::default_memory_pool(), sink, (table->num_rows() + 1) / 2)); std::shared_ptr<Buffer> buffer = sink->GetBuffer(); @@ -687,10 +747,10 @@ void DoTableRoundtrip(const std::shared_ptr<Table>& table, int num_threads, reader->set_num_threads(num_threads); if (column_subset.size() > 0) { - ASSERT_OK_NO_THROW(reader->ReadFlatTable(column_subset, out)); + ASSERT_OK_NO_THROW(reader->ReadTable(column_subset, out)); } else { // Read everything - ASSERT_OK_NO_THROW(reader->ReadFlatTable(out)); + ASSERT_OK_NO_THROW(reader->ReadTable(out)); } } http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/ad56e7ae/src/parquet/arrow/reader.cc ---------------------------------------------------------------------- diff --git a/src/parquet/arrow/reader.cc b/src/parquet/arrow/reader.cc index a60d0b2..5059494 100644 --- a/src/parquet/arrow/reader.cc +++ b/src/parquet/arrow/reader.cc @@ -34,8 +34,11 @@ #include "arrow/util/bit-util.h" using arrow::Array; +using arrow::BooleanArray; using arrow::Column; using arrow::Field; +using arrow::Int32Array; +using arrow::ListArray; using arrow::MemoryPool; using arrow::PoolBuffer; using arrow::Status; @@ -65,11 +68,11 @@ class FileReader::Impl { virtual ~Impl() {} bool CheckForFlatColumn(const ColumnDescriptor* descr); - Status GetFlatColumn(int i, std::unique_ptr<FlatColumnReader>* out); - Status ReadFlatColumn(int i, std::shared_ptr<Array>* out); - Status ReadFlatTable(std::shared_ptr<Table>* out); - Status ReadFlatTable( - const std::vector<int>& column_indices, std::shared_ptr<Table>* out); + bool CheckForFlatListColumn(const ColumnDescriptor* descr); + Status GetColumn(int i, std::unique_ptr<ColumnReader>* out); + Status ReadColumn(int i, std::shared_ptr<Array>* out); + Status ReadTable(std::shared_ptr<Table>* out); + Status ReadTable(const std::vector<int>& column_indices, std::shared_ptr<Table>* out); const ParquetFileReader* parquet_reader() const { return reader_.get(); } void set_num_threads(int num_threads) { num_threads_ = num_threads; } @@ -81,7 +84,7 @@ class FileReader::Impl { int num_threads_; }; -class FlatColumnReader::Impl { +class ColumnReader::Impl { public: Impl(MemoryPool* pool, const ColumnDescriptor* descr, ParquetFileReader* reader, int column_index); @@ -97,12 +100,16 @@ class FlatColumnReader::Impl { template <typename ArrowType> Status InitDataBuffer(int batch_size); + Status InitValidBits(int batch_size); template <typename ArrowType, typename ParquetType> - Status ReadNullableFlatBatch(TypedColumnReader<ParquetType>* reader, - int16_t* def_levels, int64_t values_to_read, int64_t* levels_read); + Status ReadNullableBatch(TypedColumnReader<ParquetType>* reader, int16_t* def_levels, + int16_t* rep_levels, int64_t values_to_read, int64_t* levels_read, + int64_t* values_read); template <typename ArrowType, typename ParquetType> Status ReadNonNullableBatch(TypedColumnReader<ParquetType>* reader, int64_t values_to_read, int64_t* levels_read); + Status WrapIntoListArray(const int16_t* def_levels, const int16_t* rep_levels, + int64_t total_values_read, std::shared_ptr<Array>* array); private: void NextRowGroup(); @@ -120,11 +127,12 @@ class FlatColumnReader::Impl { ParquetFileReader* reader_; int column_index_; int next_row_group_; - std::shared_ptr<ColumnReader> column_reader_; + std::shared_ptr<::parquet::ColumnReader> column_reader_; std::shared_ptr<Field> field_; PoolBuffer values_buffer_; PoolBuffer def_levels_buffer_; + PoolBuffer rep_levels_buffer_; std::shared_ptr<PoolBuffer> data_buffer_; uint8_t* data_buffer_ptr_; std::shared_ptr<PoolBuffer> valid_bits_buffer_; @@ -136,41 +144,34 @@ class FlatColumnReader::Impl { FileReader::Impl::Impl(MemoryPool* pool, std::unique_ptr<ParquetFileReader> reader) : pool_(pool), reader_(std::move(reader)), num_threads_(1) {} -bool FileReader::Impl::CheckForFlatColumn(const ColumnDescriptor* descr) { - if ((descr->max_repetition_level() > 0) || (descr->max_definition_level() > 1)) { - return false; - } else if ((descr->max_definition_level() == 1) && - (descr->schema_node()->repetition() != Repetition::OPTIONAL)) { - return false; - } - return true; -} - -Status FileReader::Impl::GetFlatColumn(int i, std::unique_ptr<FlatColumnReader>* out) { +Status FileReader::Impl::GetColumn(int i, std::unique_ptr<ColumnReader>* out) { const SchemaDescriptor* schema = reader_->metadata()->schema(); - if (!CheckForFlatColumn(schema->Column(i))) { - return Status::Invalid("The requested column is not flat"); - } - std::unique_ptr<FlatColumnReader::Impl> impl( - new FlatColumnReader::Impl(pool_, schema->Column(i), reader_.get(), i)); - *out = std::unique_ptr<FlatColumnReader>(new FlatColumnReader(std::move(impl))); + std::unique_ptr<ColumnReader::Impl> impl( + new ColumnReader::Impl(pool_, schema->Column(i), reader_.get(), i)); + *out = std::unique_ptr<ColumnReader>(new ColumnReader(std::move(impl))); return Status::OK(); } -Status FileReader::Impl::ReadFlatColumn(int i, std::shared_ptr<Array>* out) { - std::unique_ptr<FlatColumnReader> flat_column_reader; - RETURN_NOT_OK(GetFlatColumn(i, &flat_column_reader)); - return flat_column_reader->NextBatch(reader_->metadata()->num_rows(), out); +Status FileReader::Impl::ReadColumn(int i, std::shared_ptr<Array>* out) { + std::unique_ptr<ColumnReader> flat_column_reader; + RETURN_NOT_OK(GetColumn(i, &flat_column_reader)); + + int64_t batch_size = 0; + for (int j = 0; j < reader_->metadata()->num_row_groups(); j++) { + batch_size += reader_->metadata()->RowGroup(j)->ColumnChunk(i)->num_values(); + } + + return flat_column_reader->NextBatch(batch_size, out); } -Status FileReader::Impl::ReadFlatTable(std::shared_ptr<Table>* table) { +Status FileReader::Impl::ReadTable(std::shared_ptr<Table>* table) { std::vector<int> column_indices(reader_->metadata()->num_columns()); for (size_t i = 0; i < column_indices.size(); ++i) { column_indices[i] = i; } - return ReadFlatTable(column_indices, table); + return ReadTable(column_indices, table); } template <class FUNCTION> @@ -207,7 +208,7 @@ Status ParallelFor(int nthreads, int num_tasks, FUNCTION&& func) { return Status::OK(); } -Status FileReader::Impl::ReadFlatTable( +Status FileReader::Impl::ReadTable( const std::vector<int>& indices, std::shared_ptr<Table>* table) { auto descr = reader_->metadata()->schema(); @@ -219,19 +220,19 @@ Status FileReader::Impl::ReadFlatTable( int nthreads = std::min<int>(num_threads_, num_columns); std::vector<std::shared_ptr<Column>> columns(num_columns); - auto ReadColumn = [&indices, &schema, &columns, this](int i) { + auto ReadColumnFunc = [&indices, &schema, &columns, this](int i) { std::shared_ptr<Array> array; - RETURN_NOT_OK(ReadFlatColumn(indices[i], &array)); + RETURN_NOT_OK(ReadColumn(indices[i], &array)); columns[i] = std::make_shared<Column>(schema->field(i), array); return Status::OK(); }; if (nthreads == 1) { for (int i = 0; i < num_columns; i++) { - RETURN_NOT_OK(ReadColumn(i)); + RETURN_NOT_OK(ReadColumnFunc(i)); } } else { - RETURN_NOT_OK(ParallelFor(nthreads, num_columns, ReadColumn)); + RETURN_NOT_OK(ParallelFor(nthreads, num_columns, ReadColumnFunc)); } *table = std::make_shared<Table>(name, schema, columns); @@ -261,30 +262,30 @@ Status OpenFile(const std::shared_ptr<::arrow::io::ReadableFileInterface>& file, file, allocator, ::parquet::default_reader_properties(), nullptr, reader); } -Status FileReader::GetFlatColumn(int i, std::unique_ptr<FlatColumnReader>* out) { - return impl_->GetFlatColumn(i, out); +Status FileReader::GetColumn(int i, std::unique_ptr<ColumnReader>* out) { + return impl_->GetColumn(i, out); } -Status FileReader::ReadFlatColumn(int i, std::shared_ptr<Array>* out) { +Status FileReader::ReadColumn(int i, std::shared_ptr<Array>* out) { try { - return impl_->ReadFlatColumn(i, out); + return impl_->ReadColumn(i, out); } catch (const ::parquet::ParquetException& e) { return ::arrow::Status::IOError(e.what()); } } -Status FileReader::ReadFlatTable(std::shared_ptr<Table>* out) { +Status FileReader::ReadTable(std::shared_ptr<Table>* out) { try { - return impl_->ReadFlatTable(out); + return impl_->ReadTable(out); } catch (const ::parquet::ParquetException& e) { return ::arrow::Status::IOError(e.what()); } } -Status FileReader::ReadFlatTable( +Status FileReader::ReadTable( const std::vector<int>& column_indices, std::shared_ptr<Table>* out) { try { - return impl_->ReadFlatTable(column_indices, out); + return impl_->ReadTable(column_indices, out); } catch (const ::parquet::ParquetException& e) { return ::arrow::Status::IOError(e.what()); } @@ -298,7 +299,7 @@ const ParquetFileReader* FileReader::parquet_reader() const { return impl_->parquet_reader(); } -FlatColumnReader::Impl::Impl(MemoryPool* pool, const ColumnDescriptor* descr, +ColumnReader::Impl::Impl(MemoryPool* pool, const ColumnDescriptor* descr, ParquetFileReader* reader, int column_index) : pool_(pool), descr_(descr), @@ -306,15 +307,15 @@ FlatColumnReader::Impl::Impl(MemoryPool* pool, const ColumnDescriptor* descr, column_index_(column_index), next_row_group_(0), values_buffer_(pool), - def_levels_buffer_(pool) { + def_levels_buffer_(pool), + rep_levels_buffer_(pool) { NodeToField(descr_->schema_node(), &field_); NextRowGroup(); } template <typename ArrowType, typename ParquetType> -Status FlatColumnReader::Impl::ReadNonNullableBatch( - TypedColumnReader<ParquetType>* reader, int64_t values_to_read, - int64_t* levels_read) { +Status ColumnReader::Impl::ReadNonNullableBatch(TypedColumnReader<ParquetType>* reader, + int64_t values_to_read, int64_t* levels_read) { using ArrowCType = typename ArrowType::c_type; using ParquetCType = typename ParquetType::c_type; @@ -333,7 +334,7 @@ Status FlatColumnReader::Impl::ReadNonNullableBatch( #define NONNULLABLE_BATCH_FAST_PATH(ArrowType, ParquetType, CType) \ template <> \ - Status FlatColumnReader::Impl::ReadNonNullableBatch<ArrowType, ParquetType>( \ + Status ColumnReader::Impl::ReadNonNullableBatch<ArrowType, ParquetType>( \ TypedColumnReader<ParquetType> * reader, int64_t values_to_read, \ int64_t * levels_read) { \ int64_t values_read; \ @@ -352,7 +353,7 @@ NONNULLABLE_BATCH_FAST_PATH(::arrow::FloatType, FloatType, float) NONNULLABLE_BATCH_FAST_PATH(::arrow::DoubleType, DoubleType, double) template <> -Status FlatColumnReader::Impl::ReadNonNullableBatch<::arrow::TimestampType, Int96Type>( +Status ColumnReader::Impl::ReadNonNullableBatch<::arrow::TimestampType, Int96Type>( TypedColumnReader<Int96Type>* reader, int64_t values_to_read, int64_t* levels_read) { RETURN_NOT_OK(values_buffer_.Resize(values_to_read * sizeof(Int96Type), false)); auto values = reinterpret_cast<Int96*>(values_buffer_.mutable_data()); @@ -370,7 +371,7 @@ Status FlatColumnReader::Impl::ReadNonNullableBatch<::arrow::TimestampType, Int9 } template <> -Status FlatColumnReader::Impl::ReadNonNullableBatch<::arrow::BooleanType, BooleanType>( +Status ColumnReader::Impl::ReadNonNullableBatch<::arrow::BooleanType, BooleanType>( TypedColumnReader<BooleanType>* reader, int64_t values_to_read, int64_t* levels_read) { RETURN_NOT_OK(values_buffer_.Resize(values_to_read * sizeof(bool), false)); @@ -388,47 +389,49 @@ Status FlatColumnReader::Impl::ReadNonNullableBatch<::arrow::BooleanType, Boolea } template <typename ArrowType, typename ParquetType> -Status FlatColumnReader::Impl::ReadNullableFlatBatch( - TypedColumnReader<ParquetType>* reader, int16_t* def_levels, int64_t values_to_read, - int64_t* levels_read) { +Status ColumnReader::Impl::ReadNullableBatch(TypedColumnReader<ParquetType>* reader, + int16_t* def_levels, int16_t* rep_levels, int64_t values_to_read, + int64_t* levels_read, int64_t* values_read) { using ArrowCType = typename ArrowType::c_type; using ParquetCType = typename ParquetType::c_type; RETURN_NOT_OK(values_buffer_.Resize(values_to_read * sizeof(ParquetCType), false)); auto values = reinterpret_cast<ParquetCType*>(values_buffer_.mutable_data()); - int null_count; - PARQUET_CATCH_NOT_OK(*levels_read = - reader->ReadBatchSpaced(values_to_read, def_levels, nullptr, - values, &null_count, valid_bits_ptr_, valid_bits_idx_)); + int64_t null_count; + PARQUET_CATCH_NOT_OK(reader->ReadBatchSpaced(values_to_read, def_levels, rep_levels, + values, valid_bits_ptr_, valid_bits_idx_, levels_read, values_read, &null_count)); auto data_ptr = reinterpret_cast<ArrowCType*>(data_buffer_ptr_); INIT_BITSET(valid_bits_ptr_, valid_bits_idx_); - for (int64_t i = 0; i < *levels_read; i++) { - if (bitset & (1 << bit_offset)) { data_ptr[valid_bits_idx_ + i] = values[i]; } + for (int64_t i = 0; i < *values_read; i++) { + if (bitset_valid_bits_ptr_ & (1 << bit_offset_valid_bits_ptr_)) { + data_ptr[valid_bits_idx_ + i] = values[i]; + } READ_NEXT_BITSET(valid_bits_ptr_); } null_count_ += null_count; - valid_bits_idx_ += *levels_read; + valid_bits_idx_ += *values_read; return Status::OK(); } -#define NULLABLE_BATCH_FAST_PATH(ArrowType, ParquetType, CType) \ - template <> \ - Status FlatColumnReader::Impl::ReadNullableFlatBatch<ArrowType, ParquetType>( \ - TypedColumnReader<ParquetType> * reader, int16_t * def_levels, \ - int64_t values_to_read, int64_t * levels_read) { \ - auto data_ptr = reinterpret_cast<CType*>(data_buffer_ptr_); \ - int null_count; \ - PARQUET_CATCH_NOT_OK(*levels_read = reader->ReadBatchSpaced(values_to_read, \ - def_levels, nullptr, data_ptr + valid_bits_idx_, \ - &null_count, valid_bits_ptr_, valid_bits_idx_)); \ - \ - valid_bits_idx_ += *levels_read; \ - null_count_ += null_count; \ - \ - return Status::OK(); \ +#define NULLABLE_BATCH_FAST_PATH(ArrowType, ParquetType, CType) \ + template <> \ + Status ColumnReader::Impl::ReadNullableBatch<ArrowType, ParquetType>( \ + TypedColumnReader<ParquetType> * reader, int16_t * def_levels, \ + int16_t * rep_levels, int64_t values_to_read, int64_t * levels_read, \ + int64_t * values_read) { \ + auto data_ptr = reinterpret_cast<CType*>(data_buffer_ptr_); \ + int64_t null_count; \ + PARQUET_CATCH_NOT_OK(reader->ReadBatchSpaced(values_to_read, def_levels, rep_levels, \ + data_ptr + valid_bits_idx_, valid_bits_ptr_, valid_bits_idx_, levels_read, \ + values_read, &null_count)); \ + \ + valid_bits_idx_ += *values_read; \ + null_count_ += null_count; \ + \ + return Status::OK(); \ } NULLABLE_BATCH_FAST_PATH(::arrow::Int32Type, Int32Type, int32_t) @@ -437,56 +440,54 @@ NULLABLE_BATCH_FAST_PATH(::arrow::FloatType, FloatType, float) NULLABLE_BATCH_FAST_PATH(::arrow::DoubleType, DoubleType, double) template <> -Status FlatColumnReader::Impl::ReadNullableFlatBatch<::arrow::TimestampType, Int96Type>( - TypedColumnReader<Int96Type>* reader, int16_t* def_levels, int64_t values_to_read, - int64_t* levels_read) { +Status ColumnReader::Impl::ReadNullableBatch<::arrow::TimestampType, Int96Type>( + TypedColumnReader<Int96Type>* reader, int16_t* def_levels, int16_t* rep_levels, + int64_t values_to_read, int64_t* levels_read, int64_t* values_read) { RETURN_NOT_OK(values_buffer_.Resize(values_to_read * sizeof(Int96Type), false)); auto values = reinterpret_cast<Int96*>(values_buffer_.mutable_data()); - int null_count; - PARQUET_CATCH_NOT_OK(*levels_read = - reader->ReadBatchSpaced(values_to_read, def_levels, nullptr, - values, &null_count, valid_bits_ptr_, valid_bits_idx_)); + int64_t null_count; + PARQUET_CATCH_NOT_OK(reader->ReadBatchSpaced(values_to_read, def_levels, rep_levels, + values, valid_bits_ptr_, valid_bits_idx_, levels_read, values_read, &null_count)); auto data_ptr = reinterpret_cast<int64_t*>(data_buffer_ptr_); INIT_BITSET(valid_bits_ptr_, valid_bits_idx_); - for (int64_t i = 0; i < *levels_read; i++) { - if (bitset & (1 << bit_offset)) { + for (int64_t i = 0; i < *values_read; i++) { + if (bitset_valid_bits_ptr_ & (1 << bit_offset_valid_bits_ptr_)) { data_ptr[valid_bits_idx_ + i] = impala_timestamp_to_nanoseconds(values[i]); } READ_NEXT_BITSET(valid_bits_ptr_); } null_count_ += null_count; - valid_bits_idx_ += *levels_read; + valid_bits_idx_ += *values_read; return Status::OK(); } template <> -Status FlatColumnReader::Impl::ReadNullableFlatBatch<::arrow::BooleanType, BooleanType>( - TypedColumnReader<BooleanType>* reader, int16_t* def_levels, int64_t values_to_read, - int64_t* levels_read) { +Status ColumnReader::Impl::ReadNullableBatch<::arrow::BooleanType, BooleanType>( + TypedColumnReader<BooleanType>* reader, int16_t* def_levels, int16_t* rep_levels, + int64_t values_to_read, int64_t* levels_read, int64_t* values_read) { RETURN_NOT_OK(values_buffer_.Resize(values_to_read * sizeof(bool), false)); auto values = reinterpret_cast<bool*>(values_buffer_.mutable_data()); - int null_count; - PARQUET_CATCH_NOT_OK(*levels_read = - reader->ReadBatchSpaced(values_to_read, def_levels, nullptr, - values, &null_count, valid_bits_ptr_, valid_bits_idx_)); + int64_t null_count; + PARQUET_CATCH_NOT_OK(reader->ReadBatchSpaced(values_to_read, def_levels, rep_levels, + values, valid_bits_ptr_, valid_bits_idx_, levels_read, values_read, &null_count)); INIT_BITSET(valid_bits_ptr_, valid_bits_idx_); - for (int64_t i = 0; i < *levels_read; i++) { - if (bitset & (1 << bit_offset)) { + for (int64_t i = 0; i < *values_read; i++) { + if (bitset_valid_bits_ptr_ & (1 << bit_offset_valid_bits_ptr_)) { if (values[i]) { ::arrow::BitUtil::SetBit(data_buffer_ptr_, valid_bits_idx_ + i); } } READ_NEXT_BITSET(valid_bits_ptr_); } - valid_bits_idx_ += *levels_read; + valid_bits_idx_ += *values_read; null_count_ += null_count; return Status::OK(); } template <typename ArrowType> -Status FlatColumnReader::Impl::InitDataBuffer(int batch_size) { +Status ColumnReader::Impl::InitDataBuffer(int batch_size) { using ArrowCType = typename ArrowType::c_type; data_buffer_ = std::make_shared<PoolBuffer>(pool_); RETURN_NOT_OK(data_buffer_->Resize(batch_size * sizeof(ArrowCType), false)); @@ -496,7 +497,7 @@ Status FlatColumnReader::Impl::InitDataBuffer(int batch_size) { } template <> -Status FlatColumnReader::Impl::InitDataBuffer<::arrow::BooleanType>(int batch_size) { +Status ColumnReader::Impl::InitDataBuffer<::arrow::BooleanType>(int batch_size) { data_buffer_ = std::make_shared<PoolBuffer>(pool_); RETURN_NOT_OK(data_buffer_->Resize(::arrow::BitUtil::CeilByte(batch_size) / 8, false)); data_buffer_ptr_ = data_buffer_->mutable_data(); @@ -505,13 +506,7 @@ Status FlatColumnReader::Impl::InitDataBuffer<::arrow::BooleanType>(int batch_si return Status::OK(); } -template <typename ArrowType, typename ParquetType> -Status FlatColumnReader::Impl::TypedReadBatch( - int batch_size, std::shared_ptr<Array>* out) { - using ArrowCType = typename ArrowType::c_type; - - int values_to_read = batch_size; - RETURN_NOT_OK(InitDataBuffer<ArrowType>(batch_size)); +Status ColumnReader::Impl::InitValidBits(int batch_size) { valid_bits_idx_ = 0; if (descr_->max_definition_level() > 0) { int valid_bits_size = ::arrow::BitUtil::CeilByte(batch_size + 1) / 8; @@ -521,93 +516,207 @@ Status FlatColumnReader::Impl::TypedReadBatch( memset(valid_bits_ptr_, 0, valid_bits_size); null_count_ = 0; } + return Status::OK(); +} - while ((values_to_read > 0) && column_reader_) { - if (descr_->max_definition_level() > 0) { - RETURN_NOT_OK(def_levels_buffer_.Resize(values_to_read * sizeof(int16_t), false)); +Status ColumnReader::Impl::WrapIntoListArray(const int16_t* def_levels, + const int16_t* rep_levels, int64_t total_levels_read, std::shared_ptr<Array>* array) { + if (descr_->max_repetition_level() > 0) { + std::shared_ptr<::arrow::Schema> arrow_schema; + RETURN_NOT_OK( + FromParquetSchema(reader_->metadata()->schema(), {column_index_}, &arrow_schema)); + + // Walk downwards to extract nullability + std::shared_ptr<Field> current_field = arrow_schema->field(0); + std::vector<bool> nullable; + std::vector<std::shared_ptr<::arrow::Int32Builder>> offset_builders; + std::vector<std::shared_ptr<::arrow::BooleanBuilder>> valid_bits_builders; + nullable.push_back(current_field->nullable); + while (current_field->type->num_children() > 0) { + if (current_field->type->num_children() > 1) { + return Status::NotImplemented( + "Fields with more than one child are not supported."); + } else { + if (current_field->type->type != ::arrow::Type::LIST) { + return Status::NotImplemented( + "Currently only nesting with Lists is supported."); + } + current_field = current_field->type->child(0); + } + offset_builders.emplace_back( + std::make_shared<::arrow::Int32Builder>(pool_, ::arrow::int32())); + valid_bits_builders.emplace_back( + std::make_shared<::arrow::BooleanBuilder>(pool_, ::arrow::boolean())); + nullable.push_back(current_field->nullable); + } + + int64_t list_depth = offset_builders.size(); + // This describes the minimal definition that describes a level that + // reflects a value in the primitive values array. + int16_t values_def_level = descr_->max_definition_level(); + if (nullable[nullable.size() - 1]) { values_def_level--; } + + // The definition levels that are needed so that a list is declared + // as empty and not null. + std::vector<int16_t> empty_def_level(list_depth); + int def_level = 0; + for (int i = 0; i < list_depth; i++) { + if (nullable[i]) { def_level++; } + empty_def_level[i] = def_level; + def_level++; + } + + int32_t values_offset = 0; + std::vector<int64_t> null_counts(list_depth, 0); + for (int64_t i = 0; i < total_levels_read; i++) { + int16_t rep_level = rep_levels[i]; + if (rep_level < descr_->max_repetition_level()) { + for (int64_t j = rep_level; j < list_depth; j++) { + if (j == (list_depth - 1)) { + RETURN_NOT_OK(offset_builders[j]->Append(values_offset)); + } else { + RETURN_NOT_OK(offset_builders[j]->Append(offset_builders[j + 1]->length())); + } + + if (((empty_def_level[j] - 1) == def_levels[i]) && (nullable[j])) { + RETURN_NOT_OK(valid_bits_builders[j]->Append(false)); + null_counts[j]++; + break; + } else { + RETURN_NOT_OK(valid_bits_builders[j]->Append(true)); + if (empty_def_level[j] == def_levels[i]) { break; } + } + } + } + if (def_levels[i] >= values_def_level) { values_offset++; } + } + // Add the final offset to all lists + for (int64_t j = 0; j < list_depth; j++) { + if (j == (list_depth - 1)) { + RETURN_NOT_OK(offset_builders[j]->Append(values_offset)); + } else { + RETURN_NOT_OK(offset_builders[j]->Append(offset_builders[j + 1]->length())); + } + } + + std::vector<std::shared_ptr<Buffer>> offsets; + std::vector<std::shared_ptr<Buffer>> valid_bits; + std::vector<int64_t> list_lengths; + for (int64_t j = 0; j < list_depth; j++) { + list_lengths.push_back(offset_builders[j]->length() - 1); + std::shared_ptr<Array> array; + RETURN_NOT_OK(offset_builders[j]->Finish(&array)); + offsets.emplace_back(std::static_pointer_cast<Int32Array>(array)->data()); + RETURN_NOT_OK(valid_bits_builders[j]->Finish(&array)); + valid_bits.emplace_back(std::static_pointer_cast<BooleanArray>(array)->data()); + } + + std::shared_ptr<Array> output(*array); + for (int64_t j = list_depth - 1; j >= 0; j--) { + auto list_type = std::make_shared<::arrow::ListType>( + std::make_shared<Field>("item", output->type(), nullable[j + 1])); + output = std::make_shared<::arrow::ListArray>( + list_type, list_lengths[j], offsets[j], output, null_counts[j], valid_bits[j]); } + *array = output; + } + return Status::OK(); +} + +template <typename ArrowType, typename ParquetType> +Status ColumnReader::Impl::TypedReadBatch(int batch_size, std::shared_ptr<Array>* out) { + using ArrowCType = typename ArrowType::c_type; + + int values_to_read = batch_size; + int total_levels_read = 0; + RETURN_NOT_OK(InitDataBuffer<ArrowType>(batch_size)); + RETURN_NOT_OK(InitValidBits(batch_size)); + if (descr_->max_definition_level() > 0) { + RETURN_NOT_OK(def_levels_buffer_.Resize(batch_size * sizeof(int16_t), false)); + } + if (descr_->max_repetition_level() > 0) { + RETURN_NOT_OK(rep_levels_buffer_.Resize(batch_size * sizeof(int16_t), false)); + } + int16_t* def_levels = reinterpret_cast<int16_t*>(def_levels_buffer_.mutable_data()); + int16_t* rep_levels = reinterpret_cast<int16_t*>(rep_levels_buffer_.mutable_data()); + + while ((values_to_read > 0) && column_reader_) { auto reader = dynamic_cast<TypedColumnReader<ParquetType>*>(column_reader_.get()); + int64_t values_read; int64_t levels_read; - int16_t* def_levels = reinterpret_cast<int16_t*>(def_levels_buffer_.mutable_data()); if (descr_->max_definition_level() == 0) { RETURN_NOT_OK((ReadNonNullableBatch<ArrowType, ParquetType>( - reader, values_to_read, &levels_read))); + reader, values_to_read, &values_read))); } else { - // As per the defintion and checks for flat columns: - // descr_->max_definition_level() == 1 - RETURN_NOT_OK((ReadNullableFlatBatch<ArrowType, ParquetType>( - reader, def_levels, values_to_read, &levels_read))); + // As per the defintion and checks for flat (list) columns: + // descr_->max_definition_level() > 0, <= 3 + RETURN_NOT_OK((ReadNullableBatch<ArrowType, ParquetType>(reader, + def_levels + total_levels_read, rep_levels + total_levels_read, values_to_read, + &levels_read, &values_read))); + total_levels_read += levels_read; } - values_to_read -= levels_read; + values_to_read -= values_read; if (!column_reader_->HasNext()) { NextRowGroup(); } } + // Shrink arrays as they may be larger than the output. + RETURN_NOT_OK(data_buffer_->Resize(valid_bits_idx_ * sizeof(ArrowCType))); if (descr_->max_definition_level() > 0) { - // TODO: Shrink arrays in the case they are too large if (valid_bits_idx_ < batch_size * 0.8) { - // Shrink arrays as they are larger than the output. - // TODO(PARQUET-761/ARROW-360): Use realloc internally to shrink the arrays - // without the need for a copy. Given a decent underlying allocator this - // should still free some underlying pages to the OS. - - auto data_buffer = std::make_shared<PoolBuffer>(pool_); - RETURN_NOT_OK(data_buffer->Resize(valid_bits_idx_ * sizeof(ArrowCType), false)); - memcpy(data_buffer->mutable_data(), data_buffer_->data(), data_buffer->size()); - data_buffer_ = data_buffer; - - auto valid_bits_buffer = std::make_shared<PoolBuffer>(pool_); - RETURN_NOT_OK(valid_bits_buffer->Resize( + RETURN_NOT_OK(valid_bits_buffer_->Resize( ::arrow::BitUtil::CeilByte(valid_bits_idx_) / 8, false)); - memcpy(valid_bits_buffer->mutable_data(), valid_bits_buffer_->data(), - valid_bits_buffer->size()); - valid_bits_buffer_ = valid_bits_buffer; } *out = std::make_shared<ArrayType<ArrowType>>( field_->type, valid_bits_idx_, data_buffer_, null_count_, valid_bits_buffer_); - // Relase the ownership - data_buffer_.reset(); + // Relase the ownership as the Buffer is now part of a new Array valid_bits_buffer_.reset(); - return Status::OK(); } else { *out = std::make_shared<ArrayType<ArrowType>>( field_->type, valid_bits_idx_, data_buffer_); - data_buffer_.reset(); - return Status::OK(); } + // Relase the ownership as the Buffer is now part of a new Array + data_buffer_.reset(); + + // Check if we should transform this array into an list array. + return WrapIntoListArray(def_levels, rep_levels, total_levels_read, out); } template <> -Status FlatColumnReader::Impl::TypedReadBatch<::arrow::BooleanType, BooleanType>( +Status ColumnReader::Impl::TypedReadBatch<::arrow::BooleanType, BooleanType>( int batch_size, std::shared_ptr<Array>* out) { int values_to_read = batch_size; + int total_levels_read = 0; RETURN_NOT_OK(InitDataBuffer<::arrow::BooleanType>(batch_size)); - valid_bits_idx_ = 0; + RETURN_NOT_OK(InitValidBits(batch_size)); if (descr_->max_definition_level() > 0) { - valid_bits_buffer_ = std::make_shared<PoolBuffer>(pool_); - int valid_bits_size = ::arrow::BitUtil::CeilByte(batch_size + 1) / 8; - RETURN_NOT_OK(valid_bits_buffer_->Resize(valid_bits_size, false)); - valid_bits_ptr_ = valid_bits_buffer_->mutable_data(); - memset(valid_bits_ptr_, 0, valid_bits_size); - null_count_ = 0; + RETURN_NOT_OK(def_levels_buffer_.Resize(batch_size * sizeof(int16_t), false)); + } + if (descr_->max_repetition_level() > 0) { + RETURN_NOT_OK(rep_levels_buffer_.Resize(batch_size * sizeof(int16_t), false)); } + int16_t* def_levels = reinterpret_cast<int16_t*>(def_levels_buffer_.mutable_data()); + int16_t* rep_levels = reinterpret_cast<int16_t*>(rep_levels_buffer_.mutable_data()); while ((values_to_read > 0) && column_reader_) { if (descr_->max_definition_level() > 0) { RETURN_NOT_OK(def_levels_buffer_.Resize(values_to_read * sizeof(int16_t), false)); } auto reader = dynamic_cast<TypedColumnReader<BooleanType>*>(column_reader_.get()); + int64_t values_read; int64_t levels_read; int16_t* def_levels = reinterpret_cast<int16_t*>(def_levels_buffer_.mutable_data()); if (descr_->max_definition_level() == 0) { RETURN_NOT_OK((ReadNonNullableBatch<::arrow::BooleanType, BooleanType>( - reader, values_to_read, &levels_read))); + reader, values_to_read, &values_read))); } else { // As per the defintion and checks for flat columns: // descr_->max_definition_level() == 1 - RETURN_NOT_OK((ReadNullableFlatBatch<::arrow::BooleanType, BooleanType>( - reader, def_levels, values_to_read, &levels_read))); + RETURN_NOT_OK((ReadNullableBatch<::arrow::BooleanType, BooleanType>(reader, + def_levels + total_levels_read, rep_levels + total_levels_read, values_to_read, + &levels_read, &values_read))); + total_levels_read += levels_read; } - values_to_read -= levels_read; + values_to_read -= values_read; if (!column_reader_->HasNext()) { NextRowGroup(); } } @@ -631,39 +740,46 @@ Status FlatColumnReader::Impl::TypedReadBatch<::arrow::BooleanType, BooleanType> valid_bits_buffer->size()); valid_bits_buffer_ = valid_bits_buffer; } - *out = std::make_shared<::arrow::BooleanArray>( + *out = std::make_shared<BooleanArray>( field_->type, valid_bits_idx_, data_buffer_, null_count_, valid_bits_buffer_); // Relase the ownership data_buffer_.reset(); valid_bits_buffer_.reset(); - return Status::OK(); } else { - *out = std::make_shared<::arrow::BooleanArray>( - field_->type, valid_bits_idx_, data_buffer_); + *out = std::make_shared<BooleanArray>(field_->type, valid_bits_idx_, data_buffer_); data_buffer_.reset(); - return Status::OK(); } + + // Check if we should transform this array into an list array. + return WrapIntoListArray(def_levels, rep_levels, total_levels_read, out); } template <typename ArrowType> -Status FlatColumnReader::Impl::ReadByteArrayBatch( +Status ColumnReader::Impl::ReadByteArrayBatch( int batch_size, std::shared_ptr<Array>* out) { using BuilderType = typename ::arrow::TypeTraits<ArrowType>::BuilderType; + int total_levels_read = 0; + if (descr_->max_definition_level() > 0) { + RETURN_NOT_OK(def_levels_buffer_.Resize(batch_size * sizeof(int16_t), false)); + } + if (descr_->max_repetition_level() > 0) { + RETURN_NOT_OK(rep_levels_buffer_.Resize(batch_size * sizeof(int16_t), false)); + } + int16_t* def_levels = reinterpret_cast<int16_t*>(def_levels_buffer_.mutable_data()); + int16_t* rep_levels = reinterpret_cast<int16_t*>(rep_levels_buffer_.mutable_data()); + int values_to_read = batch_size; BuilderType builder(pool_, field_->type); while ((values_to_read > 0) && column_reader_) { RETURN_NOT_OK(values_buffer_.Resize(values_to_read * sizeof(ByteArray), false)); - if (descr_->max_definition_level() > 0) { - RETURN_NOT_OK(def_levels_buffer_.Resize(values_to_read * sizeof(int16_t), false)); - } auto reader = dynamic_cast<TypedColumnReader<ByteArrayType>*>(column_reader_.get()); int64_t values_read; int64_t levels_read; - int16_t* def_levels = reinterpret_cast<int16_t*>(def_levels_buffer_.mutable_data()); auto values = reinterpret_cast<ByteArray*>(values_buffer_.mutable_data()); - PARQUET_CATCH_NOT_OK(levels_read = reader->ReadBatch( - values_to_read, def_levels, nullptr, values, &values_read)); + PARQUET_CATCH_NOT_OK( + levels_read = reader->ReadBatch(values_to_read, def_levels + total_levels_read, + rep_levels + total_levels_read, values, &values_read)); values_to_read -= levels_read; if (descr_->max_definition_level() == 0) { for (int64_t i = 0; i < levels_read; i++) { @@ -671,32 +787,38 @@ Status FlatColumnReader::Impl::ReadByteArrayBatch( builder.Append(reinterpret_cast<const char*>(values[i].ptr), values[i].len)); } } else { - // descr_->max_definition_level() == 1 + // descr_->max_definition_level() > 0 int values_idx = 0; + int nullable_elements = descr_->schema_node()->is_optional(); for (int64_t i = 0; i < levels_read; i++) { - if (def_levels[i] < descr_->max_definition_level()) { + if (nullable_elements && + (def_levels[i + total_levels_read] == (descr_->max_definition_level() - 1))) { RETURN_NOT_OK(builder.AppendNull()); - } else { + } else if (def_levels[i + total_levels_read] == descr_->max_definition_level()) { RETURN_NOT_OK( builder.Append(reinterpret_cast<const char*>(values[values_idx].ptr), values[values_idx].len)); values_idx++; } } + total_levels_read += levels_read; } if (!column_reader_->HasNext()) { NextRowGroup(); } } - return builder.Finish(out); + + RETURN_NOT_OK(builder.Finish(out)); + // Check if we should transform this array into an list array. + return WrapIntoListArray(def_levels, rep_levels, total_levels_read, out); } template <> -Status FlatColumnReader::Impl::TypedReadBatch<::arrow::BinaryType, ByteArrayType>( +Status ColumnReader::Impl::TypedReadBatch<::arrow::BinaryType, ByteArrayType>( int batch_size, std::shared_ptr<Array>* out) { return ReadByteArrayBatch<::arrow::BinaryType>(batch_size, out); } template <> -Status FlatColumnReader::Impl::TypedReadBatch<::arrow::StringType, ByteArrayType>( +Status ColumnReader::Impl::TypedReadBatch<::arrow::StringType, ByteArrayType>( int batch_size, std::shared_ptr<Array>* out) { return ReadByteArrayBatch<::arrow::StringType>(batch_size, out); } @@ -706,7 +828,7 @@ Status FlatColumnReader::Impl::TypedReadBatch<::arrow::StringType, ByteArrayType return TypedReadBatch<ArrowType, ParquetType>(batch_size, out); \ break; -Status FlatColumnReader::Impl::NextBatch(int batch_size, std::shared_ptr<Array>* out) { +Status ColumnReader::Impl::NextBatch(int batch_size, std::shared_ptr<Array>* out) { if (!column_reader_) { // Exhausted all row groups. *out = nullptr; @@ -747,7 +869,7 @@ Status FlatColumnReader::Impl::NextBatch(int batch_size, std::shared_ptr<Array>* } } -void FlatColumnReader::Impl::NextRowGroup() { +void ColumnReader::Impl::NextRowGroup() { if (next_row_group_ < reader_->metadata()->num_row_groups()) { column_reader_ = reader_->RowGroup(next_row_group_)->Column(column_index_); next_row_group_++; @@ -756,11 +878,11 @@ void FlatColumnReader::Impl::NextRowGroup() { } } -FlatColumnReader::FlatColumnReader(std::unique_ptr<Impl> impl) : impl_(std::move(impl)) {} +ColumnReader::ColumnReader(std::unique_ptr<Impl> impl) : impl_(std::move(impl)) {} -FlatColumnReader::~FlatColumnReader() {} +ColumnReader::~ColumnReader() {} -Status FlatColumnReader::NextBatch(int batch_size, std::shared_ptr<Array>* out) { +Status ColumnReader::NextBatch(int batch_size, std::shared_ptr<Array>* out) { return impl_->NextBatch(batch_size, out); } http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/ad56e7ae/src/parquet/arrow/reader.h ---------------------------------------------------------------------- diff --git a/src/parquet/arrow/reader.h b/src/parquet/arrow/reader.h index 934b826..1aa9c3e 100644 --- a/src/parquet/arrow/reader.h +++ b/src/parquet/arrow/reader.h @@ -39,7 +39,7 @@ namespace parquet { namespace arrow { -class FlatColumnReader; +class ColumnReader; // Arrow read adapter class for deserializing Parquet files as Arrow row // batches. @@ -94,17 +94,17 @@ class PARQUET_EXPORT FileReader { // fully-materialized arrow::Array instances // // Returns error status if the column of interest is not flat. - ::arrow::Status GetFlatColumn(int i, std::unique_ptr<FlatColumnReader>* out); + ::arrow::Status GetColumn(int i, std::unique_ptr<ColumnReader>* out); // Read column as a whole into an Array. - ::arrow::Status ReadFlatColumn(int i, std::shared_ptr<::arrow::Array>* out); + ::arrow::Status ReadColumn(int i, std::shared_ptr<::arrow::Array>* out); // Read a table of flat columns into a Table. - ::arrow::Status ReadFlatTable(std::shared_ptr<::arrow::Table>* out); + ::arrow::Status ReadTable(std::shared_ptr<::arrow::Table>* out); // Read a table of flat columns into a Table. Read only the indicated column // indices (relative to the schema) - ::arrow::Status ReadFlatTable( + ::arrow::Status ReadTable( const std::vector<int>& column_indices, std::shared_ptr<::arrow::Table>* out); const ParquetFileReader* parquet_reader() const; @@ -126,9 +126,9 @@ class PARQUET_EXPORT FileReader { // // We also do not expose any internal Parquet details, such as row groups. This // might change in the future. -class PARQUET_EXPORT FlatColumnReader { +class PARQUET_EXPORT ColumnReader { public: - virtual ~FlatColumnReader(); + virtual ~ColumnReader(); // Scan the next array of the indicated size. The actual size of the // returned array may be less than the passed size depending how much data is @@ -144,7 +144,7 @@ class PARQUET_EXPORT FlatColumnReader { private: class PARQUET_NO_EXPORT Impl; std::unique_ptr<Impl> impl_; - explicit FlatColumnReader(std::unique_ptr<Impl> impl); + explicit ColumnReader(std::unique_ptr<Impl> impl); friend class FileReader; }; http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/ad56e7ae/src/parquet/arrow/test-util.h ---------------------------------------------------------------------- diff --git a/src/parquet/arrow/test-util.h b/src/parquet/arrow/test-util.h index 4760f0e..4d87dd8 100644 --- a/src/parquet/arrow/test-util.h +++ b/src/parquet/arrow/test-util.h @@ -177,6 +177,45 @@ typename std::enable_if<is_arrow_bool<ArrowType>::value, Status>::type NullableA return builder.Finish(out); } +/// Wrap an Array into a ListArray by splitting it up into size lists. +/// +/// This helper function only supports (size/2) nulls. +Status MakeListArary(const std::shared_ptr<Array>& values, int64_t size, + int64_t null_count, bool nullable_values, std::shared_ptr<::arrow::ListArray>* out) { + // We always include an empty list + int64_t non_null_entries = size - null_count - 1; + int64_t length_per_entry = values->length() / non_null_entries; + + auto offsets = std::make_shared<::arrow::PoolBuffer>(::arrow::default_memory_pool()); + RETURN_NOT_OK(offsets->Resize((size + 1) * sizeof(int32_t))); + int32_t* offsets_ptr = reinterpret_cast<int32_t*>(offsets->mutable_data()); + + auto null_bitmap = + std::make_shared<::arrow::PoolBuffer>(::arrow::default_memory_pool()); + int64_t bitmap_size = ::arrow::BitUtil::CeilByte(size) / 8; + RETURN_NOT_OK(null_bitmap->Resize(bitmap_size)); + uint8_t* null_bitmap_ptr = null_bitmap->mutable_data(); + memset(null_bitmap_ptr, 0, bitmap_size); + + int32_t current_offset = 0; + for (int64_t i = 0; i < size; i++) { + offsets_ptr[i] = current_offset; + if (!(((i % 2) == 0) && ((i / 2) < null_count))) { + // Non-null list (list with index 1 is always empty). + ::arrow::BitUtil::SetBit(null_bitmap_ptr, i); + if (i != 1) { current_offset += length_per_entry; } + } + } + offsets_ptr[size] = values->length(); + + auto value_field = + std::make_shared<::arrow::Field>("item", values->type(), nullable_values); + *out = std::make_shared<::arrow::ListArray>( + ::arrow::list(value_field), size, offsets, values, null_count, null_bitmap); + + return Status::OK(); +} + std::shared_ptr<::arrow::Column> MakeColumn( const std::string& name, const std::shared_ptr<Array>& array, bool nullable) { auto field = std::make_shared<::arrow::Field>(name, array->type(), nullable);
