This is an automated email from the ASF dual-hosted git repository.

bkietz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 557a7c6  ARROW-13169: [C++][Compute] Fix array offset support in 
GrouperFastImpl
557a7c6 is described below

commit 557a7c63d49aa04508564517c77c71f3657d19ff
Author: Michal Nowakiewicz <[email protected]>
AuthorDate: Fri Jul 9 11:37:32 2021 -0400

    ARROW-13169: [C++][Compute] Fix array offset support in GrouperFastImpl
    
    GrouperFastImpl was ignoring offset values in input key arrays, treating 
them as if it was always zero.
    This change brings support for arbitrary offsets in input.
    
    Closes #10688 from michalursa/ARROW-13169-array-offset-in-grouper
    
    Lead-authored-by: Michal Nowakiewicz <[email protected]>
    Co-authored-by: Benjamin Kietzman <[email protected]>
    Co-authored-by: michalursa <[email protected]>
    Signed-off-by: Benjamin Kietzman <[email protected]>
---
 cpp/src/arrow/array/data.cc                        |  22 +-
 cpp/src/arrow/array/data.h                         |  30 +--
 cpp/src/arrow/compute/exec/key_encode.cc           | 292 +++++++++++----------
 cpp/src/arrow/compute/exec/key_encode.h            |  14 +-
 cpp/src/arrow/compute/exec/util.cc                 | 118 ++++++---
 cpp/src/arrow/compute/exec/util.h                  |  16 +-
 cpp/src/arrow/compute/kernels/hash_aggregate.cc    |   9 +-
 .../arrow/compute/kernels/hash_aggregate_test.cc   |  42 +++
 8 files changed, 329 insertions(+), 214 deletions(-)

diff --git a/cpp/src/arrow/array/data.cc b/cpp/src/arrow/array/data.cc
index e397a75..5a21447 100644
--- a/cpp/src/arrow/array/data.cc
+++ b/cpp/src/arrow/array/data.cc
@@ -56,41 +56,39 @@ static inline void AdjustNonNullable(Type::type type_id, 
int64_t length,
   }
 }
 
