Repository: parquet-cpp Updated Branches: refs/heads/master 942f2aedb -> ffeb828ac
PARQUET-689: C++: Compress DataPages eagerly Author: Deepak Majeti <deepak.maj...@hpe.com> Closes #162 from majetideepak/PARQUET-689 and squashes the following commits: 46f04fb [Deepak Majeti] Clang format 73dfcf9 [Deepak Majeti] Compress Data Pages early Project: http://git-wip-us.apache.org/repos/asf/parquet-cpp/repo Commit: http://git-wip-us.apache.org/repos/asf/parquet-cpp/commit/ffeb828a Tree: http://git-wip-us.apache.org/repos/asf/parquet-cpp/tree/ffeb828a Diff: http://git-wip-us.apache.org/repos/asf/parquet-cpp/diff/ffeb828a Branch: refs/heads/master Commit: ffeb828ac5bf19abe5990a2be9245a8fdd292c7a Parents: 942f2ae Author: Deepak Majeti <deepak.maj...@hpe.com> Authored: Fri Sep 16 09:01:00 2016 +0200 Committer: Uwe L. Korn <uw...@xhochy.com> Committed: Fri Sep 16 09:01:00 2016 +0200 ---------------------------------------------------------------------- src/parquet/column/page.h | 19 ++++++++++++++++++- src/parquet/column/writer.cc | 8 +++++--- src/parquet/column/writer.h | 4 ++-- src/parquet/file/writer-internal.cc | 6 +++--- src/parquet/file/writer-internal.h | 18 +++++++++--------- 5 files changed, 37 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/ffeb828a/src/parquet/column/page.h ---------------------------------------------------------------------- diff --git a/src/parquet/column/page.h b/src/parquet/column/page.h index c06d3de..1de6013 100644 --- a/src/parquet/column/page.h +++ b/src/parquet/column/page.h @@ -95,6 +95,21 @@ class DataPage : public Page { std::string min_; }; +class CompressedDataPage : public DataPage { + public: + CompressedDataPage(const std::shared_ptr<Buffer>& buffer, int32_t num_values, + Encoding::type encoding, Encoding::type definition_level_encoding, + Encoding::type repetition_level_encoding, int64_t uncompressed_size) + : DataPage(buffer, num_values, encoding, definition_level_encoding, + repetition_level_encoding), + uncompressed_size_(uncompressed_size) {} + + int64_t uncompressed_size() const { return uncompressed_size_; } + + private: + int64_t uncompressed_size_; +}; + class DataPageV2 : public Page { public: DataPageV2(const std::shared_ptr<Buffer>& buffer, int32_t num_values, int32_t num_nulls, @@ -176,9 +191,11 @@ class PageWriter { // page limit virtual void Close(bool has_dictionary, bool fallback) = 0; - virtual int64_t WriteDataPage(const DataPage& page) = 0; + virtual int64_t WriteDataPage(const CompressedDataPage& page) = 0; virtual int64_t WriteDictionaryPage(const DictionaryPage& page) = 0; + + virtual std::shared_ptr<Buffer> Compress(const std::shared_ptr<Buffer>& buffer) = 0; }; } // namespace parquet http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/ffeb828a/src/parquet/column/writer.cc ---------------------------------------------------------------------- diff --git a/src/parquet/column/writer.cc b/src/parquet/column/writer.cc index 1fbea62..bfbd0c5 100644 --- a/src/parquet/column/writer.cc +++ b/src/parquet/column/writer.cc @@ -116,8 +116,10 @@ void ColumnWriter::AddDataPage() { memcpy(uncompressed_ptr, definition_levels->data(), definition_levels->size()); uncompressed_ptr += definition_levels->size(); memcpy(uncompressed_ptr, values->data(), values->size()); - DataPage page( - uncompressed_data, num_buffered_values_, encoding_, Encoding::RLE, Encoding::RLE); + + std::shared_ptr<Buffer> compressed_data = pager_->Compress(uncompressed_data); + CompressedDataPage page(compressed_data, num_buffered_values_, encoding_, Encoding::RLE, + Encoding::RLE, uncompressed_size); // Write the page to OutputStream eagerly if there is no dictionary or // if dictionary encoding has fallen back to PLAIN @@ -133,7 +135,7 @@ void ColumnWriter::AddDataPage() { num_buffered_encoded_values_ = 0; } -void ColumnWriter::WriteDataPage(const DataPage& page) { +void ColumnWriter::WriteDataPage(const CompressedDataPage& page) { int64_t bytes_written = pager_->WriteDataPage(page); total_bytes_written_ += bytes_written; } http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/ffeb828a/src/parquet/column/writer.h ---------------------------------------------------------------------- diff --git a/src/parquet/column/writer.h b/src/parquet/column/writer.h index 4b2a021..3a54cbb 100644 --- a/src/parquet/column/writer.h +++ b/src/parquet/column/writer.h @@ -72,7 +72,7 @@ class PARQUET_EXPORT ColumnWriter { void AddDataPage(); // Serializes Data Pages - void WriteDataPage(const DataPage& page); + void WriteDataPage(const CompressedDataPage& page); // Write multiple definition levels void WriteDefinitionLevels(int64_t num_levels, const int16_t* levels); @@ -128,7 +128,7 @@ class PARQUET_EXPORT ColumnWriter { std::unique_ptr<InMemoryOutputStream> definition_levels_sink_; std::unique_ptr<InMemoryOutputStream> repetition_levels_sink_; - std::vector<DataPage> data_pages_; + std::vector<CompressedDataPage> data_pages_; private: void InitSinks(); http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/ffeb828a/src/parquet/file/writer-internal.cc ---------------------------------------------------------------------- diff --git a/src/parquet/file/writer-internal.cc b/src/parquet/file/writer-internal.cc index 2d396b7..05aefb9 100644 --- a/src/parquet/file/writer-internal.cc +++ b/src/parquet/file/writer-internal.cc @@ -66,9 +66,9 @@ std::shared_ptr<Buffer> SerializedPageWriter::Compress( return compression_buffer_; } -int64_t SerializedPageWriter::WriteDataPage(const DataPage& page) { - int64_t uncompressed_size = page.size(); - std::shared_ptr<Buffer> compressed_data = Compress(page.buffer()); +int64_t SerializedPageWriter::WriteDataPage(const CompressedDataPage& page) { + int64_t uncompressed_size = page.uncompressed_size(); + std::shared_ptr<Buffer> compressed_data = page.buffer(); format::DataPageHeader data_page_header; data_page_header.__set_num_values(page.num_values()); http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/ffeb828a/src/parquet/file/writer-internal.h ---------------------------------------------------------------------- diff --git a/src/parquet/file/writer-internal.h b/src/parquet/file/writer-internal.h index e6364e9..2095154 100644 --- a/src/parquet/file/writer-internal.h +++ b/src/parquet/file/writer-internal.h @@ -40,10 +40,18 @@ class SerializedPageWriter : public PageWriter { virtual ~SerializedPageWriter() {} - int64_t WriteDataPage(const DataPage& page) override; + int64_t WriteDataPage(const CompressedDataPage& page) override; int64_t WriteDictionaryPage(const DictionaryPage& page) override; + /** + * Compress a buffer. + * + * This method may return compression_buffer_ and thus the resulting memory + * is only valid until the next call to Compress(). + */ + std::shared_ptr<Buffer> Compress(const std::shared_ptr<Buffer>& buffer) override; + void Close(bool has_dictionary, bool fallback) override; private: @@ -58,14 +66,6 @@ class SerializedPageWriter : public PageWriter { // Compression codec to use. std::unique_ptr<Codec> compressor_; std::shared_ptr<OwnedMutableBuffer> compression_buffer_; - - /** - * Compress a buffer. - * - * This method may return compression_buffer_ and thus the resulting memory - * is only valid until the next call to Compress(). - */ - std::shared_ptr<Buffer> Compress(const std::shared_ptr<Buffer>& buffer); }; // RowGroupWriter::Contents implementation for the Parquet file specification