Repository: parquet-cpp Updated Branches: refs/heads/master 6b22b4685 -> b5e8cc430
PARQUET-918: FromParquetSchema API crashes on nested schemas This is #275 with an Arrow API fix. Passing build: https://travis-ci.org/wesm/parquet-cpp/builds/220597810 Closes #275 Author: Itai Incze <[email protected]> Author: Wes McKinney <[email protected]> Closes #295 from wesm/PARQUET-918 and squashes the following commits: 02f55fd [Wes McKinney] Fix Arrow APIs a259750 [Itai Incze] Fixed: repeated group schema conversion bug 5fe3a01 [Itai Incze] fixed typos 1f7dec2 [Itai Incze] changed ReadTable tests to use API-fabricated parquet 69cc7a6 [Itai Incze] Improved FromParquetSchema tests and naming 34236b7 [Itai Incze] linting and readability 5ee1f44 [Itai Incze] Fix for [PARQUET-918] Project: http://git-wip-us.apache.org/repos/asf/parquet-cpp/repo Commit: http://git-wip-us.apache.org/repos/asf/parquet-cpp/commit/b5e8cc43 Tree: http://git-wip-us.apache.org/repos/asf/parquet-cpp/tree/b5e8cc43 Diff: http://git-wip-us.apache.org/repos/asf/parquet-cpp/diff/b5e8cc43 Branch: refs/heads/master Commit: b5e8cc4308fbf5565ce318a707d0c442f939a960 Parents: 6b22b46 Author: Itai Incze <[email protected]> Authored: Mon Apr 10 14:25:34 2017 -0400 Committer: Wes McKinney <[email protected]> Committed: Mon Apr 10 14:25:34 2017 -0400 ---------------------------------------------------------------------- src/parquet/arrow/arrow-reader-writer-test.cc | 105 ++++++++++++++++ src/parquet/arrow/arrow-schema-test.cc | 135 +++++++++++++++++++++ src/parquet/arrow/reader.cc | 10 +- src/parquet/arrow/schema.cc | 133 +++++++++++++++----- 4 files changed, 346 insertions(+), 37 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/b5e8cc43/src/parquet/arrow/arrow-reader-writer-test.cc ---------------------------------------------------------------------- diff --git a/src/parquet/arrow/arrow-reader-writer-test.cc b/src/parquet/arrow/arrow-reader-writer-test.cc index dd46893..2f8f421 100644 --- a/src/parquet/arrow/arrow-reader-writer-test.cc +++ b/src/parquet/arrow/arrow-reader-writer-test.cc @@ -25,6 +25,9 @@ #include "parquet/arrow/reader.h" #include "parquet/arrow/test-util.h" #include "parquet/arrow/writer.h" +#include "parquet/arrow/schema.h" + +#include "parquet/file/writer.h" #include "arrow/api.h" #include "arrow/test-util.h" @@ -45,6 +48,7 @@ using ParquetType = parquet::Type; using parquet::schema::GroupNode; using parquet::schema::NodePtr; using parquet::schema::PrimitiveNode; +using parquet::arrow::FromParquetSchema; namespace parquet { namespace arrow { @@ -875,5 +879,106 @@ TEST(TestArrowReadWrite, ReadColumnSubset) { ASSERT_TRUE(result->Equals(expected)); } +class TestNestedSchemaRead : public ::testing::Test { + protected: + virtual void SetUp() { + // We are using parquet low-level file api to create the nested parquet + CreateNestedParquet(); + InitReader(&reader_); + } + + void InitReader(std::shared_ptr<FileReader>* out) { + std::shared_ptr<Buffer> buffer = nested_parquet_->GetBuffer(); + std::unique_ptr<FileReader> reader; + ASSERT_OK_NO_THROW(OpenFile( + std::make_shared<BufferReader>(buffer), ::arrow::default_memory_pool(), + ::parquet::default_reader_properties(), nullptr, &reader)); + + *out = std::move(reader); + } + + void InitNewParquetFile(const std::shared_ptr<GroupNode>& schema, int num_rows) { + nested_parquet_ = std::make_shared<InMemoryOutputStream>(); + writer_ = parquet::ParquetFileWriter::Open(nested_parquet_, + schema, default_writer_properties()); + row_group_writer_ = writer_->AppendRowGroup(num_rows); + } + + void FinalizeParquetFile() { + row_group_writer_->Close(); + writer_->Close(); + } + + void CreateNestedParquet() { + std::vector<NodePtr> parquet_fields; + std::shared_ptr<Array> values; + + // create the schema: + // required group group1 { + // required int32 leaf1; + // required int32 leaf2; + // } + // required int32 leaf3; + + parquet_fields.push_back( + GroupNode::Make("group1", Repetition::REQUIRED, { + PrimitiveNode::Make( + "leaf1", Repetition::REQUIRED, ParquetType::INT32), + PrimitiveNode::Make( + "leaf2", Repetition::REQUIRED, ParquetType::INT32)})); + parquet_fields.push_back(PrimitiveNode::Make( + "leaf3", Repetition::REQUIRED, ParquetType::INT32)); + + const int num_columns = 3; + auto schema_node = GroupNode::Make("schema", Repetition::REQUIRED, parquet_fields); + + InitNewParquetFile(std::static_pointer_cast<GroupNode>(schema_node), 0); + + for (int i = 0; i < num_columns; i++) { + auto column_writer = row_group_writer_->NextColumn(); + auto typed_writer = reinterpret_cast<TypedColumnWriter<Int32Type>*>(column_writer); + typed_writer->WriteBatch(0, nullptr, nullptr, nullptr); + } + + FinalizeParquetFile(); + } + + std::shared_ptr<InMemoryOutputStream> nested_parquet_; + std::shared_ptr<FileReader> reader_; + std::unique_ptr<ParquetFileWriter> writer_; + RowGroupWriter* row_group_writer_; +}; + +TEST_F(TestNestedSchemaRead, ReadIntoTableFull) { + std::shared_ptr<Table> table; + ASSERT_OK_NO_THROW(reader_->ReadTable(&table)); + ASSERT_EQ(table->num_rows(), 0); + ASSERT_EQ(table->num_columns(), 2); + ASSERT_EQ(table->schema()->field(0)->type()->num_children(), 2); +} + +TEST_F(TestNestedSchemaRead, ReadTablePartial) { + std::shared_ptr<Table> table; + + // columns: {group1.leaf1, leaf3} + ASSERT_OK_NO_THROW(reader_->ReadTable({0, 2}, &table)); + ASSERT_EQ(table->num_rows(), 0); + ASSERT_EQ(table->num_columns(), 2); + ASSERT_EQ(table->schema()->field(0)->type()->num_children(), 1); + + // columns: {group1.leaf1, group1.leaf2} + ASSERT_OK_NO_THROW(reader_->ReadTable({0, 1}, &table)); + ASSERT_EQ(table->num_rows(), 0); + ASSERT_EQ(table->num_columns(), 1); + ASSERT_EQ(table->schema()->field(0)->type()->num_children(), 2); + + // columns: {leaf3} + ASSERT_OK_NO_THROW(reader_->ReadTable({2}, &table)); + ASSERT_EQ(table->num_rows(), 0); + ASSERT_EQ(table->num_columns(), 1); + ASSERT_EQ(table->schema()->field(0)->type()->num_children(), 0); +} + } // namespace arrow + } // namespace parquet http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/b5e8cc43/src/parquet/arrow/arrow-schema-test.cc ---------------------------------------------------------------------- diff --git a/src/parquet/arrow/arrow-schema-test.cc b/src/parquet/arrow/arrow-schema-test.cc index 83100d3..96de92e 100644 --- a/src/parquet/arrow/arrow-schema-test.cc +++ b/src/parquet/arrow/arrow-schema-test.cc @@ -73,6 +73,13 @@ class TestConvertParquetSchema : public ::testing::Test { return FromParquetSchema(&descr_, &result_schema_); } + ::arrow::Status ConvertSchema(const std::vector<NodePtr>& nodes, + const std::vector<int>& column_indices) { + NodePtr schema = GroupNode::Make("schema", Repetition::REPEATED, nodes); + descr_.Init(schema); + return FromParquetSchema(&descr_, column_indices, &result_schema_); + } + protected: SchemaDescriptor descr_; std::shared_ptr<::arrow::Schema> result_schema_; @@ -348,6 +355,134 @@ TEST_F(TestConvertParquetSchema, UnsupportedThings) { } } +TEST_F(TestConvertParquetSchema, ParquetNestedSchema) { + std::vector<NodePtr> parquet_fields; + std::vector<std::shared_ptr<Field>> arrow_fields; + + // required group group1 { + // required bool leaf1; + // required int32 leaf2; + // } + // required int64 leaf3; + { + parquet_fields.push_back( + GroupNode::Make("group1", Repetition::REQUIRED, { + PrimitiveNode::Make( + "leaf1", Repetition::REQUIRED, ParquetType::BOOLEAN), + PrimitiveNode::Make( + "leaf2", Repetition::REQUIRED, ParquetType::INT32)})); + parquet_fields.push_back(PrimitiveNode::Make( + "leaf3", Repetition::REQUIRED, ParquetType::INT64)); + + auto group1_fields = { + std::make_shared<Field>("leaf1", BOOL, false), + std::make_shared<Field>("leaf2", INT32, false)}; + auto arrow_group1_type = std::make_shared<::arrow::StructType>(group1_fields); + arrow_fields.push_back(std::make_shared<Field>("group1", arrow_group1_type, false)); + arrow_fields.push_back(std::make_shared<Field>("leaf3", INT64, false)); + } + + auto arrow_schema = std::make_shared<::arrow::Schema>(arrow_fields); + ASSERT_OK(ConvertSchema(parquet_fields)); + + CheckFlatSchema(arrow_schema); +} + +TEST_F(TestConvertParquetSchema, ParquetNestedSchemaPartial) { + std::vector<NodePtr> parquet_fields; + std::vector<std::shared_ptr<Field>> arrow_fields; + + // Full Parquet Schema: + // required group group1 { + // required int64 leaf1; + // required int64 leaf2; + // } + // required group group2 { + // required int64 leaf3; + // required int64 leaf4; + // } + // required int64 leaf5; + // + // Expected partial arrow schema (columns 0, 3, 4): + // required group group1 { + // required int64 leaf1; + // } + // required group group2 { + // required int64 leaf4; + // } + // required int64 leaf5; + { + parquet_fields.push_back( + GroupNode::Make("group1", Repetition::REQUIRED, { + PrimitiveNode::Make( + "leaf1", Repetition::REQUIRED, ParquetType::INT64), + PrimitiveNode::Make( + "leaf2", Repetition::REQUIRED, ParquetType::INT64)})); + parquet_fields.push_back( + GroupNode::Make("group2", Repetition::REQUIRED, { + PrimitiveNode::Make( + "leaf3", Repetition::REQUIRED, ParquetType::INT64), + PrimitiveNode::Make( + "leaf4", Repetition::REQUIRED, ParquetType::INT64)})); + parquet_fields.push_back(PrimitiveNode::Make( + "leaf5", Repetition::REQUIRED, ParquetType::INT64)); + + auto group1_fields = {std::make_shared<Field>("leaf1", INT64, false)}; + auto arrow_group1_type = std::make_shared<::arrow::StructType>(group1_fields); + auto group2_fields = {std::make_shared<Field>("leaf4", INT64, false)}; + auto arrow_group2_type = std::make_shared<::arrow::StructType>(group2_fields); + + arrow_fields.push_back(std::make_shared<Field>("group1", arrow_group1_type, false)); + arrow_fields.push_back(std::make_shared<Field>("group2", arrow_group2_type, false)); + arrow_fields.push_back(std::make_shared<Field>("leaf5", INT64, false)); + } + + auto arrow_schema = std::make_shared<::arrow::Schema>(arrow_fields); + ASSERT_OK(ConvertSchema(parquet_fields, {0, 3, 4})); + + CheckFlatSchema(arrow_schema); +} + +TEST_F(TestConvertParquetSchema, ParquetRepeatedNestedSchema) { + std::vector<NodePtr> parquet_fields; + std::vector<std::shared_ptr<Field>> arrow_fields; + { + // optional int32 leaf1; + // repeated group outerGroup { + // optional int32 leaf2; + // repeated group innerGroup { + // optional int32 leaf3; + // } + // } + parquet_fields.push_back( + PrimitiveNode::Make("leaf1", Repetition::OPTIONAL, ParquetType::INT32)); + parquet_fields.push_back( + GroupNode::Make("outerGroup", Repetition::REPEATED, { + PrimitiveNode::Make( + "leaf2", Repetition::OPTIONAL, ParquetType::INT32), + GroupNode::Make("innerGroup", Repetition::REPEATED, { + PrimitiveNode::Make( + "leaf3", Repetition::OPTIONAL, ParquetType::INT32)})})); + + auto inner_group_fields = {std::make_shared<Field>("leaf3", INT32, true)}; + auto inner_group_type = std::make_shared<::arrow::StructType>(inner_group_fields); + auto outer_group_fields = { + std::make_shared<Field>("leaf2", INT32, true), + std::make_shared<Field>("innerGroup", ::arrow::list( + std::make_shared<Field>("innerGroup", inner_group_type, false)), false)}; + auto outer_group_type = std::make_shared<::arrow::StructType>(outer_group_fields); + + arrow_fields.push_back(std::make_shared<Field>("leaf1", INT32, true)); + arrow_fields.push_back( + std::make_shared<Field>("outerGroup", ::arrow::list( + std::make_shared<Field>("outerGroup", outer_group_type, false)), false)); + } + auto arrow_schema = std::make_shared<::arrow::Schema>(arrow_fields); + ASSERT_OK(ConvertSchema(parquet_fields)); + + CheckFlatSchema(arrow_schema); +} + class TestConvertArrowSchema : public ::testing::Test { public: virtual void SetUp() {} http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/b5e8cc43/src/parquet/arrow/reader.cc ---------------------------------------------------------------------- diff --git a/src/parquet/arrow/reader.cc b/src/parquet/arrow/reader.cc index 2ca9207..38d5583 100644 --- a/src/parquet/arrow/reader.cc +++ b/src/parquet/arrow/reader.cc @@ -347,9 +347,9 @@ Status FileReader::Impl::ReadTable( std::shared_ptr<::arrow::Schema> schema; RETURN_NOT_OK(GetSchema(indices, &schema)); - int num_columns = static_cast<int>(indices.size()); - int nthreads = std::min<int>(num_threads_, num_columns); - std::vector<std::shared_ptr<Column>> columns(num_columns); + int num_fields = static_cast<int>(schema->num_fields()); + int nthreads = std::min<int>(num_threads_, num_fields); + std::vector<std::shared_ptr<Column>> columns(num_fields); auto ReadColumnFunc = [&indices, &schema, &columns, this](int i) { std::shared_ptr<Array> array; @@ -359,11 +359,11 @@ Status FileReader::Impl::ReadTable( }; if (nthreads == 1) { - for (int i = 0; i < num_columns; i++) { + for (int i = 0; i < num_fields; i++) { RETURN_NOT_OK(ReadColumnFunc(i)); } } else { - RETURN_NOT_OK(ParallelFor(nthreads, num_columns, ReadColumnFunc)); + RETURN_NOT_OK(ParallelFor(nthreads, num_fields, ReadColumnFunc)); } *table = std::make_shared<Table>(schema, columns); http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/b5e8cc43/src/parquet/arrow/schema.cc ---------------------------------------------------------------------- diff --git a/src/parquet/arrow/schema.cc b/src/parquet/arrow/schema.cc index 76b7f77..e589581 100644 --- a/src/parquet/arrow/schema.cc +++ b/src/parquet/arrow/schema.cc @@ -19,6 +19,7 @@ #include <string> #include <vector> +#include <unordered_set> #include "parquet/api/schema.h" @@ -183,12 +184,39 @@ Status FromPrimitive(const PrimitiveNode* primitive, TypePtr* out) { return Status::OK(); } -Status StructFromGroup(const GroupNode* group, TypePtr* out) { - std::vector<std::shared_ptr<Field>> fields(group->field_count()); +// Forward declaration +Status NodeToFieldInternal(const NodePtr& node, + const std::unordered_set<NodePtr>* included_leaf_nodes, std::shared_ptr<Field>* out); + +/* + * Auxilary function to test if a parquet schema node is a leaf node + * that should be included in a resulting arrow schema + */ +inline bool IsIncludedLeaf(const NodePtr& node, + const std::unordered_set<NodePtr>* included_leaf_nodes) { + if (included_leaf_nodes == nullptr) { + return true; + } + auto search = included_leaf_nodes->find(node); + return (search != included_leaf_nodes->end()); +} + +Status StructFromGroup(const GroupNode* group, + const std::unordered_set<NodePtr>* included_leaf_nodes, TypePtr* out) { + std::vector<std::shared_ptr<Field>> fields; + std::shared_ptr<Field> field; + + *out = nullptr; + for (int i = 0; i < group->field_count(); i++) { - RETURN_NOT_OK(NodeToField(group->field(i), &fields[i])); + RETURN_NOT_OK(NodeToFieldInternal(group->field(i), included_leaf_nodes, &field)); + if (field != nullptr) { + fields.push_back(field); + } + } + if (fields.size() > 0) { + *out = std::make_shared<::arrow::StructType>(fields); } - *out = std::make_shared<::arrow::StructType>(fields); return Status::OK(); } @@ -197,7 +225,9 @@ bool str_endswith_tuple(const std::string& str) { return false; } -Status NodeToList(const GroupNode* group, TypePtr* out) { +Status NodeToList(const GroupNode* group, + const std::unordered_set<NodePtr>* included_leaf_nodes, TypePtr* out) { + *out = nullptr; if (group->field_count() == 1) { // This attempts to resolve the preferred 3-level list encoding. NodePtr list_node = group->field(0); @@ -210,22 +240,31 @@ Status NodeToList(const GroupNode* group, TypePtr* out) { !str_endswith_tuple(list_node->name())) { // List of primitive type std::shared_ptr<Field> item_field; - RETURN_NOT_OK(NodeToField(list_group->field(0), &item_field)); - *out = ::arrow::list(item_field); + RETURN_NOT_OK(NodeToFieldInternal( + list_group->field(0), included_leaf_nodes, &item_field)); + + if (item_field != nullptr) { + *out = ::arrow::list(item_field); + } } else { // List of struct std::shared_ptr<::arrow::DataType> inner_type; - RETURN_NOT_OK(StructFromGroup(list_group, &inner_type)); - auto item_field = std::make_shared<Field>(list_node->name(), inner_type, false); - *out = ::arrow::list(item_field); + RETURN_NOT_OK(StructFromGroup(list_group, included_leaf_nodes, &inner_type)); + if (inner_type != nullptr) { + auto item_field = std::make_shared<Field>(list_node->name(), inner_type, false); + *out = ::arrow::list(item_field); + } } } else if (list_node->is_repeated()) { // repeated primitive node std::shared_ptr<::arrow::DataType> inner_type; - const PrimitiveNode* primitive = static_cast<const PrimitiveNode*>(list_node.get()); - RETURN_NOT_OK(FromPrimitive(primitive, &inner_type)); - auto item_field = std::make_shared<Field>(list_node->name(), inner_type, false); - *out = ::arrow::list(item_field); + if (IsIncludedLeaf(static_cast<NodePtr>(list_node), included_leaf_nodes)) { + const PrimitiveNode* primitive = + static_cast<const PrimitiveNode*>(list_node.get()); + RETURN_NOT_OK(FromPrimitive(primitive, &inner_type)); + auto item_field = std::make_shared<Field>(list_node->name(), inner_type, false); + *out = ::arrow::list(item_field); + } } else { return Status::NotImplemented( "Non-repeated groups in a LIST-annotated group are not supported."); @@ -238,31 +277,49 @@ Status NodeToList(const GroupNode* group, TypePtr* out) { } Status NodeToField(const NodePtr& node, std::shared_ptr<Field>* out) { - std::shared_ptr<::arrow::DataType> type; + return NodeToFieldInternal(node, nullptr, out); +} + +Status NodeToFieldInternal(const NodePtr& node, + const std::unordered_set<NodePtr>* included_leaf_nodes, std::shared_ptr<Field>* out) { + + std::shared_ptr<::arrow::DataType> type = nullptr; bool nullable = !node->is_required(); + *out = nullptr; + if (node->is_repeated()) { // 1-level LIST encoding fields are required std::shared_ptr<::arrow::DataType> inner_type; - const PrimitiveNode* primitive = static_cast<const PrimitiveNode*>(node.get()); - RETURN_NOT_OK(FromPrimitive(primitive, &inner_type)); - auto item_field = std::make_shared<Field>(node->name(), inner_type, false); - type = ::arrow::list(item_field); - nullable = false; + if (node->is_group()) { + const GroupNode* group = static_cast<const GroupNode*>(node.get()); + RETURN_NOT_OK(StructFromGroup(group, included_leaf_nodes, &inner_type)); + } else if (IsIncludedLeaf(static_cast<NodePtr>(node), included_leaf_nodes)) { + const PrimitiveNode* primitive = static_cast<const PrimitiveNode*>(node.get()); + RETURN_NOT_OK(FromPrimitive(primitive, &inner_type)); + } + if (inner_type != nullptr) { + auto item_field = std::make_shared<Field>(node->name(), inner_type, false); + type = ::arrow::list(item_field); + nullable = false; + } } else if (node->is_group()) { const GroupNode* group = static_cast<const GroupNode*>(node.get()); if (node->logical_type() == LogicalType::LIST) { - RETURN_NOT_OK(NodeToList(group, &type)); + RETURN_NOT_OK(NodeToList(group, included_leaf_nodes, &type)); } else { - RETURN_NOT_OK(StructFromGroup(group, &type)); + RETURN_NOT_OK(StructFromGroup(group, included_leaf_nodes, &type)); } } else { // Primitive (leaf) node - const PrimitiveNode* primitive = static_cast<const PrimitiveNode*>(node.get()); - RETURN_NOT_OK(FromPrimitive(primitive, &type)); + if (IsIncludedLeaf(static_cast<NodePtr>(node), included_leaf_nodes)) { + const PrimitiveNode* primitive = static_cast<const PrimitiveNode*>(node.get()); + RETURN_NOT_OK(FromPrimitive(primitive, &type)); + } + } + if (type != nullptr) { + *out = std::make_shared<Field>(node->name(), type, nullable); } - - *out = std::make_shared<Field>(node->name(), type, nullable); return Status::OK(); } @@ -270,8 +327,9 @@ Status FromParquetSchema( const SchemaDescriptor* parquet_schema, std::shared_ptr<::arrow::Schema>* out) { const GroupNode* schema_node = parquet_schema->group_node(); - std::vector<std::shared_ptr<Field>> fields(schema_node->field_count()); - for (int i = 0; i < schema_node->field_count(); i++) { + int num_fields = static_cast<int>(schema_node->field_count()); + std::vector<std::shared_ptr<Field>> fields(num_fields); + for (int i = 0; i < num_fields; i++) { RETURN_NOT_OK(NodeToField(schema_node->field(i), &fields[i])); } @@ -285,11 +343,22 @@ Status FromParquetSchema(const SchemaDescriptor* parquet_schema, // from the root Parquet node const GroupNode* schema_node = parquet_schema->group_node(); - int num_fields = static_cast<int>(column_indices.size()); + // Put the right leaf nodes in an unordered set + int num_columns = static_cast<int>(column_indices.size()); + std::unordered_set<NodePtr> included_leaf_nodes(num_columns); + for (int i = 0; i < num_columns; i++) { + auto column_desc = parquet_schema->Column(column_indices[i]); + included_leaf_nodes.insert(column_desc->schema_node()); + } - std::vector<std::shared_ptr<Field>> fields(num_fields); - for (int i = 0; i < num_fields; i++) { - RETURN_NOT_OK(NodeToField(schema_node->field(column_indices[i]), &fields[i])); + std::vector<std::shared_ptr<Field>> fields; + std::shared_ptr<Field> field; + for (int i = 0; i < schema_node->field_count(); i++) { + RETURN_NOT_OK(NodeToFieldInternal( + schema_node->field(i), &included_leaf_nodes, &field)); + if (field != nullptr) { + fields.push_back(field); + } } *out = std::make_shared<::arrow::Schema>(fields);
