wgtmac commented on code in PR #34526:
URL: https://github.com/apache/arrow/pull/34526#discussion_r1138915205


##########
cpp/src/parquet/encoding.cc:
##########
@@ -2838,6 +2839,109 @@ class DeltaLengthByteArrayDecoder : public DecoderImpl,
   std::shared_ptr<ResizableBuffer> buffered_data_;
 };
 
+// ----------------------------------------------------------------------
+// RLE_BOOLEAN_ENCODER
+
+class RleBooleanEncoder final : public EncoderImpl, virtual public 
BooleanEncoder {
+ public:
+  explicit RleBooleanEncoder(const ColumnDescriptor* descr, 
::arrow::MemoryPool* pool)
+      : EncoderImpl(descr, Encoding::RLE, pool) {}
+
+  int64_t EstimatedDataEncodedSize() override {
+    return kRleLengthInBytes + MaxRleBufferSize();
+  }
+
+  std::shared_ptr<Buffer> FlushValues() override;
+
+  void Put(const T* buffer, int num_values) override;
+  void Put(const ::arrow::Array& values) override {
+    if (values.type_id() != ::arrow::Type::BOOL) {
+      throw ParquetException("RleBooleanEncoder expects BooleanArray, got ",
+                             values.type()->ToString());
+    }
+    const auto& boolean_array = checked_cast<const 
::arrow::BooleanArray&>(values);
+    if (values.null_count() == 0) {
+      for (int i = 0; i < boolean_array.length(); ++i) {
+        // null_count == 0, so just call Value directly is ok.
+        buffered_append_values_.push_back(boolean_array.Value(i));
+      }
+    } else {
+      PARQUET_THROW_NOT_OK(::arrow::VisitArraySpanInline<::arrow::BooleanType>(
+          *boolean_array.data(),
+          [&](bool value) {
+            buffered_append_values_.push_back(value);
+            return Status::OK();
+          },
+          []() { return Status::OK(); }));
+    }
+  }
+
+  void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
+                 int64_t valid_bits_offset) override {
+    if (valid_bits != NULLPTR) {
+      PARQUET_ASSIGN_OR_THROW(auto buffer, ::arrow::AllocateBuffer(num_values 
* sizeof(T),
+                                                                   
this->memory_pool()));
+      T* data = reinterpret_cast<T*>(buffer->mutable_data());
+      int num_valid_values = ::arrow::util::internal::SpacedCompress<T>(
+          src, num_values, valid_bits, valid_bits_offset, data);
+      Put(data, num_valid_values);
+    } else {
+      Put(src, num_values);
+    }
+  }
+
+  void Put(const std::vector<bool>& src, int num_values) override;
+
+ protected:
+  template <typename SequenceType>
+  void PutImpl(const SequenceType& src, int num_values);
+
+  int MaxRleBufferSize() const noexcept {
+    return 
RlePreserveBufferSize(static_cast<int>(buffered_append_values_.size()),
+                                 kBitWidth);
+  }
+
+  constexpr static int32_t kBitWidth = 1;
+  /// 4 bytes in little-endian, which indicates the length.
+  constexpr static int32_t kRleLengthInBytes = 4;
+
+  // std::vector<bool> in C++ is tricky, because it's a bitmap.
+  // Here RleBooleanEncoder will only append values into it, and
+  // dump values into Buffer, so using it here is ok.
+  std::vector<bool> buffered_append_values_;

Review Comment:
   Will this be large? Should we use `ArrowPoolVector<bool>` instead?



