Repository: parquet-cpp Updated Branches: refs/heads/master 441d85b43 -> 55604b297
PARQUET-711: Use metadata builders in parquet writer I wrote a sample file and the metadata seems to be correct. @xhochy I fixed some missing metadata like `dictionary_page_offset`. You might want to check if this fixes the Drill problem. Author: Deepak Majeti <[email protected]> Closes #156 from majetideepak/PARQUET-711 and squashes the following commits: 25f5a7e [Deepak Majeti] fix schema and descr. Resolves PARQUET-705 and PARQUET-707 8b4784d [Deepak Majeti] Review comments to add methods back fdbc761 [Deepak Majeti] fix clang error and comments c6cb071 [Deepak Majeti] convert DCHECKS to Exceptions in metadata ada3ac2 [Deepak Majeti] clang format d9c9131 [Deepak Majeti] Use metadata builders in parquet writer Project: http://git-wip-us.apache.org/repos/asf/parquet-cpp/repo Commit: http://git-wip-us.apache.org/repos/asf/parquet-cpp/commit/55604b29 Tree: http://git-wip-us.apache.org/repos/asf/parquet-cpp/tree/55604b29 Diff: http://git-wip-us.apache.org/repos/asf/parquet-cpp/diff/55604b29 Branch: refs/heads/master Commit: 55604b297f444e95e132658b2ef384870ae1f701 Parents: 441d85b Author: Deepak Majeti <[email protected]> Authored: Thu Sep 8 23:06:08 2016 -0400 Committer: Wes McKinney <[email protected]> Committed: Thu Sep 8 23:06:08 2016 -0400 ---------------------------------------------------------------------- src/parquet/column/column-io-benchmark.cc | 22 +++-- src/parquet/column/column-writer-test.cc | 16 ++-- src/parquet/file/file-metadata-test.cc | 8 +- src/parquet/file/metadata.cc | 96 ++++++++++++------- src/parquet/file/metadata.h | 17 ++-- src/parquet/file/reader.cc | 8 +- src/parquet/file/writer-internal.cc | 107 +++++++++------------- src/parquet/file/writer-internal.h | 42 ++++----- src/parquet/file/writer.cc | 16 ++-- src/parquet/file/writer.h | 26 ++---- src/parquet/schema/descriptor.cc | 6 +- src/parquet/schema/descriptor.h | 8 +- src/parquet/schema/schema-descriptor-test.cc | 2 +- tools/parquet-dump-schema.cc | 2 +- 14 files changed, 191 insertions(+), 185 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/55604b29/src/parquet/column/column-io-benchmark.cc ---------------------------------------------------------------------- diff --git a/src/parquet/column/column-io-benchmark.cc b/src/parquet/column/column-io-benchmark.cc index 74d7349..319c8f5 100644 --- a/src/parquet/column/column-io-benchmark.cc +++ b/src/parquet/column/column-io-benchmark.cc @@ -25,13 +25,13 @@ namespace parquet { -using format::ColumnChunk; using schema::PrimitiveNode; namespace benchmark { std::unique_ptr<Int64Writer> BuildWriter(int64_t output_size, OutputStream* dst, - ColumnChunk* metadata, ColumnDescriptor* schema, const WriterProperties* properties) { + ColumnChunkMetaDataBuilder* metadata, ColumnDescriptor* schema, + const WriterProperties* properties) { std::unique_ptr<SerializedPageWriter> pager( new SerializedPageWriter(dst, Compression::UNCOMPRESSED, metadata)); return std::unique_ptr<Int64Writer>(new Int64Writer( @@ -57,17 +57,19 @@ void SetBytesProcessed(::benchmark::State& state, Repetition::type repetition) { template <Repetition::type repetition> static void BM_WriteInt64Column(::benchmark::State& state) { - format::ColumnChunk metadata; + format::ColumnChunk thrift_metadata; std::vector<int64_t> values(state.range_x(), 128); std::vector<int16_t> definition_levels(state.range_x(), 1); std::vector<int16_t> repetition_levels(state.range_x(), 0); std::shared_ptr<ColumnDescriptor> schema = Int64Schema(repetition); - std::shared_ptr<parquet::WriterProperties> properties = default_writer_properties(); + std::shared_ptr<WriterProperties> properties = default_writer_properties(); + auto metadata = ColumnChunkMetaDataBuilder::Make( + properties, schema.get(), reinterpret_cast<uint8_t*>(&thrift_metadata)); while (state.KeepRunning()) { InMemoryOutputStream dst; - std::unique_ptr<Int64Writer> writer = - BuildWriter(state.range_x(), &dst, &metadata, schema.get(), properties.get()); + std::unique_ptr<Int64Writer> writer = BuildWriter( + state.range_x(), &dst, metadata.get(), schema.get(), properties.get()); writer->WriteBatch( values.size(), definition_levels.data(), repetition_levels.data(), values.data()); writer->Close(); @@ -91,16 +93,18 @@ std::unique_ptr<Int64Reader> BuildReader( template <Repetition::type repetition> static void BM_ReadInt64Column(::benchmark::State& state) { - format::ColumnChunk metadata; + format::ColumnChunk thrift_metadata; std::vector<int64_t> values(state.range_x(), 128); std::vector<int16_t> definition_levels(state.range_x(), 1); std::vector<int16_t> repetition_levels(state.range_x(), 0); std::shared_ptr<ColumnDescriptor> schema = Int64Schema(repetition); + std::shared_ptr<WriterProperties> properties = default_writer_properties(); + auto metadata = ColumnChunkMetaDataBuilder::Make( + properties, schema.get(), reinterpret_cast<uint8_t*>(&thrift_metadata)); InMemoryOutputStream dst; - std::shared_ptr<parquet::WriterProperties> properties = default_writer_properties(); std::unique_ptr<Int64Writer> writer = - BuildWriter(state.range_x(), &dst, &metadata, schema.get(), properties.get()); + BuildWriter(state.range_x(), &dst, metadata.get(), schema.get(), properties.get()); writer->WriteBatch( values.size(), definition_levels.data(), repetition_levels.data(), values.data()); writer->Close(); http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/55604b29/src/parquet/column/column-writer-test.cc ---------------------------------------------------------------------- diff --git a/src/parquet/column/column-writer-test.cc b/src/parquet/column/column-writer-test.cc index bdbb7a0..b3ca080 100644 --- a/src/parquet/column/column-writer-test.cc +++ b/src/parquet/column/column-writer-test.cc @@ -74,6 +74,8 @@ class TestPrimitiveWriter : public ::testing::Test { repetition_levels_out_.resize(SMALL_SIZE); SetUpSchemaRequired(); + metadata_accessor_ = + ColumnChunkMetaData::Make(reinterpret_cast<uint8_t*>(&thrift_metadata_)); } void BuildReader() { @@ -87,8 +89,10 @@ class TestPrimitiveWriter : public ::testing::Test { std::shared_ptr<TypedColumnWriter<TestType>> BuildWriter( int64_t output_size = SMALL_SIZE, Encoding::type encoding = Encoding::PLAIN) { sink_.reset(new InMemoryOutputStream()); - std::unique_ptr<SerializedPageWriter> pager( - new SerializedPageWriter(sink_.get(), Compression::UNCOMPRESSED, &metadata_)); + metadata_ = ColumnChunkMetaDataBuilder::Make( + writer_properties_, schema_.get(), reinterpret_cast<uint8_t*>(&thrift_metadata_)); + std::unique_ptr<SerializedPageWriter> pager(new SerializedPageWriter( + sink_.get(), Compression::UNCOMPRESSED, metadata_.get())); WriterProperties::Builder wp_builder; if (encoding == Encoding::PLAIN_DICTIONARY || encoding == Encoding::RLE_DICTIONARY) { wp_builder.enable_dictionary(); @@ -126,9 +130,7 @@ class TestPrimitiveWriter : public ::testing::Test { ASSERT_EQ(this->values_, this->values_out_); } - int64_t metadata_num_values() const { - return metadata_.meta_data.num_values; - } + int64_t metadata_num_values() const { return metadata_accessor_->num_values(); } protected: int64_t values_read_; @@ -152,7 +154,9 @@ class TestPrimitiveWriter : public ::testing::Test { private: NodePtr node_; - format::ColumnChunk metadata_; + format::ColumnChunk thrift_metadata_; + std::unique_ptr<ColumnChunkMetaDataBuilder> metadata_; + std::unique_ptr<ColumnChunkMetaData> metadata_accessor_; std::shared_ptr<ColumnDescriptor> schema_; std::unique_ptr<InMemoryOutputStream> sink_; std::shared_ptr<WriterProperties> writer_properties_; http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/55604b29/src/parquet/file/file-metadata-test.cc ---------------------------------------------------------------------- diff --git a/src/parquet/file/file-metadata-test.cc b/src/parquet/file/file-metadata-test.cc index 5fbd613..852072c 100644 --- a/src/parquet/file/file-metadata-test.cc +++ b/src/parquet/file/file-metadata-test.cc @@ -54,8 +54,8 @@ TEST(Metadata, TestBuildAccess) { stats_float.max = &float_max; auto f_builder = FileMetaDataBuilder::Make(&schema, props); - auto rg1_builder = f_builder->AppendRowGroup(); - auto rg2_builder = f_builder->AppendRowGroup(); + auto rg1_builder = f_builder->AppendRowGroup(nrows / 2); + auto rg2_builder = f_builder->AppendRowGroup(nrows / 2); // Write the metadata // rowgroup1 metadata @@ -66,7 +66,7 @@ TEST(Metadata, TestBuildAccess) { col2_builder->SetStatistics(stats_float); col1_builder->Finish(nrows / 2, 4, 0, 10, 512, 600, false); col2_builder->Finish(nrows / 2, 24, 0, 30, 512, 600, false); - rg1_builder->Finish(nrows / 2); + rg1_builder->Finish(1024); // rowgroup2 metadata col1_builder = rg2_builder->NextColumnChunk(); @@ -76,7 +76,7 @@ TEST(Metadata, TestBuildAccess) { col2_builder->SetStatistics(stats_float); col1_builder->Finish(nrows / 2, 6, 0, 10, 512, 600, false); col2_builder->Finish(nrows / 2, 16, 0, 26, 512, 600, false); - rg2_builder->Finish(nrows / 2); + rg2_builder->Finish(1024); // Read the metadata auto f_accessor = f_builder->Finish(); http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/55604b29/src/parquet/file/metadata.cc ---------------------------------------------------------------------- diff --git a/src/parquet/file/metadata.cc b/src/parquet/file/metadata.cc index 4e298a8..bc0f7b9 100644 --- a/src/parquet/file/metadata.cc +++ b/src/parquet/file/metadata.cc @@ -180,8 +180,12 @@ class RowGroupMetaData::RowGroupMetaDataImpl { inline const SchemaDescriptor* schema() const { return schema_; } std::unique_ptr<ColumnChunkMetaData> ColumnChunk(int i) { - DCHECK(i < num_columns()) << "The file only has " << num_columns() - << " columns, requested metadata for column: " << i; + if (!(i < num_columns())) { + std::stringstream ss; + ss << "The file only has " << num_columns() + << " columns, requested metadata for column: " << i; + throw ParquetException(ss.str()); + } return ColumnChunkMetaData::Make( reinterpret_cast<const uint8_t*>(&row_group_->columns[i])); } @@ -244,14 +248,17 @@ class FileMetaData::FileMetaDataImpl { void WriteTo(OutputStream* dst) { SerializeThriftMsg(metadata_.get(), 1024, dst); } std::unique_ptr<RowGroupMetaData> RowGroup(int i) { - DCHECK(i < num_row_groups()) - << "The file only has " << num_row_groups() - << " row groups, requested metadata for row group: " << i; + if (!(i < num_row_groups())) { + std::stringstream ss; + ss << "The file only has " << num_row_groups() + << " row groups, requested metadata for row group: " << i; + throw ParquetException(ss.str()); + } return RowGroupMetaData::Make( reinterpret_cast<const uint8_t*>(&metadata_->row_groups[i]), &schema_); } - const SchemaDescriptor* schema_descriptor() const { return &schema_; } + const SchemaDescriptor* schema() const { return &schema_; } private: friend FileMetaDataBuilder; @@ -306,8 +313,8 @@ int FileMetaData::num_schema_elements() const { return impl_->num_schema_elements(); } -const SchemaDescriptor* FileMetaData::schema_descriptor() const { - return impl_->schema_descriptor(); +const SchemaDescriptor* FileMetaData::schema() const { + return impl_->schema(); } void FileMetaData::WriteTo(OutputStream* dst) { @@ -374,6 +381,8 @@ class ColumnChunkMetaDataBuilder::ColumnChunkMetaDataBuilderImpl { column_chunk_->meta_data.__set_encodings(thrift_encodings); } + const ColumnDescriptor* descr() const { return column_; } + private: format::ColumnChunk* column_chunk_; const std::shared_ptr<WriterProperties> properties_; @@ -406,24 +415,33 @@ void ColumnChunkMetaDataBuilder::Finish(int64_t num_values, compressed_size, uncompressed_size, dictionary_fallback); } +const ColumnDescriptor* ColumnChunkMetaDataBuilder::descr() const { + return impl_->descr(); +} + void ColumnChunkMetaDataBuilder::SetStatistics(const ColumnStatistics& result) { impl_->SetStatistics(result); } class RowGroupMetaDataBuilder::RowGroupMetaDataBuilderImpl { public: - explicit RowGroupMetaDataBuilderImpl(const std::shared_ptr<WriterProperties>& props, - const SchemaDescriptor* schema, uint8_t* contents) + explicit RowGroupMetaDataBuilderImpl(int64_t num_rows, + const std::shared_ptr<WriterProperties>& props, const SchemaDescriptor* schema, + uint8_t* contents) : properties_(props), schema_(schema), current_column_(0) { row_group_ = reinterpret_cast<format::RowGroup*>(contents); InitializeColumns(schema->num_columns()); + row_group_->__set_num_rows(num_rows); } ~RowGroupMetaDataBuilderImpl() {} ColumnChunkMetaDataBuilder* NextColumnChunk() { - DCHECK(current_column_ < num_columns()) - << "The schema only has " << num_columns() - << " columns, requested metadata for column: " << current_column_; + if (!(current_column_ < num_columns())) { + std::stringstream ss; + ss << "The schema only has " << num_columns() + << " columns, requested metadata for column: " << current_column_; + throw ParquetException(ss.str()); + } auto column = schema_->Column(current_column_); auto column_builder = ColumnChunkMetaDataBuilder::Make(properties_, column, reinterpret_cast<uint8_t*>(&row_group_->columns[current_column_++])); @@ -432,25 +450,32 @@ class RowGroupMetaDataBuilder::RowGroupMetaDataBuilderImpl { return column_builder_ptr; } - void Finish(int64_t num_rows) { - DCHECK(current_column_ == schema_->num_columns()) - << "Only " << current_column_ - 1 << " out of " << schema_->num_columns() - << " columns are initialized"; - size_t total_byte_size = 0; + void Finish(int64_t total_bytes_written) { + if (!(current_column_ == schema_->num_columns())) { + std::stringstream ss; + ss << "Only " << current_column_ - 1 << " out of " << schema_->num_columns() + << " columns are initialized"; + throw ParquetException(ss.str()); + } + int64_t total_byte_size = 0; for (int i = 0; i < schema_->num_columns(); i++) { - DCHECK(row_group_->columns[i].file_offset > 0) << "Column " << i - << " is not complete."; + if (!(row_group_->columns[i].file_offset > 0)) { + std::stringstream ss; + ss << "Column " << i << " is not complete."; + throw ParquetException(ss.str()); + } total_byte_size += row_group_->columns[i].meta_data.total_compressed_size; } + DCHECK(total_bytes_written == total_byte_size) + << "Total bytes in this RowGroup does not match with compressed sizes of columns"; row_group_->__set_total_byte_size(total_byte_size); - row_group_->__set_num_rows(num_rows); } - private: int num_columns() { return row_group_->columns.size(); } + private: void InitializeColumns(int ncols) { row_group_->columns.resize(ncols); } format::RowGroup* row_group_; @@ -460,18 +485,18 @@ class RowGroupMetaDataBuilder::RowGroupMetaDataBuilderImpl { int current_column_; }; -std::unique_ptr<RowGroupMetaDataBuilder> RowGroupMetaDataBuilder::Make( +std::unique_ptr<RowGroupMetaDataBuilder> RowGroupMetaDataBuilder::Make(int64_t num_rows, const std::shared_ptr<WriterProperties>& props, const SchemaDescriptor* schema_, uint8_t* contents) { return std::unique_ptr<RowGroupMetaDataBuilder>( - new RowGroupMetaDataBuilder(props, schema_, contents)); + new RowGroupMetaDataBuilder(num_rows, props, schema_, contents)); } -RowGroupMetaDataBuilder::RowGroupMetaDataBuilder( +RowGroupMetaDataBuilder::RowGroupMetaDataBuilder(int64_t num_rows, const std::shared_ptr<WriterProperties>& props, const SchemaDescriptor* schema_, uint8_t* contents) : impl_{std::unique_ptr<RowGroupMetaDataBuilderImpl>( - new RowGroupMetaDataBuilderImpl(props, schema_, contents))} {} + new RowGroupMetaDataBuilderImpl(num_rows, props, schema_, contents))} {} RowGroupMetaDataBuilder::~RowGroupMetaDataBuilder() {} @@ -479,11 +504,16 @@ ColumnChunkMetaDataBuilder* RowGroupMetaDataBuilder::NextColumnChunk() { return impl_->NextColumnChunk(); } -void RowGroupMetaDataBuilder::Finish(int64_t num_rows) { - impl_->Finish(num_rows); +int RowGroupMetaDataBuilder::num_columns() { + return impl_->num_columns(); +} + +void RowGroupMetaDataBuilder::Finish(int64_t total_bytes_written) { + impl_->Finish(total_bytes_written); } // file metadata +// TODO(PARQUET-595) Support key_value_metadata class FileMetaDataBuilder::FileMetaDataBuilderImpl { public: explicit FileMetaDataBuilderImpl( @@ -493,10 +523,10 @@ class FileMetaDataBuilder::FileMetaDataBuilderImpl { } ~FileMetaDataBuilderImpl() {} - RowGroupMetaDataBuilder* AppendRowGroup() { + RowGroupMetaDataBuilder* AppendRowGroup(int64_t num_rows) { auto row_group = std::unique_ptr<format::RowGroup>(new format::RowGroup()); auto row_group_builder = RowGroupMetaDataBuilder::Make( - properties_, schema_, reinterpret_cast<uint8_t*>(row_group.get())); + num_rows, properties_, schema_, reinterpret_cast<uint8_t*>(row_group.get())); RowGroupMetaDataBuilder* row_group_ptr = row_group_builder.get(); row_group_builders_.push_back(std::move(row_group_builder)); row_groups_.push_back(std::move(row_group)); @@ -517,7 +547,7 @@ class FileMetaDataBuilder::FileMetaDataBuilderImpl { metadata_->__set_version(properties_->version()); metadata_->__set_created_by(properties_->created_by()); parquet::schema::SchemaFlattener flattener( - static_cast<parquet::schema::GroupNode*>(schema_->schema().get()), + static_cast<parquet::schema::GroupNode*>(schema_->schema_root().get()), &metadata_->schema); flattener.Flatten(); auto file_meta_data = std::unique_ptr<FileMetaData>(new FileMetaData()); @@ -548,8 +578,8 @@ FileMetaDataBuilder::FileMetaDataBuilder( FileMetaDataBuilder::~FileMetaDataBuilder() {} -RowGroupMetaDataBuilder* FileMetaDataBuilder::AppendRowGroup() { - return impl_->AppendRowGroup(); +RowGroupMetaDataBuilder* FileMetaDataBuilder::AppendRowGroup(int64_t num_rows) { + return impl_->AppendRowGroup(num_rows); } std::unique_ptr<FileMetaData> FileMetaDataBuilder::Finish() { http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/55604b29/src/parquet/file/metadata.h ---------------------------------------------------------------------- diff --git a/src/parquet/file/metadata.h b/src/parquet/file/metadata.h index 78ea53b..1d96621 100644 --- a/src/parquet/file/metadata.h +++ b/src/parquet/file/metadata.h @@ -117,7 +117,7 @@ class PARQUET_EXPORT FileMetaData { void WriteTo(OutputStream* dst); // Return const-pointer to make it clear that this object is not to be copied - const SchemaDescriptor* schema_descriptor() const; + const SchemaDescriptor* schema() const; private: friend FileMetaDataBuilder; @@ -144,7 +144,8 @@ class PARQUET_EXPORT ColumnChunkMetaDataBuilder { // column metadata // ownership of min/max is with ColumnChunkMetadata void SetStatistics(const ColumnStatistics& stats); - + // get the column descriptor + const ColumnDescriptor* descr() const; // commit the metadata void Finish(int64_t num_values, int64_t dictonary_page_offset, int64_t index_page_offset, int64_t data_page_offset, int64_t compressed_size, @@ -161,20 +162,22 @@ class PARQUET_EXPORT ColumnChunkMetaDataBuilder { class PARQUET_EXPORT RowGroupMetaDataBuilder { public: // API convenience to get a MetaData reader - static std::unique_ptr<RowGroupMetaDataBuilder> Make( + static std::unique_ptr<RowGroupMetaDataBuilder> Make(int64_t num_rows, const std::shared_ptr<WriterProperties>& props, const SchemaDescriptor* schema_, uint8_t* contents); ~RowGroupMetaDataBuilder(); ColumnChunkMetaDataBuilder* NextColumnChunk(); + int num_columns(); // commit the metadata - void Finish(int64_t num_rows); + void Finish(int64_t total_bytes_written); private: - explicit RowGroupMetaDataBuilder(const std::shared_ptr<WriterProperties>& props, - const SchemaDescriptor* schema_, uint8_t* contents); + explicit RowGroupMetaDataBuilder(int64_t num_rows, + const std::shared_ptr<WriterProperties>& props, const SchemaDescriptor* schema_, + uint8_t* contents); // PIMPL Idiom class RowGroupMetaDataBuilderImpl; std::unique_ptr<RowGroupMetaDataBuilderImpl> impl_; @@ -188,7 +191,7 @@ class PARQUET_EXPORT FileMetaDataBuilder { ~FileMetaDataBuilder(); - RowGroupMetaDataBuilder* AppendRowGroup(); + RowGroupMetaDataBuilder* AppendRowGroup(int64_t num_rows); // commit the metadata std::unique_ptr<FileMetaData> Finish(); http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/55604b29/src/parquet/file/reader.cc ---------------------------------------------------------------------- diff --git a/src/parquet/file/reader.cc b/src/parquet/file/reader.cc index b593ea0..7cf3f1a 100644 --- a/src/parquet/file/reader.cc +++ b/src/parquet/file/reader.cc @@ -126,7 +126,7 @@ void ParquetFileReader::DebugPrint( stream << "Total rows: " << file_metadata->num_rows() << "\n"; stream << "Number of RowGroups: " << file_metadata->num_row_groups() << "\n"; stream << "Number of Real Columns: " - << file_metadata->schema_descriptor()->group()->field_count() << "\n"; + << file_metadata->schema()->group_node()->field_count() << "\n"; if (selected_columns.size() == 0) { for (int i = 0; i < file_metadata->num_columns(); i++) { @@ -143,7 +143,7 @@ void ParquetFileReader::DebugPrint( stream << "Number of Columns: " << file_metadata->num_columns() << "\n"; stream << "Number of Selected Columns: " << selected_columns.size() << "\n"; for (auto i : selected_columns) { - const ColumnDescriptor* descr = file_metadata->schema_descriptor()->Column(i); + const ColumnDescriptor* descr = file_metadata->schema()->Column(i); stream << "Column " << i << ": " << descr->name() << " (" << TypeToString(descr->physical_type()) << ")" << std::endl; } @@ -162,7 +162,7 @@ void ParquetFileReader::DebugPrint( auto column_chunk = group_metadata->ColumnChunk(i); const ColumnStatistics stats = column_chunk->statistics(); - const ColumnDescriptor* descr = file_metadata->schema_descriptor()->Column(i); + const ColumnDescriptor* descr = file_metadata->schema()->Column(i); stream << "Column " << i << std::endl << ", values: " << column_chunk->num_values(); if (column_chunk->is_stats_set()) { stream << ", null values: " << stats.null_count @@ -201,7 +201,7 @@ void ParquetFileReader::DebugPrint( std::string fmt = ss.str(); snprintf(buffer, bufsize, fmt.c_str(), - file_metadata->schema_descriptor()->Column(i)->name().c_str()); + file_metadata->schema()->Column(i)->name().c_str()); stream << buffer; // This is OK in this method as long as the RowGroupReader does not get http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/55604b29/src/parquet/file/writer-internal.cc ---------------------------------------------------------------------- diff --git a/src/parquet/file/writer-internal.cc b/src/parquet/file/writer-internal.cc index f07b44b..fb05f13 100644 --- a/src/parquet/file/writer-internal.cc +++ b/src/parquet/file/writer-internal.cc @@ -34,25 +34,23 @@ static constexpr uint8_t PARQUET_MAGIC[4] = {'P', 'A', 'R', '1'}; // SerializedPageWriter SerializedPageWriter::SerializedPageWriter(OutputStream* sink, Compression::type codec, - format::ColumnChunk* metadata, MemoryAllocator* allocator) + ColumnChunkMetaDataBuilder* metadata, MemoryAllocator* allocator) : sink_(sink), metadata_(metadata), - // allocator_(allocator), + num_values_(0), + dictionary_page_offset_(0), + data_page_offset_(0), + total_uncompressed_size_(0), + total_compressed_size_(0), compression_buffer_(std::make_shared<OwnedMutableBuffer>(0, allocator)) { compressor_ = Codec::Create(codec); - // Currently we directly start with the data page - metadata_->meta_data.__set_data_page_offset(sink_->Tell()); - metadata_->meta_data.__set_codec(ToThrift(codec)); } -void SerializedPageWriter::Close() {} - -void SerializedPageWriter::AddEncoding(Encoding::type encoding) { - auto it = std::find(metadata_->meta_data.encodings.begin(), - metadata_->meta_data.encodings.end(), ToThrift(encoding)); - if (it != metadata_->meta_data.encodings.end()) { - metadata_->meta_data.encodings.push_back(ToThrift(encoding)); - } +void SerializedPageWriter::Close() { + // index_page_offset = 0 since they are not supported + // TODO: Remove default fallback = 'false' when implemented + metadata_->Finish(num_values_, dictionary_page_offset_, 0, data_page_offset_, + total_compressed_size_, total_uncompressed_size_, false); } std::shared_ptr<Buffer> SerializedPageWriter::Compress( @@ -91,13 +89,14 @@ int64_t SerializedPageWriter::WriteDataPage(const DataPage& page) { // TODO(PARQUET-594) crc checksum int64_t start_pos = sink_->Tell(); + if (data_page_offset_ == 0) { data_page_offset_ = start_pos; } SerializeThriftMsg(&page_header, sizeof(format::PageHeader), sink_); int64_t header_size = sink_->Tell() - start_pos; sink_->Write(compressed_data->data(), compressed_data->size()); - metadata_->meta_data.total_uncompressed_size += uncompressed_size + header_size; - metadata_->meta_data.total_compressed_size += compressed_data->size() + header_size; - metadata_->meta_data.num_values += page.num_values(); + total_uncompressed_size_ += uncompressed_size + header_size; + total_compressed_size_ += compressed_data->size() + header_size; + num_values_ += page.num_values(); return sink_->Tell() - start_pos; } @@ -119,12 +118,13 @@ int64_t SerializedPageWriter::WriteDictionaryPage(const DictionaryPage& page) { // TODO(PARQUET-594) crc checksum int64_t start_pos = sink_->Tell(); + if (dictionary_page_offset_ == 0) { dictionary_page_offset_ = start_pos; } SerializeThriftMsg(&page_header, sizeof(format::PageHeader), sink_); int64_t header_size = sink_->Tell() - start_pos; sink_->Write(compressed_data->data(), compressed_data->size()); - metadata_->meta_data.total_uncompressed_size += uncompressed_size + header_size; - metadata_->meta_data.total_compressed_size += compressed_data->size() + header_size; + total_uncompressed_size_ += uncompressed_size + header_size; + total_compressed_size_ += compressed_data->size() + header_size; return sink_->Tell() - start_pos; } @@ -133,50 +133,38 @@ int64_t SerializedPageWriter::WriteDictionaryPage(const DictionaryPage& page) { // RowGroupSerializer int RowGroupSerializer::num_columns() const { - return schema_->num_columns(); + return metadata_->num_columns(); } int64_t RowGroupSerializer::num_rows() const { return num_rows_; } -const SchemaDescriptor* RowGroupSerializer::schema() const { - return schema_; -} - ColumnWriter* RowGroupSerializer::NextColumn() { - if (current_column_index_ == schema_->num_columns() - 1) { - throw ParquetException("All columns have already been written."); - } - current_column_index_++; + // Throws an error if more columns are being written + auto col_meta = metadata_->NextColumnChunk(); - if (current_column_writer_) { total_bytes_written_ += current_column_writer_->Close(); } + if (current_column_writer_) { total_bytes_written_ = current_column_writer_->Close(); } - const ColumnDescriptor* column_descr = schema_->Column(current_column_index_); - format::ColumnChunk* col_meta = &metadata_->columns[current_column_index_]; - col_meta->__isset.meta_data = true; - col_meta->meta_data.__set_type(ToThrift(column_descr->physical_type())); - col_meta->meta_data.__set_path_in_schema(column_descr->path()->ToDotVector()); - std::unique_ptr<PageWriter> pager(new SerializedPageWriter( - sink_, properties_->compression(column_descr->path()), col_meta, allocator_)); + const ColumnDescriptor* column_descr = col_meta->descr(); + std::unique_ptr<PageWriter> pager( + new SerializedPageWriter(sink_, properties_->compression(column_descr->path()), + col_meta, properties_->allocator())); current_column_writer_ = - ColumnWriter::Make(column_descr, std::move(pager), num_rows_, properties_); + ColumnWriter::Make(col_meta->descr(), std::move(pager), num_rows_, properties_); return current_column_writer_.get(); } void RowGroupSerializer::Close() { if (!closed_) { closed_ = true; - if (current_column_index_ != schema_->num_columns() - 1) { - throw ParquetException("Not all column were written in the current rowgroup."); - } if (current_column_writer_) { - total_bytes_written_ += current_column_writer_->Close(); + total_bytes_written_ = current_column_writer_->Close(); current_column_writer_.reset(); } - - metadata_->__set_total_byte_size(total_bytes_written_); + // Ensures all columns have been written + metadata_->Finish(total_bytes_written_); } } @@ -225,13 +213,10 @@ RowGroupWriter* FileSerializer::AppendRowGroup(int64_t num_rows) { if (row_group_writer_) { row_group_writer_->Close(); } num_rows_ += num_rows; num_row_groups_++; - - auto rgm_size = row_group_metadata_.size(); - row_group_metadata_.resize(rgm_size + 1); - format::RowGroup* rg_metadata = &row_group_metadata_.data()[rgm_size]; - std::unique_ptr<RowGroupWriter::Contents> contents(new RowGroupSerializer( - num_rows, &schema_, sink_.get(), rg_metadata, properties_.get())); - row_group_writer_.reset(new RowGroupWriter(std::move(contents), allocator_)); + auto rg_metadata = metadata_->AppendRowGroup(num_rows); + std::unique_ptr<RowGroupWriter::Contents> contents( + new RowGroupSerializer(num_rows, sink_.get(), rg_metadata, properties_.get())); + row_group_writer_.reset(new RowGroupWriter(std::move(contents))); return row_group_writer_.get(); } @@ -243,19 +228,9 @@ void FileSerializer::WriteMetaData() { // Write MetaData uint32_t metadata_len = sink_->Tell(); - SchemaFlattener flattener( - static_cast<GroupNode*>(schema_.schema().get()), &metadata_.schema); - flattener.Flatten(); - - // TODO: Currently we only write version 1 files - metadata_.__set_version(1); - metadata_.__set_num_rows(num_rows_); - metadata_.__set_row_groups(row_group_metadata_); - // TODO(PARQUET-595) Support key_value_metadata - // TODO(PARQUET-590) Get from WriterProperties - metadata_.__set_created_by("parquet-cpp"); - - SerializeThriftMsg(&metadata_, 1024, sink_.get()); + // Get a FileMetaData + auto metadata = metadata_->Finish(); + metadata->WriteTo(sink_.get()); metadata_len = sink_->Tell() - metadata_len; // Write Footer @@ -267,12 +242,12 @@ FileSerializer::FileSerializer(std::shared_ptr<OutputStream> sink, const std::shared_ptr<GroupNode>& schema, const std::shared_ptr<WriterProperties>& properties) : sink_(sink), - allocator_(properties->allocator()), - num_row_groups_(0), - num_rows_(0), is_open_(true), - properties_(properties) { + properties_(properties), + num_row_groups_(0), + num_rows_(0) { schema_.Init(schema); + metadata_ = FileMetaDataBuilder::Make(&schema_, properties); StartFile(); } http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/55604b29/src/parquet/file/writer-internal.h ---------------------------------------------------------------------- diff --git a/src/parquet/file/writer-internal.h b/src/parquet/file/writer-internal.h index 7c46408..645d4bf 100644 --- a/src/parquet/file/writer-internal.h +++ b/src/parquet/file/writer-internal.h @@ -23,6 +23,7 @@ #include "parquet/column/page.h" #include "parquet/compression/codec.h" +#include "parquet/file/metadata.h" #include "parquet/file/writer.h" #include "parquet/thrift/parquet_types.h" @@ -31,11 +32,11 @@ namespace parquet { // This subclass delimits pages appearing in a serialized stream, each preceded // by a serialized Thrift format::PageHeader indicating the type of each page // and the page metadata. -// class SerializedPageWriter : public PageWriter { public: SerializedPageWriter(OutputStream* sink, Compression::type codec, - format::ColumnChunk* metadata, MemoryAllocator* allocator = default_allocator()); + ColumnChunkMetaDataBuilder* metadata, + MemoryAllocator* allocator = default_allocator()); virtual ~SerializedPageWriter() {} @@ -47,14 +48,17 @@ class SerializedPageWriter : public PageWriter { private: OutputStream* sink_; - format::ColumnChunk* metadata_; - // MemoryAllocator* allocator_; + ColumnChunkMetaDataBuilder* metadata_; + int64_t num_values_; + int64_t dictionary_page_offset_; + int64_t data_page_offset_; + int64_t total_uncompressed_size_; + int64_t total_compressed_size_; // Compression codec to use. std::unique_ptr<Codec> compressor_; std::shared_ptr<OwnedMutableBuffer> compression_buffer_; - void AddEncoding(Encoding::type encoding); /** * Compress a buffer. * @@ -67,24 +71,17 @@ class SerializedPageWriter : public PageWriter { // RowGroupWriter::Contents implementation for the Parquet file specification class RowGroupSerializer : public RowGroupWriter::Contents { public: - RowGroupSerializer(int64_t num_rows, const SchemaDescriptor* schema, OutputStream* sink, - format::RowGroup* metadata, const WriterProperties* properties) + RowGroupSerializer(int64_t num_rows, OutputStream* sink, + RowGroupMetaDataBuilder* metadata, const WriterProperties* properties) : num_rows_(num_rows), - schema_(schema), sink_(sink), metadata_(metadata), - allocator_(properties->allocator()), properties_(properties), total_bytes_written_(0), - closed_(false), - current_column_index_(-1) { - metadata_->__set_num_rows(num_rows_); - metadata_->columns.resize(schema->num_columns()); - } + closed_(false) {} int num_columns() const override; int64_t num_rows() const override; - const SchemaDescriptor* schema() const override; // TODO: PARQUET-579 // void WriteRowGroupStatitics() override; @@ -94,15 +91,12 @@ class RowGroupSerializer : public RowGroupWriter::Contents { private: int64_t num_rows_; - const SchemaDescriptor* schema_; OutputStream* sink_; - format::RowGroup* metadata_; - MemoryAllocator* allocator_; + RowGroupMetaDataBuilder* metadata_; const WriterProperties* properties_; int64_t total_bytes_written_; bool closed_; - int64_t current_column_index_; std::shared_ptr<ColumnWriter> current_column_writer_; }; @@ -134,14 +128,12 @@ class FileSerializer : public ParquetFileWriter::Contents { const std::shared_ptr<WriterProperties>& properties); std::shared_ptr<OutputStream> sink_; - format::FileMetaData metadata_; - std::vector<format::RowGroup> row_group_metadata_; - MemoryAllocator* allocator_; - int num_row_groups_; - int num_rows_; bool is_open_; + const std::shared_ptr<WriterProperties> properties_; + int num_row_groups_; + int64_t num_rows_; + std::unique_ptr<FileMetaDataBuilder> metadata_; std::unique_ptr<RowGroupWriter> row_group_writer_; - std::shared_ptr<WriterProperties> properties_; void StartFile(); void WriteMetaData(); http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/55604b29/src/parquet/file/writer.cc ---------------------------------------------------------------------- diff --git a/src/parquet/file/writer.cc b/src/parquet/file/writer.cc index 269b1cb..8c9f52f 100644 --- a/src/parquet/file/writer.cc +++ b/src/parquet/file/writer.cc @@ -27,11 +27,8 @@ namespace parquet { // ---------------------------------------------------------------------- // RowGroupWriter public API -RowGroupWriter::RowGroupWriter( - std::unique_ptr<Contents> contents, MemoryAllocator* allocator) - : contents_(std::move(contents)), allocator_(allocator) { - schema_ = contents_->schema(); -} +RowGroupWriter::RowGroupWriter(std::unique_ptr<Contents> contents) + : contents_(std::move(contents)) {} void RowGroupWriter::Close() { if (contents_) { @@ -64,9 +61,16 @@ std::unique_ptr<ParquetFileWriter> ParquetFileWriter::Open( return result; } +const SchemaDescriptor* ParquetFileWriter::schema() const { + return contents_->schema(); +} + +const ColumnDescriptor* ParquetFileWriter::descr(int i) const { + return contents_->schema()->Column(i); +} + void ParquetFileWriter::Open(std::unique_ptr<ParquetFileWriter::Contents> contents) { contents_ = std::move(contents); - schema_ = contents_->schema(); } void ParquetFileWriter::Close() { http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/55604b29/src/parquet/file/writer.h ---------------------------------------------------------------------- diff --git a/src/parquet/file/writer.h b/src/parquet/file/writer.h index 0e64961..422f008 100644 --- a/src/parquet/file/writer.h +++ b/src/parquet/file/writer.h @@ -46,12 +46,9 @@ class PARQUET_EXPORT RowGroupWriter { // virtual void WriteRowGroupStatitics(); virtual ColumnWriter* NextColumn() = 0; virtual void Close() = 0; - - // Return const-pointer to make it clear that this object is not to be copied - virtual const SchemaDescriptor* schema() const = 0; }; - RowGroupWriter(std::unique_ptr<Contents> contents, MemoryAllocator* allocator); + explicit RowGroupWriter(std::unique_ptr<Contents> contents); /** * Construct a ColumnWriter for the indicated row group-relative column. @@ -75,13 +72,8 @@ class PARQUET_EXPORT RowGroupWriter { // virtual void WriteRowGroupStatitics(); private: - // Owned by the parent ParquetFileWriter - const SchemaDescriptor* schema_; - // Holds a pointer to an instance of Contents implementation std::unique_ptr<Contents> contents_; - - MemoryAllocator* allocator_; }; class PARQUET_EXPORT ParquetFileWriter { @@ -102,7 +94,7 @@ class PARQUET_EXPORT ParquetFileWriter { virtual const std::shared_ptr<WriterProperties>& properties() const = 0; - // Return const-poitner to make it clear that this object is not to be copied + // Return const-pointer to make it clear that this object is not to be copied const SchemaDescriptor* schema() const { return &schema_; } SchemaDescriptor schema_; }; @@ -153,18 +145,18 @@ class PARQUET_EXPORT ParquetFileWriter { const std::shared_ptr<WriterProperties>& properties() const; /** - * Returns the file schema descriptor - */ - const SchemaDescriptor* descr() { return schema_; } + * Returns the file schema descriptor + */ + const SchemaDescriptor* schema() const; - const ColumnDescriptor* column_schema(int i) const { return schema_->Column(i); } + /** + * Returns a column descriptor in schema + */ + const ColumnDescriptor* descr(int i) const; private: // Holds a pointer to an instance of Contents implementation std::unique_ptr<Contents> contents_; - - // The SchemaDescriptor is provided by the Contents impl - const SchemaDescriptor* schema_; }; } // namespace parquet http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/55604b29/src/parquet/schema/descriptor.cc ---------------------------------------------------------------------- diff --git a/src/parquet/schema/descriptor.cc b/src/parquet/schema/descriptor.cc index de63e5e..4d46204 100644 --- a/src/parquet/schema/descriptor.cc +++ b/src/parquet/schema/descriptor.cc @@ -39,11 +39,11 @@ void SchemaDescriptor::Init(const NodePtr& schema) { throw ParquetException("Must initialize with a schema group"); } - group_ = static_cast<const GroupNode*>(schema_.get()); + group_node_ = static_cast<const GroupNode*>(schema_.get()); leaves_.clear(); - for (int i = 0; i < group_->field_count(); ++i) { - BuildTree(group_->field(i), 0, 0, group_->field(i)); + for (int i = 0; i < group_node_->field_count(); ++i) { + BuildTree(group_node_->field(i), 0, 0, group_node_->field(i)); } } http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/55604b29/src/parquet/schema/descriptor.h ---------------------------------------------------------------------- diff --git a/src/parquet/schema/descriptor.h b/src/parquet/schema/descriptor.h index 51f3503..c591954 100644 --- a/src/parquet/schema/descriptor.h +++ b/src/parquet/schema/descriptor.h @@ -100,18 +100,20 @@ class PARQUET_EXPORT SchemaDescriptor { // The number of physical columns appearing in the file int num_columns() const { return leaves_.size(); } - const schema::NodePtr& schema() const { return schema_; } + const schema::NodePtr& schema_root() const { return schema_; } - const schema::GroupNode* group() const { return group_; } + const schema::GroupNode* group_node() const { return group_node_; } // Returns the root (child of the schema root) node of the leaf(column) node const schema::NodePtr& GetColumnRoot(int i) const; + const std::string& name() const { return group_node_->name(); } + private: friend class ColumnDescriptor; schema::NodePtr schema_; - const schema::GroupNode* group_; + const schema::GroupNode* group_node_; void BuildTree(const schema::NodePtr& node, int16_t max_def_level, int16_t max_rep_level, const schema::NodePtr& base); http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/55604b29/src/parquet/schema/schema-descriptor-test.cc ---------------------------------------------------------------------- diff --git a/src/parquet/schema/schema-descriptor-test.cc b/src/parquet/schema/schema-descriptor-test.cc index d88cd0d..5be8db7 100644 --- a/src/parquet/schema/schema-descriptor-test.cc +++ b/src/parquet/schema/schema-descriptor-test.cc @@ -128,7 +128,7 @@ TEST_F(TestSchemaDescriptor, BuildTree) { ASSERT_EQ(bag.get(), descr_.GetColumnRoot(4).get()); ASSERT_EQ(bag.get(), descr_.GetColumnRoot(5).get()); - ASSERT_EQ(schema.get(), descr_.group()); + ASSERT_EQ(schema.get(), descr_.group_node()); // Init clears the leaves descr_.Init(schema); http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/55604b29/tools/parquet-dump-schema.cc ---------------------------------------------------------------------- diff --git a/tools/parquet-dump-schema.cc b/tools/parquet-dump-schema.cc index deef2fd..1e0239b 100644 --- a/tools/parquet-dump-schema.cc +++ b/tools/parquet-dump-schema.cc @@ -26,7 +26,7 @@ int main(int argc, char** argv) { try { std::unique_ptr<parquet::ParquetFileReader> reader = parquet::ParquetFileReader::OpenFile(filename); - PrintSchema(reader->metadata()->schema_descriptor()->schema().get(), std::cout); + PrintSchema(reader->metadata()->schema()->schema_root().get(), std::cout); } catch (const std::exception& e) { std::cerr << "Parquet error: " << e.what() << std::endl; return -1;
