Repository: parquet-cpp Updated Branches: refs/heads/master ac1c1277f -> 527d53f7c
PARQUET-741: Always allocate fresh buffers while compressing Introduces another allocation at the cost of an actually working compression path. Also extended the column-writer test to write several columns. Author: Uwe L. Korn <[email protected]> Author: Korn, Uwe <[email protected]> Closes #173 from xhochy/PARQUET-741 and squashes the following commits: ce46816 [Uwe L. Korn] Use emplace_back to get rid of the shared_ptr 0d2f041 [Uwe L. Korn] Fix signed comparison ac1ccf0 [Uwe L. Korn] Minor style fixes 4cb03f8 [Uwe L. Korn] Fix FLBA tests a559123 [Korn, Uwe] PARQUET-741: Always allocate fresh buffers while compressing Project: http://git-wip-us.apache.org/repos/asf/parquet-cpp/repo Commit: http://git-wip-us.apache.org/repos/asf/parquet-cpp/commit/527d53f7 Tree: http://git-wip-us.apache.org/repos/asf/parquet-cpp/tree/527d53f7 Diff: http://git-wip-us.apache.org/repos/asf/parquet-cpp/diff/527d53f7 Branch: refs/heads/master Commit: 527d53f7cdd192ff62260c25a513fe7f97b81e65 Parents: ac1c127 Author: Uwe L. Korn <[email protected]> Authored: Fri Oct 7 10:09:43 2016 +0200 Committer: Uwe L. Korn <[email protected]> Committed: Fri Oct 7 10:09:43 2016 +0200 ---------------------------------------------------------------------- src/parquet/column/column-writer-test.cc | 84 +++++++++++++++++++++++---- src/parquet/file/writer-internal.cc | 11 ++-- src/parquet/file/writer-internal.h | 4 -- 3 files changed, 78 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/527d53f7/src/parquet/column/column-writer-test.cc ---------------------------------------------------------------------- diff --git a/src/parquet/column/column-writer-test.cc b/src/parquet/column/column-writer-test.cc index 29139c9..9f04c06 100644 --- a/src/parquet/column/column-writer-test.cc +++ b/src/parquet/column/column-writer-test.cc @@ -25,6 +25,7 @@ #include "parquet/file/reader-internal.h" #include "parquet/file/writer-internal.h" #include "parquet/types.h" +#include "parquet/util/comparison.h" #include "parquet/util/input.h" #include "parquet/util/output.h" @@ -38,7 +39,7 @@ namespace test { // The default size used in most tests. const int SMALL_SIZE = 100; // Larger size to test some corner cases, only used in some specific cases. -const int LARGE_SIZE = 10000; +const int LARGE_SIZE = 100000; // Very large size to test dictionary fallback. const int VERY_LARGE_SIZE = 400000; @@ -97,26 +98,37 @@ class TestPrimitiveWriter : public PrimitiveTypedTest<TestType> { this->SyncValuesOut(); } + void ReadColumnFully(Compression::type compression = Compression::UNCOMPRESSED); + void TestRequiredWithEncoding(Encoding::type encoding) { return TestRequiredWithSettings(encoding, Compression::UNCOMPRESSED, false, false); } void TestRequiredWithSettings(Encoding::type encoding, Compression::type compression, - bool enable_dictionary, bool enable_statistics) { - this->GenerateData(SMALL_SIZE); + bool enable_dictionary, bool enable_statistics, int64_t num_rows = SMALL_SIZE) { + this->GenerateData(num_rows); // Test case 1: required and non-repeated, so no definition or repetition levels ColumnProperties column_properties( encoding, compression, enable_dictionary, enable_statistics); std::shared_ptr<TypedColumnWriter<TestType>> writer = - this->BuildWriter(SMALL_SIZE, column_properties); + this->BuildWriter(num_rows, column_properties); writer->WriteBatch(this->values_.size(), nullptr, nullptr, this->values_ptr_); // The behaviour should be independent from the number of Close() calls writer->Close(); writer->Close(); - this->ReadColumn(compression); - ASSERT_EQ(SMALL_SIZE, this->values_read_); + this->SetupValuesOut(num_rows); + this->ReadColumnFully(compression); + Compare<T> compare(this->descr_); + for (size_t i = 0; i < this->values_.size(); i++) { + if (compare(this->values_[i], this->values_out_[i]) || + compare(this->values_out_[i], this->values_[i])) { + std::cout << "Failed at " << i << std::endl; + } + ASSERT_FALSE(compare(this->values_[i], this->values_out_[i])); + ASSERT_FALSE(compare(this->values_out_[i], this->values_[i])); + } ASSERT_EQ(this->values_, this->values_out_); } @@ -154,8 +166,53 @@ class TestPrimitiveWriter : public PrimitiveTypedTest<TestType> { std::unique_ptr<ColumnChunkMetaDataBuilder> metadata_; std::unique_ptr<InMemoryOutputStream> sink_; std::shared_ptr<WriterProperties> writer_properties_; + std::vector<std::vector<uint8_t>> data_buffer_; }; +template <typename TestType> +void TestPrimitiveWriter<TestType>::ReadColumnFully(Compression::type compression) { + BuildReader(compression); + values_read_ = 0; + while (values_read_ < static_cast<int64_t>(this->values_out_.size())) { + int64_t values_read_recently = 0; + reader_->ReadBatch(this->values_out_.size() - values_read_, + definition_levels_out_.data() + values_read_, + repetition_levels_out_.data() + values_read_, + this->values_out_ptr_ + values_read_, &values_read_recently); + values_read_ += values_read_recently; + } + this->SyncValuesOut(); +} + +template <> +void TestPrimitiveWriter<FLBAType>::ReadColumnFully(Compression::type compression) { + BuildReader(compression); + this->data_buffer_.clear(); + + values_read_ = 0; + while (values_read_ < static_cast<int64_t>(this->values_out_.size())) { + int64_t values_read_recently = 0; + reader_->ReadBatch(this->values_out_.size() - values_read_, + definition_levels_out_.data() + values_read_, + repetition_levels_out_.data() + values_read_, + this->values_out_ptr_ + values_read_, &values_read_recently); + + // Copy contents of the pointers + std::vector<uint8_t> data(values_read_recently * this->descr_->type_length()); + uint8_t* data_ptr = data.data(); + for (int64_t i = 0; i < values_read_recently; i++) { + memcpy(data_ptr + this->descr_->type_length() * i, + this->values_out_[i + values_read_].ptr, this->descr_->type_length()); + this->values_out_[i + values_read_].ptr = + data_ptr + this->descr_->type_length() * i; + } + data_buffer_.emplace_back(std::move(data)); + + values_read_ += values_read_recently; + } + this->SyncValuesOut(); +} + typedef ::testing::Types<Int32Type, Int64Type, Int96Type, FloatType, DoubleType, BooleanType, ByteArrayType, FLBAType> TestTypes; @@ -198,23 +255,28 @@ TYPED_TEST(TestPrimitiveWriter, RequiredRLEDictionary) { */ TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithSnappyCompression) { - this->TestRequiredWithSettings(Encoding::PLAIN, Compression::SNAPPY, false, false); + this->TestRequiredWithSettings( + Encoding::PLAIN, Compression::SNAPPY, false, false, LARGE_SIZE); } TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithGzipCompression) { - this->TestRequiredWithSettings(Encoding::PLAIN, Compression::GZIP, false, false); + this->TestRequiredWithSettings( + Encoding::PLAIN, Compression::GZIP, false, false, LARGE_SIZE); } TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithStats) { - this->TestRequiredWithSettings(Encoding::PLAIN, Compression::UNCOMPRESSED, false, true); + this->TestRequiredWithSettings( + Encoding::PLAIN, Compression::UNCOMPRESSED, false, true, LARGE_SIZE); } TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithStatsAndSnappyCompression) { - this->TestRequiredWithSettings(Encoding::PLAIN, Compression::SNAPPY, false, true); + this->TestRequiredWithSettings( + Encoding::PLAIN, Compression::SNAPPY, false, true, LARGE_SIZE); } TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithStatsAndGzipCompression) { - this->TestRequiredWithSettings(Encoding::PLAIN, Compression::GZIP, false, true); + this->TestRequiredWithSettings( + Encoding::PLAIN, Compression::GZIP, false, true, LARGE_SIZE); } TYPED_TEST(TestPrimitiveWriter, Optional) { http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/527d53f7/src/parquet/file/writer-internal.cc ---------------------------------------------------------------------- diff --git a/src/parquet/file/writer-internal.cc b/src/parquet/file/writer-internal.cc index 5a7c70e..c4681bd 100644 --- a/src/parquet/file/writer-internal.cc +++ b/src/parquet/file/writer-internal.cc @@ -41,8 +41,7 @@ SerializedPageWriter::SerializedPageWriter(OutputStream* sink, Compression::type dictionary_page_offset_(0), data_page_offset_(0), total_uncompressed_size_(0), - total_compressed_size_(0), - compression_buffer_(std::make_shared<OwnedMutableBuffer>(0, allocator)) { + total_compressed_size_(0) { compressor_ = Codec::Create(codec); } @@ -72,11 +71,11 @@ std::shared_ptr<Buffer> SerializedPageWriter::Compress( // Compress the data int64_t max_compressed_size = compressor_->MaxCompressedLen(buffer->size(), buffer->data()); - compression_buffer_->Resize(max_compressed_size); + auto compression_buffer = std::make_shared<OwnedMutableBuffer>(max_compressed_size); int64_t compressed_size = compressor_->Compress(buffer->size(), buffer->data(), - max_compressed_size, compression_buffer_->mutable_data()); - compression_buffer_->Resize(compressed_size); - return compression_buffer_; + max_compressed_size, compression_buffer->mutable_data()); + compression_buffer->Resize(compressed_size); + return compression_buffer; } int64_t SerializedPageWriter::WriteDataPage(const CompressedDataPage& page) { http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/527d53f7/src/parquet/file/writer-internal.h ---------------------------------------------------------------------- diff --git a/src/parquet/file/writer-internal.h b/src/parquet/file/writer-internal.h index 2095154..f1f76ab 100644 --- a/src/parquet/file/writer-internal.h +++ b/src/parquet/file/writer-internal.h @@ -46,9 +46,6 @@ class SerializedPageWriter : public PageWriter { /** * Compress a buffer. - * - * This method may return compression_buffer_ and thus the resulting memory - * is only valid until the next call to Compress(). */ std::shared_ptr<Buffer> Compress(const std::shared_ptr<Buffer>& buffer) override; @@ -65,7 +62,6 @@ class SerializedPageWriter : public PageWriter { // Compression codec to use. std::unique_ptr<Codec> compressor_; - std::shared_ptr<OwnedMutableBuffer> compression_buffer_; }; // RowGroupWriter::Contents implementation for the Parquet file specification