##########
cpp/src/parquet/encoding.cc:
##########
@@ -115,7 +115,6 @@ class PlainEncoder : public EncoderImpl, virtual public 
TypedEncoder<DType> {
   using TypedEncoder<DType>::Put;
 
   void Put(const T* buffer, int num_values) override;
-

Review Comment:
   Keep the blank line to be consistent.



##########
cpp/src/parquet/encoding.cc:
##########
@@ -512,14 +520,7 @@ class DictEncoderImpl : public EncoderImpl, virtual public 
DictEncoder<DType> {
   /// Returns a conservative estimate of the number of bytes needed to encode 
the buffered
   /// indices. Used to size the buffer passed to WriteIndices().
   int64_t EstimatedDataEncodedSize() override {
-    // Note: because of the way RleEncoder::CheckBufferFull() is called, we 
have to
-    // reserve
-    // an extra "RleEncoder::MinBufferSize" bytes. These extra bytes won't be 
used
-    // but not reserving them would cause the encoder to fail.
-    return 1 +
-           ::arrow::util::RleEncoder::MaxBufferSize(
-               bit_width(), static_cast<int>(buffered_indices_.size())) +
-           ::arrow::util::RleEncoder::MinBufferSize(bit_width());
+    return RlePreserveBufferSize(static_cast<int>(buffered_indices_.size()), 
bit_width());

Review Comment:
   Consolidate `EstimatedDataEncodedSize()` with `MaxRleBufferSize()`? It seems 
that they are the same.



##########
cpp/src/parquet/encoding.cc:
##########
@@ -2838,6 +2839,120 @@ class DeltaLengthByteArrayDecoder : public DecoderImpl,
   std::shared_ptr<ResizableBuffer> buffered_data_;
 };
 
+// ----------------------------------------------------------------------
+// RLE_BOOLEAN_ENCODER
+
+class RleBooleanEncoder final : public EncoderImpl, virtual public 
BooleanEncoder {
+ public:
+  explicit RleBooleanEncoder(const ColumnDescriptor* descr, 
::arrow::MemoryPool* pool)
+      : EncoderImpl(descr, Encoding::RLE, pool) {}
+
+  int64_t EstimatedDataEncodedSize() override;
+  std::shared_ptr<Buffer> FlushValues() override;
+
+  void Put(const T* buffer, int num_values) override;
+  void Put(const ::arrow::Array& values) override {
+    if (values.type_id() != ::arrow::Type::BOOL) {
+      throw ParquetException("RleBooleanEncoder Expected BoolTArray, got ",
+                             values.type()->ToString());
+    }
+    const auto& boolean_array = checked_cast<const 
::arrow::BooleanArray&>(values);
+    PARQUET_THROW_NOT_OK(::arrow::VisitArraySpanInline<::arrow::BooleanType>(
+        *boolean_array.data(),
+        [&](bool value) {
+          buffered_append_values_.push_back(value);
+          return Status::OK();
+        },
+        []() { return Status::OK(); }));
+  }
+  void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
+                 int64_t valid_bits_offset) override {
+    if (valid_bits != NULLPTR) {
+      PARQUET_ASSIGN_OR_THROW(auto buffer, ::arrow::AllocateBuffer(num_values 
* sizeof(T),
+                                                                   
this->memory_pool()));
+      T* data = reinterpret_cast<T*>(buffer->mutable_data());
+      int num_valid_values = ::arrow::util::internal::SpacedCompress<T>(
+          src, num_values, valid_bits, valid_bits_offset, data);
+      Put(data, num_valid_values);
+    } else {
+      Put(src, num_values);
+    }
+  }
+
+  void Put(const std::vector<bool>& src, int num_values) override;
+
+ protected:
+  template <typename ArrowType>
+  void PutImpl(const ::arrow::Array& values) {
+    if (values.type_id() != ::arrow::Type::BOOL) {
+      throw ParquetException(std::string() + "direct put to " + 
ArrowType::type_name() +
+                             " from " + values.type()->ToString() + " not 
supported");
+    }
+    const auto& data = *values.data();
+    PutSpaced(data.GetValues<typename ArrowType::c_type>(1),
+              static_cast<int>(data.length), data.GetValues<uint8_t>(0, 0), 
data.offset);
+  }
+
+  template <typename SequenceType>
+  void PutImpl(const SequenceType& src, int num_values);
+
+  int MaxRleBufferSize() const noexcept {
+    // TODO(mwish): Encapsulate these rules.
+    return 1 +
+           ::arrow::util::RleEncoder::MaxBufferSize(
+               kBitWidth,
+               
static_cast<int>(static_cast<int>(buffered_append_values_.size()))) +
+           ::arrow::util::RleEncoder::MinBufferSize(kBitWidth);
+  }
+
+  constexpr static int32_t kBitWidth = 1;
+  /// 4 bytes in little-endian, which indicates the length.
+  constexpr static int32_t kRleLengthInBytes = 4;
+
+  // std::vector<bool> in C++ is tricky, because it's a bitmap.
+  // Here RleBooleanEncoder will only append values into it, and
+  // dump values into Buffer, so using it here is ok.
+  std::vector<bool> buffered_append_values_;
+};
+
+void RleBooleanEncoder::Put(const bool* src, int num_values) { PutImpl(src, 
num_values); }

Review Comment:
   We have `int RleBooleanDecoder::Decode(uint8_t* buffer, int max_values)` so 
it would be nice to have the encoder parity.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to