Repository: parquet-cpp
Updated Branches:
  refs/heads/master a278998a8 -> 86ebc2393


PARQUET-764: Support batches for PLAIN boolean writes that aren't a multiple of 
8

cc @majetideepak

Author: Uwe L. Korn <[email protected]>

Closes #185 from xhochy/PARQUET-764 and squashes the following commits:

926e61f [Uwe L. Korn] Get rid of some re-allocations
e12dc4e [Uwe L. Korn] Fix multiline comment
2ef2da5 [Uwe L. Korn] PARQUET-764: Support batches for PLAIN boolean writes 
that aren't a multiple of 8


Project: http://git-wip-us.apache.org/repos/asf/parquet-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-cpp/commit/86ebc239
Tree: http://git-wip-us.apache.org/repos/asf/parquet-cpp/tree/86ebc239
Diff: http://git-wip-us.apache.org/repos/asf/parquet-cpp/diff/86ebc239

Branch: refs/heads/master
Commit: 86ebc239393e78c9888856831a3dc4504a0f6f40
Parents: a278998
Author: Uwe L. Korn <[email protected]>
Authored: Sun Nov 6 14:18:11 2016 -0500
Committer: Wes McKinney <[email protected]>
Committed: Sun Nov 6 14:18:11 2016 -0500

----------------------------------------------------------------------
 src/parquet/column/column-writer-test.cc | 20 +++++-
 src/parquet/encodings/plain-encoding.h   | 97 ++++++++++++++++-----------
 2 files changed, 75 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/86ebc239/src/parquet/column/column-writer-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column/column-writer-test.cc 
b/src/parquet/column/column-writer-test.cc
index 2269e8f..0a20ac1 100644
--- a/src/parquet/column/column-writer-test.cc
+++ b/src/parquet/column/column-writer-test.cc
@@ -214,8 +214,7 @@ void 
TestPrimitiveWriter<FLBAType>::ReadColumnFully(Compression::type compressio
 }
 
 typedef ::testing::Types<Int32Type, Int64Type, Int96Type, FloatType, 
DoubleType,
-    BooleanType, ByteArrayType, FLBAType>
-    TestTypes;
+    BooleanType, ByteArrayType, FLBAType> TestTypes;
 
 TYPED_TEST_CASE(TestPrimitiveWriter, TestTypes);
 
@@ -421,5 +420,22 @@ TEST_F(TestNullValuesWriter, OptionalNullValueChunk) {
   ASSERT_EQ(0, this->values_read_);
 }
 
