Repository: parquet-cpp Updated Branches: refs/heads/master a6520d34f -> d5a1aed72
PARQUET-909: Reduce buffer allocations (mallocs) on critical path Author: Deepak Majeti <[email protected]> Closes #268 from majetideepak/ReuseBuffers and squashes the following commits: bbf5453 [Deepak Majeti] Review comments 4d93d4b [Deepak Majeti] Improve example 3a3e2bb [Deepak Majeti] Fix Resize shrink_to_fit 53c8ac1 [Deepak Majeti] Improve API 22f422f [Deepak Majeti] clang format 8ae02cc [Deepak Majeti] optimize for uncompressed data 3190cef [Deepak Majeti] change fit_to_size of InMemoryOutputStream 03d4862 [Deepak Majeti] clang format 261aa1c [Deepak Majeti] Rewrite Compress API b09c4a8 [Deepak Majeti] Reuse uncompressed_data buffer 6b9a81b [Deepak Majeti] Clang fromat 31af602 [Deepak Majeti] Reuse levels rle buffer 82eabfb [Deepak Majeti] Re-use def and rep levels sink edc8e3c [Deepak Majeti] Add API to InMemoryOutputStream Project: http://git-wip-us.apache.org/repos/asf/parquet-cpp/repo Commit: http://git-wip-us.apache.org/repos/asf/parquet-cpp/commit/d5a1aed7 Tree: http://git-wip-us.apache.org/repos/asf/parquet-cpp/tree/d5a1aed7 Diff: http://git-wip-us.apache.org/repos/asf/parquet-cpp/diff/d5a1aed7 Branch: refs/heads/master Commit: d5a1aed72c18ba6047ca469fb46661ec7a44136b Parents: a6520d3 Author: Deepak Majeti <[email protected]> Authored: Fri Mar 17 18:07:59 2017 -0400 Committer: Wes McKinney <[email protected]> Committed: Fri Mar 17 18:07:59 2017 -0400 ---------------------------------------------------------------------- examples/reader-writer.cc | 8 ++- src/parquet/column/page.h | 4 +- src/parquet/column/statistics.h | 4 +- src/parquet/column/writer.cc | 90 +++++++++++++++++++++----------- src/parquet/column/writer.h | 11 +++- src/parquet/encoding-internal.h | 8 +-- src/parquet/file/reader-internal.cc | 2 +- src/parquet/file/writer-internal.cc | 31 ++++++----- src/parquet/file/writer-internal.h | 4 +- src/parquet/util/memory.h | 6 +++ 10 files changed, 114 insertions(+), 54 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/d5a1aed7/examples/reader-writer.cc ---------------------------------------------------------------------- diff --git a/examples/reader-writer.cc b/examples/reader-writer.cc index 59ee63b..54390e0 100644 --- a/examples/reader-writer.cc +++ b/examples/reader-writer.cc @@ -110,9 +110,14 @@ int main(int argc, char** argv) { // Setup the parquet schema std::shared_ptr<GroupNode> schema = SetupSchema(); + // Add writer properties + parquet::WriterProperties::Builder builder; + builder.compression(parquet::Compression::SNAPPY); + std::shared_ptr<parquet::WriterProperties> props = builder.build(); + // Create a ParquetFileWriter instance std::shared_ptr<parquet::ParquetFileWriter> file_writer = - parquet::ParquetFileWriter::Open(out_file, schema); + parquet::ParquetFileWriter::Open(out_file, schema, props); // Append a RowGroup with a specific number of rows. parquet::RowGroupWriter* rg_writer = @@ -225,6 +230,7 @@ int main(int argc, char** argv) { // Create a ParquetReader instance std::unique_ptr<parquet::ParquetFileReader> parquet_reader = parquet::ParquetFileReader::OpenFile(PARQUET_FILENAME, false); + // Get the File MetaData std::shared_ptr<parquet::FileMetaData> file_metadata = parquet_reader->metadata(); http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/d5a1aed7/src/parquet/column/page.h ---------------------------------------------------------------------- diff --git a/src/parquet/column/page.h b/src/parquet/column/page.h index 6670e7f..bca0ca4 100644 --- a/src/parquet/column/page.h +++ b/src/parquet/column/page.h @@ -191,7 +191,9 @@ class PageWriter { virtual int64_t WriteDictionaryPage(const DictionaryPage& page) = 0; - virtual std::shared_ptr<Buffer> Compress(const std::shared_ptr<Buffer>& buffer) = 0; + virtual bool has_compressor() = 0; + + virtual void Compress(const Buffer& src_buffer, ResizableBuffer* dest_buffer) = 0; }; } // namespace parquet http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/d5a1aed7/src/parquet/column/statistics.h ---------------------------------------------------------------------- diff --git a/src/parquet/column/statistics.h b/src/parquet/column/statistics.h index 509e587..6f12eb9 100644 --- a/src/parquet/column/statistics.h +++ b/src/parquet/column/statistics.h @@ -182,7 +182,7 @@ inline void TypedRowGroupStatistics<FLBAType>::Copy( const FLBA& src, FLBA* dst, PoolBuffer* buffer) { if (dst->ptr == src.ptr) return; uint32_t len = descr_->type_length(); - PARQUET_THROW_NOT_OK(buffer->Resize(len)); + PARQUET_THROW_NOT_OK(buffer->Resize(len, false)); std::memcpy(buffer->mutable_data(), src.ptr, len); *dst = FLBA(buffer->data()); } @@ -191,7 +191,7 @@ template <> inline void TypedRowGroupStatistics<ByteArrayType>::Copy( const ByteArray& src, ByteArray* dst, PoolBuffer* buffer) { if (dst->ptr == src.ptr) return; - PARQUET_THROW_NOT_OK(buffer->Resize(src.len)); + PARQUET_THROW_NOT_OK(buffer->Resize(src.len, false)); std::memcpy(buffer->mutable_data(), src.ptr, src.len); *dst = ByteArray(src.len, buffer->data()); } http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/d5a1aed7/src/parquet/column/writer.cc ---------------------------------------------------------------------- diff --git a/src/parquet/column/writer.cc b/src/parquet/column/writer.cc index fc0372f..2ba4162 100644 --- a/src/parquet/column/writer.cc +++ b/src/parquet/column/writer.cc @@ -52,12 +52,23 @@ ColumnWriter::ColumnWriter(ColumnChunkMetaDataBuilder* metadata, total_bytes_written_(0), closed_(false), fallback_(false) { - InitSinks(); + definition_levels_sink_.reset(new InMemoryOutputStream(allocator_)); + repetition_levels_sink_.reset(new InMemoryOutputStream(allocator_)); + definition_levels_rle_ = + std::static_pointer_cast<ResizableBuffer>(AllocateBuffer(allocator_, 0)); + repetition_levels_rle_ = + std::static_pointer_cast<ResizableBuffer>(AllocateBuffer(allocator_, 0)); + uncompressed_data_ = + std::static_pointer_cast<ResizableBuffer>(AllocateBuffer(allocator_, 0)); + if (pager_->has_compressor()) { + compressed_data_ = + std::static_pointer_cast<ResizableBuffer>(AllocateBuffer(allocator_, 0)); + } } void ColumnWriter::InitSinks() { - definition_levels_sink_.reset(new InMemoryOutputStream(allocator_)); - repetition_levels_sink_.reset(new InMemoryOutputStream(allocator_)); + definition_levels_sink_->Clear(); + repetition_levels_sink_->Clear(); } void ColumnWriter::WriteDefinitionLevels(int64_t num_levels, const int16_t* levels) { @@ -72,68 +83,87 @@ void ColumnWriter::WriteRepetitionLevels(int64_t num_levels, const int16_t* leve reinterpret_cast<const uint8_t*>(levels), sizeof(int16_t) * num_levels); } -std::shared_ptr<Buffer> ColumnWriter::RleEncodeLevels( - const std::shared_ptr<Buffer>& buffer, int16_t max_level) { +// return the size of the encoded buffer +int64_t ColumnWriter::RleEncodeLevels( + const Buffer& src_buffer, ResizableBuffer* dest_buffer, int16_t max_level) { // TODO: This only works with due to some RLE specifics int64_t rle_size = LevelEncoder::MaxBufferSize(Encoding::RLE, max_level, num_buffered_values_) + sizeof(int32_t); - std::shared_ptr<PoolBuffer> buffer_rle = AllocateBuffer(allocator_, rle_size); + + // Use Arrow::Buffer::shrink_to_fit = false + // underlying buffer only keeps growing. Resize to a smaller size does not reallocate. + PARQUET_THROW_NOT_OK(dest_buffer->Resize(rle_size, false)); + level_encoder_.Init(Encoding::RLE, max_level, num_buffered_values_, - buffer_rle->mutable_data() + sizeof(int32_t), buffer_rle->size() - sizeof(int32_t)); + dest_buffer->mutable_data() + sizeof(int32_t), + dest_buffer->size() - sizeof(int32_t)); int encoded = level_encoder_.Encode( - num_buffered_values_, reinterpret_cast<const int16_t*>(buffer->data())); + num_buffered_values_, reinterpret_cast<const int16_t*>(src_buffer.data())); DCHECK_EQ(encoded, num_buffered_values_); - reinterpret_cast<int32_t*>(buffer_rle->mutable_data())[0] = level_encoder_.len(); + reinterpret_cast<int32_t*>(dest_buffer->mutable_data())[0] = level_encoder_.len(); int64_t encoded_size = level_encoder_.len() + sizeof(int32_t); - DCHECK(rle_size >= encoded_size); - PARQUET_THROW_NOT_OK(buffer_rle->Resize(encoded_size)); - return std::static_pointer_cast<Buffer>(buffer_rle); + return encoded_size; } void ColumnWriter::AddDataPage() { - std::shared_ptr<Buffer> definition_levels = definition_levels_sink_->GetBuffer(); - std::shared_ptr<Buffer> repetition_levels = repetition_levels_sink_->GetBuffer(); + int64_t definition_levels_rle_size = 0; + int64_t repetition_levels_rle_size = 0; + std::shared_ptr<Buffer> values = GetValuesBuffer(); if (descr_->max_definition_level() > 0) { - definition_levels = - RleEncodeLevels(definition_levels, descr_->max_definition_level()); + definition_levels_rle_size = RleEncodeLevels(definition_levels_sink_->GetBufferRef(), + definition_levels_rle_.get(), descr_->max_definition_level()); } if (descr_->max_repetition_level() > 0) { - repetition_levels = - RleEncodeLevels(repetition_levels, descr_->max_repetition_level()); + repetition_levels_rle_size = RleEncodeLevels(repetition_levels_sink_->GetBufferRef(), + repetition_levels_rle_.get(), descr_->max_repetition_level()); } int64_t uncompressed_size = - definition_levels->size() + repetition_levels->size() + values->size(); + definition_levels_rle_size + repetition_levels_rle_size + values->size(); + + // Use Arrow::Buffer::shrink_to_fit = false + // underlying buffer only keeps growing. Resize to a smaller size does not reallocate. + PARQUET_THROW_NOT_OK(uncompressed_data_->Resize(uncompressed_size, false)); // Concatenate data into a single buffer - std::shared_ptr<PoolBuffer> uncompressed_data = - AllocateBuffer(allocator_, uncompressed_size); - uint8_t* uncompressed_ptr = uncompressed_data->mutable_data(); - memcpy(uncompressed_ptr, repetition_levels->data(), repetition_levels->size()); - uncompressed_ptr += repetition_levels->size(); - memcpy(uncompressed_ptr, definition_levels->data(), definition_levels->size()); - uncompressed_ptr += definition_levels->size(); + uint8_t* uncompressed_ptr = uncompressed_data_->mutable_data(); + memcpy(uncompressed_ptr, repetition_levels_rle_->data(), repetition_levels_rle_size); + uncompressed_ptr += repetition_levels_rle_size; + memcpy(uncompressed_ptr, definition_levels_rle_->data(), definition_levels_rle_size); + uncompressed_ptr += definition_levels_rle_size; memcpy(uncompressed_ptr, values->data(), values->size()); EncodedStatistics page_stats = GetPageStatistics(); ResetPageStatistics(); - std::shared_ptr<Buffer> compressed_data = pager_->Compress(uncompressed_data); - CompressedDataPage page(compressed_data, num_buffered_values_, encoding_, Encoding::RLE, - Encoding::RLE, uncompressed_size, page_stats); + + std::shared_ptr<Buffer> compressed_data; + if (pager_->has_compressor()) { + pager_->Compress(*(uncompressed_data_.get()), compressed_data_.get()); + compressed_data = compressed_data_; + } else { + compressed_data = uncompressed_data_; + } // Write the page to OutputStream eagerly if there is no dictionary or // if dictionary encoding has fallen back to PLAIN if (has_dictionary_ && !fallback_) { // Save pages until end of dictionary encoding + std::shared_ptr<Buffer> compressed_data_copy; + PARQUET_THROW_NOT_OK(compressed_data->Copy( + 0, compressed_data->size(), allocator_, &compressed_data_copy)); + CompressedDataPage page(compressed_data_copy, num_buffered_values_, encoding_, + Encoding::RLE, Encoding::RLE, uncompressed_size, page_stats); data_pages_.push_back(std::move(page)); } else { // Eagerly write pages + CompressedDataPage page(compressed_data, num_buffered_values_, encoding_, + Encoding::RLE, Encoding::RLE, uncompressed_size, page_stats); WriteDataPage(page); } - // Re-initialize the sinks as GetBuffer made them invalid. + // Re-initialize the sinks for next Page. InitSinks(); num_buffered_values_ = 0; num_buffered_encoded_values_ = 0; http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/d5a1aed7/src/parquet/column/writer.h ---------------------------------------------------------------------- diff --git a/src/parquet/column/writer.h b/src/parquet/column/writer.h index c91f261..305c35e 100644 --- a/src/parquet/column/writer.h +++ b/src/parquet/column/writer.h @@ -89,8 +89,9 @@ class PARQUET_EXPORT ColumnWriter { // Write multiple repetition levels void WriteRepetitionLevels(int64_t num_levels, const int16_t* levels); - std::shared_ptr<Buffer> RleEncodeLevels( - const std::shared_ptr<Buffer>& buffer, int16_t max_level); + // RLE encode the src_buffer into dest_buffer and return the encoded size + int64_t RleEncodeLevels( + const Buffer& src_buffer, ResizableBuffer* dest_buffer, int16_t max_level); // Serialize the buffered Data Pages void FlushBufferedDataPages(); @@ -138,6 +139,12 @@ class PARQUET_EXPORT ColumnWriter { std::unique_ptr<InMemoryOutputStream> definition_levels_sink_; std::unique_ptr<InMemoryOutputStream> repetition_levels_sink_; + std::shared_ptr<ResizableBuffer> definition_levels_rle_; + std::shared_ptr<ResizableBuffer> repetition_levels_rle_; + + std::shared_ptr<ResizableBuffer> uncompressed_data_; + std::shared_ptr<ResizableBuffer> compressed_data_; + std::vector<CompressedDataPage> data_pages_; private: http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/d5a1aed7/src/parquet/encoding-internal.h ---------------------------------------------------------------------- diff --git a/src/parquet/encoding-internal.h b/src/parquet/encoding-internal.h index ffcb531..dc2c336 100644 --- a/src/parquet/encoding-internal.h +++ b/src/parquet/encoding-internal.h @@ -379,7 +379,7 @@ inline void DictionaryDecoder<ByteArrayType>::SetDict( for (int i = 0; i < num_dictionary_values; ++i) { total_size += dictionary_[i].len; } - PARQUET_THROW_NOT_OK(byte_array_data_->Resize(total_size)); + PARQUET_THROW_NOT_OK(byte_array_data_->Resize(total_size, false)); int offset = 0; uint8_t* bytes_data = byte_array_data_->mutable_data(); @@ -399,7 +399,7 @@ inline void DictionaryDecoder<FLBAType>::SetDict(Decoder<FLBAType>* dictionary) int fixed_len = descr_->type_length(); int total_size = num_dictionary_values * fixed_len; - PARQUET_THROW_NOT_OK(byte_array_data_->Resize(total_size)); + PARQUET_THROW_NOT_OK(byte_array_data_->Resize(total_size, false)); uint8_t* bytes_data = byte_array_data_->mutable_data(); int offset = 0; for (int i = 0; i < num_dictionary_values; ++i) { @@ -495,7 +495,7 @@ class DictEncoder : public Encoder<DType> { AllocateBuffer(this->allocator_, EstimatedDataEncodedSize()); int result_size = WriteIndices(buffer->mutable_data(), EstimatedDataEncodedSize()); ClearIndices(); - PARQUET_THROW_NOT_OK(buffer->Resize(result_size)); + PARQUET_THROW_NOT_OK(buffer->Resize(result_size, false)); return buffer; }; @@ -771,7 +771,7 @@ class DeltaBitPackDecoder : public Decoder<DType> { if (!decoder_.GetVlqInt(&num_mini_blocks_)) ParquetException::EofException(); if (!decoder_.GetVlqInt(&values_current_block_)) { ParquetException::EofException(); } if (!decoder_.GetZigZagVlqInt(&last_value_)) ParquetException::EofException(); - PARQUET_THROW_NOT_OK(delta_bit_widths_->Resize(num_mini_blocks_)); + PARQUET_THROW_NOT_OK(delta_bit_widths_->Resize(num_mini_blocks_, false)); uint8_t* bit_width_data = delta_bit_widths_->mutable_data(); http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/d5a1aed7/src/parquet/file/reader-internal.cc ---------------------------------------------------------------------- diff --git a/src/parquet/file/reader-internal.cc b/src/parquet/file/reader-internal.cc index bd3fbea..c05bb12 100644 --- a/src/parquet/file/reader-internal.cc +++ b/src/parquet/file/reader-internal.cc @@ -97,7 +97,7 @@ std::shared_ptr<Page> SerializedPageReader::NextPage() { if (decompressor_ != NULL) { // Grow the uncompressed buffer if we need to. if (uncompressed_len > static_cast<int>(decompression_buffer_->size())) { - PARQUET_THROW_NOT_OK(decompression_buffer_->Resize(uncompressed_len)); + PARQUET_THROW_NOT_OK(decompression_buffer_->Resize(uncompressed_len, false)); } decompressor_->Decompress(compressed_len, buffer, uncompressed_len, decompression_buffer_->mutable_data()); http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/d5a1aed7/src/parquet/file/writer-internal.cc ---------------------------------------------------------------------- diff --git a/src/parquet/file/writer-internal.cc b/src/parquet/file/writer-internal.cc index ea8a338..ff6de48 100644 --- a/src/parquet/file/writer-internal.cc +++ b/src/parquet/file/writer-internal.cc @@ -70,22 +70,21 @@ void SerializedPageWriter::Close(bool has_dictionary, bool fallback) { metadata_->WriteTo(sink_); } -std::shared_ptr<Buffer> SerializedPageWriter::Compress( - const std::shared_ptr<Buffer>& buffer) { - // Fast path, no compressor available. - if (!compressor_) return buffer; +void SerializedPageWriter::Compress( + const Buffer& src_buffer, ResizableBuffer* dest_buffer) { + DCHECK(compressor_ != nullptr); // Compress the data int64_t max_compressed_size = - compressor_->MaxCompressedLen(buffer->size(), buffer->data()); + compressor_->MaxCompressedLen(src_buffer.size(), src_buffer.data()); - std::shared_ptr<PoolBuffer> compression_buffer = - AllocateBuffer(pool_, max_compressed_size); + // Use Arrow::Buffer::shrink_to_fit = false + // underlying buffer only keeps growing. Resize to a smaller size does not reallocate. + PARQUET_THROW_NOT_OK(dest_buffer->Resize(max_compressed_size, false)); - int64_t compressed_size = compressor_->Compress(buffer->size(), buffer->data(), - max_compressed_size, compression_buffer->mutable_data()); - PARQUET_THROW_NOT_OK(compression_buffer->Resize(compressed_size)); - return compression_buffer; + int64_t compressed_size = compressor_->Compress(src_buffer.size(), src_buffer.data(), + max_compressed_size, dest_buffer->mutable_data()); + PARQUET_THROW_NOT_OK(dest_buffer->Resize(compressed_size, false)); } int64_t SerializedPageWriter::WriteDataPage(const CompressedDataPage& page) { @@ -124,7 +123,15 @@ int64_t SerializedPageWriter::WriteDataPage(const CompressedDataPage& page) { int64_t SerializedPageWriter::WriteDictionaryPage(const DictionaryPage& page) { int64_t uncompressed_size = page.size(); - std::shared_ptr<Buffer> compressed_data = Compress(page.buffer()); + std::shared_ptr<Buffer> compressed_data = nullptr; + if (has_compressor()) { + auto buffer = std::static_pointer_cast<ResizableBuffer>( + AllocateBuffer(pool_, uncompressed_size)); + Compress(*(page.buffer().get()), buffer.get()); + compressed_data = std::static_pointer_cast<Buffer>(buffer); + } else { + compressed_data = page.buffer(); + } format::DictionaryPageHeader dict_page_header; dict_page_header.__set_num_values(page.num_values()); http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/d5a1aed7/src/parquet/file/writer-internal.h ---------------------------------------------------------------------- diff --git a/src/parquet/file/writer-internal.h b/src/parquet/file/writer-internal.h index 5bd00be..e038319 100644 --- a/src/parquet/file/writer-internal.h +++ b/src/parquet/file/writer-internal.h @@ -48,7 +48,9 @@ class SerializedPageWriter : public PageWriter { /** * Compress a buffer. */ - std::shared_ptr<Buffer> Compress(const std::shared_ptr<Buffer>& buffer) override; + void Compress(const Buffer& src_buffer, ResizableBuffer* dest_buffer) override; + + bool has_compressor() override { return (compressor_ != nullptr); } void Close(bool has_dictionary, bool fallback) override; http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/d5a1aed7/src/parquet/util/memory.h ---------------------------------------------------------------------- diff --git a/src/parquet/util/memory.h b/src/parquet/util/memory.h index e2e522d..4a15430 100644 --- a/src/parquet/util/memory.h +++ b/src/parquet/util/memory.h @@ -340,6 +340,12 @@ class PARQUET_EXPORT InMemoryOutputStream : public OutputStream { virtual void Write(const uint8_t* data, int64_t length); + // Clears the stream + void Clear() { size_ = 0; } + + // Get pointer to the underlying buffer + const Buffer& GetBufferRef() const { return *buffer_; } + // Return complete stream as Buffer std::shared_ptr<Buffer> GetBuffer();
