Repository: parquet-cpp Updated Branches: refs/heads/master b5e8cc430 -> 21ad2c397
PARQUET-953: Add static constructors to arrow::FileWriter for initializing from schema, add WriteTable method I preserved the existing WriteTable top level methods, but this will unblock ARROW-528 Author: Wes McKinney <[email protected]> Closes #296 from wesm/PARQUET-953 and squashes the following commits: 127edaa [Wes McKinney] Make FileWriter ctor public again 7c921f3 [Wes McKinney] cpplint b825f0b [Wes McKinney] Add static constructors to arrow::FileWriter for initializing from arrow::Schema Project: http://git-wip-us.apache.org/repos/asf/parquet-cpp/repo Commit: http://git-wip-us.apache.org/repos/asf/parquet-cpp/commit/21ad2c39 Tree: http://git-wip-us.apache.org/repos/asf/parquet-cpp/tree/21ad2c39 Diff: http://git-wip-us.apache.org/repos/asf/parquet-cpp/diff/21ad2c39 Branch: refs/heads/master Commit: 21ad2c3979e0fa973b271a94103919bbded20b1a Parents: b5e8cc4 Author: Wes McKinney <[email protected]> Authored: Thu Apr 13 11:13:50 2017 -0400 Committer: Wes McKinney <[email protected]> Committed: Thu Apr 13 11:13:50 2017 -0400 ---------------------------------------------------------------------- src/parquet/arrow/arrow-reader-writer-test.cc | 23 ++++---- src/parquet/arrow/arrow-schema-test.cc | 68 +++++++++------------- src/parquet/arrow/schema.cc | 41 +++++-------- src/parquet/arrow/writer.cc | 50 +++++++++++----- src/parquet/arrow/writer.h | 22 ++++++- 5 files changed, 106 insertions(+), 98 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/21ad2c39/src/parquet/arrow/arrow-reader-writer-test.cc ---------------------------------------------------------------------- diff --git a/src/parquet/arrow/arrow-reader-writer-test.cc b/src/parquet/arrow/arrow-reader-writer-test.cc index 2f8f421..0bdc14d 100644 --- a/src/parquet/arrow/arrow-reader-writer-test.cc +++ b/src/parquet/arrow/arrow-reader-writer-test.cc @@ -23,9 +23,9 @@ #include "parquet/api/writer.h" #include "parquet/arrow/reader.h" +#include "parquet/arrow/schema.h" #include "parquet/arrow/test-util.h" #include "parquet/arrow/writer.h" -#include "parquet/arrow/schema.h" #include "parquet/file/writer.h" @@ -890,17 +890,17 @@ class TestNestedSchemaRead : public ::testing::Test { void InitReader(std::shared_ptr<FileReader>* out) { std::shared_ptr<Buffer> buffer = nested_parquet_->GetBuffer(); std::unique_ptr<FileReader> reader; - ASSERT_OK_NO_THROW(OpenFile( - std::make_shared<BufferReader>(buffer), ::arrow::default_memory_pool(), - ::parquet::default_reader_properties(), nullptr, &reader)); + ASSERT_OK_NO_THROW( + OpenFile(std::make_shared<BufferReader>(buffer), ::arrow::default_memory_pool(), + ::parquet::default_reader_properties(), nullptr, &reader)); *out = std::move(reader); } void InitNewParquetFile(const std::shared_ptr<GroupNode>& schema, int num_rows) { nested_parquet_ = std::make_shared<InMemoryOutputStream>(); - writer_ = parquet::ParquetFileWriter::Open(nested_parquet_, - schema, default_writer_properties()); + writer_ = parquet::ParquetFileWriter::Open( + nested_parquet_, schema, default_writer_properties()); row_group_writer_ = writer_->AppendRowGroup(num_rows); } @@ -920,14 +920,11 @@ class TestNestedSchemaRead : public ::testing::Test { // } // required int32 leaf3; + parquet_fields.push_back(GroupNode::Make("group1", Repetition::REQUIRED, + {PrimitiveNode::Make("leaf1", Repetition::REQUIRED, ParquetType::INT32), + PrimitiveNode::Make("leaf2", Repetition::REQUIRED, ParquetType::INT32)})); parquet_fields.push_back( - GroupNode::Make("group1", Repetition::REQUIRED, { - PrimitiveNode::Make( - "leaf1", Repetition::REQUIRED, ParquetType::INT32), - PrimitiveNode::Make( - "leaf2", Repetition::REQUIRED, ParquetType::INT32)})); - parquet_fields.push_back(PrimitiveNode::Make( - "leaf3", Repetition::REQUIRED, ParquetType::INT32)); + PrimitiveNode::Make("leaf3", Repetition::REQUIRED, ParquetType::INT32)); const int num_columns = 3; auto schema_node = GroupNode::Make("schema", Repetition::REQUIRED, parquet_fields); http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/21ad2c39/src/parquet/arrow/arrow-schema-test.cc ---------------------------------------------------------------------- diff --git a/src/parquet/arrow/arrow-schema-test.cc b/src/parquet/arrow/arrow-schema-test.cc index 96de92e..85578ac 100644 --- a/src/parquet/arrow/arrow-schema-test.cc +++ b/src/parquet/arrow/arrow-schema-test.cc @@ -73,8 +73,8 @@ class TestConvertParquetSchema : public ::testing::Test { return FromParquetSchema(&descr_, &result_schema_); } - ::arrow::Status ConvertSchema(const std::vector<NodePtr>& nodes, - const std::vector<int>& column_indices) { + ::arrow::Status ConvertSchema( + const std::vector<NodePtr>& nodes, const std::vector<int>& column_indices) { NodePtr schema = GroupNode::Make("schema", Repetition::REPEATED, nodes); descr_.Init(schema); return FromParquetSchema(&descr_, column_indices, &result_schema_); @@ -365,18 +365,14 @@ TEST_F(TestConvertParquetSchema, ParquetNestedSchema) { // } // required int64 leaf3; { + parquet_fields.push_back(GroupNode::Make("group1", Repetition::REQUIRED, + {PrimitiveNode::Make("leaf1", Repetition::REQUIRED, ParquetType::BOOLEAN), + PrimitiveNode::Make("leaf2", Repetition::REQUIRED, ParquetType::INT32)})); parquet_fields.push_back( - GroupNode::Make("group1", Repetition::REQUIRED, { - PrimitiveNode::Make( - "leaf1", Repetition::REQUIRED, ParquetType::BOOLEAN), - PrimitiveNode::Make( - "leaf2", Repetition::REQUIRED, ParquetType::INT32)})); - parquet_fields.push_back(PrimitiveNode::Make( - "leaf3", Repetition::REQUIRED, ParquetType::INT64)); - - auto group1_fields = { - std::make_shared<Field>("leaf1", BOOL, false), - std::make_shared<Field>("leaf2", INT32, false)}; + PrimitiveNode::Make("leaf3", Repetition::REQUIRED, ParquetType::INT64)); + + auto group1_fields = {std::make_shared<Field>("leaf1", BOOL, false), + std::make_shared<Field>("leaf2", INT32, false)}; auto arrow_group1_type = std::make_shared<::arrow::StructType>(group1_fields); arrow_fields.push_back(std::make_shared<Field>("group1", arrow_group1_type, false)); arrow_fields.push_back(std::make_shared<Field>("leaf3", INT64, false)); @@ -412,20 +408,14 @@ TEST_F(TestConvertParquetSchema, ParquetNestedSchemaPartial) { // } // required int64 leaf5; { + parquet_fields.push_back(GroupNode::Make("group1", Repetition::REQUIRED, + {PrimitiveNode::Make("leaf1", Repetition::REQUIRED, ParquetType::INT64), + PrimitiveNode::Make("leaf2", Repetition::REQUIRED, ParquetType::INT64)})); + parquet_fields.push_back(GroupNode::Make("group2", Repetition::REQUIRED, + {PrimitiveNode::Make("leaf3", Repetition::REQUIRED, ParquetType::INT64), + PrimitiveNode::Make("leaf4", Repetition::REQUIRED, ParquetType::INT64)})); parquet_fields.push_back( - GroupNode::Make("group1", Repetition::REQUIRED, { - PrimitiveNode::Make( - "leaf1", Repetition::REQUIRED, ParquetType::INT64), - PrimitiveNode::Make( - "leaf2", Repetition::REQUIRED, ParquetType::INT64)})); - parquet_fields.push_back( - GroupNode::Make("group2", Repetition::REQUIRED, { - PrimitiveNode::Make( - "leaf3", Repetition::REQUIRED, ParquetType::INT64), - PrimitiveNode::Make( - "leaf4", Repetition::REQUIRED, ParquetType::INT64)})); - parquet_fields.push_back(PrimitiveNode::Make( - "leaf5", Repetition::REQUIRED, ParquetType::INT64)); + PrimitiveNode::Make("leaf5", Repetition::REQUIRED, ParquetType::INT64)); auto group1_fields = {std::make_shared<Field>("leaf1", INT64, false)}; auto arrow_group1_type = std::make_shared<::arrow::StructType>(group1_fields); @@ -456,26 +446,24 @@ TEST_F(TestConvertParquetSchema, ParquetRepeatedNestedSchema) { // } parquet_fields.push_back( PrimitiveNode::Make("leaf1", Repetition::OPTIONAL, ParquetType::INT32)); - parquet_fields.push_back( - GroupNode::Make("outerGroup", Repetition::REPEATED, { - PrimitiveNode::Make( - "leaf2", Repetition::OPTIONAL, ParquetType::INT32), - GroupNode::Make("innerGroup", Repetition::REPEATED, { - PrimitiveNode::Make( - "leaf3", Repetition::OPTIONAL, ParquetType::INT32)})})); + parquet_fields.push_back(GroupNode::Make("outerGroup", Repetition::REPEATED, + {PrimitiveNode::Make("leaf2", Repetition::OPTIONAL, ParquetType::INT32), + GroupNode::Make("innerGroup", Repetition::REPEATED, + {PrimitiveNode::Make( + "leaf3", Repetition::OPTIONAL, ParquetType::INT32)})})); auto inner_group_fields = {std::make_shared<Field>("leaf3", INT32, true)}; auto inner_group_type = std::make_shared<::arrow::StructType>(inner_group_fields); - auto outer_group_fields = { - std::make_shared<Field>("leaf2", INT32, true), - std::make_shared<Field>("innerGroup", ::arrow::list( - std::make_shared<Field>("innerGroup", inner_group_type, false)), false)}; + auto outer_group_fields = {std::make_shared<Field>("leaf2", INT32, true), + std::make_shared<Field>("innerGroup", + ::arrow::list(std::make_shared<Field>("innerGroup", inner_group_type, false)), + false)}; auto outer_group_type = std::make_shared<::arrow::StructType>(outer_group_fields); arrow_fields.push_back(std::make_shared<Field>("leaf1", INT32, true)); - arrow_fields.push_back( - std::make_shared<Field>("outerGroup", ::arrow::list( - std::make_shared<Field>("outerGroup", outer_group_type, false)), false)); + arrow_fields.push_back(std::make_shared<Field>("outerGroup", + ::arrow::list(std::make_shared<Field>("outerGroup", outer_group_type, false)), + false)); } auto arrow_schema = std::make_shared<::arrow::Schema>(arrow_fields); ASSERT_OK(ConvertSchema(parquet_fields)); http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/21ad2c39/src/parquet/arrow/schema.cc ---------------------------------------------------------------------- diff --git a/src/parquet/arrow/schema.cc b/src/parquet/arrow/schema.cc index e589581..2c74839 100644 --- a/src/parquet/arrow/schema.cc +++ b/src/parquet/arrow/schema.cc @@ -18,8 +18,8 @@ #include "parquet/arrow/schema.h" #include <string> -#include <vector> #include <unordered_set> +#include <vector> #include "parquet/api/schema.h" @@ -192,11 +192,9 @@ Status NodeToFieldInternal(const NodePtr& node, * Auxilary function to test if a parquet schema node is a leaf node * that should be included in a resulting arrow schema */ -inline bool IsIncludedLeaf(const NodePtr& node, - const std::unordered_set<NodePtr>* included_leaf_nodes) { - if (included_leaf_nodes == nullptr) { - return true; - } +inline bool IsIncludedLeaf( + const NodePtr& node, const std::unordered_set<NodePtr>* included_leaf_nodes) { + if (included_leaf_nodes == nullptr) { return true; } auto search = included_leaf_nodes->find(node); return (search != included_leaf_nodes->end()); } @@ -210,13 +208,9 @@ Status StructFromGroup(const GroupNode* group, for (int i = 0; i < group->field_count(); i++) { RETURN_NOT_OK(NodeToFieldInternal(group->field(i), included_leaf_nodes, &field)); - if (field != nullptr) { - fields.push_back(field); - } - } - if (fields.size() > 0) { - *out = std::make_shared<::arrow::StructType>(fields); + if (field != nullptr) { fields.push_back(field); } } + if (fields.size() > 0) { *out = std::make_shared<::arrow::StructType>(fields); } return Status::OK(); } @@ -240,12 +234,10 @@ Status NodeToList(const GroupNode* group, !str_endswith_tuple(list_node->name())) { // List of primitive type std::shared_ptr<Field> item_field; - RETURN_NOT_OK(NodeToFieldInternal( - list_group->field(0), included_leaf_nodes, &item_field)); + RETURN_NOT_OK( + NodeToFieldInternal(list_group->field(0), included_leaf_nodes, &item_field)); - if (item_field != nullptr) { - *out = ::arrow::list(item_field); - } + if (item_field != nullptr) { *out = ::arrow::list(item_field); } } else { // List of struct std::shared_ptr<::arrow::DataType> inner_type; @@ -260,7 +252,7 @@ Status NodeToList(const GroupNode* group, std::shared_ptr<::arrow::DataType> inner_type; if (IsIncludedLeaf(static_cast<NodePtr>(list_node), included_leaf_nodes)) { const PrimitiveNode* primitive = - static_cast<const PrimitiveNode*>(list_node.get()); + static_cast<const PrimitiveNode*>(list_node.get()); RETURN_NOT_OK(FromPrimitive(primitive, &inner_type)); auto item_field = std::make_shared<Field>(list_node->name(), inner_type, false); *out = ::arrow::list(item_field); @@ -282,7 +274,6 @@ Status NodeToField(const NodePtr& node, std::shared_ptr<Field>* out) { Status NodeToFieldInternal(const NodePtr& node, const std::unordered_set<NodePtr>* included_leaf_nodes, std::shared_ptr<Field>* out) { - std::shared_ptr<::arrow::DataType> type = nullptr; bool nullable = !node->is_required(); @@ -317,9 +308,7 @@ Status NodeToFieldInternal(const NodePtr& node, RETURN_NOT_OK(FromPrimitive(primitive, &type)); } } - if (type != nullptr) { - *out = std::make_shared<Field>(node->name(), type, nullable); - } + if (type != nullptr) { *out = std::make_shared<Field>(node->name(), type, nullable); } return Status::OK(); } @@ -354,11 +343,9 @@ Status FromParquetSchema(const SchemaDescriptor* parquet_schema, std::vector<std::shared_ptr<Field>> fields; std::shared_ptr<Field> field; for (int i = 0; i < schema_node->field_count(); i++) { - RETURN_NOT_OK(NodeToFieldInternal( - schema_node->field(i), &included_leaf_nodes, &field)); - if (field != nullptr) { - fields.push_back(field); - } + RETURN_NOT_OK( + NodeToFieldInternal(schema_node->field(i), &included_leaf_nodes, &field)); + if (field != nullptr) { fields.push_back(field); } } *out = std::make_shared<::arrow::Schema>(fields); http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/21ad2c39/src/parquet/arrow/writer.cc ---------------------------------------------------------------------- diff --git a/src/parquet/arrow/writer.cc b/src/parquet/arrow/writer.cc index 5933937..90cd135 100644 --- a/src/parquet/arrow/writer.cc +++ b/src/parquet/arrow/writer.cc @@ -496,9 +496,6 @@ Status FileWriter::Impl::Close() { return Status::OK(); } -FileWriter::FileWriter(MemoryPool* pool, std::unique_ptr<ParquetFileWriter> writer) - : impl_(new FileWriter::Impl(pool, std::move(writer))) {} - Status FileWriter::NewRowGroup(int64_t chunk_size) { return impl_->NewRowGroup(chunk_size); } @@ -589,16 +586,33 @@ MemoryPool* FileWriter::memory_pool() const { FileWriter::~FileWriter() {} -Status WriteTable(const Table& table, MemoryPool* pool, - const std::shared_ptr<OutputStream>& sink, int64_t chunk_size, - const std::shared_ptr<WriterProperties>& properties) { +FileWriter::FileWriter(MemoryPool* pool, std::unique_ptr<ParquetFileWriter> writer) + : impl_(new FileWriter::Impl(pool, std::move(writer))) {} + +Status FileWriter::Open(const ::arrow::Schema& schema, ::arrow::MemoryPool* pool, + const std::shared_ptr<OutputStream>& sink, + const std::shared_ptr<WriterProperties>& properties, + std::unique_ptr<FileWriter>* writer) { std::shared_ptr<SchemaDescriptor> parquet_schema; - RETURN_NOT_OK(ToParquetSchema(table.schema().get(), *properties, &parquet_schema)); + RETURN_NOT_OK(ToParquetSchema(&schema, *properties, &parquet_schema)); + auto schema_node = std::static_pointer_cast<GroupNode>(parquet_schema->schema_root()); - std::unique_ptr<ParquetFileWriter> parquet_writer = + std::unique_ptr<ParquetFileWriter> base_writer = ParquetFileWriter::Open(sink, schema_node, properties); - FileWriter writer(pool, std::move(parquet_writer)); + writer->reset(new FileWriter(pool, std::move(base_writer))); + return Status::OK(); +} + +Status FileWriter::Open(const ::arrow::Schema& schema, ::arrow::MemoryPool* pool, + const std::shared_ptr<::arrow::io::OutputStream>& sink, + const std::shared_ptr<WriterProperties>& properties, + std::unique_ptr<FileWriter>* writer) { + auto wrapper = std::make_shared<ArrowOutputStream>(sink); + return Open(schema, pool, wrapper, properties, writer); +} + +Status FileWriter::WriteTable(const Table& table, int64_t chunk_size) { // TODO(ARROW-232) Support writing chunked arrays. for (int i = 0; i < table.num_columns(); i++) { if (table.column(i)->data()->num_chunks() != 1) { @@ -609,19 +623,26 @@ Status WriteTable(const Table& table, MemoryPool* pool, for (int chunk = 0; chunk * chunk_size < table.num_rows(); chunk++) { int64_t offset = chunk * chunk_size; int64_t size = std::min(chunk_size, table.num_rows() - offset); - RETURN_NOT_OK_ELSE(writer.NewRowGroup(size), PARQUET_IGNORE_NOT_OK(writer.Close())); + RETURN_NOT_OK_ELSE(NewRowGroup(size), PARQUET_IGNORE_NOT_OK(Close())); for (int i = 0; i < table.num_columns(); i++) { std::shared_ptr<Array> array = table.column(i)->data()->chunk(0); array = array->Slice(offset, size); - RETURN_NOT_OK_ELSE( - writer.WriteColumnChunk(*array), PARQUET_IGNORE_NOT_OK(writer.Close())); + RETURN_NOT_OK_ELSE(WriteColumnChunk(*array), PARQUET_IGNORE_NOT_OK(Close())); } } + return Status::OK(); +} - return writer.Close(); +Status WriteTable(const ::arrow::Table& table, ::arrow::MemoryPool* pool, + const std::shared_ptr<OutputStream>& sink, int64_t chunk_size, + const std::shared_ptr<WriterProperties>& properties) { + std::unique_ptr<FileWriter> writer; + RETURN_NOT_OK(FileWriter::Open(*table.schema(), pool, sink, properties, &writer)); + RETURN_NOT_OK(writer->WriteTable(table, chunk_size)); + return writer->Close(); } -Status WriteTable(const Table& table, MemoryPool* pool, +Status WriteTable(const ::arrow::Table& table, ::arrow::MemoryPool* pool, const std::shared_ptr<::arrow::io::OutputStream>& sink, int64_t chunk_size, const std::shared_ptr<WriterProperties>& properties) { auto wrapper = std::make_shared<ArrowOutputStream>(sink); @@ -629,5 +650,4 @@ Status WriteTable(const Table& table, MemoryPool* pool, } } // namespace arrow - } // namespace parquet http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/21ad2c39/src/parquet/arrow/writer.h ---------------------------------------------------------------------- diff --git a/src/parquet/arrow/writer.h b/src/parquet/arrow/writer.h index e3b281b..3916298 100644 --- a/src/parquet/arrow/writer.h +++ b/src/parquet/arrow/writer.h @@ -31,13 +31,13 @@ class Array; class MemoryPool; class PrimitiveArray; class RowBatch; +class Schema; class Status; class StringArray; class Table; -} +} // namespace arrow namespace parquet { - namespace arrow { /** @@ -49,6 +49,23 @@ class PARQUET_EXPORT FileWriter { public: FileWriter(::arrow::MemoryPool* pool, std::unique_ptr<ParquetFileWriter> writer); + static ::arrow::Status Open(const ::arrow::Schema& schema, ::arrow::MemoryPool* pool, + const std::shared_ptr<OutputStream>& sink, + const std::shared_ptr<WriterProperties>& properties, + std::unique_ptr<FileWriter>* writer); + + static ::arrow::Status Open(const ::arrow::Schema& schema, ::arrow::MemoryPool* pool, + const std::shared_ptr<::arrow::io::OutputStream>& sink, + const std::shared_ptr<WriterProperties>& properties, + std::unique_ptr<FileWriter>* writer); + + /** + * Write a Table to Parquet. + * + * The table shall only consist of columns of primitive type or of primitive lists. + */ + ::arrow::Status WriteTable(const ::arrow::Table& table, int64_t chunk_size); + ::arrow::Status NewRowGroup(int64_t chunk_size); ::arrow::Status WriteColumnChunk(const ::arrow::Array& data); ::arrow::Status Close(); @@ -78,7 +95,6 @@ class PARQUET_EXPORT FileWriter { const std::shared_ptr<WriterProperties>& properties = default_writer_properties()); } // namespace arrow - } // namespace parquet #endif // PARQUET_ARROW_WRITER_H