+// PARQUET-764
+// Correct bitpacking for boolean write at non-byte boundaries
+using TestBooleanValuesWriter = TestPrimitiveWriter<BooleanType>;
+TEST_F(TestBooleanValuesWriter, AlternateBooleanValues) {
+  this->SetUpSchema(Repetition::REQUIRED);
+  auto writer = this->BuildWriter();
+  for (int i = 0; i < SMALL_SIZE; i++) {
+    bool value = (i % 2 == 0) ? true : false;
+    writer->WriteBatch(1, nullptr, nullptr, &value);
+  }
+  writer->Close();
+  this->ReadColumn();
+  for (int i = 0; i < SMALL_SIZE; i++) {
+    ASSERT_EQ((i % 2 == 0) ? true : false, this->values_out_[i]) << i;
+  }
+}
+
 }  // namespace test
 }  // namespace parquet

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/86ebc239/src/parquet/encodings/plain-encoding.h
----------------------------------------------------------------------
diff --git a/src/parquet/encodings/plain-encoding.h 
b/src/parquet/encodings/plain-encoding.h
index eee3f65..b960bd2 100644
--- a/src/parquet/encodings/plain-encoding.h
+++ b/src/parquet/encodings/plain-encoding.h
@@ -181,59 +181,76 @@ class PlainEncoder<BooleanType> : public 
Encoder<BooleanType> {
   explicit PlainEncoder(
       const ColumnDescriptor* descr, MemoryAllocator* allocator = 
default_allocator())
       : Encoder<BooleanType>(descr, Encoding::PLAIN, allocator),
-        values_sink_(new InMemoryOutputStream(IN_MEMORY_DEFAULT_CAPACITY, 
allocator)) {}
+        bits_available_(IN_MEMORY_DEFAULT_CAPACITY * 8),
+        bits_buffer_(IN_MEMORY_DEFAULT_CAPACITY, allocator),
+        values_sink_(new InMemoryOutputStream(IN_MEMORY_DEFAULT_CAPACITY, 
allocator)) {
+    bit_writer_.reset(new BitWriter(bits_buffer_.mutable_data(), 
bits_buffer_.size()));
+  }
 
-  int64_t EstimatedDataEncodedSize() override { return values_sink_->Tell(); }
+  int64_t EstimatedDataEncodedSize() override {
+    return values_sink_->Tell() + bit_writer_->bytes_written();
+  }
 
   std::shared_ptr<Buffer> FlushValues() override {
+    if (bits_available_ > 0) {
+      bit_writer_->Flush();
+      values_sink_->Write(bit_writer_->buffer(), bit_writer_->bytes_written());
+      bits_available_ = 0;
+      bit_writer_->Clear();
+      bits_available_ = bits_buffer_.size() * 8;
+    }
+
     std::shared_ptr<Buffer> buffer = values_sink_->GetBuffer();
     values_sink_.reset(
         new InMemoryOutputStream(IN_MEMORY_DEFAULT_CAPACITY, 
this->allocator_));
     return buffer;
   }
 
-  void Put(const bool* src, int num_values) override {
-    Encode(src, num_values, values_sink_.get());
-  }
-
-  void Put(const std::vector<bool>& src, int num_values) {
-    Encode(src, num_values, values_sink_.get());
-  }
-
-  void Encode(const bool* src, int num_values, OutputStream* dst) {
-    int bytes_required = BitUtil::Ceil(num_values, 8);
-    OwnedMutableBuffer tmp_buffer(bytes_required, allocator_);
-
-    BitWriter bit_writer(&tmp_buffer[0], bytes_required);
-    for (int i = 0; i < num_values; ++i) {
-      bit_writer.PutValue(src[i], 1);
-    }
-    bit_writer.Flush();
-
-    // Write the result to the output stream
-    dst->Write(bit_writer.buffer(), bit_writer.bytes_written());
+#define PLAINDECODER_BOOLEAN_PUT(input_type, function_attributes)              
   \
+  void Put(input_type src, int num_values) function_attributes {               
   \
+    int bit_offset = 0;                                                        
   \
+    if (bits_available_ > 0) {                                                 
   \
+      int bits_to_write = std::min(bits_available_, num_values);               
   \
+      for (int i = 0; i < bits_to_write; i++) {                                
   \
+        bit_writer_->PutValue(src[i], 1);                                      
   \
+      }                                                                        
   \
+      bits_available_ -= bits_to_write;                                        
   \
+      bit_offset = bits_to_write;                                              
   \
+                                                                               
   \
+      if (bits_available_ == 0) {                                              
   \
+        bit_writer_->Flush();                                                  
   \
+        values_sink_->Write(bit_writer_->buffer(), 
bit_writer_->bytes_written()); \
+        bit_writer_->Clear();                                                  
   \
+      }                                                                        
   \
+    }                                                                          
   \
+                                                                               
   \
+    int bits_remaining = num_values - bit_offset;                              
   \
+    while (bit_offset < num_values) {                                          
   \
+      bits_available_ = bits_buffer_.size() * 8;                               
   \
+                                                                               
   \
+      int bits_to_write = std::min(bits_available_, bits_remaining);           
   \
+      for (int i = bit_offset; i < bit_offset + bits_to_write; i++) {          
   \
+        bit_writer_->PutValue(src[i], 1);                                      
   \
+      }                                                                        
   \
+      bit_offset += bits_to_write;                                             
   \
+      bits_available_ -= bits_to_write;                                        
   \
+      bits_remaining -= bits_to_write;                                         
   \
+                                                                               
   \
+      if (bits_available_ == 0) {                                              
   \
+        bit_writer_->Flush();                                                  
   \
+        values_sink_->Write(bit_writer_->buffer(), 
bit_writer_->bytes_written()); \
+        bit_writer_->Clear();                                                  
   \
+      }                                                                        
   \
+    }                                                                          
   \
   }
 
-  void Encode(const std::vector<bool>& src, int num_values, OutputStream* dst) 
{
-    int bytes_required = BitUtil::Ceil(num_values, 8);
-
-    // TODO(wesm)
-    // Use a temporary buffer for now and copy, because the BitWriter is not
-    // aware of OutputStream. Later we can add some kind of Request/Flush API
-    // to OutputStream
-    OwnedMutableBuffer tmp_buffer(bytes_required, allocator_);
-
-    BitWriter bit_writer(&tmp_buffer[0], bytes_required);
-    for (int i = 0; i < num_values; ++i) {
-      bit_writer.PutValue(src[i], 1);
-    }
-    bit_writer.Flush();
-
-    // Write the result to the output stream
-    dst->Write(bit_writer.buffer(), bit_writer.bytes_written());
-  }
+  PLAINDECODER_BOOLEAN_PUT(const bool*, override)
+  PLAINDECODER_BOOLEAN_PUT(const std::vector<bool>&, )
 
  protected:
+  int bits_available_;
+  std::unique_ptr<BitWriter> bit_writer_;
+  OwnedMutableBuffer bits_buffer_;
   std::shared_ptr<InMemoryOutputStream> values_sink_;
 };
 

Reply via email to