Repository: parquet-cpp Updated Branches: refs/heads/master 4beac90a3 -> 893af978a
PARQUET-595: API for KeyValue metadata This supersedes #309 and incorporates the `std::shared_ptr<const KeyValueMetadata>` pattern so less copying is needed in Parquet for metadata inbound from Arrow (and vice versa). close #309 Author: Wes McKinney <[email protected]> Author: Phillip Cloud <[email protected]> Closes #314 from wesm/PARQUET-595 and squashes the following commits: c0199c5 [Wes McKinney] Remove some more std::string includes 3d3be4e [Wes McKinney] Remove string include b2ed09e [Wes McKinney] Add backwards compatible schema APIs 116575a [Wes McKinney] Use std::shared_ptr<const KeyValueMetadata> from upstream Arrow 5116eaa [Phillip Cloud] Add support for reading/writing Schema-level Arrow metadata Project: http://git-wip-us.apache.org/repos/asf/parquet-cpp/repo Commit: http://git-wip-us.apache.org/repos/asf/parquet-cpp/commit/893af978 Tree: http://git-wip-us.apache.org/repos/asf/parquet-cpp/tree/893af978 Diff: http://git-wip-us.apache.org/repos/asf/parquet-cpp/diff/893af978 Branch: refs/heads/master Commit: 893af978a9184a17ca44cfbe44a786460e56a49a Parents: 4beac90 Author: Wes McKinney <[email protected]> Authored: Sat Apr 29 13:57:50 2017 -0400 Committer: Wes McKinney <[email protected]> Committed: Sat Apr 29 13:57:50 2017 -0400 ---------------------------------------------------------------------- .gitignore | 3 +- CMakeLists.txt | 4 ++- cmake_modules/ThirdpartyToolchain.cmake | 2 +- examples/reader-writer.cc | 2 +- src/parquet/arrow/arrow-schema-test.cc | 46 +++++++++++++++++++----- src/parquet/arrow/reader.cc | 12 ++++--- src/parquet/arrow/schema.cc | 27 ++++++++++---- src/parquet/arrow/schema.h | 20 +++++++++-- src/parquet/arrow/writer.cc | 8 +++-- src/parquet/column/properties.h | 2 +- src/parquet/file/metadata.cc | 54 +++++++++++++++++++++++----- src/parquet/file/metadata.h | 16 ++++++--- src/parquet/file/printer.cc | 8 ++--- src/parquet/file/printer.h | 8 ++--- src/parquet/file/writer-internal.cc | 16 +++++---- src/parquet/file/writer-internal.h | 6 ++-- src/parquet/file/writer.cc | 16 ++++++--- src/parquet/file/writer.h | 31 ++++++++++++++-- src/parquet/util/memory.cc | 1 + 19 files changed, 214 insertions(+), 68 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/893af978/.gitignore ---------------------------------------------------------------------- diff --git a/.gitignore b/.gitignore index 9de56ea..3730dba 100644 --- a/.gitignore +++ b/.gitignore @@ -13,4 +13,5 @@ Makefile thirdparty *.pc -.idea/ \ No newline at end of file +.idea/ +*-build/ http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/893af978/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/CMakeLists.txt b/CMakeLists.txt index b153d89..9b85d96 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -408,8 +408,10 @@ include_directories( # "make lint" target ############################################################ if (UNIX) + find_program(CPPLINT_BIN NAMES cpplint cpplint.py HINTS ${BUILD_SUPPORT_DIR}) + message(STATUS "Found cpplint executable at ${CPPLINT_BIN}") # Full lint - add_custom_target(lint ${BUILD_SUPPORT_DIR}/cpplint.py + add_custom_target(lint ${CPPLINT_BIN} --verbose=2 --linelength=90 --filter=-whitespace/comments,-readability/todo,-build/header_guard,-runtime/references,-readability/check,-build/c++11,-build/include_order http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/893af978/cmake_modules/ThirdpartyToolchain.cmake ---------------------------------------------------------------------- diff --git a/cmake_modules/ThirdpartyToolchain.cmake b/cmake_modules/ThirdpartyToolchain.cmake index 0e7e24d..478e018 100644 --- a/cmake_modules/ThirdpartyToolchain.cmake +++ b/cmake_modules/ThirdpartyToolchain.cmake @@ -429,7 +429,7 @@ if (NOT ARROW_FOUND) -DARROW_BUILD_TESTS=OFF) if ("$ENV{PARQUET_ARROW_VERSION}" STREQUAL "") - set(ARROW_VERSION "bb8514cc9d7068c8b62d346577370751d68221d8") + set(ARROW_VERSION "f7ab7270bb07466dabf84c015a6db2a192eb3dad") else() set(ARROW_VERSION "$ENV{PARQUET_ARROW_VERSION}") endif() http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/893af978/examples/reader-writer.cc ---------------------------------------------------------------------- diff --git a/examples/reader-writer.cc b/examples/reader-writer.cc index 54390e0..9118c88 100644 --- a/examples/reader-writer.cc +++ b/examples/reader-writer.cc @@ -46,7 +46,7 @@ constexpr int NUM_ROWS_PER_ROW_GROUP = 500; constexpr int FIXED_LENGTH = 10; -const std::string PARQUET_FILENAME = "parquet_cpp_example.parquet"; +const char PARQUET_FILENAME[] = "parquet_cpp_example.parquet"; using parquet::Repetition; using parquet::Type; http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/893af978/src/parquet/arrow/arrow-schema-test.cc ---------------------------------------------------------------------- diff --git a/src/parquet/arrow/arrow-schema-test.cc b/src/parquet/arrow/arrow-schema-test.cc index 2042566..a8a8c09 100644 --- a/src/parquet/arrow/arrow-schema-test.cc +++ b/src/parquet/arrow/arrow-schema-test.cc @@ -79,6 +79,13 @@ class TestConvertParquetSchema : public ::testing::Test { return FromParquetSchema(&descr_, column_indices, &result_schema_); } + ::arrow::Status ConvertSchema(const std::vector<NodePtr>& nodes, + const std::shared_ptr<const KeyValueMetadata>& key_value_metadata) { + NodePtr schema = GroupNode::Make("schema", Repetition::REPEATED, nodes); + descr_.Init(schema); + return FromParquetSchema(&descr_, {}, key_value_metadata, &result_schema_); + } + protected: SchemaDescriptor descr_; std::shared_ptr<::arrow::Schema> result_schema_; @@ -114,13 +121,13 @@ TEST_F(TestConvertParquetSchema, ParquetFlatPrimitives) { parquet_fields.push_back(PrimitiveNode::Make( "time32", Repetition::REQUIRED, ParquetType::INT32, LogicalType::TIME_MILLIS)); - arrow_fields.push_back(std::make_shared<Field>( - "time32", ::arrow::time32(TimeUnit::MILLI), false)); + arrow_fields.push_back( + std::make_shared<Field>("time32", ::arrow::time32(TimeUnit::MILLI), false)); parquet_fields.push_back(PrimitiveNode::Make( "time64", Repetition::REQUIRED, ParquetType::INT64, LogicalType::TIME_MICROS)); - arrow_fields.push_back(std::make_shared<Field>( - "time64", ::arrow::time64(TimeUnit::MICRO), false)); + arrow_fields.push_back( + std::make_shared<Field>("time64", ::arrow::time64(TimeUnit::MICRO), false)); parquet_fields.push_back( PrimitiveNode::Make("timestamp96", Repetition::REQUIRED, ParquetType::INT96)); @@ -152,6 +159,30 @@ TEST_F(TestConvertParquetSchema, ParquetFlatPrimitives) { CheckFlatSchema(arrow_schema); } +TEST_F(TestConvertParquetSchema, ParquetKeyValueMetadata) { + std::vector<NodePtr> parquet_fields; + std::vector<std::shared_ptr<Field>> arrow_fields; + + parquet_fields.push_back( + PrimitiveNode::Make("boolean", Repetition::REQUIRED, ParquetType::BOOLEAN)); + arrow_fields.push_back(std::make_shared<Field>("boolean", BOOL, false)); + + parquet_fields.push_back( + PrimitiveNode::Make("int32", Repetition::REQUIRED, ParquetType::INT32)); + arrow_fields.push_back(std::make_shared<Field>("int32", INT32, false)); + + auto key_value_metadata = std::make_shared<KeyValueMetadata>(); + key_value_metadata->Append("foo", "bar"); + key_value_metadata->Append("biz", "baz"); + ASSERT_OK(ConvertSchema(parquet_fields, key_value_metadata)); + + auto arrow_metadata = result_schema_->metadata(); + ASSERT_EQ("foo", arrow_metadata->key(0)); + ASSERT_EQ("bar", arrow_metadata->value(0)); + ASSERT_EQ("biz", arrow_metadata->key(1)); + ASSERT_EQ("baz", arrow_metadata->value(1)); +} + TEST_F(TestConvertParquetSchema, ParquetFlatDecimals) { std::vector<NodePtr> parquet_fields; std::vector<std::shared_ptr<Field>> arrow_fields; @@ -441,7 +472,7 @@ TEST_F(TestConvertParquetSchema, ParquetNestedSchemaPartial) { } auto arrow_schema = std::make_shared<::arrow::Schema>(arrow_fields); - ASSERT_OK(ConvertSchema(parquet_fields, {0, 3, 4})); + ASSERT_OK(ConvertSchema(parquet_fields, std::vector<int>{0, 3, 4})); CheckFlatSchema(arrow_schema); } @@ -490,7 +521,7 @@ TEST_F(TestConvertParquetSchema, ParquetNestedSchemaPartialOrdering) { } auto arrow_schema = std::make_shared<::arrow::Schema>(arrow_fields); - ASSERT_OK(ConvertSchema(parquet_fields, {3, 4, 0})); + ASSERT_OK(ConvertSchema(parquet_fields, std::vector<int>{3, 4, 0})); CheckFlatSchema(arrow_schema); } @@ -663,8 +694,7 @@ TEST_F(TestConvertArrowSchema, ParquetLists) { TEST_F(TestConvertArrowSchema, UnsupportedTypes) { std::vector<std::shared_ptr<Field>> unsupported_fields = { - ::arrow::field("f0", ::arrow::time64(TimeUnit::NANO)) - }; + ::arrow::field("f0", ::arrow::time64(TimeUnit::NANO))}; for (const auto& field : unsupported_fields) { ASSERT_RAISES(NotImplemented, ConvertSchema({field})); http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/893af978/src/parquet/arrow/reader.cc ---------------------------------------------------------------------- diff --git a/src/parquet/arrow/reader.cc b/src/parquet/arrow/reader.cc index cc3e3fa..06e5e22 100644 --- a/src/parquet/arrow/reader.cc +++ b/src/parquet/arrow/reader.cc @@ -117,6 +117,8 @@ class FileColumnIterator { const ColumnDescriptor* descr() const { return schema_->Column(column_index_); } + std::shared_ptr<FileMetaData> metadata() const { return reader_->metadata(); } + int column_index() const { return column_index_; } protected: @@ -296,7 +298,8 @@ Status FileReader::Impl::ReadColumn(int i, std::shared_ptr<Array>* out) { Status FileReader::Impl::GetSchema( const std::vector<int>& indices, std::shared_ptr<::arrow::Schema>* out) { auto descr = reader_->metadata()->schema(); - return FromParquetSchema(descr, indices, out); + auto parquet_key_value_metadata = reader_->metadata()->key_value_metadata(); + return FromParquetSchema(descr, indices, parquet_key_value_metadata, out); } Status FileReader::Impl::ReadRowGroup(int row_group_index, @@ -723,13 +726,12 @@ Status ColumnReader::Impl::InitValidBits(int batch_size) { Status ColumnReader::Impl::WrapIntoListArray(const int16_t* def_levels, const int16_t* rep_levels, int64_t total_levels_read, std::shared_ptr<Array>* array) { std::shared_ptr<::arrow::Schema> arrow_schema; - RETURN_NOT_OK( - FromParquetSchema(input_->schema(), {input_->column_index()}, &arrow_schema)); + RETURN_NOT_OK(FromParquetSchema(input_->schema(), {input_->column_index()}, + input_->metadata()->key_value_metadata(), &arrow_schema)); std::shared_ptr<Field> current_field = arrow_schema->field(0); if (current_field->type()->id() == ::arrow::Type::STRUCT) { - return Status::NotImplemented( - "Structs are not yet supported."); + return Status::NotImplemented("Structs are not yet supported."); } if (descr_->max_repetition_level() > 0) { http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/893af978/src/parquet/arrow/schema.cc ---------------------------------------------------------------------- diff --git a/src/parquet/arrow/schema.cc b/src/parquet/arrow/schema.cc index 31895ce..4326161 100644 --- a/src/parquet/arrow/schema.cc +++ b/src/parquet/arrow/schema.cc @@ -322,8 +322,9 @@ Status NodeToFieldInternal(const NodePtr& node, return Status::OK(); } -Status FromParquetSchema( - const SchemaDescriptor* parquet_schema, std::shared_ptr<::arrow::Schema>* out) { +Status FromParquetSchema(const SchemaDescriptor* parquet_schema, + const std::shared_ptr<const KeyValueMetadata>& key_value_metadata, + std::shared_ptr<::arrow::Schema>* out) { const GroupNode* schema_node = parquet_schema->group_node(); int num_fields = static_cast<int>(schema_node->field_count()); @@ -332,12 +333,14 @@ Status FromParquetSchema( RETURN_NOT_OK(NodeToField(schema_node->field(i), &fields[i])); } - *out = std::make_shared<::arrow::Schema>(fields); + *out = std::make_shared<::arrow::Schema>(fields, key_value_metadata); return Status::OK(); } Status FromParquetSchema(const SchemaDescriptor* parquet_schema, - const std::vector<int>& column_indices, std::shared_ptr<::arrow::Schema>* out) { + const std::vector<int>& column_indices, + const std::shared_ptr<const KeyValueMetadata>& key_value_metadata, + std::shared_ptr<::arrow::Schema>* out) { // TODO(wesm): Consider adding an arrow::Schema name attribute, which comes // from the root Parquet node @@ -363,10 +366,21 @@ Status FromParquetSchema(const SchemaDescriptor* parquet_schema, if (field != nullptr) { fields.push_back(field); } } - *out = std::make_shared<::arrow::Schema>(fields); + *out = std::make_shared<::arrow::Schema>(fields, key_value_metadata); return Status::OK(); } +Status FromParquetSchema(const SchemaDescriptor* parquet_schema, + const std::vector<int>& column_indices, + std::shared_ptr<::arrow::Schema>* out) { + return FromParquetSchema(parquet_schema, column_indices, nullptr, out); +} + +Status FromParquetSchema(const SchemaDescriptor* parquet_schema, + std::shared_ptr<::arrow::Schema>* out) { + return FromParquetSchema(parquet_schema, nullptr, out); +} + Status ListToNode(const std::shared_ptr<::arrow::ListType>& type, const std::string& name, bool nullable, const WriterProperties& properties, NodePtr* out) { Repetition::type repetition = nullable ? Repetition::OPTIONAL : Repetition::REQUIRED; @@ -483,8 +497,7 @@ Status FieldToNode(const std::shared_ptr<Field>& field, case ArrowType::TIME64: { auto time_type = static_cast<::arrow::Time64Type*>(field->type().get()); if (time_type->unit() == ::arrow::TimeUnit::NANO) { - return Status::NotImplemented( - "Nanosecond time not supported in Parquet."); + return Status::NotImplemented("Nanosecond time not supported in Parquet."); } type = ParquetType::INT64; logical_type = LogicalType::TIME_MICROS; http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/893af978/src/parquet/arrow/schema.h ---------------------------------------------------------------------- diff --git a/src/parquet/arrow/schema.h b/src/parquet/arrow/schema.h index 1866fea..30dee20 100644 --- a/src/parquet/arrow/schema.h +++ b/src/parquet/arrow/schema.h @@ -44,13 +44,27 @@ namespace arrow { /// \param column_indices indices of leaf nodes in parquet schema tree. Appearing ordering /// matters for the converted schema. Repeated indices are ignored /// except for the first one +/// \param key_value_metadata optional metadata, can be nullptr /// \param out the corresponding arrow schema /// \return Status::OK() on a successful conversion. ::arrow::Status PARQUET_EXPORT FromParquetSchema(const SchemaDescriptor* parquet_schema, - const std::vector<int>& column_indices, std::shared_ptr<::arrow::Schema>* out); + const std::vector<int>& column_indices, + const std::shared_ptr<const KeyValueMetadata>& key_value_metadata, + std::shared_ptr<::arrow::Schema>* out); -::arrow::Status PARQUET_EXPORT FromParquetSchema( - const SchemaDescriptor* parquet_schema, std::shared_ptr<::arrow::Schema>* out); +// Without indices +::arrow::Status PARQUET_EXPORT FromParquetSchema(const SchemaDescriptor* parquet_schema, + const std::shared_ptr<const KeyValueMetadata>& key_value_metadata, + std::shared_ptr<::arrow::Schema>* out); + +// Without metadata +::arrow::Status PARQUET_EXPORT FromParquetSchema(const SchemaDescriptor* parquet_schema, + const std::vector<int>& column_indices, + std::shared_ptr<::arrow::Schema>* out); + +// Without metadata or indices +::arrow::Status PARQUET_EXPORT FromParquetSchema(const SchemaDescriptor* parquet_schema, + std::shared_ptr<::arrow::Schema>* out); ::arrow::Status PARQUET_EXPORT FieldToNode(const std::shared_ptr<::arrow::Field>& field, const WriterProperties& properties, schema::NodePtr* out); http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/893af978/src/parquet/arrow/writer.cc ---------------------------------------------------------------------- diff --git a/src/parquet/arrow/writer.cc b/src/parquet/arrow/writer.cc index 6ac33b1..8b0a271 100644 --- a/src/parquet/arrow/writer.cc +++ b/src/parquet/arrow/writer.cc @@ -18,6 +18,7 @@ #include "parquet/arrow/writer.h" #include <algorithm> +#include <string> #include <vector> #include "parquet/util/bit-util.h" @@ -564,8 +565,8 @@ Status FileWriter::Impl::WriteColumnChunk(const Array& data) { int current_column_idx = row_group_writer_->current_column(); std::shared_ptr<::arrow::Schema> arrow_schema; - RETURN_NOT_OK( - FromParquetSchema(writer_->schema(), {current_column_idx - 1}, &arrow_schema)); + RETURN_NOT_OK(FromParquetSchema(writer_->schema(), {current_column_idx - 1}, + writer_->key_value_metadata(), &arrow_schema)); LevelBuilder level_builder(pool_); std::shared_ptr<Buffer> def_levels_buffer; std::shared_ptr<Buffer> rep_levels_buffer; @@ -658,8 +659,9 @@ Status FileWriter::Open(const ::arrow::Schema& schema, ::arrow::MemoryPool* pool RETURN_NOT_OK(ToParquetSchema(&schema, *properties, &parquet_schema)); auto schema_node = std::static_pointer_cast<GroupNode>(parquet_schema->schema_root()); + std::unique_ptr<ParquetFileWriter> base_writer = - ParquetFileWriter::Open(sink, schema_node, properties); + ParquetFileWriter::Open(sink, schema_node, properties, schema.metadata()); writer->reset(new FileWriter(pool, std::move(base_writer))); return Status::OK(); http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/893af978/src/parquet/column/properties.h ---------------------------------------------------------------------- diff --git a/src/parquet/column/properties.h b/src/parquet/column/properties.h index 8573d2e..29c011d 100644 --- a/src/parquet/column/properties.h +++ b/src/parquet/column/properties.h @@ -85,7 +85,7 @@ static constexpr bool DEFAULT_ARE_STATISTICS_ENABLED = true; static constexpr Encoding::type DEFAULT_ENCODING = Encoding::PLAIN; static constexpr ParquetVersion::type DEFAULT_WRITER_VERSION = ParquetVersion::PARQUET_1_0; -static std::string DEFAULT_CREATED_BY = "parquet-cpp version 1.0.0"; +static const char DEFAULT_CREATED_BY[] = "parquet-cpp version 1.0.0"; static constexpr Compression::type DEFAULT_COMPRESSION_TYPE = Compression::UNCOMPRESSED; class PARQUET_EXPORT ColumnProperties { http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/893af978/src/parquet/file/metadata.cc ---------------------------------------------------------------------- diff --git a/src/parquet/file/metadata.cc b/src/parquet/file/metadata.cc index e5b1242..3df8fbd 100644 --- a/src/parquet/file/metadata.cc +++ b/src/parquet/file/metadata.cc @@ -364,6 +364,7 @@ class FileMetaData::FileMetaDataImpl { } InitSchema(); + InitKeyValueMetadata(); } ~FileMetaDataImpl() {} @@ -393,6 +394,10 @@ class FileMetaData::FileMetaDataImpl { const SchemaDescriptor* schema() const { return &schema_; } + std::shared_ptr<const KeyValueMetadata> key_value_metadata() const { + return key_value_metadata_; + } + private: friend FileMetaDataBuilder; uint32_t metadata_len_; @@ -404,6 +409,18 @@ class FileMetaData::FileMetaDataImpl { } SchemaDescriptor schema_; ApplicationVersion writer_version_; + + void InitKeyValueMetadata() { + auto metadata = std::make_shared<KeyValueMetadata>(); + if (metadata_->__isset.key_value_metadata) { + for (const auto& it : metadata_->key_value_metadata) { + metadata->Append(it.key, it.value); + } + } + key_value_metadata_ = metadata; + } + + std::shared_ptr<const KeyValueMetadata> key_value_metadata_; }; std::shared_ptr<FileMetaData> FileMetaData::Make( @@ -470,6 +487,10 @@ const SchemaDescriptor* FileMetaData::schema() const { return impl_->schema(); } +std::shared_ptr<const KeyValueMetadata> FileMetaData::key_value_metadata() const { + return impl_->key_value_metadata(); +} + void FileMetaData::WriteTo(OutputStream* dst) { return impl_->WriteTo(dst); } @@ -769,9 +790,10 @@ void RowGroupMetaDataBuilder::Finish(int64_t total_bytes_written) { // TODO(PARQUET-595) Support key_value_metadata class FileMetaDataBuilder::FileMetaDataBuilderImpl { public: - explicit FileMetaDataBuilderImpl( - const SchemaDescriptor* schema, const std::shared_ptr<WriterProperties>& props) - : properties_(props), schema_(schema) { + explicit FileMetaDataBuilderImpl(const SchemaDescriptor* schema, + const std::shared_ptr<WriterProperties>& props, + const std::shared_ptr<const KeyValueMetadata>& key_value_metadata) + : properties_(props), schema_(schema), key_value_metadata_(key_value_metadata) { metadata_.reset(new format::FileMetaData()); } ~FileMetaDataBuilderImpl() {} @@ -798,6 +820,18 @@ class FileMetaDataBuilder::FileMetaDataBuilderImpl { metadata_->__set_num_rows(total_rows); metadata_->__set_row_groups(row_groups); + if (key_value_metadata_) { + metadata_->key_value_metadata.clear(); + metadata_->key_value_metadata.reserve(key_value_metadata_->size()); + for (int64_t i = 0; i < key_value_metadata_->size(); ++i) { + format::KeyValue kv_pair; + kv_pair.__set_key(key_value_metadata_->key(i)); + kv_pair.__set_value(key_value_metadata_->value(i)); + metadata_->key_value_metadata.push_back(kv_pair); + } + metadata_->__isset.key_value_metadata = true; + } + int32_t file_version = 0; switch (properties_->version()) { case ParquetVersion::PARQUET_1_0: @@ -829,17 +863,21 @@ class FileMetaDataBuilder::FileMetaDataBuilderImpl { std::vector<std::unique_ptr<format::RowGroup>> row_groups_; std::vector<std::unique_ptr<RowGroupMetaDataBuilder>> row_group_builders_; const SchemaDescriptor* schema_; + std::shared_ptr<const KeyValueMetadata> key_value_metadata_; }; std::unique_ptr<FileMetaDataBuilder> FileMetaDataBuilder::Make( - const SchemaDescriptor* schema, const std::shared_ptr<WriterProperties>& props) { - return std::unique_ptr<FileMetaDataBuilder>(new FileMetaDataBuilder(schema, props)); + const SchemaDescriptor* schema, const std::shared_ptr<WriterProperties>& props, + const std::shared_ptr<const KeyValueMetadata>& key_value_metadata) { + return std::unique_ptr<FileMetaDataBuilder>( + new FileMetaDataBuilder(schema, props, key_value_metadata)); } -FileMetaDataBuilder::FileMetaDataBuilder( - const SchemaDescriptor* schema, const std::shared_ptr<WriterProperties>& props) +FileMetaDataBuilder::FileMetaDataBuilder(const SchemaDescriptor* schema, + const std::shared_ptr<WriterProperties>& props, + const std::shared_ptr<const KeyValueMetadata>& key_value_metadata) : impl_{std::unique_ptr<FileMetaDataBuilderImpl>( - new FileMetaDataBuilderImpl(schema, props))} {} + new FileMetaDataBuilderImpl(schema, props, key_value_metadata))} {} FileMetaDataBuilder::~FileMetaDataBuilder() {} http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/893af978/src/parquet/file/metadata.h ---------------------------------------------------------------------- diff --git a/src/parquet/file/metadata.h b/src/parquet/file/metadata.h index 057c5b1..d663617 100644 --- a/src/parquet/file/metadata.h +++ b/src/parquet/file/metadata.h @@ -22,6 +22,8 @@ #include <string> #include <vector> +#include "arrow/util/key_value_metadata.h" + #include "parquet/column/properties.h" #include "parquet/column/statistics.h" #include "parquet/compression.h" @@ -32,6 +34,8 @@ namespace parquet { +using KeyValueMetadata = ::arrow::KeyValueMetadata; + // Reference: // parquet-mr/parquet-hadoop/src/main/java/org/apache/parquet/ // format/converter/ParquetMetadataConverter.java @@ -178,6 +182,8 @@ class PARQUET_EXPORT FileMetaData { // Return const-pointer to make it clear that this object is not to be copied const SchemaDescriptor* schema() const; + std::shared_ptr<const KeyValueMetadata> key_value_metadata() const; + private: friend FileMetaDataBuilder; explicit FileMetaData(const uint8_t* serialized_metadata, uint32_t* metadata_len); @@ -249,8 +255,9 @@ class PARQUET_EXPORT RowGroupMetaDataBuilder { class PARQUET_EXPORT FileMetaDataBuilder { public: // API convenience to get a MetaData reader - static std::unique_ptr<FileMetaDataBuilder> Make( - const SchemaDescriptor* schema, const std::shared_ptr<WriterProperties>& props); + static std::unique_ptr<FileMetaDataBuilder> Make(const SchemaDescriptor* schema, + const std::shared_ptr<WriterProperties>& props, + const std::shared_ptr<const KeyValueMetadata>& key_value_metadata = nullptr); ~FileMetaDataBuilder(); @@ -260,8 +267,9 @@ class PARQUET_EXPORT FileMetaDataBuilder { std::unique_ptr<FileMetaData> Finish(); private: - explicit FileMetaDataBuilder( - const SchemaDescriptor* schema, const std::shared_ptr<WriterProperties>& props); + explicit FileMetaDataBuilder(const SchemaDescriptor* schema, + const std::shared_ptr<WriterProperties>& props, + const std::shared_ptr<const KeyValueMetadata>& key_value_metadata = nullptr); // PIMPL Idiom class FileMetaDataBuilderImpl; std::unique_ptr<FileMetaDataBuilderImpl> impl_; http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/893af978/src/parquet/file/printer.cc ---------------------------------------------------------------------- diff --git a/src/parquet/file/printer.cc b/src/parquet/file/printer.cc index 4d0dad4..e398c3a 100644 --- a/src/parquet/file/printer.cc +++ b/src/parquet/file/printer.cc @@ -32,9 +32,8 @@ namespace parquet { // the fixed initial size is just for an example #define COL_WIDTH "30" -void ParquetFilePrinter::DebugPrint( - std::ostream& stream, std::list<int> selected_columns, bool print_values, - const char* filename) { +void ParquetFilePrinter::DebugPrint(std::ostream& stream, std::list<int> selected_columns, + bool print_values, const char* filename) { const FileMetaData* file_metadata = fileReader->metadata().get(); stream << "File Name: " << filename << "\n"; @@ -142,8 +141,7 @@ void ParquetFilePrinter::DebugPrint( } void ParquetFilePrinter::JSONPrint( - std::ostream& stream, std::list<int> selected_columns, - const char* filename) { + std::ostream& stream, std::list<int> selected_columns, const char* filename) { const FileMetaData* file_metadata = fileReader->metadata().get(); stream << "{\n"; stream << " \"FileName\": \"" << filename << "\",\n"; http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/893af978/src/parquet/file/printer.h ---------------------------------------------------------------------- diff --git a/src/parquet/file/printer.h b/src/parquet/file/printer.h index bd54e40..a72c17d 100644 --- a/src/parquet/file/printer.h +++ b/src/parquet/file/printer.h @@ -37,12 +37,10 @@ class PARQUET_EXPORT ParquetFilePrinter { explicit ParquetFilePrinter(ParquetFileReader* reader) : fileReader(reader) {} ~ParquetFilePrinter() {} - void DebugPrint( - std::ostream& stream, std::list<int> selected_columns, bool print_values = true, - const char* fileame = "No Name"); + void DebugPrint(std::ostream& stream, std::list<int> selected_columns, + bool print_values = true, const char* fileame = "No Name"); - void JSONPrint( - std::ostream& stream, std::list<int> selected_columns, + void JSONPrint(std::ostream& stream, std::list<int> selected_columns, const char* filename = "No Name"); }; http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/893af978/src/parquet/file/writer-internal.cc ---------------------------------------------------------------------- diff --git a/src/parquet/file/writer-internal.cc b/src/parquet/file/writer-internal.cc index ff6de48..b69e87e 100644 --- a/src/parquet/file/writer-internal.cc +++ b/src/parquet/file/writer-internal.cc @@ -205,9 +205,10 @@ void RowGroupSerializer::Close() { std::unique_ptr<ParquetFileWriter::Contents> FileSerializer::Open( const std::shared_ptr<OutputStream>& sink, const std::shared_ptr<GroupNode>& schema, - const std::shared_ptr<WriterProperties>& properties) { + const std::shared_ptr<WriterProperties>& properties, + const std::shared_ptr<const KeyValueMetadata>& key_value_metadata) { std::unique_ptr<ParquetFileWriter::Contents> result( - new FileSerializer(sink, schema, properties)); + new FileSerializer(sink, schema, properties, key_value_metadata)); return result; } @@ -274,14 +275,15 @@ void FileSerializer::WriteMetaData() { FileSerializer::FileSerializer(const std::shared_ptr<OutputStream>& sink, const std::shared_ptr<GroupNode>& schema, - const std::shared_ptr<WriterProperties>& properties) - : sink_(sink), + const std::shared_ptr<WriterProperties>& properties, + const std::shared_ptr<const KeyValueMetadata>& key_value_metadata) + : ParquetFileWriter::Contents(schema, key_value_metadata), + sink_(sink), is_open_(true), properties_(properties), num_row_groups_(0), - num_rows_(0) { - schema_.Init(schema); - metadata_ = FileMetaDataBuilder::Make(&schema_, properties); + num_rows_(0), + metadata_(FileMetaDataBuilder::Make(&schema_, properties, key_value_metadata)) { StartFile(); } http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/893af978/src/parquet/file/writer-internal.h ---------------------------------------------------------------------- diff --git a/src/parquet/file/writer-internal.h b/src/parquet/file/writer-internal.h index e038319..27dc89e 100644 --- a/src/parquet/file/writer-internal.h +++ b/src/parquet/file/writer-internal.h @@ -106,7 +106,8 @@ class FileSerializer : public ParquetFileWriter::Contents { static std::unique_ptr<ParquetFileWriter::Contents> Open( const std::shared_ptr<OutputStream>& sink, const std::shared_ptr<schema::GroupNode>& schema, - const std::shared_ptr<WriterProperties>& properties = default_writer_properties()); + const std::shared_ptr<WriterProperties>& properties = default_writer_properties(), + const std::shared_ptr<const KeyValueMetadata>& key_value_metadata = nullptr); void Close() override; @@ -123,7 +124,8 @@ class FileSerializer : public ParquetFileWriter::Contents { private: explicit FileSerializer(const std::shared_ptr<OutputStream>& sink, const std::shared_ptr<schema::GroupNode>& schema, - const std::shared_ptr<WriterProperties>& properties); + const std::shared_ptr<WriterProperties>& properties, + const std::shared_ptr<const KeyValueMetadata>& key_value_metadata); std::shared_ptr<OutputStream> sink_; bool is_open_; http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/893af978/src/parquet/file/writer.cc ---------------------------------------------------------------------- diff --git a/src/parquet/file/writer.cc b/src/parquet/file/writer.cc index be46dbe..17afa02 100644 --- a/src/parquet/file/writer.cc +++ b/src/parquet/file/writer.cc @@ -59,15 +59,18 @@ ParquetFileWriter::~ParquetFileWriter() { std::unique_ptr<ParquetFileWriter> ParquetFileWriter::Open( const std::shared_ptr<::arrow::io::OutputStream>& sink, const std::shared_ptr<GroupNode>& schema, - const std::shared_ptr<WriterProperties>& properties) { - return Open(std::make_shared<ArrowOutputStream>(sink), schema, properties); + const std::shared_ptr<WriterProperties>& properties, + const std::shared_ptr<const KeyValueMetadata>& key_value_metadata) { + return Open( + std::make_shared<ArrowOutputStream>(sink), schema, properties, key_value_metadata); } std::unique_ptr<ParquetFileWriter> ParquetFileWriter::Open( const std::shared_ptr<OutputStream>& sink, const std::shared_ptr<schema::GroupNode>& schema, - const std::shared_ptr<WriterProperties>& properties) { - auto contents = FileSerializer::Open(sink, schema, properties); + const std::shared_ptr<WriterProperties>& properties, + const std::shared_ptr<const KeyValueMetadata>& key_value_metadata) { + auto contents = FileSerializer::Open(sink, schema, properties, key_value_metadata); std::unique_ptr<ParquetFileWriter> result(new ParquetFileWriter()); result->Open(std::move(contents)); return result; @@ -81,6 +84,11 @@ const ColumnDescriptor* ParquetFileWriter::descr(int i) const { return contents_->schema()->Column(i); } +const std::shared_ptr<const KeyValueMetadata>& ParquetFileWriter::key_value_metadata() + const { + return contents_->key_value_metadata(); +} + void ParquetFileWriter::Open(std::unique_ptr<ParquetFileWriter::Contents> contents) { contents_ = std::move(contents); } http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/893af978/src/parquet/file/writer.h ---------------------------------------------------------------------- diff --git a/src/parquet/file/writer.h b/src/parquet/file/writer.h index 6a58cbf..7d48720 100644 --- a/src/parquet/file/writer.h +++ b/src/parquet/file/writer.h @@ -22,6 +22,7 @@ #include <memory> #include "parquet/column/properties.h" +#include "parquet/file/metadata.h" #include "parquet/schema.h" #include "parquet/util/memory.h" #include "parquet/util/visibility.h" @@ -32,6 +33,12 @@ class ColumnWriter; class PageWriter; class OutputStream; +namespace schema { + +class GroupNode; + +} // namespace schema + class PARQUET_EXPORT RowGroupWriter { public: // Forward declare a virtual class 'Contents' to aid dependency injection and more @@ -80,6 +87,11 @@ class PARQUET_EXPORT ParquetFileWriter { // easily create test fixtures // An implementation of the Contents class is defined in the .cc file struct Contents { + Contents(const std::shared_ptr<::parquet::schema::GroupNode>& schema, + const std::shared_ptr<const KeyValueMetadata>& key_value_metadata) + : schema_(), key_value_metadata_(key_value_metadata) { + schema_.Init(schema); + } virtual ~Contents() {} // Perform any cleanup associated with the file contents virtual void Close() = 0; @@ -92,9 +104,17 @@ class PARQUET_EXPORT ParquetFileWriter { virtual const std::shared_ptr<WriterProperties>& properties() const = 0; + const std::shared_ptr<const KeyValueMetadata>& key_value_metadata() const { + return key_value_metadata_; + } + // Return const-pointer to make it clear that this object is not to be copied const SchemaDescriptor* schema() const { return &schema_; } + SchemaDescriptor schema_; + + /// This should be the only place this is stored. Everything else is a const reference + std::shared_ptr<const KeyValueMetadata> key_value_metadata_; }; ParquetFileWriter(); @@ -103,12 +123,14 @@ class PARQUET_EXPORT ParquetFileWriter { static std::unique_ptr<ParquetFileWriter> Open( const std::shared_ptr<::arrow::io::OutputStream>& sink, const std::shared_ptr<schema::GroupNode>& schema, - const std::shared_ptr<WriterProperties>& properties = default_writer_properties()); + const std::shared_ptr<WriterProperties>& properties = default_writer_properties(), + const std::shared_ptr<const KeyValueMetadata>& key_value_metadata = nullptr); static std::unique_ptr<ParquetFileWriter> Open( const std::shared_ptr<OutputStream>& sink, const std::shared_ptr<schema::GroupNode>& schema, - const std::shared_ptr<WriterProperties>& properties = default_writer_properties()); + const std::shared_ptr<WriterProperties>& properties = default_writer_properties(), + const std::shared_ptr<const KeyValueMetadata>& key_value_metadata = nullptr); void Open(std::unique_ptr<Contents> contents); void Close(); @@ -158,6 +180,11 @@ class PARQUET_EXPORT ParquetFileWriter { */ const ColumnDescriptor* descr(int i) const; + /** + * Returns the file custom metadata + */ + const std::shared_ptr<const KeyValueMetadata>& key_value_metadata() const; + private: // Holds a pointer to an instance of Contents implementation std::unique_ptr<Contents> contents_; http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/893af978/src/parquet/util/memory.cc ---------------------------------------------------------------------- diff --git a/src/parquet/util/memory.cc b/src/parquet/util/memory.cc index 021a346..f1fcc75 100644 --- a/src/parquet/util/memory.cc +++ b/src/parquet/util/memory.cc @@ -21,6 +21,7 @@ #include <cstdint> #include <cstdio> #include <string> +#include <utility> #include "arrow/status.h"
