Repository: parquet-cpp Updated Branches: refs/heads/master f9ff60797 -> 248094206
PARQUET-691: Write ColumnChunk metadata after chunk is complete See corresponding logic in Impala's Parquet implementation: https://github.com/apache/incubator-impala/blob/master/be/src/exec/hdfs-parquet-table-writer.cc#L973 Author: Wes McKinney <[email protected]> Closes #224 from wesm/PARQUET-691 and squashes the following commits: 0387d43 [Wes McKinney] Write ColumnChunk metadata after chunk is complete Project: http://git-wip-us.apache.org/repos/asf/parquet-cpp/repo Commit: http://git-wip-us.apache.org/repos/asf/parquet-cpp/commit/24809420 Tree: http://git-wip-us.apache.org/repos/asf/parquet-cpp/tree/24809420 Diff: http://git-wip-us.apache.org/repos/asf/parquet-cpp/diff/24809420 Branch: refs/heads/master Commit: 2480942067bf078220745a733a65df55c572790a Parents: f9ff607 Author: Wes McKinney <[email protected]> Authored: Tue Jan 24 15:26:14 2017 -0500 Committer: Wes McKinney <[email protected]> Committed: Tue Jan 24 15:26:14 2017 -0500 ---------------------------------------------------------------------- src/parquet/file/metadata.cc | 8 ++++++++ src/parquet/file/metadata.h | 3 +++ src/parquet/file/writer-internal.cc | 12 ++++++++---- src/parquet/file/writer-internal.h | 3 --- src/parquet/thrift/util.h | 3 ++- 5 files changed, 21 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/24809420/src/parquet/file/metadata.cc ---------------------------------------------------------------------- diff --git a/src/parquet/file/metadata.cc b/src/parquet/file/metadata.cc index a262b63..1545efe 100644 --- a/src/parquet/file/metadata.cc +++ b/src/parquet/file/metadata.cc @@ -501,6 +501,10 @@ class ColumnChunkMetaDataBuilder::ColumnChunkMetaDataBuilderImpl { column_chunk_->meta_data.__set_encodings(thrift_encodings); } + void WriteTo(OutputStream* sink) { + SerializeThriftMsg(column_chunk_, sizeof(format::ColumnChunk), sink); + } + const ColumnDescriptor* descr() const { return column_; } private: @@ -536,6 +540,10 @@ void ColumnChunkMetaDataBuilder::Finish(int64_t num_values, compressed_size, uncompressed_size, has_dictionary, dictionary_fallback); } +void ColumnChunkMetaDataBuilder::WriteTo(OutputStream* sink) { + impl_->WriteTo(sink); +} + const ColumnDescriptor* ColumnChunkMetaDataBuilder::descr() const { return impl_->descr(); } http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/24809420/src/parquet/file/metadata.h ---------------------------------------------------------------------- diff --git a/src/parquet/file/metadata.h b/src/parquet/file/metadata.h index 942aa39..7307ddf 100644 --- a/src/parquet/file/metadata.h +++ b/src/parquet/file/metadata.h @@ -174,6 +174,9 @@ class PARQUET_EXPORT ColumnChunkMetaDataBuilder { int64_t index_page_offset, int64_t data_page_offset, int64_t compressed_size, int64_t uncompressed_size, bool has_dictionary, bool dictionary_fallback); + // For writing metadata at end of column chunk + void WriteTo(OutputStream* sink); + private: explicit ColumnChunkMetaDataBuilder(const std::shared_ptr<WriterProperties>& props, const ColumnDescriptor* column, uint8_t* contents); http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/24809420/src/parquet/file/writer-internal.cc ---------------------------------------------------------------------- diff --git a/src/parquet/file/writer-internal.cc b/src/parquet/file/writer-internal.cc index 48884ad..724635c 100644 --- a/src/parquet/file/writer-internal.cc +++ b/src/parquet/file/writer-internal.cc @@ -62,6 +62,9 @@ void SerializedPageWriter::Close(bool has_dictionary, bool fallback) { // TODO: Remove default fallback = 'false' when implemented metadata_->Finish(num_values_, dictionary_page_offset_, 0, data_page_offset_, total_compressed_size_, total_uncompressed_size_, has_dictionary, fallback); + + // Write metadata at end of column chunk + metadata_->WriteTo(sink_); } std::shared_ptr<Buffer> SerializedPageWriter::Compress( @@ -104,8 +107,9 @@ int64_t SerializedPageWriter::WriteDataPage(const CompressedDataPage& page) { int64_t start_pos = sink_->Tell(); if (data_page_offset_ == 0) { data_page_offset_ = start_pos; } - SerializeThriftMsg(&page_header, sizeof(format::PageHeader), sink_); - int64_t header_size = sink_->Tell() - start_pos; + + int64_t header_size = + SerializeThriftMsg(&page_header, sizeof(format::PageHeader), sink_); sink_->Write(compressed_data->data(), compressed_data->size()); total_uncompressed_size_ += uncompressed_size + header_size; @@ -133,8 +137,8 @@ int64_t SerializedPageWriter::WriteDictionaryPage(const DictionaryPage& page) { int64_t start_pos = sink_->Tell(); if (dictionary_page_offset_ == 0) { dictionary_page_offset_ = start_pos; } - SerializeThriftMsg(&page_header, sizeof(format::PageHeader), sink_); - int64_t header_size = sink_->Tell() - start_pos; + int64_t header_size = + SerializeThriftMsg(&page_header, sizeof(format::PageHeader), sink_); sink_->Write(compressed_data->data(), compressed_data->size()); total_uncompressed_size_ += uncompressed_size + header_size; http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/24809420/src/parquet/file/writer-internal.h ---------------------------------------------------------------------- diff --git a/src/parquet/file/writer-internal.h b/src/parquet/file/writer-internal.h index 81a0837..12002e1 100644 --- a/src/parquet/file/writer-internal.h +++ b/src/parquet/file/writer-internal.h @@ -81,9 +81,6 @@ class RowGroupSerializer : public RowGroupWriter::Contents { int num_columns() const override; int64_t num_rows() const override; - // TODO: PARQUET-579 - // void WriteRowGroupStatitics() override; - ColumnWriter* NextColumn() override; void Close() override; http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/24809420/src/parquet/thrift/util.h ---------------------------------------------------------------------- diff --git a/src/parquet/thrift/util.h b/src/parquet/thrift/util.h index 9d2b66f..30d7edf 100644 --- a/src/parquet/thrift/util.h +++ b/src/parquet/thrift/util.h @@ -118,7 +118,7 @@ inline void DeserializeThriftMsg(const uint8_t* buf, uint32_t* len, T* deseriali // The arguments are the object to be serialized and // the expected size of the serialized object template <class T> -inline void SerializeThriftMsg(T* obj, uint32_t len, OutputStream* out) { +inline int64_t SerializeThriftMsg(T* obj, uint32_t len, OutputStream* out) { boost::shared_ptr<apache::thrift::transport::TMemoryBuffer> mem_buffer( new apache::thrift::transport::TMemoryBuffer(len)); apache::thrift::protocol::TCompactProtocolFactoryT< @@ -139,6 +139,7 @@ inline void SerializeThriftMsg(T* obj, uint32_t len, OutputStream* out) { uint32_t out_length; mem_buffer->getBuffer(&out_buffer, &out_length); out->Write(out_buffer, out_length); + return out_length; } } // namespace parquet
