Repository: parquet-cpp Updated Branches: refs/heads/master fecdcbf69 -> 56fbdb63b
PARQUET-835: Read Arrow columns in parallel with thread pool Also implements PARQUET-836, but need to add a unit test for that Author: Wes McKinney <[email protected]> Closes #222 from wesm/PARQUET-835 and squashes the following commits: 71c700e [Wes McKinney] Add missing include. Update Arrow version 638b4c0 [Wes McKinney] cpplint 7c79ca7 [Wes McKinney] Read Arrow columns in parallel with thread pool Project: http://git-wip-us.apache.org/repos/asf/parquet-cpp/repo Commit: http://git-wip-us.apache.org/repos/asf/parquet-cpp/commit/56fbdb63 Tree: http://git-wip-us.apache.org/repos/asf/parquet-cpp/tree/56fbdb63 Diff: http://git-wip-us.apache.org/repos/asf/parquet-cpp/diff/56fbdb63 Branch: refs/heads/master Commit: 56fbdb63b908b38d51965d213db5a6ec47ffa9ca Parents: fecdcbf Author: Wes McKinney <[email protected]> Authored: Mon Jan 23 12:55:05 2017 -0500 Committer: Wes McKinney <[email protected]> Committed: Mon Jan 23 12:55:05 2017 -0500 ---------------------------------------------------------------------- cmake_modules/ThirdpartyToolchain.cmake | 2 +- src/parquet/arrow/arrow-reader-writer-test.cc | 49 +++++++++++- src/parquet/arrow/reader.cc | 90 ++++++++++++++++++++-- src/parquet/arrow/reader.h | 12 +++ src/parquet/arrow/schema.cc | 19 ++++- src/parquet/arrow/schema.h | 4 + 6 files changed, 164 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/56fbdb63/cmake_modules/ThirdpartyToolchain.cmake ---------------------------------------------------------------------- diff --git a/cmake_modules/ThirdpartyToolchain.cmake b/cmake_modules/ThirdpartyToolchain.cmake index 9a17dcf..8fc1b78 100644 --- a/cmake_modules/ThirdpartyToolchain.cmake +++ b/cmake_modules/ThirdpartyToolchain.cmake @@ -22,7 +22,7 @@ set(THRIFT_VERSION "0.9.1") # Brotli 0.5.2 does not install headers/libraries yet, but 0.6.0.dev does set(BROTLI_VERSION "5db62dcc9d386579609540cdf8869e95ad334bbd") -set(ARROW_VERSION "7d3e2a3ab90324625b738e464a020758379f457a") +set(ARROW_VERSION "085c8754b0ab2da7fcd245fc88bc4de9a6806a4c") # find boost headers and libs set(Boost_DEBUG TRUE) http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/56fbdb63/src/parquet/arrow/arrow-reader-writer-test.cc ---------------------------------------------------------------------- diff --git a/src/parquet/arrow/arrow-reader-writer-test.cc b/src/parquet/arrow/arrow-reader-writer-test.cc index 57986de..6748a8d 100644 --- a/src/parquet/arrow/arrow-reader-writer-test.cc +++ b/src/parquet/arrow/arrow-reader-writer-test.cc @@ -17,6 +17,8 @@ #include "gtest/gtest.h" +#include <sstream> + #include "parquet/api/reader.h" #include "parquet/api/writer.h" @@ -44,7 +46,6 @@ using parquet::schema::NodePtr; using parquet::schema::PrimitiveNode; namespace parquet { - namespace arrow { const int SMALL_SIZE = 100; @@ -184,6 +185,23 @@ using ParquetDataType = DataType<test_traits<T>::parquet_enum>; template <typename T> using ParquetWriter = TypedColumnWriter<ParquetDataType<T>>; +void DoTableRoundtrip( + const std::shared_ptr<Table>& table, int num_threads, std::shared_ptr<Table>* out) { + auto sink = std::make_shared<InMemoryOutputStream>(); + + ASSERT_OK_NO_THROW(WriteFlatTable( + table.get(), ::arrow::default_memory_pool(), sink, (table->num_rows() + 1) / 2)); + + std::shared_ptr<Buffer> buffer = sink->GetBuffer(); + std::unique_ptr<FileReader> reader; + ASSERT_OK_NO_THROW( + OpenFile(std::make_shared<BufferReader>(buffer), ::arrow::default_memory_pool(), + ::parquet::default_reader_properties(), nullptr, &reader)); + + reader->set_num_threads(num_threads); + ASSERT_OK_NO_THROW(reader->ReadFlatTable(out)); +} + template <typename TestType> class TestParquetIO : public ::testing::Test { public: @@ -642,6 +660,33 @@ TYPED_TEST(TestPrimitiveParquetIO, SingleColumnRequiredChunkedTableRead) { this->CheckSingleColumnRequiredTableRead(4); } -} // namespace arrow +TEST(TestArrowReadWrite, MultithreadedRead) { + const int num_columns = 20; + const int num_rows = 1000; + const int num_threads = 4; + + std::shared_ptr<::arrow::Column> column; + std::vector<std::shared_ptr<::arrow::Column>> columns(num_columns); + std::vector<std::shared_ptr<::arrow::Field>> fields(num_columns); + + std::shared_ptr<Array> values; + for (int i = 0; i < num_columns; ++i) { + ASSERT_OK(NullableArray<::arrow::DoubleType>(num_rows, num_rows / 10, &values)); + std::stringstream ss; + ss << "col" << i; + column = MakeColumn(ss.str(), values, true); + + columns[i] = column; + fields[i] = column->field(); + } + auto schema = std::make_shared<::arrow::Schema>(fields); + auto table = std::make_shared<Table>("schema", schema, columns); + std::shared_ptr<Table> result; + DoTableRoundtrip(table, num_threads, &result); + + ASSERT_TRUE(table->Equals(result)); +} + +} // namespace arrow } // namespace parquet http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/56fbdb63/src/parquet/arrow/reader.cc ---------------------------------------------------------------------- diff --git a/src/parquet/arrow/reader.cc b/src/parquet/arrow/reader.cc index c9f986a..9221041 100644 --- a/src/parquet/arrow/reader.cc +++ b/src/parquet/arrow/reader.cc @@ -18,9 +18,12 @@ #include "parquet/arrow/reader.h" #include <algorithm> +#include <atomic> #include <chrono> +#include <mutex> #include <queue> #include <string> +#include <thread> #include <vector> #include "parquet/arrow/schema.h" @@ -65,11 +68,17 @@ class FileReader::Impl { Status GetFlatColumn(int i, std::unique_ptr<FlatColumnReader>* out); Status ReadFlatColumn(int i, std::shared_ptr<Array>* out); Status ReadFlatTable(std::shared_ptr<Table>* out); + Status ReadFlatTable( + const std::vector<int>& column_indices, std::shared_ptr<Table>* out); const ParquetFileReader* parquet_reader() const { return reader_.get(); } + void set_num_threads(int num_threads) { num_threads_ = num_threads; } + private: MemoryPool* pool_; std::unique_ptr<ParquetFileReader> reader_; + + int num_threads_; }; class FlatColumnReader::Impl { @@ -125,7 +134,7 @@ class FlatColumnReader::Impl { }; FileReader::Impl::Impl(MemoryPool* pool, std::unique_ptr<ParquetFileReader> reader) - : pool_(pool), reader_(std::move(reader)) {} + : pool_(pool), reader_(std::move(reader)), num_threads_(1) {} bool FileReader::Impl::CheckForFlatColumn(const ColumnDescriptor* descr) { if ((descr->max_repetition_level() > 0) || (descr->max_definition_level() > 1)) { @@ -156,19 +165,73 @@ Status FileReader::Impl::ReadFlatColumn(int i, std::shared_ptr<Array>* out) { } Status FileReader::Impl::ReadFlatTable(std::shared_ptr<Table>* table) { + std::vector<int> column_indices(reader_->metadata()->num_columns()); + + for (size_t i = 0; i < column_indices.size(); ++i) { + column_indices[i] = i; + } + return ReadFlatTable(column_indices, table); +} + +template <class FUNCTION> +Status ParallelFor(int nthreads, int num_tasks, FUNCTION&& func) { + std::vector<std::thread> thread_pool; + thread_pool.reserve(nthreads); + std::atomic<int> task_counter(0); + + std::mutex error_mtx; + bool error_occurred = false; + Status error; + + for (int thread_id = 0; thread_id < nthreads; ++thread_id) { + thread_pool.emplace_back( + [&num_tasks, &task_counter, &error, &error_occurred, &error_mtx, &func]() { + int task_id; + while (!error_occurred) { + task_id = task_counter.fetch_add(1); + if (task_id >= num_tasks) { break; } + Status s = func(task_id); + if (!s.ok()) { + std::lock_guard<std::mutex> lock(error_mtx); + error_occurred = true; + error = s; + break; + } + } + }); + } + for (auto&& thread : thread_pool) { + thread.join(); + } + if (error_occurred) { return error; } + return Status::OK(); +} + +Status FileReader::Impl::ReadFlatTable( + const std::vector<int>& indices, std::shared_ptr<Table>* table) { auto descr = reader_->metadata()->schema(); const std::string& name = descr->name(); std::shared_ptr<::arrow::Schema> schema; - RETURN_NOT_OK(FromParquetSchema(descr, &schema)); - - int num_columns = reader_->metadata()->num_columns(); + RETURN_NOT_OK(FromParquetSchema(descr, indices, &schema)); + int num_columns = static_cast<int>(indices.size()); + int nthreads = std::min<int>(num_threads_, num_columns); std::vector<std::shared_ptr<Column>> columns(num_columns); - for (int i = 0; i < num_columns; i++) { + + auto ReadColumn = [&indices, &schema, &columns, this](int i) { std::shared_ptr<Array> array; - RETURN_NOT_OK(ReadFlatColumn(i, &array)); - columns[i] = std::make_shared<Column>(schema->field(i), array); + RETURN_NOT_OK(ReadFlatColumn(indices[i], &array)); + columns[i] = std::make_shared<Column>(schema->field(indices[i]), array); + return Status::OK(); + }; + + if (nthreads == 1) { + for (int i = 0; i < num_columns; i++) { + RETURN_NOT_OK(ReadColumn(i)); + } + } else { + RETURN_NOT_OK(ParallelFor(nthreads, num_columns, ReadColumn)); } *table = std::make_shared<Table>(name, schema, columns); @@ -218,6 +281,19 @@ Status FileReader::ReadFlatTable(std::shared_ptr<Table>* out) { } } +Status FileReader::ReadFlatTable( + const std::vector<int>& column_indices, std::shared_ptr<Table>* out) { + try { + return impl_->ReadFlatTable(column_indices, out); + } catch (const ::parquet::ParquetException& e) { + return ::arrow::Status::IOError(e.what()); + } +} + +void FileReader::set_num_threads(int num_threads) { + impl_->set_num_threads(num_threads); +} + const ParquetFileReader* FileReader::parquet_reader() const { return impl_->parquet_reader(); } http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/56fbdb63/src/parquet/arrow/reader.h ---------------------------------------------------------------------- diff --git a/src/parquet/arrow/reader.h b/src/parquet/arrow/reader.h index 518ae4b..934b826 100644 --- a/src/parquet/arrow/reader.h +++ b/src/parquet/arrow/reader.h @@ -19,6 +19,7 @@ #define PARQUET_ARROW_READER_H #include <memory> +#include <vector> #include "parquet/api/reader.h" #include "parquet/api/schema.h" @@ -94,13 +95,24 @@ class PARQUET_EXPORT FileReader { // // Returns error status if the column of interest is not flat. ::arrow::Status GetFlatColumn(int i, std::unique_ptr<FlatColumnReader>* out); + // Read column as a whole into an Array. ::arrow::Status ReadFlatColumn(int i, std::shared_ptr<::arrow::Array>* out); + // Read a table of flat columns into a Table. ::arrow::Status ReadFlatTable(std::shared_ptr<::arrow::Table>* out); + // Read a table of flat columns into a Table. Read only the indicated column + // indices (relative to the schema) + ::arrow::Status ReadFlatTable( + const std::vector<int>& column_indices, std::shared_ptr<::arrow::Table>* out); + const ParquetFileReader* parquet_reader() const; + /// Set the number of threads to use during reads of multiple columns. By + /// default only 1 thread is used + void set_num_threads(int num_threads); + virtual ~FileReader(); private: http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/56fbdb63/src/parquet/arrow/schema.cc ---------------------------------------------------------------------- diff --git a/src/parquet/arrow/schema.cc b/src/parquet/arrow/schema.cc index b086b9e..4f17f5e 100644 --- a/src/parquet/arrow/schema.cc +++ b/src/parquet/arrow/schema.cc @@ -250,8 +250,6 @@ Status NodeToField(const NodePtr& node, std::shared_ptr<Field>* out) { Status FromParquetSchema( const SchemaDescriptor* parquet_schema, std::shared_ptr<::arrow::Schema>* out) { - // TODO(wesm): Consider adding an arrow::Schema name attribute, which comes - // from the root Parquet node const GroupNode* schema_node = parquet_schema->group_node(); std::vector<std::shared_ptr<Field>> fields(schema_node->field_count()); @@ -263,6 +261,23 @@ Status FromParquetSchema( return Status::OK(); } +Status FromParquetSchema(const SchemaDescriptor* parquet_schema, + const std::vector<int>& column_indices, std::shared_ptr<::arrow::Schema>* out) { + // TODO(wesm): Consider adding an arrow::Schema name attribute, which comes + // from the root Parquet node + const GroupNode* schema_node = parquet_schema->group_node(); + + int num_fields = static_cast<int>(column_indices.size()); + + std::vector<std::shared_ptr<Field>> fields(num_fields); + for (int i = 0; i < num_fields; i++) { + RETURN_NOT_OK(NodeToField(schema_node->field(column_indices[i]), &fields[i])); + } + + *out = std::make_shared<::arrow::Schema>(fields); + return Status::OK(); +} + Status ListToNode(const std::shared_ptr<::arrow::ListType>& type, const std::string& name, bool nullable, const WriterProperties& properties, NodePtr* out) { Repetition::type repetition = nullable ? Repetition::OPTIONAL : Repetition::REQUIRED; http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/56fbdb63/src/parquet/arrow/schema.h ---------------------------------------------------------------------- diff --git a/src/parquet/arrow/schema.h b/src/parquet/arrow/schema.h index 6917b90..bb77a4e 100644 --- a/src/parquet/arrow/schema.h +++ b/src/parquet/arrow/schema.h @@ -19,6 +19,7 @@ #define PARQUET_ARROW_SCHEMA_H #include <memory> +#include <vector> #include "arrow/schema.h" #include "arrow/type.h" @@ -40,6 +41,9 @@ namespace arrow { ::arrow::Status PARQUET_EXPORT NodeToField( const schema::NodePtr& node, std::shared_ptr<::arrow::Field>* out); +::arrow::Status PARQUET_EXPORT FromParquetSchema(const SchemaDescriptor* parquet_schema, + const std::vector<int>& column_indices, std::shared_ptr<::arrow::Schema>* out); + ::arrow::Status PARQUET_EXPORT FromParquetSchema( const SchemaDescriptor* parquet_schema, std::shared_ptr<::arrow::Schema>* out);