-std::shared_ptr<ArrayData> ArrayData::Make(const std::shared_ptr<DataType>& 
type,
-                                           int64_t length,
+std::shared_ptr<ArrayData> ArrayData::Make(std::shared_ptr<DataType> type, 
int64_t length,
                                            
std::vector<std::shared_ptr<Buffer>> buffers,
                                            int64_t null_count, int64_t offset) 
{
   AdjustNonNullable(type->id(), length, &buffers, &null_count);
-  return std::make_shared<ArrayData>(type, length, std::move(buffers), 
null_count,
-                                     offset);
+  return std::make_shared<ArrayData>(std::move(type), length, 
std::move(buffers),
+                                     null_count, offset);
 }
 
 std::shared_ptr<ArrayData> ArrayData::Make(
-    const std::shared_ptr<DataType>& type, int64_t length,
+    std::shared_ptr<DataType> type, int64_t length,
     std::vector<std::shared_ptr<Buffer>> buffers,
     std::vector<std::shared_ptr<ArrayData>> child_data, int64_t null_count,
     int64_t offset) {
   AdjustNonNullable(type->id(), length, &buffers, &null_count);
-  return std::make_shared<ArrayData>(type, length, std::move(buffers),
+  return std::make_shared<ArrayData>(std::move(type), length, 
std::move(buffers),
                                      std::move(child_data), null_count, 
offset);
 }
 
 std::shared_ptr<ArrayData> ArrayData::Make(
-    const std::shared_ptr<DataType>& type, int64_t length,
+    std::shared_ptr<DataType> type, int64_t length,
     std::vector<std::shared_ptr<Buffer>> buffers,
     std::vector<std::shared_ptr<ArrayData>> child_data,
     std::shared_ptr<ArrayData> dictionary, int64_t null_count, int64_t offset) 
{
   AdjustNonNullable(type->id(), length, &buffers, &null_count);
-  auto data = std::make_shared<ArrayData>(type, length, std::move(buffers),
+  auto data = std::make_shared<ArrayData>(std::move(type), length, 
std::move(buffers),
                                           std::move(child_data), null_count, 
offset);
   data->dictionary = std::move(dictionary);
   return data;
 }
 
-std::shared_ptr<ArrayData> ArrayData::Make(const std::shared_ptr<DataType>& 
type,
-                                           int64_t length, int64_t null_count,
-                                           int64_t offset) {
-  return std::make_shared<ArrayData>(type, length, null_count, offset);
+std::shared_ptr<ArrayData> ArrayData::Make(std::shared_ptr<DataType> type, 
int64_t length,
+                                           int64_t null_count, int64_t offset) 
{
+  return std::make_shared<ArrayData>(std::move(type), length, null_count, 
offset);
 }
 
 std::shared_ptr<ArrayData> ArrayData::Slice(int64_t off, int64_t len) const {
diff --git a/cpp/src/arrow/array/data.h b/cpp/src/arrow/array/data.h
index 02a4994..418d09d 100644
--- a/cpp/src/arrow/array/data.h
+++ b/cpp/src/arrow/array/data.h
@@ -71,49 +71,47 @@ constexpr int64_t kUnknownNullCount = -1;
 /// input array and replace them with newly-allocated data, changing the output
 /// data type as well.
 struct ARROW_EXPORT ArrayData {
-  ArrayData() : length(0), null_count(0), offset(0) {}
+  ArrayData() = default;
 
-  ArrayData(const std::shared_ptr<DataType>& type, int64_t length,
+  ArrayData(std::shared_ptr<DataType> type, int64_t length,
             int64_t null_count = kUnknownNullCount, int64_t offset = 0)
-      : type(type), length(length), null_count(null_count), offset(offset) {}
+      : type(std::move(type)), length(length), null_count(null_count), 
offset(offset) {}
 
-  ArrayData(const std::shared_ptr<DataType>& type, int64_t length,
+  ArrayData(std::shared_ptr<DataType> type, int64_t length,
             std::vector<std::shared_ptr<Buffer>> buffers,
             int64_t null_count = kUnknownNullCount, int64_t offset = 0)
-      : ArrayData(type, length, null_count, offset) {
+      : ArrayData(std::move(type), length, null_count, offset) {
     this->buffers = std::move(buffers);
   }
 
-  ArrayData(const std::shared_ptr<DataType>& type, int64_t length,
+  ArrayData(std::shared_ptr<DataType> type, int64_t length,
             std::vector<std::shared_ptr<Buffer>> buffers,
             std::vector<std::shared_ptr<ArrayData>> child_data,
             int64_t null_count = kUnknownNullCount, int64_t offset = 0)
-      : ArrayData(type, length, null_count, offset) {
+      : ArrayData(std::move(type), length, null_count, offset) {
     this->buffers = std::move(buffers);
     this->child_data = std::move(child_data);
   }
 
-  static std::shared_ptr<ArrayData> Make(const std::shared_ptr<DataType>& type,
-                                         int64_t length,
+  static std::shared_ptr<ArrayData> Make(std::shared_ptr<DataType> type, 
int64_t length,
                                          std::vector<std::shared_ptr<Buffer>> 
buffers,
                                          int64_t null_count = 
kUnknownNullCount,
                                          int64_t offset = 0);
 
   static std::shared_ptr<ArrayData> Make(
-      const std::shared_ptr<DataType>& type, int64_t length,
+      std::shared_ptr<DataType> type, int64_t length,
       std::vector<std::shared_ptr<Buffer>> buffers,
       std::vector<std::shared_ptr<ArrayData>> child_data,
       int64_t null_count = kUnknownNullCount, int64_t offset = 0);
 
   static std::shared_ptr<ArrayData> Make(
-      const std::shared_ptr<DataType>& type, int64_t length,
+      std::shared_ptr<DataType> type, int64_t length,
       std::vector<std::shared_ptr<Buffer>> buffers,
       std::vector<std::shared_ptr<ArrayData>> child_data,
       std::shared_ptr<ArrayData> dictionary, int64_t null_count = 
kUnknownNullCount,
       int64_t offset = 0);
 
-  static std::shared_ptr<ArrayData> Make(const std::shared_ptr<DataType>& type,
-                                         int64_t length,
+  static std::shared_ptr<ArrayData> Make(std::shared_ptr<DataType> type, 
int64_t length,
                                          int64_t null_count = 
kUnknownNullCount,
                                          int64_t offset = 0);
 
@@ -232,11 +230,11 @@ struct ARROW_EXPORT ArrayData {
   }
 
   std::shared_ptr<DataType> type;
-  int64_t length;
-  mutable std::atomic<int64_t> null_count;
+  int64_t length = 0;
+  mutable std::atomic<int64_t> null_count{0};
   // The logical start point into the physical buffers (in values, not bytes).
   // Note that, for child data, this must be *added* to the child data's own 
offset.
-  int64_t offset;
+  int64_t offset = 0;
   std::vector<std::shared_ptr<Buffer>> buffers;
   std::vector<std::shared_ptr<ArrayData>> child_data;
 
diff --git a/cpp/src/arrow/compute/exec/key_encode.cc 
b/cpp/src/arrow/compute/exec/key_encode.cc
index 0c5f27c..de79558 100644
--- a/cpp/src/arrow/compute/exec/key_encode.cc
+++ b/cpp/src/arrow/compute/exec/key_encode.cc
@@ -35,7 +35,7 @@ Status KeyEncoder::KeyRowArray::Init(MemoryPool* pool, const 
KeyRowMetadata& met
   pool_ = pool;
   metadata_ = metadata;
 
-  ARROW_DCHECK(!null_masks_ && !offsets_ && !rows_);
+  DCHECK(!null_masks_ && !offsets_ && !rows_);
 
   constexpr int64_t rows_capacity = 8;
   constexpr int64_t bytes_capacity = 1024;
@@ -178,15 +178,14 @@ Status 
KeyEncoder::KeyRowArray::ResizeOptionalVaryingLengthBuffer(
 Status KeyEncoder::KeyRowArray::AppendSelectionFrom(const KeyRowArray& from,
                                                     uint32_t 
num_rows_to_append,
                                                     const uint16_t* 
source_row_ids) {
-  ARROW_DCHECK(metadata_.is_compatible(from.metadata()));
+  DCHECK(metadata_.is_compatible(from.metadata()));
 
   RETURN_NOT_OK(ResizeFixedLengthBuffers(num_rows_to_append));
 
   if (!metadata_.is_fixed_length) {
     // Varying-length rows
-    const uint32_t* from_offsets =
-        reinterpret_cast<const uint32_t*>(from.offsets_->data());
-    uint32_t* to_offsets = 
reinterpret_cast<uint32_t*>(offsets_->mutable_data());
+    auto from_offsets = reinterpret_cast<const 
uint32_t*>(from.offsets_->data());
+    auto to_offsets = reinterpret_cast<uint32_t*>(offsets_->mutable_data());
     uint32_t total_length = to_offsets[num_rows_];
     uint32_t total_length_to_append = 0;
     for (uint32_t i = 0; i < num_rows_to_append; ++i) {
@@ -203,9 +202,8 @@ Status KeyEncoder::KeyRowArray::AppendSelectionFrom(const 
KeyRowArray& from,
     for (uint32_t i = 0; i < num_rows_to_append; ++i) {
       uint16_t row_id = source_row_ids[i];
       uint32_t length = from_offsets[row_id + 1] - from_offsets[row_id];
-      const uint64_t* src64 =
-          reinterpret_cast<const uint64_t*>(src + from_offsets[row_id]);
-      uint64_t* dst64 = reinterpret_cast<uint64_t*>(dst);
+      auto src64 = reinterpret_cast<const uint64_t*>(src + 
from_offsets[row_id]);
+      auto dst64 = reinterpret_cast<uint64_t*>(dst);
       for (uint32_t j = 0; j < (length + 7) / 8; ++j) {
         dst64[j] = src64[j];
       }
@@ -218,8 +216,8 @@ Status KeyEncoder::KeyRowArray::AppendSelectionFrom(const 
KeyRowArray& from,
     for (uint32_t i = 0; i < num_rows_to_append; ++i) {
       uint16_t row_id = source_row_ids[i];
       uint32_t length = metadata_.fixed_length;
-      const uint64_t* src64 = reinterpret_cast<const uint64_t*>(src + length * 
row_id);
-      uint64_t* dst64 = reinterpret_cast<uint64_t*>(dst);
+      auto src64 = reinterpret_cast<const uint64_t*>(src + length * row_id);
+      auto dst64 = reinterpret_cast<uint64_t*>(dst);
       for (uint32_t j = 0; j < (length + 7) / 8; ++j) {
         dst64[j] = src64[j];
       }
@@ -285,51 +283,67 @@ KeyEncoder::KeyColumnArray::KeyColumnArray(const 
KeyColumnMetadata& metadata,
   }
   buffers_[buffer_id_to_replace] = right.buffers_[buffer_id_to_replace];
   mutable_buffers_[buffer_id_to_replace] = 
right.mutable_buffers_[buffer_id_to_replace];
+  bit_offset_[0] = left.bit_offset_[0];
+  bit_offset_[1] = left.bit_offset_[1];
+  if (buffer_id_to_replace < max_buffers_ - 1) {
+    bit_offset_[buffer_id_to_replace] = 
right.bit_offset_[buffer_id_to_replace];
+  }
 }
 
 KeyEncoder::KeyColumnArray::KeyColumnArray(const KeyColumnMetadata& metadata,
                                            int64_t length, const uint8_t* 
buffer0,
-                                           const uint8_t* buffer1,
-                                           const uint8_t* buffer2) {
+                                           const uint8_t* buffer1, const 
uint8_t* buffer2,
+                                           int bit_offset0, int bit_offset1) {
   metadata_ = metadata;
   length_ = length;
   buffers_[0] = buffer0;
   buffers_[1] = buffer1;
   buffers_[2] = buffer2;
   mutable_buffers_[0] = mutable_buffers_[1] = mutable_buffers_[2] = nullptr;
+  bit_offset_[0] = bit_offset0;
+  bit_offset_[1] = bit_offset1;
 }
 
 KeyEncoder::KeyColumnArray::KeyColumnArray(const KeyColumnMetadata& metadata,
                                            int64_t length, uint8_t* buffer0,
-                                           uint8_t* buffer1, uint8_t* buffer2) 
{
+                                           uint8_t* buffer1, uint8_t* buffer2,
+                                           int bit_offset0, int bit_offset1) {
   metadata_ = metadata;
   length_ = length;
   buffers_[0] = mutable_buffers_[0] = buffer0;
   buffers_[1] = mutable_buffers_[1] = buffer1;
   buffers_[2] = mutable_buffers_[2] = buffer2;
+  bit_offset_[0] = bit_offset0;
+  bit_offset_[1] = bit_offset1;
 }
 
 KeyEncoder::KeyColumnArray::KeyColumnArray(const KeyColumnArray& from, int64_t 
start,
                                            int64_t length) {
-  ARROW_DCHECK((start % 8) == 0);
   metadata_ = from.metadata_;
   length_ = length;
   uint32_t fixed_size =
       !metadata_.is_fixed_length ? sizeof(uint32_t) : metadata_.fixed_length;
 
-  buffers_[0] = from.buffers_[0] ? from.buffers_[0] + start / 8 : nullptr;
-  mutable_buffers_[0] =
-      from.mutable_buffers_[0] ? from.mutable_buffers_[0] + start / 8 : 
nullptr;
+  buffers_[0] =
+      from.buffers_[0] ? from.buffers_[0] + (from.bit_offset_[0] + start) / 8 
: nullptr;
+  mutable_buffers_[0] = from.mutable_buffers_[0]
+                            ? from.mutable_buffers_[0] + (from.bit_offset_[0] 
+ start) / 8
+                            : nullptr;
+  bit_offset_[0] = (from.bit_offset_[0] + start) % 8;
 
   if (fixed_size == 0) {
-    buffers_[1] = from.buffers_[1] ? from.buffers_[1] + start / 8 : nullptr;
-    mutable_buffers_[1] =
-        from.mutable_buffers_[1] ? from.mutable_buffers_[1] + start / 8 : 
nullptr;
+    buffers_[1] =
+        from.buffers_[1] ? from.buffers_[1] + (from.bit_offset_[1] + start) / 
8 : nullptr;
+    mutable_buffers_[1] = from.mutable_buffers_[1] ? from.mutable_buffers_[1] +
+                                                         (from.bit_offset_[1] 
+ start) / 8
+                                                   : nullptr;
+    bit_offset_[1] = (from.bit_offset_[1] + start) % 8;
   } else {
     buffers_[1] = from.buffers_[1] ? from.buffers_[1] + start * fixed_size : 
nullptr;
     mutable_buffers_[1] = from.mutable_buffers_[1]
                               ? from.mutable_buffers_[1] + start * fixed_size
                               : nullptr;
+    bit_offset_[1] = 0;
   }
 
   buffers_[2] = from.buffers_[2];
@@ -339,8 +353,8 @@ KeyEncoder::KeyColumnArray::KeyColumnArray(const 
KeyColumnArray& from, int64_t s
 KeyEncoder::KeyColumnArray KeyEncoder::TransformBoolean::ArrayReplace(
     const KeyColumnArray& column, const KeyColumnArray& temp) {
   // Make sure that the temp buffer is large enough
-  ARROW_DCHECK(temp.length() >= column.length() && 
temp.metadata().is_fixed_length &&
-               temp.metadata().fixed_length >= sizeof(uint8_t));
+  DCHECK(temp.length() >= column.length() && temp.metadata().is_fixed_length &&
+         temp.metadata().fixed_length >= sizeof(uint8_t));
   KeyColumnMetadata metadata;
   metadata.is_fixed_length = true;
   metadata.fixed_length = sizeof(uint8_t);
@@ -353,33 +367,31 @@ void KeyEncoder::TransformBoolean::PreEncode(const 
KeyColumnArray& input,
                                              KeyColumnArray* output,
                                              KeyEncoderContext* ctx) {
   // Make sure that metadata and lengths are compatible.
-  ARROW_DCHECK(output->metadata().is_fixed_length == 
input.metadata().is_fixed_length);
-  ARROW_DCHECK(output->metadata().fixed_length == 1 &&
-               input.metadata().fixed_length == 0);
-  ARROW_DCHECK(output->length() == input.length());
+  DCHECK(output->metadata().is_fixed_length == 
input.metadata().is_fixed_length);
+  DCHECK(output->metadata().fixed_length == 1 && input.metadata().fixed_length 
== 0);
+  DCHECK(output->length() == input.length());
   constexpr int buffer_index = 1;
-  ARROW_DCHECK(input.data(buffer_index) != nullptr);
-  ARROW_DCHECK(output->mutable_data(buffer_index) != nullptr);
-  util::BitUtil::bits_to_bytes(ctx->hardware_flags, 
static_cast<int>(input.length()),
-                               input.data(buffer_index),
-                               output->mutable_data(buffer_index));
+  DCHECK(input.data(buffer_index) != nullptr);
+  DCHECK(output->mutable_data(buffer_index) != nullptr);
+  util::BitUtil::bits_to_bytes(
+      ctx->hardware_flags, static_cast<int>(input.length()), 
input.data(buffer_index),
+      output->mutable_data(buffer_index), input.bit_offset(buffer_index));
 }
 
 void KeyEncoder::TransformBoolean::PostDecode(const KeyColumnArray& input,
                                               KeyColumnArray* output,
                                               KeyEncoderContext* ctx) {
   // Make sure that metadata and lengths are compatible.
-  ARROW_DCHECK(output->metadata().is_fixed_length == 
input.metadata().is_fixed_length);
-  ARROW_DCHECK(output->metadata().fixed_length == 0 &&
-               input.metadata().fixed_length == 1);
-  ARROW_DCHECK(output->length() == input.length());
+  DCHECK(output->metadata().is_fixed_length == 
input.metadata().is_fixed_length);
+  DCHECK(output->metadata().fixed_length == 0 && input.metadata().fixed_length 
== 1);
+  DCHECK(output->length() == input.length());
   constexpr int buffer_index = 1;
-  ARROW_DCHECK(input.data(buffer_index) != nullptr);
-  ARROW_DCHECK(output->mutable_data(buffer_index) != nullptr);
+  DCHECK(input.data(buffer_index) != nullptr);
+  DCHECK(output->mutable_data(buffer_index) != nullptr);
 
-  util::BitUtil::bytes_to_bits(ctx->hardware_flags, 
static_cast<int>(input.length()),
-                               input.data(buffer_index),
-                               output->mutable_data(buffer_index));
+  util::BitUtil::bytes_to_bits(
+      ctx->hardware_flags, static_cast<int>(input.length()), 
input.data(buffer_index),
+      output->mutable_data(buffer_index), output->bit_offset(buffer_index));
 }
 
 bool KeyEncoder::EncoderInteger::IsBoolean(const KeyColumnMetadata& metadata) {
@@ -425,12 +437,12 @@ void KeyEncoder::EncoderInteger::Encode(uint32_t 
offset_within_row, KeyRowArray*
     col_prep = col;
   }
 
-  uint32_t num_rows = static_cast<uint32_t>(col.length());
+  const auto num_rows = static_cast<uint32_t>(col.length());
 
   // When we have a single fixed length column we can just do memcpy
   if (rows->metadata().is_fixed_length &&
       rows->metadata().fixed_length == col.metadata().fixed_length) {
-    ARROW_DCHECK(offset_within_row == 0);
+    DCHECK_EQ(offset_within_row, 0);
     uint32_t row_size = col.metadata().fixed_length;
     memcpy(rows->mutable_data(1), col.data(1), num_rows * row_size);
   } else if (rows->metadata().is_fixed_length) {
@@ -462,7 +474,7 @@ void KeyEncoder::EncoderInteger::Encode(uint32_t 
offset_within_row, KeyRowArray*
         }
         break;
       default:
-        ARROW_DCHECK(false);
+        DCHECK(false);
     }
   } else {
     const uint32_t* row_offsets = rows->offsets();
@@ -493,7 +505,7 @@ void KeyEncoder::EncoderInteger::Encode(uint32_t 
offset_within_row, KeyRowArray*
         }
         break;
       default:
-        ARROW_DCHECK(false);
+        DCHECK(false);
     }
   }
 }
@@ -512,7 +524,7 @@ void KeyEncoder::EncoderInteger::Decode(uint32_t start_row, 
uint32_t num_rows,
   // When we have a single fixed length column we can just do memcpy
   if (rows.metadata().is_fixed_length &&
       col_prep.metadata().fixed_length == rows.metadata().fixed_length) {
-    ARROW_DCHECK(offset_within_row == 0);
+    DCHECK_EQ(offset_within_row, 0);
     uint32_t row_size = rows.metadata().fixed_length;
     memcpy(col_prep.mutable_data(1), rows.data(1) + start_row * row_size,
            num_rows * row_size);
@@ -546,7 +558,7 @@ void KeyEncoder::EncoderInteger::Decode(uint32_t start_row, 
uint32_t num_rows,
         }
         break;
       default:
-        ARROW_DCHECK(false);
+        DCHECK(false);
     }
   } else {
     const uint32_t* row_offsets = rows.offsets() + start_row;
@@ -578,7 +590,7 @@ void KeyEncoder::EncoderInteger::Decode(uint32_t start_row, 
uint32_t num_rows,
         }
         break;
       default:
-        ARROW_DCHECK(false);
+        DCHECK(false);
     }
   }
 
@@ -625,9 +637,9 @@ void KeyEncoder::EncoderBinary::Encode(uint32_t 
offset_within_row, KeyRowArray*
 #endif
   }
 
-  ARROW_DCHECK(temp->metadata().is_fixed_length);
-  ARROW_DCHECK(temp->length() * temp->metadata().fixed_length >=
-               col.length() * static_cast<int64_t>(sizeof(uint16_t)));
+  DCHECK(temp->metadata().is_fixed_length);
+  DCHECK(temp->length() * temp->metadata().fixed_length >=
+         col.length() * static_cast<int64_t>(sizeof(uint16_t)));
 
   KeyColumnArray temp16bit(KeyColumnMetadata(true, sizeof(uint16_t)), 
col.length(),
                            nullptr, temp->mutable_data(1), nullptr);
@@ -677,8 +689,8 @@ void KeyEncoder::EncoderBinary::EncodeImp(uint32_t 
offset_within_row, KeyRowArra
   EncodeDecodeHelper<is_row_fixed_length, true>(
       0, static_cast<uint32_t>(col.length()), offset_within_row, rows, rows, 
&col,
       nullptr, [](uint8_t* dst, const uint8_t* src, int64_t length) {
-        uint64_t* dst64 = reinterpret_cast<uint64_t*>(dst);
-        const uint64_t* src64 = reinterpret_cast<const uint64_t*>(src);
+        auto dst64 = reinterpret_cast<uint64_t*>(dst);
+        auto src64 = reinterpret_cast<const uint64_t*>(src);
         uint32_t istripe;
         for (istripe = 0; istripe < length / 8; ++istripe) {
           dst64[istripe] = util::SafeLoad(src64 + istripe);
@@ -699,8 +711,8 @@ void KeyEncoder::EncoderBinary::DecodeImp(uint32_t 
start_row, uint32_t num_rows,
       start_row, num_rows, offset_within_row, &rows, nullptr, col, col,
       [](uint8_t* dst, const uint8_t* src, int64_t length) {
         for (uint32_t istripe = 0; istripe < (length + 7) / 8; ++istripe) {
-          uint64_t* dst64 = reinterpret_cast<uint64_t*>(dst);
-          const uint64_t* src64 = reinterpret_cast<const uint64_t*>(src);
+          auto dst64 = reinterpret_cast<uint64_t*>(dst);
+          auto src64 = reinterpret_cast<const uint64_t*>(src);
           util::SafeStore(dst64 + istripe, src64[istripe]);
         }
       });
@@ -709,8 +721,8 @@ void KeyEncoder::EncoderBinary::DecodeImp(uint32_t 
start_row, uint32_t num_rows,
 void KeyEncoder::EncoderBinary::ColumnMemsetNulls(
     uint32_t offset_within_row, KeyRowArray* rows, const KeyColumnArray& col,
     KeyEncoderContext* ctx, KeyColumnArray* temp_vector_16bit, uint8_t 
byte_value) {
-  typedef void (*ColumnMemsetNullsImp_t)(uint32_t, KeyRowArray*, const 
KeyColumnArray&,
-                                         KeyEncoderContext*, KeyColumnArray*, 
uint8_t);
+  using ColumnMemsetNullsImp_t = void (*)(uint32_t, KeyRowArray*, const 
KeyColumnArray&,
+                                          KeyEncoderContext*, KeyColumnArray*, 
uint8_t);
   static const ColumnMemsetNullsImp_t ColumnMemsetNullsImp_fn[] = {
       ColumnMemsetNullsImp<false, 1>,  ColumnMemsetNullsImp<false, 2>,
       ColumnMemsetNullsImp<false, 4>,  ColumnMemsetNullsImp<false, 8>,
@@ -735,18 +747,19 @@ void KeyEncoder::EncoderBinary::ColumnMemsetNullsImp(
     return;
   }
 
-  uint32_t num_rows = static_cast<uint32_t>(col.length());
+  const auto num_rows = static_cast<uint32_t>(col.length());
 
   // Temp vector needs space for the required number of rows
-  ARROW_DCHECK(temp_vector_16bit->length() >= num_rows);
-  ARROW_DCHECK(temp_vector_16bit->metadata().is_fixed_length &&
-               temp_vector_16bit->metadata().fixed_length == sizeof(uint16_t));
-  uint16_t* temp_vector = 
reinterpret_cast<uint16_t*>(temp_vector_16bit->mutable_data(1));
+  DCHECK(temp_vector_16bit->length() >= num_rows);
+  DCHECK(temp_vector_16bit->metadata().is_fixed_length &&
+         temp_vector_16bit->metadata().fixed_length == sizeof(uint16_t));
+  auto temp_vector = 
reinterpret_cast<uint16_t*>(temp_vector_16bit->mutable_data(1));
 
   // Bit vector to index vector of null positions
   int num_selected;
   util::BitUtil::bits_to_indexes(0, ctx->hardware_flags, 
static_cast<int>(col.length()),
-                                 col.data(0), &num_selected, temp_vector);
+                                 col.data(0), &num_selected, temp_vector,
+                                 col.bit_offset(0));
 
   for (int i = 0; i < num_selected; ++i) {
     uint32_t row_id = temp_vector[i];
@@ -793,7 +806,7 @@ void KeyEncoder::EncoderBinaryPair::Encode(uint32_t 
offset_within_row, KeyRowArr
                                            const KeyColumnArray& col2,
                                            KeyEncoderContext* ctx, 
KeyColumnArray* temp1,
                                            KeyColumnArray* temp2) {
-  ARROW_DCHECK(CanProcessPair(col1.metadata(), col2.metadata()));
+  DCHECK(CanProcessPair(col1.metadata(), col2.metadata()));
 
   KeyColumnArray col_prep[2];
   if (EncoderInteger::UsesTransform(col1)) {
@@ -818,7 +831,7 @@ void KeyEncoder::EncoderBinaryPair::Encode(uint32_t 
offset_within_row, KeyRowArr
 
   bool is_row_fixed_length = rows->metadata().is_fixed_length;
 
-  uint32_t num_rows = static_cast<uint32_t>(col1.length());
+  const auto num_rows = static_cast<uint32_t>(col1.length());
   uint32_t num_processed = 0;
 #if defined(ARROW_HAVE_AVX2)
   if (ctx->has_avx2() && col_width1 == col_width2) {
@@ -862,7 +875,7 @@ void KeyEncoder::EncoderBinaryPair::EncodeImp(uint32_t 
num_rows_to_skip,
   const uint8_t* src_A = col1.data(1);
   const uint8_t* src_B = col2.data(1);
 
-  uint32_t num_rows = static_cast<uint32_t>(col1.length());
+  const auto num_rows = static_cast<uint32_t>(col1.length());
 
   uint32_t fixed_length = rows->metadata().fixed_length;
   const uint32_t* offsets;
@@ -901,7 +914,7 @@ void KeyEncoder::EncoderBinaryPair::Decode(uint32_t 
start_row, uint32_t num_rows
                                            const KeyRowArray& rows, 
KeyColumnArray* col1,
                                            KeyColumnArray* col2, 
KeyEncoderContext* ctx,
                                            KeyColumnArray* temp1, 
KeyColumnArray* temp2) {
-  ARROW_DCHECK(CanProcessPair(col1->metadata(), col2->metadata()));
+  DCHECK(CanProcessPair(col1->metadata(), col2->metadata()));
 
   KeyColumnArray col_prep[2];
   if (EncoderInteger::UsesTransform(*col1)) {
@@ -933,8 +946,8 @@ void KeyEncoder::EncoderBinaryPair::Decode(uint32_t 
start_row, uint32_t num_rows
   }
 #endif
   if (num_processed < num_rows) {
-    typedef void (*DecodeImp_t)(uint32_t, uint32_t, uint32_t, uint32_t,
-                                const KeyRowArray&, KeyColumnArray*, 
KeyColumnArray*);
+    using DecodeImp_t = void (*)(uint32_t, uint32_t, uint32_t, uint32_t,
+                                 const KeyRowArray&, KeyColumnArray*, 
KeyColumnArray*);
     static const DecodeImp_t DecodeImp_fn[] = {
         DecodeImp<false, uint8_t, uint8_t>,   DecodeImp<false, uint16_t, 
uint8_t>,
         DecodeImp<false, uint32_t, uint8_t>,  DecodeImp<false, uint64_t, 
uint8_t>,
@@ -973,8 +986,8 @@ void KeyEncoder::EncoderBinaryPair::DecodeImp(uint32_t 
num_rows_to_skip,
                                               const KeyRowArray& rows,
                                               KeyColumnArray* col1,
                                               KeyColumnArray* col2) {
-  ARROW_DCHECK(rows.length() >= start_row + num_rows);
-  ARROW_DCHECK(col1->length() == num_rows && col2->length() == num_rows);
+  DCHECK(rows.length() >= start_row + num_rows);
+  DCHECK(col1->length() == num_rows && col2->length() == num_rows);
 
   uint8_t* dst_A = col1->mutable_data(1);
   uint8_t* dst_B = col2->mutable_data(1);
@@ -1014,25 +1027,31 @@ void KeyEncoder::EncoderBinaryPair::DecodeImp(uint32_t 
num_rows_to_skip,
 void KeyEncoder::EncoderOffsets::Encode(KeyRowArray* rows,
                                         const std::vector<KeyColumnArray>& 
varbinary_cols,
                                         KeyEncoderContext* ctx) {
-  ARROW_DCHECK(!varbinary_cols.empty());
+  DCHECK(!varbinary_cols.empty());
 
   // Rows and columns must all be varying-length
-  ARROW_DCHECK(!rows->metadata().is_fixed_length);
-  for (size_t col = 0; col < varbinary_cols.size(); ++col) {
-    ARROW_DCHECK(!varbinary_cols[col].metadata().is_fixed_length);
+  DCHECK(!rows->metadata().is_fixed_length);
+  for (const auto& col : varbinary_cols) {
+    DCHECK(!col.metadata().is_fixed_length);
   }
 
-  uint32_t num_rows = static_cast<uint32_t>(varbinary_cols[0].length());
+  const auto num_rows = static_cast<uint32_t>(varbinary_cols[0].length());
+
+  uint32_t num_processed = 0;
+#if defined(ARROW_HAVE_AVX2)
+  // Whether any of the columns has non-zero starting bit offset for non-nulls 
bit vector
+  bool has_bit_offset = false;
 
   // The space in columns must be exactly equal to a space for offsets in rows
-  ARROW_DCHECK(rows->length() == num_rows);
-  for (size_t col = 0; col < varbinary_cols.size(); ++col) {
-    ARROW_DCHECK(varbinary_cols[col].length() == num_rows);
+  DCHECK(rows->length() == num_rows);
+  for (const auto& col : varbinary_cols) {
+    DCHECK(col.length() == num_rows);
+    if (col.bit_offset(0) != 0) {
+      has_bit_offset = true;
+    }
   }
 
-  uint32_t num_processed = 0;
-#if defined(ARROW_HAVE_AVX2)
-  if (ctx->has_avx2()) {
+  if (ctx->has_avx2() && !has_bit_offset) {
     // Create a temp vector sized based on the number of columns
     auto temp_buffer_holder = util::TempVectorHolder<uint32_t>(
         ctx->stack, static_cast<uint32_t>(varbinary_cols.size()) * 8);
@@ -1051,14 +1070,14 @@ void KeyEncoder::EncoderOffsets::Encode(KeyRowArray* 
rows,
 void KeyEncoder::EncoderOffsets::EncodeImp(
     uint32_t num_rows_already_processed, KeyRowArray* rows,
     const std::vector<KeyColumnArray>& varbinary_cols) {
-  ARROW_DCHECK(varbinary_cols.size() > 0);
+  DCHECK_GT(varbinary_cols.size(), 0);
 
   int row_alignment = rows->metadata().row_alignment;
   int string_alignment = rows->metadata().string_alignment;
 
   uint32_t* row_offsets = rows->mutable_offsets();
   uint8_t* row_values = rows->mutable_data(2);
-  uint32_t num_rows = static_cast<uint32_t>(varbinary_cols[0].length());
+  const auto num_rows = static_cast<uint32_t>(varbinary_cols[0].length());
 
   if (num_rows_already_processed == 0) {
     row_offsets[0] = 0;
@@ -1079,8 +1098,10 @@ void KeyEncoder::EncoderOffsets::EncodeImp(
       const uint32_t* col_offsets = varbinary_cols[col].offsets();
       uint32_t col_length = col_offsets[i + 1] - col_offsets[i];
 
+      const int bit_offset = varbinary_cols[col].bit_offset(0);
+
       const uint8_t* non_nulls = varbinary_cols[col].data(0);
-      if (non_nulls && BitUtil::GetBit(non_nulls, i) == 0) {
+      if (non_nulls && BitUtil::GetBit(non_nulls, bit_offset + i) == 0) {
         col_length = 0;
       }
 
@@ -1102,19 +1123,16 @@ void KeyEncoder::EncoderOffsets::Decode(
     uint32_t start_row, uint32_t num_rows, const KeyRowArray& rows,
     std::vector<KeyColumnArray>* varbinary_cols,
     const std::vector<uint32_t>& varbinary_cols_base_offset, 
KeyEncoderContext* ctx) {
-  ARROW_DCHECK(!varbinary_cols->empty());
-  ARROW_DCHECK(varbinary_cols->size() == varbinary_cols_base_offset.size());
+  DCHECK(!varbinary_cols->empty());
+  DCHECK(varbinary_cols->size() == varbinary_cols_base_offset.size());
 
-  // Rows and columns must all be varying-length
-  ARROW_DCHECK(!rows.metadata().is_fixed_length);
-  for (size_t col = 0; col < varbinary_cols->size(); ++col) {
-    ARROW_DCHECK(!(*varbinary_cols)[col].metadata().is_fixed_length);
-  }
-
-  // The space in columns must be exactly equal to a subset of rows selected
-  ARROW_DCHECK(rows.length() >= start_row + num_rows);
-  for (size_t col = 0; col < varbinary_cols->size(); ++col) {
-    ARROW_DCHECK((*varbinary_cols)[col].length() == num_rows);
+  DCHECK(!rows.metadata().is_fixed_length);
+  DCHECK(rows.length() >= start_row + num_rows);
+  for (const auto& col : *varbinary_cols) {
+    // Rows and columns must all be varying-length
+    DCHECK(!col.metadata().is_fixed_length);
+    // The space in columns must be exactly equal to a subset of rows selected
+    DCHECK(col.length() == num_rows);
   }
 
   // Offsets of varbinary columns data within each encoded row are stored
@@ -1197,8 +1215,8 @@ void KeyEncoder::EncoderVarBinary::EncodeImp(uint32_t 
varbinary_col_id, KeyRowAr
   EncodeDecodeHelper<first_varbinary_col, true>(
       0, static_cast<uint32_t>(col.length()), varbinary_col_id, rows, rows, 
&col, nullptr,
       [](uint8_t* dst, const uint8_t* src, int64_t length) {
-        uint64_t* dst64 = reinterpret_cast<uint64_t*>(dst);
-        const uint64_t* src64 = reinterpret_cast<const uint64_t*>(src);
+        auto dst64 = reinterpret_cast<uint64_t*>(dst);
+        auto src64 = reinterpret_cast<const uint64_t*>(src);
         uint32_t istripe;
         for (istripe = 0; istripe < length / 8; ++istripe) {
           dst64[istripe] = util::SafeLoad(src64 + istripe);
@@ -1220,8 +1238,8 @@ void KeyEncoder::EncoderVarBinary::DecodeImp(uint32_t 
start_row, uint32_t num_ro
       start_row, num_rows, varbinary_col_id, &rows, nullptr, col, col,
       [](uint8_t* dst, const uint8_t* src, int64_t length) {
         for (uint32_t istripe = 0; istripe < (length + 7) / 8; ++istripe) {
-          uint64_t* dst64 = reinterpret_cast<uint64_t*>(dst);
-          const uint64_t* src64 = reinterpret_cast<const uint64_t*>(src);
+          auto dst64 = reinterpret_cast<uint64_t*>(dst);
+          auto src64 = reinterpret_cast<const uint64_t*>(src);
           util::SafeStore(dst64 + istripe, src64[istripe]);
         }
       });
@@ -1231,19 +1249,19 @@ void KeyEncoder::EncoderNulls::Encode(KeyRowArray* rows,
                                       const std::vector<KeyColumnArray>& cols,
                                       KeyEncoderContext* ctx,
                                       KeyColumnArray* temp_vector_16bit) {
-  ARROW_DCHECK(cols.size() > 0);
-  uint32_t num_rows = static_cast<uint32_t>(rows->length());
+  DCHECK_GT(cols.size(), 0);
+  const auto num_rows = static_cast<uint32_t>(rows->length());
 
   // All input columns should have the same number of rows.
   // They may or may not have non-nulls bit-vectors allocated.
-  for (size_t col = 0; col < cols.size(); ++col) {
-    ARROW_DCHECK(cols[col].length() == num_rows);
+  for (const auto& col : cols) {
+    DCHECK(col.length() == num_rows);
   }
 
   // Temp vector needs space for the required number of rows
-  ARROW_DCHECK(temp_vector_16bit->length() >= num_rows);
-  ARROW_DCHECK(temp_vector_16bit->metadata().is_fixed_length &&
-               temp_vector_16bit->metadata().fixed_length == sizeof(uint16_t));
+  DCHECK(temp_vector_16bit->length() >= num_rows);
+  DCHECK(temp_vector_16bit->metadata().is_fixed_length &&
+         temp_vector_16bit->metadata().fixed_length == sizeof(uint16_t));
 
   uint8_t* null_masks = rows->null_masks();
   uint32_t null_masks_bytes_per_row = 
rows->metadata().null_masks_bytes_per_row;
@@ -1253,10 +1271,12 @@ void KeyEncoder::EncoderNulls::Encode(KeyRowArray* rows,
     if (!non_nulls) {
       continue;
     }
+    int bit_offset = cols[col].bit_offset(0);
+    DCHECK_LT(bit_offset, 8);
     int num_selected;
     util::BitUtil::bits_to_indexes(
         0, ctx->hardware_flags, num_rows, non_nulls, &num_selected,
-        reinterpret_cast<uint16_t*>(temp_vector_16bit->mutable_data(1)));
+        reinterpret_cast<uint16_t*>(temp_vector_16bit->mutable_data(1)), 
bit_offset);
     for (int i = 0; i < num_selected; ++i) {
       uint16_t row_id = reinterpret_cast<const 
uint16_t*>(temp_vector_16bit->data(1))[i];
       int64_t null_masks_bit_id = row_id * null_masks_bytes_per_row * 8 + col;
@@ -1270,23 +1290,29 @@ void KeyEncoder::EncoderNulls::Decode(uint32_t 
start_row, uint32_t num_rows,
                                       std::vector<KeyColumnArray>* cols) {
   // Every output column needs to have a space for exactly the required number
   // of rows. It also needs to have non-nulls bit-vector allocated and mutable.
-  ARROW_DCHECK(cols->size() > 0);
-  for (size_t col = 0; col < cols->size(); ++col) {
-    ARROW_DCHECK((*cols)[col].length() == num_rows);
-    ARROW_DCHECK((*cols)[col].mutable_data(0));
+  DCHECK_GT(cols->size(), 0);
+  for (auto& col : *cols) {
+    DCHECK(col.length() == num_rows);
+    DCHECK(col.mutable_data(0));
   }
 
   const uint8_t* null_masks = rows.null_masks();
   uint32_t null_masks_bytes_per_row = rows.metadata().null_masks_bytes_per_row;
   for (size_t col = 0; col < cols->size(); ++col) {
     uint8_t* non_nulls = (*cols)[col].mutable_data(0);
-    memset(non_nulls, 0xff, BitUtil::BytesForBits(num_rows));
+    const int bit_offset = (*cols)[col].bit_offset(0);
+    DCHECK_LT(bit_offset, 8);
+    non_nulls[0] |= 0xff << (bit_offset);
+    if (bit_offset + num_rows > 8) {
+      int bits_in_first_byte = 8 - bit_offset;
+      memset(non_nulls + 1, 0xff, BitUtil::BytesForBits(num_rows - 
bits_in_first_byte));
+    }
     for (uint32_t row = 0; row < num_rows; ++row) {
       uint32_t null_masks_bit_id =
           (start_row + row) * null_masks_bytes_per_row * 8 + 
static_cast<uint32_t>(col);
       bool is_set = BitUtil::GetBit(null_masks, null_masks_bit_id);
       if (is_set) {
-        BitUtil::ClearBit(non_nulls, row);
+        BitUtil::ClearBit(non_nulls, bit_offset + row);
       }
     }
   }
@@ -1294,8 +1320,8 @@ void KeyEncoder::EncoderNulls::Decode(uint32_t start_row, 
uint32_t num_rows,
 
 uint32_t KeyEncoder::KeyRowMetadata::num_varbinary_cols() const {
   uint32_t result = 0;
-  for (size_t i = 0; i < column_metadatas.size(); ++i) {
-    if (!column_metadatas[i].is_fixed_length) {
+  for (auto column_metadata : column_metadatas) {
+    if (!column_metadata.is_fixed_length) {
       ++result;
     }
   }
@@ -1330,7 +1356,7 @@ void KeyEncoder::KeyRowMetadata::FromColumnMetadataVector(
     column_metadatas[i] = cols[i];
   }
 
-  uint32_t num_cols = static_cast<uint32_t>(cols.size());
+  const auto num_cols = static_cast<uint32_t>(cols.size());
 
   // Sort columns.
   // Columns are sorted based on the size in bytes of their fixed-length part.
@@ -1389,8 +1415,8 @@ void KeyEncoder::KeyRowMetadata::FromColumnMetadataVector(
       if (num_varbinary_cols == 0) {
         varbinary_end_array_offset = offset_within_row;
       }
-      ARROW_DCHECK(column_offsets[i] - varbinary_end_array_offset ==
-                   num_varbinary_cols * sizeof(uint32_t));
+      DCHECK(column_offsets[i] - varbinary_end_array_offset ==
+             num_varbinary_cols * sizeof(uint32_t));
       ++num_varbinary_cols;
       offset_within_row += sizeof(uint32_t);
     } else {
@@ -1433,8 +1459,8 @@ void KeyEncoder::Init(const 
std::vector<KeyColumnMetadata>& cols, KeyEncoderCont
 
 void KeyEncoder::PrepareKeyColumnArrays(int64_t start_row, int64_t num_rows,
                                         const std::vector<KeyColumnArray>& 
cols_in) {
-  uint32_t num_cols = static_cast<uint32_t>(cols_in.size());
-  ARROW_DCHECK(batch_all_cols_.size() == num_cols);
+  const auto num_cols = static_cast<uint32_t>(cols_in.size());
+  DCHECK(batch_all_cols_.size() == num_cols);
 
   uint32_t num_varbinary_visited = 0;
   for (uint32_t i = 0; i < num_cols; ++i) {
@@ -1442,7 +1468,7 @@ void KeyEncoder::PrepareKeyColumnArrays(int64_t 
start_row, int64_t num_rows,
     KeyColumnArray col_window(col, start_row, num_rows);
     batch_all_cols_[i] = col_window;
     if (!col.metadata().is_fixed_length) {
-      ARROW_DCHECK(num_varbinary_visited < batch_varbinary_cols_.size());
+      DCHECK(num_varbinary_visited < batch_varbinary_cols_.size());
       // If start row is zero, then base offset of varbinary column is also 
zero.
       if (start_row == 0) {
         batch_varbinary_cols_base_offsets_[num_varbinary_visited] = 0;
@@ -1462,10 +1488,9 @@ Status KeyEncoder::PrepareOutputForEncode(int64_t 
start_row, int64_t num_rows,
 
   int64_t fixed_part = row_metadata_.fixed_length * num_rows;
   int64_t var_part = 0;
-  for (size_t i = 0; i < all_cols.size(); ++i) {
-    const KeyColumnArray& col = all_cols[i];
+  for (const auto& col : all_cols) {
     if (!col.metadata().is_fixed_length) {
-      ARROW_DCHECK(col.length() >= start_row + num_rows);
+      DCHECK(col.length() >= start_row + num_rows);
       const uint32_t* offsets = col.offsets();
       var_part += offsets[start_row + num_rows] - offsets[start_row];
       // Include maximum padding that can be added to align the start of 
varbinary fields.
@@ -1509,16 +1534,16 @@ void KeyEncoder::Encode(int64_t start_row, int64_t 
num_rows, KeyRowArray* rows,
     // - offsets for individual varbinary fields within each row
     EncoderOffsets::Encode(rows, batch_varbinary_cols_, ctx_);
 
-    uint32_t num_varbinary_cols = 
static_cast<uint32_t>(batch_varbinary_cols_.size());
-    for (uint32_t i = 0; i < num_varbinary_cols; ++i) {
+    for (size_t i = 0; i < batch_varbinary_cols_.size(); ++i) {
       // Memcpy varbinary fields into precomputed in the previous step
       // positions in the output row buffer.
-      EncoderVarBinary::Encode(i, rows, batch_varbinary_cols_[i], ctx_);
+      EncoderVarBinary::Encode(static_cast<uint32_t>(i), rows, 
batch_varbinary_cols_[i],
+                               ctx_);
     }
   }
 
   // Process fixed length columns
-  uint32_t num_cols = static_cast<uint32_t>(batch_all_cols_.size());
+  const auto num_cols = static_cast<uint32_t>(batch_all_cols_.size());
   for (uint32_t i = 0; i < num_cols;) {
     if (!batch_all_cols_[i].metadata().is_fixed_length) {
       i += 1;
@@ -1571,7 +1596,7 @@ void KeyEncoder::DecodeFixedLengthBuffers(int64_t 
start_row_input,
   }
 
   // Process fixed length columns
-  uint32_t num_cols = static_cast<uint32_t>(batch_all_cols_.size());
+  const auto num_cols = static_cast<uint32_t>(batch_all_cols_.size());
   for (uint32_t i = 0; i < num_cols;) {
     if (!batch_all_cols_[i].metadata().is_fixed_length) {
       i += 1;
@@ -1610,13 +1635,12 @@ void KeyEncoder::DecodeVaryingLengthBuffers(int64_t 
start_row_input,
 
   bool is_row_fixed_length = row_metadata_.is_fixed_length;
   if (!is_row_fixed_length) {
-    uint32_t num_varbinary_cols = 
static_cast<uint32_t>(batch_varbinary_cols_.size());
-    for (uint32_t i = 0; i < num_varbinary_cols; ++i) {
+    for (size_t i = 0; i < batch_varbinary_cols_.size(); ++i) {
       // Memcpy varbinary fields into precomputed in the previous step
       // positions in the output row buffer.
       EncoderVarBinary::Decode(static_cast<uint32_t>(start_row_input),
-                               static_cast<uint32_t>(num_rows), i, rows,
-                               &batch_varbinary_cols_[i], ctx_);
+                               static_cast<uint32_t>(num_rows), 
static_cast<uint32_t>(i),
+                               rows, &batch_varbinary_cols_[i], ctx_);
     }
   }
 }
diff --git a/cpp/src/arrow/compute/exec/key_encode.h 
b/cpp/src/arrow/compute/exec/key_encode.h
index 3f5ef36..e5397b9 100644
--- a/cpp/src/arrow/compute/exec/key_encode.h
+++ b/cpp/src/arrow/compute/exec/key_encode.h
@@ -247,11 +247,12 @@ class KeyEncoder {
                    const KeyColumnArray& right, int buffer_id_to_replace);
     /// Create for reading
     KeyColumnArray(const KeyColumnMetadata& metadata, int64_t length,
-                   const uint8_t* buffer0, const uint8_t* buffer1,
-                   const uint8_t* buffer2);
+                   const uint8_t* buffer0, const uint8_t* buffer1, const 
uint8_t* buffer2,
+                   int bit_offset0 = 0, int bit_offset1 = 0);
     /// Create for writing
     KeyColumnArray(const KeyColumnMetadata& metadata, int64_t length, uint8_t* 
buffer0,
-                   uint8_t* buffer1, uint8_t* buffer2);
+                   uint8_t* buffer1, uint8_t* buffer2, int bit_offset0 = 0,
+                   int bit_offset1 = 0);
     /// Create as a window view of original description that is offset
     /// by a given number of rows.
     /// The number of rows used in offset must be divisible by 8
@@ -269,6 +270,10 @@ class KeyEncoder {
     const uint32_t* offsets() const { return reinterpret_cast<const 
uint32_t*>(data(1)); }
     const KeyColumnMetadata& metadata() const { return metadata_; }
     int64_t length() const { return length_; }
+    int bit_offset(int i) const {
+      ARROW_DCHECK(i >= 0 && i < max_buffers_);
+      return bit_offset_[i];
+    }
 
    private:
     static constexpr int max_buffers_ = 3;
@@ -276,6 +281,9 @@ class KeyEncoder {
     uint8_t* mutable_buffers_[max_buffers_];
     KeyColumnMetadata metadata_;
     int64_t length_;
+    // Starting bit offset within the first byte (between 0 and 7)
+    // to be used when accessing buffers that store bit vectors.
+    int bit_offset_[max_buffers_ - 1];
   };
 
   void Init(const std::vector<KeyColumnMetadata>& cols, KeyEncoderContext* ctx,
diff --git a/cpp/src/arrow/compute/exec/util.cc 
b/cpp/src/arrow/compute/exec/util.cc
index 8830334..a44676c 100644
--- a/cpp/src/arrow/compute/exec/util.cc
+++ b/cpp/src/arrow/compute/exec/util.cc
@@ -99,7 +99,26 @@ void BitUtil::bits_to_indexes_internal(int64_t 
hardware_flags, const int num_bit
 
 void BitUtil::bits_to_indexes(int bit_to_search, int64_t hardware_flags,
                               const int num_bits, const uint8_t* bits, int* 
num_indexes,
-                              uint16_t* indexes) {
+                              uint16_t* indexes, int bit_offset) {
+  bits += bit_offset / 8;
+  bit_offset %= 8;
+  if (bit_offset != 0) {
+    int num_indexes_head = 0;
+    uint64_t bits_head =
+        util::SafeLoad(reinterpret_cast<const uint64_t*>(bits)) >> bit_offset;
+    int bits_in_first_byte = std::min(num_bits, 8 - bit_offset);
+    bits_to_indexes(bit_to_search, hardware_flags, bits_in_first_byte,
+                    reinterpret_cast<const uint8_t*>(&bits_head), 
&num_indexes_head,
+                    indexes);
+    int num_indexes_tail = 0;
+    if (num_bits > bits_in_first_byte) {
+      bits_to_indexes(bit_to_search, hardware_flags, num_bits - 
bits_in_first_byte,
+                      bits + 1, &num_indexes_tail, indexes + num_indexes_head);
+    }
+    *num_indexes = num_indexes_head + num_indexes_tail;
+    return;
+  }
+
   if (bit_to_search == 0) {
     bits_to_indexes_internal<0, false>(hardware_flags, num_bits, bits, nullptr,
                                        num_indexes, indexes);
@@ -113,7 +132,27 @@ void BitUtil::bits_to_indexes(int bit_to_search, int64_t 
hardware_flags,
 void BitUtil::bits_filter_indexes(int bit_to_search, int64_t hardware_flags,
                                   const int num_bits, const uint8_t* bits,
                                   const uint16_t* input_indexes, int* 
num_indexes,
-                                  uint16_t* indexes) {
+                                  uint16_t* indexes, int bit_offset) {
+  bits += bit_offset / 8;
+  bit_offset %= 8;
+  if (bit_offset != 0) {
+    int num_indexes_head = 0;
+    uint64_t bits_head =
+        util::SafeLoad(reinterpret_cast<const uint64_t*>(bits)) >> bit_offset;
+    int bits_in_first_byte = std::min(num_bits, 8 - bit_offset);
+    bits_filter_indexes(bit_to_search, hardware_flags, bits_in_first_byte,
+                        reinterpret_cast<const uint8_t*>(&bits_head), 
input_indexes,
+                        &num_indexes_head, indexes);
+    int num_indexes_tail = 0;
+    if (num_bits > bits_in_first_byte) {
+      bits_filter_indexes(bit_to_search, hardware_flags, num_bits - 
bits_in_first_byte,
+                          bits + 1, input_indexes + bits_in_first_byte, 
&num_indexes_tail,
+                          indexes + num_indexes_head);
+    }
+    *num_indexes = num_indexes_head + num_indexes_tail;
+    return;
+  }
+
   if (bit_to_search == 0) {
     bits_to_indexes_internal<0, true>(hardware_flags, num_bits, bits, 
input_indexes,
                                       num_indexes, indexes);
@@ -126,46 +165,32 @@ void BitUtil::bits_filter_indexes(int bit_to_search, 
int64_t hardware_flags,
 
 void BitUtil::bits_split_indexes(int64_t hardware_flags, const int num_bits,
                                  const uint8_t* bits, int* num_indexes_bit0,
-                                 uint16_t* indexes_bit0, uint16_t* 
indexes_bit1) {
-  bits_to_indexes(0, hardware_flags, num_bits, bits, num_indexes_bit0, 
indexes_bit0);
+                                 uint16_t* indexes_bit0, uint16_t* 
indexes_bit1,
+                                 int bit_offset) {
+  bits_to_indexes(0, hardware_flags, num_bits, bits, num_indexes_bit0, 
indexes_bit0,
+                  bit_offset);
   int num_indexes_bit1;
-  bits_to_indexes(1, hardware_flags, num_bits, bits, &num_indexes_bit1, 
indexes_bit1);
+  bits_to_indexes(1, hardware_flags, num_bits, bits, &num_indexes_bit1, 
indexes_bit1,
+                  bit_offset);
 }
 
-void BitUtil::bits_to_bytes_internal(const int num_bits, const uint8_t* bits,
-                                     uint8_t* bytes) {
-  constexpr int unroll = 8;
-  // Processing 8 bits at a time
-  for (int i = 0; i < (num_bits + unroll - 1) / unroll; ++i) {
-    uint8_t bits_next = bits[i];
-    // Clear the lowest bit and then make 8 copies of remaining 7 bits, each 7 
bits apart
-    // from the previous.
-    uint64_t unpacked = static_cast<uint64_t>(bits_next & 0xfe) *
-                        ((1ULL << 7) | (1ULL << 14) | (1ULL << 21) | (1ULL << 
28) |
-                         (1ULL << 35) | (1ULL << 42) | (1ULL << 49));
-    unpacked |= (bits_next & 1);
-    unpacked &= 0x0101010101010101ULL;
-    unpacked *= 255;
-    util::SafeStore(&reinterpret_cast<uint64_t*>(bytes)[i], unpacked);
-  }
-}
-
-void BitUtil::bytes_to_bits_internal(const int num_bits, const uint8_t* bytes,
-                                     uint8_t* bits) {
-  constexpr int unroll = 8;
-  // Process 8 bits at a time
-  for (int i = 0; i < (num_bits + unroll - 1) / unroll; ++i) {
-    uint64_t bytes_next = util::SafeLoad(&reinterpret_cast<const 
uint64_t*>(bytes)[i]);
-    bytes_next &= 0x0101010101010101ULL;
-    bytes_next |= (bytes_next >> 7);  // Pairs of adjacent output bits in 
individual bytes
-    bytes_next |= (bytes_next >> 14);  // 4 adjacent output bits in individual 
bytes
-    bytes_next |= (bytes_next >> 28);  // All 8 output bits in the lowest byte
-    bits[i] = static_cast<uint8_t>(bytes_next & 0xff);
+void BitUtil::bits_to_bytes(int64_t hardware_flags, const int num_bits,
+                            const uint8_t* bits, uint8_t* bytes, int 
bit_offset) {
+  bits += bit_offset / 8;
+  bit_offset %= 8;
+  if (bit_offset != 0) {
+    uint64_t bits_head =
+        util::SafeLoad(reinterpret_cast<const uint64_t*>(bits)) >> bit_offset;
+    int bits_in_first_byte = std::min(num_bits, 8 - bit_offset);
+    bits_to_bytes(hardware_flags, bits_in_first_byte,
+                  reinterpret_cast<const uint8_t*>(&bits_head), bytes);
+    if (num_bits > bits_in_first_byte) {
+      bits_to_bytes(hardware_flags, num_bits - bits_in_first_byte, bits + 1,
+                    bytes + bits_in_first_byte);
+    }
+    return;
   }
-}
 
-void BitUtil::bits_to_bytes(int64_t hardware_flags, const int num_bits,
-                            const uint8_t* bits, uint8_t* bytes) {
   int num_processed = 0;
 #if defined(ARROW_HAVE_AVX2)
   if (hardware_flags & arrow::internal::CpuInfo::AVX2) {
@@ -191,7 +216,24 @@ void BitUtil::bits_to_bytes(int64_t hardware_flags, const 
int num_bits,
 }
 
 void BitUtil::bytes_to_bits(int64_t hardware_flags, const int num_bits,
-                            const uint8_t* bytes, uint8_t* bits) {
+                            const uint8_t* bytes, uint8_t* bits, int 
bit_offset) {
+  bits += bit_offset / 8;
+  bit_offset %= 8;
+  if (bit_offset != 0) {
+    uint64_t bits_head;
+    int bits_in_first_byte = std::min(num_bits, 8 - bit_offset);
+    bytes_to_bits(hardware_flags, bits_in_first_byte, bytes,
+                  reinterpret_cast<uint8_t*>(&bits_head));
+    uint8_t mask = (1 << bit_offset) - 1;
+    *bits = static_cast<uint8_t>((*bits & mask) | (bits_head << bit_offset));
+
+    if (num_bits > bits_in_first_byte) {
+      bytes_to_bits(hardware_flags, num_bits - bits_in_first_byte,
+                    bytes + bits_in_first_byte, bits + 1);
+    }
+    return;
+  }
+
   int num_processed = 0;
 #if defined(ARROW_HAVE_AVX2)
   if (hardware_flags & arrow::internal::CpuInfo::AVX2) {
diff --git a/cpp/src/arrow/compute/exec/util.h 
b/cpp/src/arrow/compute/exec/util.h
index d345bd3..471cc33 100644
--- a/cpp/src/arrow/compute/exec/util.h
+++ b/cpp/src/arrow/compute/exec/util.h
@@ -112,24 +112,26 @@ class BitUtil {
  public:
   static void bits_to_indexes(int bit_to_search, int64_t hardware_flags,
                               const int num_bits, const uint8_t* bits, int* 
num_indexes,
-                              uint16_t* indexes);
+                              uint16_t* indexes, int bit_offset = 0);
 
   static void bits_filter_indexes(int bit_to_search, int64_t hardware_flags,
                                   const int num_bits, const uint8_t* bits,
                                   const uint16_t* input_indexes, int* 
num_indexes,
-                                  uint16_t* indexes);
+                                  uint16_t* indexes, int bit_offset = 0);
 
   // Input and output indexes may be pointing to the same data (in-place 
filtering).
   static void bits_split_indexes(int64_t hardware_flags, const int num_bits,
                                  const uint8_t* bits, int* num_indexes_bit0,
-                                 uint16_t* indexes_bit0, uint16_t* 
indexes_bit1);
+                                 uint16_t* indexes_bit0, uint16_t* 
indexes_bit1,
+                                 int bit_offset = 0);
 
   // Bit 1 is replaced with byte 0xFF.
   static void bits_to_bytes(int64_t hardware_flags, const int num_bits,
-                            const uint8_t* bits, uint8_t* bytes);
+                            const uint8_t* bits, uint8_t* bytes, int 
bit_offset = 0);
+
   // Return highest bit of each byte.
   static void bytes_to_bits(int64_t hardware_flags, const int num_bits,
-                            const uint8_t* bytes, uint8_t* bits);
+                            const uint8_t* bytes, uint8_t* bits, int 
bit_offset = 0);
 
   static bool are_all_bytes_zero(int64_t hardware_flags, const uint8_t* bytes,
                                  uint32_t num_bytes);
@@ -144,10 +146,6 @@ class BitUtil {
   static void bits_to_indexes_internal(int64_t hardware_flags, const int 
num_bits,
                                        const uint8_t* bits, const uint16_t* 
input_indexes,
                                        int* num_indexes, uint16_t* indexes);
-  static void bits_to_bytes_internal(const int num_bits, const uint8_t* bits,
-                                     uint8_t* bytes);
-  static void bytes_to_bits_internal(const int num_bits, const uint8_t* bytes,
-                                     uint8_t* bits);
 
 #if defined(ARROW_HAVE_AVX2)
   static void bits_to_indexes_avx2(int bit_to_search, const int num_bits,
diff --git a/cpp/src/arrow/compute/kernels/hash_aggregate.cc 
b/cpp/src/arrow/compute/kernels/hash_aggregate.cc
index e282035..ed40a6b 100644
--- a/cpp/src/arrow/compute/kernels/hash_aggregate.cc
+++ b/cpp/src/arrow/compute/kernels/hash_aggregate.cc
@@ -564,8 +564,13 @@ struct GrouperFastImpl : Grouper {
         varlen = batch[icol].array()->buffers[2]->data();
       }
 
-      cols_[icol] = arrow::compute::KeyEncoder::KeyColumnArray(
-          col_metadata_[icol], num_rows, non_nulls, fixedlen, varlen);
+      int64_t offset = batch[icol].array()->offset;
+
+      auto col_base = arrow::compute::KeyEncoder::KeyColumnArray(
+          col_metadata_[icol], offset + num_rows, non_nulls, fixedlen, varlen);
+
+      cols_[icol] =
+          arrow::compute::KeyEncoder::KeyColumnArray(col_base, offset, 
num_rows);
     }
 
     // Split into smaller mini-batches
diff --git a/cpp/src/arrow/compute/kernels/hash_aggregate_test.cc 
b/cpp/src/arrow/compute/kernels/hash_aggregate_test.cc
index a8f8c64..8c8a4b2 100644
--- a/cpp/src/arrow/compute/kernels/hash_aggregate_test.cc
+++ b/cpp/src/arrow/compute/kernels/hash_aggregate_test.cc
@@ -844,5 +844,47 @@ TEST(GroupBy, MinMaxWithNewGroupsInChunkedArray) {
                     aggregated_and_grouped,
                     /*verbose=*/true);
 }
+
+ExecContext* small_chunksize_context() {
+  static ExecContext ctx;
+  ctx.set_exec_chunksize(2);
+  return &ctx;
+}
+
+TEST(GroupBy, SmallChunkSizeSumOnly) {
+  auto batch = RecordBatchFromJSON(
+      schema({field("argument", float64()), field("key", int64())}), R"([
+    [1.0,   1],
+    [null,  1],
+    [0.0,   2],
+    [null,  3],
+    [4.0,   null],
+    [3.25,  1],
+    [0.125, 2],
+    [-0.25, 2],
+    [0.75,  null],
+    [null,  3]
+  ])");
+  ASSERT_OK_AND_ASSIGN(Datum aggregated_and_grouped,
+                       internal::GroupBy({batch->GetColumnByName("argument")},
+                                         {batch->GetColumnByName("key")},
+                                         {
+                                             {"hash_sum", nullptr},
+                                         },
+                                         small_chunksize_context()));
+  AssertDatumsEqual(ArrayFromJSON(struct_({
+                                      field("hash_sum", float64()),
+                                      field("key_0", int64()),
+                                  }),
+                                  R"([
+    [4.25,   1],
+    [-0.125, 2],
+    [null,   3],
+    [4.75,   null]
+  ])"),
+                    aggregated_and_grouped,
+                    /*verbose=*/true);
+}
+
 }  // namespace compute
 }  // namespace arrow

Reply via email to