Repository: parquet-cpp Updated Branches: refs/heads/master f8573ebed -> d0646659c
PARQUET-946: Add ReadRowGroup and num_row_group methods to arrow::FileReader There's a lot of room for improvement / further refactoring here -- the assumption that an entire column in a file is being read runs very deep in the Arrow reader, so I tried to do the minimum work to decouple the row group iteration. There's some code duplication in ReadRowGroup, but we should maybe save further cleanup for a future patch. Author: Wes McKinney <[email protected]> Closes #291 from wesm/PARQUET-946 and squashes the following commits: 6d2b48a [Wes McKinney] Add virtual dtor c7589f7 [Wes McKinney] Add ReadRowGroup and num_row_group methods to arrow::FileReader Project: http://git-wip-us.apache.org/repos/asf/parquet-cpp/repo Commit: http://git-wip-us.apache.org/repos/asf/parquet-cpp/commit/d0646659 Tree: http://git-wip-us.apache.org/repos/asf/parquet-cpp/tree/d0646659 Diff: http://git-wip-us.apache.org/repos/asf/parquet-cpp/diff/d0646659 Branch: refs/heads/master Commit: d0646659c64585dccd9a8f75a9509c1ae8cfa1fb Parents: f8573eb Author: Wes McKinney <[email protected]> Authored: Thu Apr 6 09:26:31 2017 +0200 Committer: Uwe L. Korn <[email protected]> Committed: Thu Apr 6 09:26:31 2017 +0200 ---------------------------------------------------------------------- cmake_modules/FindClangTools.cmake | 19 +- src/parquet/arrow/arrow-reader-writer-test.cc | 146 ++++++---- src/parquet/arrow/reader.cc | 305 +++++++++++++++------ src/parquet/arrow/reader.h | 7 + src/parquet/arrow/test-util.h | 8 +- src/parquet/column/writer.cc | 2 +- 6 files changed, 343 insertions(+), 144 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/d0646659/cmake_modules/FindClangTools.cmake ---------------------------------------------------------------------- diff --git a/cmake_modules/FindClangTools.cmake b/cmake_modules/FindClangTools.cmake index c07c7d2..e4ee984 100644 --- a/cmake_modules/FindClangTools.cmake +++ b/cmake_modules/FindClangTools.cmake @@ -27,16 +27,16 @@ # This module defines # CLANG_TIDY_BIN, The path to the clang tidy binary # CLANG_TIDY_FOUND, Whether clang tidy was found -# CLANG_FORMAT_BIN, The path to the clang format binary +# CLANG_FORMAT_BIN, The path to the clang format binary # CLANG_TIDY_FOUND, Whether clang format was found -find_program(CLANG_TIDY_BIN - NAMES clang-tidy-3.8 clang-tidy-3.7 clang-tidy-3.6 clang-tidy - PATHS ${ClangTools_PATH} $ENV{CLANG_TOOLS_PATH} /usr/local/bin /usr/bin +find_program(CLANG_TIDY_BIN + NAMES clang-tidy-3.9 clang-tidy-3.8 clang-tidy-3.7 clang-tidy-3.6 clang-tidy + PATHS ${ClangTools_PATH} $ENV{CLANG_TOOLS_PATH} /usr/local/bin /usr/bin NO_DEFAULT_PATH ) -if ( "${CLANG_TIDY_BIN}" STREQUAL "CLANG_TIDY_BIN-NOTFOUND" ) +if ( "${CLANG_TIDY_BIN}" STREQUAL "CLANG_TIDY_BIN-NOTFOUND" ) set(CLANG_TIDY_FOUND 0) message("clang-tidy not found") else() @@ -44,17 +44,16 @@ else() message("clang-tidy found at ${CLANG_TIDY_BIN}") endif() -find_program(CLANG_FORMAT_BIN - NAMES clang-format-3.8 clang-format-3.7 clang-format-3.6 clang-format - PATHS ${ClangTools_PATH} $ENV{CLANG_TOOLS_PATH} /usr/local/bin /usr/bin +find_program(CLANG_FORMAT_BIN + NAMES clang-format-3.9 clang-format-3.8 clang-format-3.7 clang-format-3.6 clang-format + PATHS ${ClangTools_PATH} $ENV{CLANG_TOOLS_PATH} /usr/local/bin /usr/bin NO_DEFAULT_PATH ) -if ( "${CLANG_FORMAT_BIN}" STREQUAL "CLANG_FORMAT_BIN-NOTFOUND" ) +if ( "${CLANG_FORMAT_BIN}" STREQUAL "CLANG_FORMAT_BIN-NOTFOUND" ) set(CLANG_FORMAT_FOUND 0) message("clang-format not found") else() set(CLANG_FORMAT_FOUND 1) message("clang-format found at ${CLANG_FORMAT_BIN}") endif() - http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/d0646659/src/parquet/arrow/arrow-reader-writer-test.cc ---------------------------------------------------------------------- diff --git a/src/parquet/arrow/arrow-reader-writer-test.cc b/src/parquet/arrow/arrow-reader-writer-test.cc index 3b232f9..dd46893 100644 --- a/src/parquet/arrow/arrow-reader-writer-test.cc +++ b/src/parquet/arrow/arrow-reader-writer-test.cc @@ -197,6 +197,36 @@ using ParquetDataType = DataType<test_traits<T>::parquet_enum>; template <typename T> using ParquetWriter = TypedColumnWriter<ParquetDataType<T>>; +void WriteTableToBuffer(const std::shared_ptr<Table>& table, int num_threads, + int64_t row_group_size, std::shared_ptr<Buffer>* out) { + auto sink = std::make_shared<InMemoryOutputStream>(); + + ASSERT_OK_NO_THROW(WriteTable(*table, ::arrow::default_memory_pool(), sink, + row_group_size, default_writer_properties())); + *out = sink->GetBuffer(); +} + +void DoSimpleRoundtrip(const std::shared_ptr<Table>& table, int num_threads, + int64_t row_group_size, const std::vector<int>& column_subset, + std::shared_ptr<Table>* out) { + std::shared_ptr<Buffer> buffer; + WriteTableToBuffer(table, num_threads, row_group_size, &buffer); + + std::unique_ptr<FileReader> reader; + ASSERT_OK_NO_THROW( + OpenFile(std::make_shared<BufferReader>(buffer), ::arrow::default_memory_pool(), + ::parquet::default_reader_properties(), nullptr, &reader)); + + reader->set_num_threads(num_threads); + + if (column_subset.size() > 0) { + ASSERT_OK_NO_THROW(reader->ReadTable(column_subset, out)); + } else { + // Read everything + ASSERT_OK_NO_THROW(reader->ReadTable(out)); + } +} + template <typename TestType> class TestParquetIO : public ::testing::Test { public: @@ -248,19 +278,6 @@ class TestParquetIO : public ::testing::Test { ASSERT_NE(nullptr, out->get()); } - void ReadAndCheckSingleColumnTable(const std::shared_ptr<::arrow::Array>& values) { - std::shared_ptr<::arrow::Table> out; - std::unique_ptr<FileReader> reader; - ReaderFromSink(&reader); - ReadTableFromFile(std::move(reader), &out); - ASSERT_EQ(1, out->num_columns()); - ASSERT_EQ(values->length(), out->num_rows()); - - std::shared_ptr<ChunkedArray> chunked_array = out->column(0)->data(); - ASSERT_EQ(1, chunked_array->num_chunks()); - ASSERT_TRUE(values->Equals(chunked_array->chunk(0))); - } - void PrepareListTable(int64_t size, bool nullable_lists, bool nullable_elements, int64_t null_count, std::shared_ptr<Table>* out) { std::shared_ptr<Array> values; @@ -289,13 +306,23 @@ class TestParquetIO : public ::testing::Test { *out = MakeSimpleTable(parent_lists, nullable_parent_lists); } - void WriteReadAndCheckSingleColumnTable(const std::shared_ptr<Table>& table) { - std::shared_ptr<Array> values = table->column(0)->data()->chunk(0); - this->sink_ = std::make_shared<InMemoryOutputStream>(); - ASSERT_OK_NO_THROW(WriteTable(*table, ::arrow::default_memory_pool(), this->sink_, - values->length(), default_writer_properties())); + void ReadAndCheckSingleColumnTable(const std::shared_ptr<::arrow::Array>& values) { + std::shared_ptr<::arrow::Table> out; + std::unique_ptr<FileReader> reader; + ReaderFromSink(&reader); + ReadTableFromFile(std::move(reader), &out); + ASSERT_EQ(1, out->num_columns()); + ASSERT_EQ(values->length(), out->num_rows()); + + std::shared_ptr<ChunkedArray> chunked_array = out->column(0)->data(); + ASSERT_EQ(1, chunked_array->num_chunks()); + ASSERT_TRUE(values->Equals(chunked_array->chunk(0))); + } - this->ReadAndCheckSingleColumnTable(values); + void CheckRoundTrip(const std::shared_ptr<Table>& table) { + std::shared_ptr<Table> result; + DoSimpleRoundtrip(table, 1, table->num_rows(), {}, &result); + ASSERT_TRUE(table->Equals(*result)); } template <typename ArrayType> @@ -401,37 +428,37 @@ TYPED_TEST(TestParquetIO, SingleColumnTableOptionalReadWrite) { ASSERT_OK(NullableArray<TypeParam>(SMALL_SIZE, 10, kDefaultSeed, &values)); std::shared_ptr<Table> table = MakeSimpleTable(values, true); - this->WriteReadAndCheckSingleColumnTable(table); + this->CheckRoundTrip(table); } TYPED_TEST(TestParquetIO, SingleNullableListNullableColumnReadWrite) { std::shared_ptr<Table> table; this->PrepareListTable(SMALL_SIZE, true, true, 10, &table); - this->WriteReadAndCheckSingleColumnTable(table); + this->CheckRoundTrip(table); } TYPED_TEST(TestParquetIO, SingleRequiredListNullableColumnReadWrite) { std::shared_ptr<Table> table; this->PrepareListTable(SMALL_SIZE, false, true, 10, &table); - this->WriteReadAndCheckSingleColumnTable(table); + this->CheckRoundTrip(table); } TYPED_TEST(TestParquetIO, SingleNullableListRequiredColumnReadWrite) { std::shared_ptr<Table> table; this->PrepareListTable(SMALL_SIZE, true, false, 10, &table); - this->WriteReadAndCheckSingleColumnTable(table); + this->CheckRoundTrip(table); } TYPED_TEST(TestParquetIO, SingleRequiredListRequiredColumnReadWrite) { std::shared_ptr<Table> table; this->PrepareListTable(SMALL_SIZE, false, false, 0, &table); - this->WriteReadAndCheckSingleColumnTable(table); + this->CheckRoundTrip(table); } TYPED_TEST(TestParquetIO, SingleNullableListRequiredListRequiredColumnReadWrite) { std::shared_ptr<Table> table; this->PrepareListOfListTable(SMALL_SIZE, true, false, false, 0, &table); - this->WriteReadAndCheckSingleColumnTable(table); + this->CheckRoundTrip(table); } TYPED_TEST(TestParquetIO, SingleColumnRequiredChunkedWrite) { @@ -756,18 +783,24 @@ TYPED_TEST(TestPrimitiveParquetIO, SingleColumnRequiredChunkedTableRead) { this->CheckSingleColumnRequiredTableRead(4); } -void MakeDoubleTable(int num_columns, int num_rows, std::shared_ptr<Table>* out) { +void MakeDoubleTable( + int num_columns, int num_rows, int nchunks, std::shared_ptr<Table>* out) { std::shared_ptr<::arrow::Column> column; std::vector<std::shared_ptr<::arrow::Column>> columns(num_columns); std::vector<std::shared_ptr<::arrow::Field>> fields(num_columns); - std::shared_ptr<Array> values; for (int i = 0; i < num_columns; ++i) { + std::vector<std::shared_ptr<Array>> arrays; + std::shared_ptr<Array> values; ASSERT_OK(NullableArray<::arrow::DoubleType>( num_rows, num_rows / 10, static_cast<uint32_t>(i), &values)); std::stringstream ss; ss << "col" << i; - column = MakeColumn(ss.str(), values, true); + + for (int j = 0; j < nchunks; ++j) { + arrays.push_back(values); + } + column = MakeColumn(ss.str(), arrays, true); columns[i] = column; fields[i] = column->field(); @@ -776,41 +809,46 @@ void MakeDoubleTable(int num_columns, int num_rows, std::shared_ptr<Table>* out) *out = std::make_shared<Table>(schema, columns); } -void DoTableRoundtrip(const std::shared_ptr<Table>& table, int num_threads, - const std::vector<int>& column_subset, std::shared_ptr<Table>* out) { - auto sink = std::make_shared<InMemoryOutputStream>(); - - ASSERT_OK_NO_THROW(WriteTable( - *table, ::arrow::default_memory_pool(), sink, (table->num_rows() + 1) / 2)); +TEST(TestArrowReadWrite, MultithreadedRead) { + const int num_columns = 20; + const int num_rows = 1000; + const int num_threads = 4; - std::shared_ptr<Buffer> buffer = sink->GetBuffer(); - std::unique_ptr<FileReader> reader; - ASSERT_OK_NO_THROW( - OpenFile(std::make_shared<BufferReader>(buffer), ::arrow::default_memory_pool(), - ::parquet::default_reader_properties(), nullptr, &reader)); + std::shared_ptr<Table> table; + MakeDoubleTable(num_columns, num_rows, 1, &table); - reader->set_num_threads(num_threads); + std::shared_ptr<Table> result; + DoSimpleRoundtrip(table, num_threads, table->num_rows(), {}, &result); - if (column_subset.size() > 0) { - ASSERT_OK_NO_THROW(reader->ReadTable(column_subset, out)); - } else { - // Read everything - ASSERT_OK_NO_THROW(reader->ReadTable(out)); - } + ASSERT_TRUE(table->Equals(*result)); } -TEST(TestArrowReadWrite, MultithreadedRead) { +TEST(TestArrowReadWrite, ReadSingleRowGroup) { const int num_columns = 20; const int num_rows = 1000; - const int num_threads = 4; std::shared_ptr<Table> table; - MakeDoubleTable(num_columns, num_rows, &table); + MakeDoubleTable(num_columns, num_rows, 1, &table); - std::shared_ptr<Table> result; - DoTableRoundtrip(table, num_threads, {}, &result); + std::shared_ptr<Buffer> buffer; + WriteTableToBuffer(table, 1, num_rows / 2, &buffer); - ASSERT_TRUE(table->Equals(*result)); + std::unique_ptr<FileReader> reader; + ASSERT_OK_NO_THROW( + OpenFile(std::make_shared<BufferReader>(buffer), ::arrow::default_memory_pool(), + ::parquet::default_reader_properties(), nullptr, &reader)); + + ASSERT_EQ(2, reader->num_row_groups()); + + std::shared_ptr<Table> r1, r2; + // Read everything + ASSERT_OK_NO_THROW(reader->ReadRowGroup(0, &r1)); + ASSERT_OK_NO_THROW(reader->ReadRowGroup(1, &r2)); + + std::shared_ptr<Table> concatenated; + ASSERT_OK(ConcatenateTables({r1, r2}, &concatenated)); + + ASSERT_TRUE(table->Equals(*concatenated)); } TEST(TestArrowReadWrite, ReadColumnSubset) { @@ -819,11 +857,11 @@ TEST(TestArrowReadWrite, ReadColumnSubset) { const int num_threads = 4; std::shared_ptr<Table> table; - MakeDoubleTable(num_columns, num_rows, &table); + MakeDoubleTable(num_columns, num_rows, 1, &table); std::shared_ptr<Table> result; std::vector<int> column_subset = {0, 4, 8, 10}; - DoTableRoundtrip(table, num_threads, column_subset, &result); + DoSimpleRoundtrip(table, num_threads, table->num_rows(), column_subset, &result); std::vector<std::shared_ptr<::arrow::Column>> ex_columns; std::vector<std::shared_ptr<::arrow::Field>> ex_fields; http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/d0646659/src/parquet/arrow/reader.cc ---------------------------------------------------------------------- diff --git a/src/parquet/arrow/reader.cc b/src/parquet/arrow/reader.cc index a26c3ea..823aea9 100644 --- a/src/parquet/arrow/reader.cc +++ b/src/parquet/arrow/reader.cc @@ -60,19 +60,139 @@ static inline int64_t impala_timestamp_to_nanoseconds(const Int96& impala_timest template <typename ArrowType> using ArrayType = typename ::arrow::TypeTraits<ArrowType>::ArrayType; +// ---------------------------------------------------------------------- +// Helper for parallel for-loop + +template <class FUNCTION> +Status ParallelFor(int nthreads, int num_tasks, FUNCTION&& func) { + std::vector<std::thread> thread_pool; + thread_pool.reserve(nthreads); + std::atomic<int> task_counter(0); + + std::mutex error_mtx; + bool error_occurred = false; + Status error; + + for (int thread_id = 0; thread_id < nthreads; ++thread_id) { + thread_pool.emplace_back( + [&num_tasks, &task_counter, &error, &error_occurred, &error_mtx, &func]() { + int task_id; + while (!error_occurred) { + task_id = task_counter.fetch_add(1); + if (task_id >= num_tasks) { break; } + Status s = func(task_id); + if (!s.ok()) { + std::lock_guard<std::mutex> lock(error_mtx); + error_occurred = true; + error = s; + break; + } + } + }); + } + for (auto&& thread : thread_pool) { + thread.join(); + } + if (error_occurred) { return error; } + return Status::OK(); +} + +// ---------------------------------------------------------------------- +// Iteration utilities + +// Abstraction to decouple row group iteration details from the ColumnReader, +// so we can read only a single row group if we want +class FileColumnIterator { + public: + explicit FileColumnIterator(int column_index, ParquetFileReader* reader) + : column_index_(column_index), + reader_(reader), + schema_(reader->metadata()->schema()) {} + + virtual ~FileColumnIterator() {} + + virtual std::shared_ptr<::parquet::ColumnReader> Next() = 0; + + const SchemaDescriptor* schema() const { return schema_; } + + const ColumnDescriptor* descr() const { return schema_->Column(column_index_); } + + int column_index() const { return column_index_; } + + protected: + int column_index_; + ParquetFileReader* reader_; + const SchemaDescriptor* schema_; +}; + +class AllRowGroupsIterator : public FileColumnIterator { + public: + explicit AllRowGroupsIterator(int column_index, ParquetFileReader* reader) + : FileColumnIterator(column_index, reader), next_row_group_(0) {} + + std::shared_ptr<::parquet::ColumnReader> Next() override { + std::shared_ptr<::parquet::ColumnReader> result; + if (next_row_group_ < reader_->metadata()->num_row_groups()) { + result = reader_->RowGroup(next_row_group_)->Column(column_index_); + next_row_group_++; + } else { + result = nullptr; + } + return result; + }; + + private: + int next_row_group_; +}; + +class SingleRowGroupIterator : public FileColumnIterator { + public: + explicit SingleRowGroupIterator( + int column_index, int row_group_number, ParquetFileReader* reader) + : FileColumnIterator(column_index, reader), + row_group_number_(row_group_number), + done_(false) {} + + std::shared_ptr<::parquet::ColumnReader> Next() override { + if (done_) { return nullptr; } + + auto result = reader_->RowGroup(row_group_number_)->Column(column_index_); + done_ = true; + return result; + }; + + private: + int row_group_number_; + bool done_; +}; + +// ---------------------------------------------------------------------- +// File reader implementation + class FileReader::Impl { public: - Impl(MemoryPool* pool, std::unique_ptr<ParquetFileReader> reader); + Impl(MemoryPool* pool, std::unique_ptr<ParquetFileReader> reader) + : pool_(pool), reader_(std::move(reader)), num_threads_(1) {} + virtual ~Impl() {} - bool CheckForFlatColumn(const ColumnDescriptor* descr); - bool CheckForFlatListColumn(const ColumnDescriptor* descr); Status GetColumn(int i, std::unique_ptr<ColumnReader>* out); Status ReadColumn(int i, std::shared_ptr<Array>* out); - Status ReadTable(std::shared_ptr<Table>* out); - Status ReadTable(const std::vector<int>& column_indices, std::shared_ptr<Table>* out); + Status GetSchema( + const std::vector<int>& indices, std::shared_ptr<::arrow::Schema>* out); + Status ReadRowGroup(int row_group_index, const std::vector<int>& indices, + std::shared_ptr<::arrow::Table>* out); + Status ReadTable(const std::vector<int>& indices, std::shared_ptr<Table>* table); + Status ReadTable(std::shared_ptr<Table>* table); + Status ReadRowGroup(int i, std::shared_ptr<Table>* table); + + bool CheckForFlatColumn(const ColumnDescriptor* descr); + bool CheckForFlatListColumn(const ColumnDescriptor* descr); + const ParquetFileReader* parquet_reader() const { return reader_.get(); } + int num_row_groups() const { return reader_->metadata()->num_row_groups(); } + void set_num_threads(int num_threads) { num_threads_ = num_threads; } private: @@ -84,8 +204,17 @@ class FileReader::Impl { class ColumnReader::Impl { public: - Impl(MemoryPool* pool, const ColumnDescriptor* descr, ParquetFileReader* reader, - int column_index); + Impl(MemoryPool* pool, std::unique_ptr<FileColumnIterator> input) + : pool_(pool), + input_(std::move(input)), + descr_(input_->descr()), + values_buffer_(pool), + def_levels_buffer_(pool), + rep_levels_buffer_(pool) { + NodeToField(input_->descr()->schema_node(), &field_); + NextRowGroup(); + } + virtual ~Impl() {} Status NextBatch(int batch_size, std::shared_ptr<Array>* out); @@ -121,10 +250,9 @@ class ColumnReader::Impl { }; MemoryPool* pool_; + std::unique_ptr<FileColumnIterator> input_; const ColumnDescriptor* descr_; - ParquetFileReader* reader_; - int column_index_; - int next_row_group_; + std::shared_ptr<::parquet::ColumnReader> column_reader_; std::shared_ptr<Field> field_; @@ -139,14 +267,16 @@ class ColumnReader::Impl { int64_t null_count_; }; -FileReader::Impl::Impl(MemoryPool* pool, std::unique_ptr<ParquetFileReader> reader) - : pool_(pool), reader_(std::move(reader)), num_threads_(1) {} +FileReader::FileReader(MemoryPool* pool, std::unique_ptr<ParquetFileReader> reader) + : impl_(new FileReader::Impl(pool, std::move(reader))) {} + +FileReader::~FileReader() {} Status FileReader::Impl::GetColumn(int i, std::unique_ptr<ColumnReader>* out) { - const SchemaDescriptor* schema = reader_->metadata()->schema(); + std::unique_ptr<FileColumnIterator> input(new AllRowGroupsIterator(i, reader_.get())); std::unique_ptr<ColumnReader::Impl> impl( - new ColumnReader::Impl(pool_, schema->Column(i), reader_.get(), i)); + new ColumnReader::Impl(pool_, std::move(input))); *out = std::unique_ptr<ColumnReader>(new ColumnReader(std::move(impl))); return Status::OK(); } @@ -163,55 +293,59 @@ Status FileReader::Impl::ReadColumn(int i, std::shared_ptr<Array>* out) { return flat_column_reader->NextBatch(batch_size, out); } -Status FileReader::Impl::ReadTable(std::shared_ptr<Table>* table) { - std::vector<int> column_indices(reader_->metadata()->num_columns()); - - for (size_t i = 0; i < column_indices.size(); ++i) { - column_indices[i] = i; - } - return ReadTable(column_indices, table); +Status FileReader::Impl::GetSchema( + const std::vector<int>& indices, std::shared_ptr<::arrow::Schema>* out) { + auto descr = reader_->metadata()->schema(); + return FromParquetSchema(descr, indices, out); } -template <class FUNCTION> -Status ParallelFor(int nthreads, int num_tasks, FUNCTION&& func) { - std::vector<std::thread> thread_pool; - thread_pool.reserve(nthreads); - std::atomic<int> task_counter(0); +Status FileReader::Impl::ReadRowGroup(int row_group_index, + const std::vector<int>& indices, std::shared_ptr<::arrow::Table>* out) { + std::shared_ptr<::arrow::Schema> schema; + RETURN_NOT_OK(GetSchema(indices, &schema)); - std::mutex error_mtx; - bool error_occurred = false; - Status error; + auto rg_metadata = reader_->metadata()->RowGroup(row_group_index); - for (int thread_id = 0; thread_id < nthreads; ++thread_id) { - thread_pool.emplace_back( - [&num_tasks, &task_counter, &error, &error_occurred, &error_mtx, &func]() { - int task_id; - while (!error_occurred) { - task_id = task_counter.fetch_add(1); - if (task_id >= num_tasks) { break; } - Status s = func(task_id); - if (!s.ok()) { - std::lock_guard<std::mutex> lock(error_mtx); - error_occurred = true; - error = s; - break; - } - } - }); - } - for (auto&& thread : thread_pool) { - thread.join(); + int num_columns = static_cast<int>(indices.size()); + int nthreads = std::min<int>(num_threads_, num_columns); + std::vector<std::shared_ptr<Column>> columns(num_columns); + + // TODO(wesm): Refactor to share more code with ReadTable + + auto ReadColumnFunc = [&indices, &row_group_index, &schema, &columns, &rg_metadata, + this](int i) { + int column_index = indices[i]; + int64_t batch_size = rg_metadata->ColumnChunk(column_index)->num_values(); + + std::unique_ptr<FileColumnIterator> input( + new SingleRowGroupIterator(column_index, row_group_index, reader_.get())); + + std::unique_ptr<ColumnReader::Impl> impl( + new ColumnReader::Impl(pool_, std::move(input))); + ColumnReader flat_column_reader(std::move(impl)); + + std::shared_ptr<Array> array; + RETURN_NOT_OK(flat_column_reader.NextBatch(batch_size, &array)); + columns[i] = std::make_shared<Column>(schema->field(i), array); + return Status::OK(); + }; + + if (nthreads == 1) { + for (int i = 0; i < num_columns; i++) { + RETURN_NOT_OK(ReadColumnFunc(i)); + } + } else { + RETURN_NOT_OK(ParallelFor(nthreads, num_columns, ReadColumnFunc)); } - if (error_occurred) { return error; } + + *out = std::make_shared<Table>(schema, columns); return Status::OK(); } Status FileReader::Impl::ReadTable( const std::vector<int>& indices, std::shared_ptr<Table>* table) { - auto descr = reader_->metadata()->schema(); - std::shared_ptr<::arrow::Schema> schema; - RETURN_NOT_OK(FromParquetSchema(descr, indices, &schema)); + RETURN_NOT_OK(GetSchema(indices, &schema)); int num_columns = static_cast<int>(indices.size()); int nthreads = std::min<int>(num_threads_, num_columns); @@ -236,10 +370,23 @@ Status FileReader::Impl::ReadTable( return Status::OK(); } -FileReader::FileReader(MemoryPool* pool, std::unique_ptr<ParquetFileReader> reader) - : impl_(new FileReader::Impl(pool, std::move(reader))) {} +Status FileReader::Impl::ReadTable(std::shared_ptr<Table>* table) { + std::vector<int> indices(reader_->metadata()->num_columns()); -FileReader::~FileReader() {} + for (size_t i = 0; i < indices.size(); ++i) { + indices[i] = i; + } + return ReadTable(indices, table); +} + +Status FileReader::Impl::ReadRowGroup(int i, std::shared_ptr<Table>* table) { + std::vector<int> indices(reader_->metadata()->num_columns()); + + for (size_t i = 0; i < indices.size(); ++i) { + indices[i] = i; + } + return ReadRowGroup(i, indices, table); +} // Static ctor Status OpenFile(const std::shared_ptr<::arrow::io::ReadableFileInterface>& file, @@ -280,14 +427,35 @@ Status FileReader::ReadTable(std::shared_ptr<Table>* out) { } Status FileReader::ReadTable( - const std::vector<int>& column_indices, std::shared_ptr<Table>* out) { + const std::vector<int>& indices, std::shared_ptr<Table>* out) { + try { + return impl_->ReadTable(indices, out); + } catch (const ::parquet::ParquetException& e) { + return ::arrow::Status::IOError(e.what()); + } +} + +Status FileReader::ReadRowGroup(int i, std::shared_ptr<Table>* out) { + try { + return impl_->ReadRowGroup(i, out); + } catch (const ::parquet::ParquetException& e) { + return ::arrow::Status::IOError(e.what()); + } +} + +Status FileReader::ReadRowGroup( + int i, const std::vector<int>& indices, std::shared_ptr<Table>* out) { try { - return impl_->ReadTable(column_indices, out); + return impl_->ReadRowGroup(i, indices, out); } catch (const ::parquet::ParquetException& e) { return ::arrow::Status::IOError(e.what()); } } +int FileReader::num_row_groups() const { + return impl_->num_row_groups(); +} + void FileReader::set_num_threads(int num_threads) { impl_->set_num_threads(num_threads); } @@ -296,20 +464,6 @@ const ParquetFileReader* FileReader::parquet_reader() const { return impl_->parquet_reader(); } -ColumnReader::Impl::Impl(MemoryPool* pool, const ColumnDescriptor* descr, - ParquetFileReader* reader, int column_index) - : pool_(pool), - descr_(descr), - reader_(reader), - column_index_(column_index), - next_row_group_(0), - values_buffer_(pool), - def_levels_buffer_(pool), - rep_levels_buffer_(pool) { - NodeToField(descr_->schema_node(), &field_); - NextRowGroup(); -} - template <typename ArrowType, typename ParquetType> Status ColumnReader::Impl::ReadNonNullableBatch(TypedColumnReader<ParquetType>* reader, int64_t values_to_read, int64_t* levels_read) { @@ -563,7 +717,7 @@ Status ColumnReader::Impl::WrapIntoListArray(const int16_t* def_levels, if (descr_->max_repetition_level() > 0) { std::shared_ptr<::arrow::Schema> arrow_schema; RETURN_NOT_OK( - FromParquetSchema(reader_->metadata()->schema(), {column_index_}, &arrow_schema)); + FromParquetSchema(input_->schema(), {input_->column_index()}, &arrow_schema)); // Walk downwards to extract nullability std::shared_ptr<Field> current_field = arrow_schema->field(0); @@ -912,12 +1066,7 @@ Status ColumnReader::Impl::NextBatch(int batch_size, std::shared_ptr<Array>* out } void ColumnReader::Impl::NextRowGroup() { - if (next_row_group_ < reader_->metadata()->num_row_groups()) { - column_reader_ = reader_->RowGroup(next_row_group_)->Column(column_index_); - next_row_group_++; - } else { - column_reader_ = nullptr; - } + column_reader_ = input_->Next(); } ColumnReader::ColumnReader(std::unique_ptr<Impl> impl) : impl_(std::move(impl)) {} http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/d0646659/src/parquet/arrow/reader.h ---------------------------------------------------------------------- diff --git a/src/parquet/arrow/reader.h b/src/parquet/arrow/reader.h index 1aa9c3e..f12acaf 100644 --- a/src/parquet/arrow/reader.h +++ b/src/parquet/arrow/reader.h @@ -107,6 +107,13 @@ class PARQUET_EXPORT FileReader { ::arrow::Status ReadTable( const std::vector<int>& column_indices, std::shared_ptr<::arrow::Table>* out); + ::arrow::Status ReadRowGroup(int i, const std::vector<int>& column_indices, + std::shared_ptr<::arrow::Table>* out); + + ::arrow::Status ReadRowGroup(int i, std::shared_ptr<::arrow::Table>* out); + + int num_row_groups() const; + const ParquetFileReader* parquet_reader() const; /// Set the number of threads to use during reads of multiple columns. By http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/d0646659/src/parquet/arrow/test-util.h ---------------------------------------------------------------------- diff --git a/src/parquet/arrow/test-util.h b/src/parquet/arrow/test-util.h index 2cfc60a..bff952b 100644 --- a/src/parquet/arrow/test-util.h +++ b/src/parquet/arrow/test-util.h @@ -260,12 +260,18 @@ Status MakeListArary(const std::shared_ptr<Array>& values, int64_t size, return Status::OK(); } -std::shared_ptr<::arrow::Column> MakeColumn( +static std::shared_ptr<::arrow::Column> MakeColumn( const std::string& name, const std::shared_ptr<Array>& array, bool nullable) { auto field = std::make_shared<::arrow::Field>(name, array->type(), nullable); return std::make_shared<::arrow::Column>(field, array); } +static std::shared_ptr<::arrow::Column> MakeColumn(const std::string& name, + const std::vector<std::shared_ptr<Array>>& arrays, bool nullable) { + auto field = std::make_shared<::arrow::Field>(name, arrays[0]->type(), nullable); + return std::make_shared<::arrow::Column>(field, arrays); +} + std::shared_ptr<::arrow::Table> MakeSimpleTable( const std::shared_ptr<Array>& values, bool nullable) { std::shared_ptr<::arrow::Column> column = MakeColumn("col", values, nullable); http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/d0646659/src/parquet/column/writer.cc ---------------------------------------------------------------------- diff --git a/src/parquet/column/writer.cc b/src/parquet/column/writer.cc index 2ba4162..eb74147 100644 --- a/src/parquet/column/writer.cc +++ b/src/parquet/column/writer.cc @@ -213,7 +213,7 @@ TypedColumnWriter<Type>::TypedColumnWriter(ColumnChunkMetaDataBuilder* metadata, const WriterProperties* properties) : ColumnWriter(metadata, std::move(pager), expected_rows, (encoding == Encoding::PLAIN_DICTIONARY || - encoding == Encoding::RLE_DICTIONARY), + encoding == Encoding::RLE_DICTIONARY), encoding, properties) { switch (encoding) { case Encoding::PLAIN:
