Repository: parquet-cpp Updated Branches: refs/heads/master a278998a8 -> 86ebc2393
PARQUET-764: Support batches for PLAIN boolean writes that aren't a multiple of 8 cc @majetideepak Author: Uwe L. Korn <[email protected]> Closes #185 from xhochy/PARQUET-764 and squashes the following commits: 926e61f [Uwe L. Korn] Get rid of some re-allocations e12dc4e [Uwe L. Korn] Fix multiline comment 2ef2da5 [Uwe L. Korn] PARQUET-764: Support batches for PLAIN boolean writes that aren't a multiple of 8 Project: http://git-wip-us.apache.org/repos/asf/parquet-cpp/repo Commit: http://git-wip-us.apache.org/repos/asf/parquet-cpp/commit/86ebc239 Tree: http://git-wip-us.apache.org/repos/asf/parquet-cpp/tree/86ebc239 Diff: http://git-wip-us.apache.org/repos/asf/parquet-cpp/diff/86ebc239 Branch: refs/heads/master Commit: 86ebc239393e78c9888856831a3dc4504a0f6f40 Parents: a278998 Author: Uwe L. Korn <[email protected]> Authored: Sun Nov 6 14:18:11 2016 -0500 Committer: Wes McKinney <[email protected]> Committed: Sun Nov 6 14:18:11 2016 -0500 ---------------------------------------------------------------------- src/parquet/column/column-writer-test.cc | 20 +++++- src/parquet/encodings/plain-encoding.h | 97 ++++++++++++++++----------- 2 files changed, 75 insertions(+), 42 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/86ebc239/src/parquet/column/column-writer-test.cc ---------------------------------------------------------------------- diff --git a/src/parquet/column/column-writer-test.cc b/src/parquet/column/column-writer-test.cc index 2269e8f..0a20ac1 100644 --- a/src/parquet/column/column-writer-test.cc +++ b/src/parquet/column/column-writer-test.cc @@ -214,8 +214,7 @@ void TestPrimitiveWriter<FLBAType>::ReadColumnFully(Compression::type compressio } typedef ::testing::Types<Int32Type, Int64Type, Int96Type, FloatType, DoubleType, - BooleanType, ByteArrayType, FLBAType> - TestTypes; + BooleanType, ByteArrayType, FLBAType> TestTypes; TYPED_TEST_CASE(TestPrimitiveWriter, TestTypes); @@ -421,5 +420,22 @@ TEST_F(TestNullValuesWriter, OptionalNullValueChunk) { ASSERT_EQ(0, this->values_read_); } +// PARQUET-764 +// Correct bitpacking for boolean write at non-byte boundaries +using TestBooleanValuesWriter = TestPrimitiveWriter<BooleanType>; +TEST_F(TestBooleanValuesWriter, AlternateBooleanValues) { + this->SetUpSchema(Repetition::REQUIRED); + auto writer = this->BuildWriter(); + for (int i = 0; i < SMALL_SIZE; i++) { + bool value = (i % 2 == 0) ? true : false; + writer->WriteBatch(1, nullptr, nullptr, &value); + } + writer->Close(); + this->ReadColumn(); + for (int i = 0; i < SMALL_SIZE; i++) { + ASSERT_EQ((i % 2 == 0) ? true : false, this->values_out_[i]) << i; + } +} + } // namespace test } // namespace parquet http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/86ebc239/src/parquet/encodings/plain-encoding.h ---------------------------------------------------------------------- diff --git a/src/parquet/encodings/plain-encoding.h b/src/parquet/encodings/plain-encoding.h index eee3f65..b960bd2 100644 --- a/src/parquet/encodings/plain-encoding.h +++ b/src/parquet/encodings/plain-encoding.h @@ -181,59 +181,76 @@ class PlainEncoder<BooleanType> : public Encoder<BooleanType> { explicit PlainEncoder( const ColumnDescriptor* descr, MemoryAllocator* allocator = default_allocator()) : Encoder<BooleanType>(descr, Encoding::PLAIN, allocator), - values_sink_(new InMemoryOutputStream(IN_MEMORY_DEFAULT_CAPACITY, allocator)) {} + bits_available_(IN_MEMORY_DEFAULT_CAPACITY * 8), + bits_buffer_(IN_MEMORY_DEFAULT_CAPACITY, allocator), + values_sink_(new InMemoryOutputStream(IN_MEMORY_DEFAULT_CAPACITY, allocator)) { + bit_writer_.reset(new BitWriter(bits_buffer_.mutable_data(), bits_buffer_.size())); + } - int64_t EstimatedDataEncodedSize() override { return values_sink_->Tell(); } + int64_t EstimatedDataEncodedSize() override { + return values_sink_->Tell() + bit_writer_->bytes_written(); + } std::shared_ptr<Buffer> FlushValues() override { + if (bits_available_ > 0) { + bit_writer_->Flush(); + values_sink_->Write(bit_writer_->buffer(), bit_writer_->bytes_written()); + bits_available_ = 0; + bit_writer_->Clear(); + bits_available_ = bits_buffer_.size() * 8; + } + std::shared_ptr<Buffer> buffer = values_sink_->GetBuffer(); values_sink_.reset( new InMemoryOutputStream(IN_MEMORY_DEFAULT_CAPACITY, this->allocator_)); return buffer; } - void Put(const bool* src, int num_values) override { - Encode(src, num_values, values_sink_.get()); - } - - void Put(const std::vector<bool>& src, int num_values) { - Encode(src, num_values, values_sink_.get()); - } - - void Encode(const bool* src, int num_values, OutputStream* dst) { - int bytes_required = BitUtil::Ceil(num_values, 8); - OwnedMutableBuffer tmp_buffer(bytes_required, allocator_); - - BitWriter bit_writer(&tmp_buffer[0], bytes_required); - for (int i = 0; i < num_values; ++i) { - bit_writer.PutValue(src[i], 1); - } - bit_writer.Flush(); - - // Write the result to the output stream - dst->Write(bit_writer.buffer(), bit_writer.bytes_written()); +#define PLAINDECODER_BOOLEAN_PUT(input_type, function_attributes) \ + void Put(input_type src, int num_values) function_attributes { \ + int bit_offset = 0; \ + if (bits_available_ > 0) { \ + int bits_to_write = std::min(bits_available_, num_values); \ + for (int i = 0; i < bits_to_write; i++) { \ + bit_writer_->PutValue(src[i], 1); \ + } \ + bits_available_ -= bits_to_write; \ + bit_offset = bits_to_write; \ + \ + if (bits_available_ == 0) { \ + bit_writer_->Flush(); \ + values_sink_->Write(bit_writer_->buffer(), bit_writer_->bytes_written()); \ + bit_writer_->Clear(); \ + } \ + } \ + \ + int bits_remaining = num_values - bit_offset; \ + while (bit_offset < num_values) { \ + bits_available_ = bits_buffer_.size() * 8; \ + \ + int bits_to_write = std::min(bits_available_, bits_remaining); \ + for (int i = bit_offset; i < bit_offset + bits_to_write; i++) { \ + bit_writer_->PutValue(src[i], 1); \ + } \ + bit_offset += bits_to_write; \ + bits_available_ -= bits_to_write; \ + bits_remaining -= bits_to_write; \ + \ + if (bits_available_ == 0) { \ + bit_writer_->Flush(); \ + values_sink_->Write(bit_writer_->buffer(), bit_writer_->bytes_written()); \ + bit_writer_->Clear(); \ + } \ + } \ } - void Encode(const std::vector<bool>& src, int num_values, OutputStream* dst) { - int bytes_required = BitUtil::Ceil(num_values, 8); - - // TODO(wesm) - // Use a temporary buffer for now and copy, because the BitWriter is not - // aware of OutputStream. Later we can add some kind of Request/Flush API - // to OutputStream - OwnedMutableBuffer tmp_buffer(bytes_required, allocator_); - - BitWriter bit_writer(&tmp_buffer[0], bytes_required); - for (int i = 0; i < num_values; ++i) { - bit_writer.PutValue(src[i], 1); - } - bit_writer.Flush(); - - // Write the result to the output stream - dst->Write(bit_writer.buffer(), bit_writer.bytes_written()); - } + PLAINDECODER_BOOLEAN_PUT(const bool*, override) + PLAINDECODER_BOOLEAN_PUT(const std::vector<bool>&, ) protected: + int bits_available_; + std::unique_ptr<BitWriter> bit_writer_; + OwnedMutableBuffer bits_buffer_; std::shared_ptr<InMemoryOutputStream> values_sink_; };
