This is an automated email from the ASF dual-hosted git repository.

apitrou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new b22bae7  ARROW-10696: [C++] Add SetBitRunReader
b22bae7 is described below

commit b22bae7996b964a792786d8a3697a533c2e47b52
Author: Antoine Pitrou <[email protected]>
AuthorDate: Wed Dec 2 14:51:56 2020 +0100

    ARROW-10696: [C++] Add SetBitRunReader
    
    A specialized bitmap reader that yields runs of set bits, for use cases 
where reset bits (e.g. null bits) don't need any handling.
    
    On some use cases it can be significantly faster than the alternatives.
    
    Closes #8770 from pitrou/ARROW-10696-set-bit-run-reader
    
    Authored-by: Antoine Pitrou <[email protected]>
    Signed-off-by: Antoine Pitrou <[email protected]>
---
 cpp/src/arrow/compare.cc                           |  51 ++-
 cpp/src/arrow/compute/kernels/aggregate_mode.cc    |  86 +++--
 cpp/src/arrow/compute/kernels/aggregate_var_std.cc |  44 ++-
 cpp/src/arrow/compute/kernels/vector_selection.cc  |  85 ++---
 cpp/src/arrow/util/bit_run_reader.h                | 349 ++++++++++++++++++++-
 cpp/src/arrow/util/bit_util_benchmark.cc           |  97 +++---
 cpp/src/arrow/util/bit_util_test.cc                | 220 ++++++++++++-
 cpp/src/arrow/util/spaced.h                        | 155 ++-------
 cpp/src/parquet/column_io_benchmark.cc             |  36 ++-
 cpp/src/parquet/encoding.cc                        |  37 +--
 cpp/src/parquet/statistics.cc                      |  20 +-
 11 files changed, 826 insertions(+), 354 deletions(-)

diff --git a/cpp/src/arrow/compare.cc b/cpp/src/arrow/compare.cc
index d9cbe21..ab0e90c 100644
--- a/cpp/src/arrow/compare.cc
+++ b/cpp/src/arrow/compare.cc
@@ -38,7 +38,7 @@
 #include "arrow/tensor.h"
 #include "arrow/type.h"
 #include "arrow/type_traits.h"
-#include "arrow/util/bit_block_counter.h"
+#include "arrow/util/bit_run_reader.h"
 #include "arrow/util/bit_util.h"
 #include "arrow/util/bitmap_ops.h"
 #include "arrow/util/bitmap_reader.h"
@@ -54,7 +54,6 @@ using internal::BitmapEquals;
 using internal::BitmapReader;
 using internal::BitmapUInt64Reader;
 using internal::checked_cast;
-using internal::OptionalBitBlockCounter;
 using internal::OptionalBitmapEquals;
 
 // ----------------------------------------------------------------------
@@ -412,41 +411,33 @@ class RangeDataEqualsImpl {
 
   template <typename CompareValues>
   void VisitValues(CompareValues&& compare_values) {
-    internal::VisitBitBlocksVoid(
-        left_.buffers[0], left_.offset + left_start_idx_, range_length_,
-        [&](int64_t position) { result_ &= compare_values(position); }, []() 
{});
+    internal::VisitSetBitRunsVoid(left_.buffers[0], left_.offset + 
left_start_idx_,
+                                  range_length_, [&](int64_t position, int64_t 
length) {
+                                    for (int64_t i = 0; i < length; ++i) {
+                                      result_ &= compare_values(position + i);
+                                    }
+                                  });
   }
 
   // Visit and compare runs of non-null values
   template <typename CompareRuns>
   void VisitValidRuns(CompareRuns&& compare_runs) {
-    int64_t i = 0;
     const uint8_t* left_null_bitmap = left_.GetValues<uint8_t>(0, 0);
-    // TODO may use BitRunReader or equivalent?
-    OptionalBitBlockCounter bit_blocks(left_null_bitmap, left_.offset + 
left_start_idx_,
-                                       range_length_);
-
-    while (result_ && i < range_length_) {
-      const auto block = bit_blocks.NextBlock();
-      if (block.AllSet()) {
-        if (!compare_runs(i, block.length)) {
-          result_ = false;
-          return;
-        }
-      } else if (block.NoneSet()) {
-        // Nothing to do
-      } else {
-        // Compare non-null values one at a time
-        for (int64_t j = i; j < i + block.length; ++j) {
-          if (BitUtil::GetBit(left_null_bitmap, left_.offset + left_start_idx_ 
+ j)) {
-            if (!compare_runs(j, 1)) {
-              result_ = false;
-              return;
-            }
-          }
-        }
+    if (left_null_bitmap == nullptr) {
+      result_ = compare_runs(0, range_length_);
+      return;
+    }
+    internal::SetBitRunReader reader(left_null_bitmap, left_.offset + 
left_start_idx_,
+                                     range_length_);
+    while (true) {
+      const auto run = reader.NextRun();
+      if (run.length == 0) {
+        return;
+      }
+      if (!compare_runs(run.position, run.length)) {
+        result_ = false;
+        return;
       }
-      i += block.length;
     }
   }
 
diff --git a/cpp/src/arrow/compute/kernels/aggregate_mode.cc 
b/cpp/src/arrow/compute/kernels/aggregate_mode.cc
index d5affce..3a60cab 100644
--- a/cpp/src/arrow/compute/kernels/aggregate_mode.cc
+++ b/cpp/src/arrow/compute/kernels/aggregate_mode.cc
@@ -22,6 +22,8 @@
 #include "arrow/compute/api_aggregate.h"
 #include "arrow/compute/kernels/aggregate_internal.h"
 #include "arrow/compute/kernels/common.h"
+#include "arrow/type_traits.h"
+#include "arrow/util/bit_run_reader.h"
 
 namespace arrow {
 namespace compute {
@@ -41,19 +43,22 @@ template <typename ArrayType, typename CType = typename 
ArrayType::TypeClass::c_
 enable_if_t<std::is_floating_point<CType>::value, CounterMap<CType>> 
CountValuesByMap(
     const ArrayType& array, int64_t& nan_count) {
   CounterMap<CType> value_counts_map;
+  const ArrayData& data = *array.data();
+  const CType* values = data.GetValues<CType>(1);
 
   nan_count = 0;
   if (array.length() > array.null_count()) {
-    VisitArrayDataInline<typename ArrayType::TypeClass>(
-        *array.data(),
-        [&](CType value) {
-          if (std::isnan(value)) {
-            ++nan_count;
-          } else {
-            ++value_counts_map[value];
-          }
-        },
-        []() {});
+    arrow::internal::VisitSetBitRunsVoid(data.buffers[0], data.offset, 
data.length,
+                                         [&](int64_t pos, int64_t len) {
+                                           for (int64_t i = 0; i < len; ++i) {
+                                             const auto value = values[pos + 
i];
+                                             if (std::isnan(value)) {
+                                               ++nan_count;
+                                             } else {
+                                               ++value_counts_map[value];
+                                             }
+                                           }
+                                         });
   }
 
   return value_counts_map;
@@ -64,25 +69,37 @@ template <typename ArrayType, typename CType = typename 
ArrayType::TypeClass::c_
 enable_if_t<!std::is_floating_point<CType>::value, CounterMap<CType>> 
CountValuesByMap(
     const ArrayType& array) {
   CounterMap<CType> value_counts_map;
+  const ArrayData& data = *array.data();
+  const CType* values = data.GetValues<CType>(1);
 
   if (array.length() > array.null_count()) {
-    VisitArrayDataInline<typename ArrayType::TypeClass>(
-        *array.data(), [&](CType value) { ++value_counts_map[value]; }, []() 
{});
+    arrow::internal::VisitSetBitRunsVoid(data.buffers[0], data.offset, 
data.length,
+                                         [&](int64_t pos, int64_t len) {
+                                           for (int64_t i = 0; i < len; ++i) {
+                                             ++value_counts_map[values[pos + 
i]];
+                                           }
+                                         });
   }
 
   return value_counts_map;
 }
 
-// vector based counter for bool/int8 or integers with small value range
+// vector based counter for int8 or integers with small value range
 template <typename ArrayType, typename CType = typename 
ArrayType::TypeClass::c_type>
 CounterMap<CType> CountValuesByVector(const ArrayType& array, CType min, CType 
max) {
   const int range = static_cast<int>(max - min);
   DCHECK(range >= 0 && range < 64 * 1024 * 1024);
+  const ArrayData& data = *array.data();
+  const CType* values = data.GetValues<CType>(1);
 
   std::vector<int64_t> value_counts_vector(range + 1);
   if (array.length() > array.null_count()) {
-    VisitArrayDataInline<typename ArrayType::TypeClass>(
-        *array.data(), [&](CType value) { ++value_counts_vector[value - min]; 
}, []() {});
+    arrow::internal::VisitSetBitRunsVoid(data.buffers[0], data.offset, 
data.length,
+                                         [&](int64_t pos, int64_t len) {
+                                           for (int64_t i = 0; i < len; ++i) {
+                                             ++value_counts_vector[values[pos 
+ i] - min];
+                                           }
+                                         });
   }
 
   // Transfer value counts to a map to be consistent with other chunks
@@ -104,18 +121,21 @@ CounterMap<CType> CountValuesByMapOrVector(const 
ArrayType& array) {
   // see https://issues.apache.org/jira/browse/ARROW-9873
   static constexpr int kMinArraySize = 8192 / sizeof(CType);
   static constexpr int kMaxValueRange = 16384;
+  const ArrayData& data = *array.data();
+  const CType* values = data.GetValues<CType>(1);
 
   if ((array.length() - array.null_count()) >= kMinArraySize) {
     CType min = std::numeric_limits<CType>::max();
     CType max = std::numeric_limits<CType>::min();
 
-    VisitArrayDataInline<typename ArrayType::TypeClass>(
-        *array.data(),
-        [&](CType value) {
-          min = std::min(min, value);
-          max = std::max(max, value);
-        },
-        []() {});
+    arrow::internal::VisitSetBitRunsVoid(data.buffers[0], data.offset, 
data.length,
+                                         [&](int64_t pos, int64_t len) {
+                                           for (int64_t i = 0; i < len; ++i) {
+                                             const auto value = values[pos + 
i];
+                                             min = std::min(min, value);
+                                             max = std::max(max, value);
+                                           }
+                                         });
 
     if (static_cast<uint64_t>(max) - static_cast<uint64_t>(min) <= 
kMaxValueRange) {
       return CountValuesByVector(array, min, max);
@@ -124,9 +144,24 @@ CounterMap<CType> CountValuesByMapOrVector(const 
ArrayType& array) {
   return CountValuesByMap(array);
 }
 
-// bool, int8
+// bool
 template <typename ArrayType, typename CType = typename 
ArrayType::TypeClass::c_type>
-enable_if_t<std::is_integral<CType>::value && sizeof(CType) == 1, 
CounterMap<CType>>
+enable_if_t<is_boolean_type<typename ArrayType::TypeClass>::value, 
CounterMap<CType>>
+CountValues(const ArrayType& array, int64_t& nan_count) {
+  // we need just count ones and zeros
+  CounterMap<CType> map;
+  if (array.length() > array.null_count()) {
+    map[true] = array.true_count();
+    map[false] = array.length() - array.null_count() - map[true];
+  }
+  nan_count = 0;
+  return map;
+}
+
+// int8
+template <typename ArrayType, typename CType = typename 
ArrayType::TypeClass::c_type>
+enable_if_t<is_integer_type<typename ArrayType::TypeClass>::value && 
sizeof(CType) == 1,
+            CounterMap<CType>>
 CountValues(const ArrayType& array, int64_t& nan_count) {
   using Limits = std::numeric_limits<CType>;
   nan_count = 0;
@@ -135,7 +170,8 @@ CountValues(const ArrayType& array, int64_t& nan_count) {
 
 // int16/32/64
 template <typename ArrayType, typename CType = typename 
ArrayType::TypeClass::c_type>
-enable_if_t<std::is_integral<CType>::value && (sizeof(CType) > 1), 
CounterMap<CType>>
+enable_if_t<is_integer_type<typename ArrayType::TypeClass>::value && 
(sizeof(CType) > 1),
+            CounterMap<CType>>
 CountValues(const ArrayType& array, int64_t& nan_count) {
   nan_count = 0;
   return CountValuesByMapOrVector(array);
diff --git a/cpp/src/arrow/compute/kernels/aggregate_var_std.cc 
b/cpp/src/arrow/compute/kernels/aggregate_var_std.cc
index 4dac0a3..6e1d930 100644
--- a/cpp/src/arrow/compute/kernels/aggregate_var_std.cc
+++ b/cpp/src/arrow/compute/kernels/aggregate_var_std.cc
@@ -20,6 +20,7 @@
 #include "arrow/compute/api_aggregate.h"
 #include "arrow/compute/kernels/aggregate_internal.h"
 #include "arrow/compute/kernels/common.h"
+#include "arrow/util/bit_run_reader.h"
 #include "arrow/util/int128_internal.h"
 
 namespace arrow {
@@ -49,18 +50,24 @@ struct VarStdState {
     using SumType =
         typename std::conditional<is_floating_type<T>::value, double, 
int128_t>::type;
     SumType sum = 0;
-    VisitArrayDataInline<ArrowType>(
-        *array.data(), [&sum](CType value) { sum += 
static_cast<SumType>(value); },
-        []() {});
+
+    const ArrayData& data = *array.data();
+    const CType* values = data.GetValues<CType>(1);
+    arrow::internal::VisitSetBitRunsVoid(data.buffers[0], data.offset, 
data.length,
+                                         [&](int64_t pos, int64_t len) {
+                                           for (int64_t i = 0; i < len; ++i) {
+                                             sum += 
static_cast<SumType>(values[pos + i]);
+                                           }
+                                         });
 
     double mean = static_cast<double>(sum) / count, m2 = 0;
-    VisitArrayDataInline<ArrowType>(
-        *array.data(),
-        [mean, &m2](CType value) {
-          double v = static_cast<double>(value);
-          m2 += (v - mean) * (v - mean);
-        },
-        []() {});
+    arrow::internal::VisitSetBitRunsVoid(
+        data.buffers[0], data.offset, data.length, [&](int64_t pos, int64_t 
len) {
+          for (int64_t i = 0; i < len; ++i) {
+            const double v = static_cast<double>(values[pos + i]);
+            m2 += (v - mean) * (v - mean);
+          }
+        });
 
     this->count = count;
     this->mean = mean;
@@ -89,13 +96,16 @@ struct VarStdState {
       if (count > 0) {
         int64_t sum = 0;
         int128_t square_sum = 0;
-        VisitArrayDataInline<ArrowType>(
-            *slice->data(),
-            [&sum, &square_sum](CType value) {
-              sum += value;
-              square_sum += static_cast<uint64_t>(value) * value;
-            },
-            []() {});
+        const ArrayData& data = *slice->data();
+        const CType* values = data.GetValues<CType>(1);
+        arrow::internal::VisitSetBitRunsVoid(
+            data.buffers[0], data.offset, data.length, [&](int64_t pos, 
int64_t len) {
+              for (int64_t i = 0; i < len; ++i) {
+                const auto value = values[pos + i];
+                sum += value;
+                square_sum += static_cast<uint64_t>(value) * value;
+              }
+            });
 
         const double mean = static_cast<double>(sum) / count;
         // calculate m2 = square_sum - sum * sum / count
diff --git a/cpp/src/arrow/compute/kernels/vector_selection.cc 
b/cpp/src/arrow/compute/kernels/vector_selection.cc
index ccca0e5..2c383d2 100644
--- a/cpp/src/arrow/compute/kernels/vector_selection.cc
+++ b/cpp/src/arrow/compute/kernels/vector_selection.cc
@@ -36,6 +36,7 @@
 #include "arrow/table.h"
 #include "arrow/type.h"
 #include "arrow/util/bit_block_counter.h"
+#include "arrow/util/bit_run_reader.h"
 #include "arrow/util/bit_util.h"
 #include "arrow/util/bitmap_ops.h"
 #include "arrow/util/bitmap_reader.h"
@@ -588,37 +589,9 @@ class PrimitiveFilterImpl {
 
   void ExecNonNull() {
     // Fast filter when values and filter are not null
-    // Bit counters used for both null_selection behaviors
-    BitBlockCounter filter_counter(filter_data_, filter_offset_, 
values_length_);
-
-    int64_t in_position = 0;
-    BitBlockCount current_block = filter_counter.NextWord();
-    while (in_position < values_length_) {
-      if (current_block.AllSet()) {
-        int64_t run_length = 0;
-        // If we've found a all-true block, then we scan forward until we find
-        // a block that has some false values (or we reach the end
-        while (current_block.length > 0 && current_block.AllSet()) {
-          run_length += current_block.length;
-          current_block = filter_counter.NextWord();
-        }
-        WriteValueSegment(in_position, run_length);
-        in_position += run_length;
-      } else if (current_block.NoneSet()) {
-        // Nothing selected
-        in_position += current_block.length;
-        current_block = filter_counter.NextWord();
-      } else {
-        // Some values selected
-        for (int64_t i = 0; i < current_block.length; ++i) {
-          if (BitUtil::GetBit(filter_data_, filter_offset_ + in_position)) {
-            WriteValue(in_position);
-          }
-          ++in_position;
-        }
-        current_block = filter_counter.NextWord();
-      }
-    }
+    ::arrow::internal::VisitSetBitRunsVoid(
+        filter_data_, filter_offset_, values_length_,
+        [&](int64_t position, int64_t length) { WriteValueSegment(position, 
length); });
   }
 
   void Exec() {
@@ -872,8 +845,6 @@ void PrimitiveFilter(KernelContext* ctx, const ExecBatch& 
batch, Datum* out) {
         data_builder.Reserve(static_cast<int64_t>(mean_value_length * 
output_length))); \
   }                                                                            
         \
   int64_t space_available = data_builder.capacity();                           
         \
-  int64_t in_position = 0;                                                     
         \
-  int64_t out_position = 0;                                                    
         \
   offset_type offset = 0;
 
 #define APPEND_RAW_DATA(DATA, NBYTES)                                  \
@@ -900,45 +871,25 @@ Status BinaryFilterNonNullImpl(KernelContext* ctx, const 
ArrayData& values,
                                ArrayData* out) {
   using offset_type = typename Type::offset_type;
   const auto filter_data = filter.buffers[1]->data();
-  const int64_t filter_offset = filter.offset;
-  BitBlockCounter filter_counter(filter_data, filter.offset, filter.length);
+
   BINARY_FILTER_SETUP_COMMON();
 
-  while (in_position < filter.length) {
-    BitBlockCount filter_block = filter_counter.NextWord();
-    if (filter_block.NoneSet()) {
-      // For this exceedingly common case in low-selectivity filters we can
-      // skip further analysis of the data and move on to the next block.
-      in_position += filter_block.length;
-    } else {
-      // Simpler path: no filter values are null
-      if (filter_block.AllSet()) {
-        // Fastest path: filter values are all true and not null
+  RETURN_NOT_OK(arrow::internal::VisitSetBitRuns(
+      filter_data, filter.offset, filter.length, [&](int64_t position, int64_t 
length) {
         // Bulk-append raw data
-        offset_type block_data_bytes =
-            (raw_offsets[in_position + filter_block.length] - 
raw_offsets[in_position]);
-        APPEND_RAW_DATA(raw_data + raw_offsets[in_position], block_data_bytes);
+        const offset_type run_data_bytes =
+            (raw_offsets[position + length] - raw_offsets[position]);
+        APPEND_RAW_DATA(raw_data + raw_offsets[position], run_data_bytes);
         // Append offsets
-        offset_type cur_offset = raw_offsets[in_position];
-        for (int64_t i = 0; i < filter_block.length; ++i, ++in_position) {
+        offset_type cur_offset = raw_offsets[position];
+        for (int64_t i = 0; i < length; ++i) {
           offset_builder.UnsafeAppend(offset);
-          offset += raw_offsets[in_position + 1] - cur_offset;
-          cur_offset = raw_offsets[in_position + 1];
+          offset += raw_offsets[i + position + 1] - cur_offset;
+          cur_offset = raw_offsets[i + position + 1];
         }
-        out_position += filter_block.length;
-      } else {  // !filter_block.AllSet()
-        // Some of the filter values are false, but all not null
-        // All the values are not-null, so we can skip null checking for
-        // them
-        for (int64_t i = 0; i < filter_block.length; ++i, ++in_position) {
-          if (BitUtil::GetBit(filter_data, filter_offset + in_position)) {
-            offset_builder.UnsafeAppend(offset);
-            APPEND_SINGLE_VALUE();
-          }
-        }
-      }
-    }
-  }
+        return Status::OK();
+      }));
+
   offset_builder.UnsafeAppend(offset);
   out->length = output_length;
   RETURN_NOT_OK(offset_builder.Finish(&out->buffers[1]));
@@ -976,6 +927,8 @@ Status BinaryFilterImpl(KernelContext* ctx, const 
ArrayData& values,
 
   BINARY_FILTER_SETUP_COMMON();
 
+  int64_t in_position = 0;
+  int64_t out_position = 0;
   while (in_position < filter.length) {
     BitBlockCount filter_valid_block = filter_valid_counter.NextWord();
     BitBlockCount values_valid_block = values_valid_counter.NextWord();
diff --git a/cpp/src/arrow/util/bit_run_reader.h 
b/cpp/src/arrow/util/bit_run_reader.h
index 893f4f2..39ff049 100644
--- a/cpp/src/arrow/util/bit_run_reader.h
+++ b/cpp/src/arrow/util/bit_run_reader.h
@@ -17,6 +17,7 @@
 
 #pragma once
 
+#include <cassert>
 #include <cstdint>
 #include <cstring>
 #include <string>
@@ -166,7 +167,353 @@ class ARROW_EXPORT BitRunReader {
 using BitRunReader = BitRunReaderLinear;
 #endif
 
-// TODO SetBitRunReader?
+struct SetBitRun {
+  int64_t position;
+  int64_t length;
+
+  bool AtEnd() const { return length == 0; }
+
+  std::string ToString() const {
+    return std::string("{pos=") + std::to_string(position) +
+           ", len=" + std::to_string(length) + "}";
+  }
+
+  bool operator==(const SetBitRun& other) const {
+    return position == other.position && length == other.length;
+  }
+  bool operator!=(const SetBitRun& other) const {
+    return position != other.position || length != other.length;
+  }
+};
+
+template <bool Reverse>
+class BaseSetBitRunReader {
+ public:
+  /// \brief Constructs new SetBitRunReader.
+  ///
+  /// \param[in] bitmap source data
+  /// \param[in] start_offset bit offset into the source data
+  /// \param[in] length number of bits to copy
+  inline BaseSetBitRunReader(const uint8_t* bitmap, int64_t start_offset, 
int64_t length);
+
+  SetBitRun NextRun() {
+    int64_t pos = 0;
+    int64_t len = 0;
+    if (current_num_bits_) {
+      const auto run = FindCurrentRun();
+      assert(remaining_ >= 0);
+      if (run.length && current_num_bits_) {
+        // The run ends in current_word_
+        return AdjustRun(run);
+      }
+      pos = run.position;
+      len = run.length;
+    }
+    if (!len) {
+      // We didn't get any ones in current_word_, so we can skip any zeros
+      // in the following words
+      SkipNextZeros();
+      if (remaining_ == 0) {
+        return {0, 0};
+      }
+      assert(current_num_bits_);
+      pos = position();
+    } else if (!current_num_bits_) {
+      if (ARROW_PREDICT_TRUE(remaining_ >= 64)) {
+        current_word_ = LoadFullWord();
+        current_num_bits_ = 64;
+      } else if (remaining_ > 0) {
+        current_word_ = LoadPartialWord(/*bit_offset=*/0, remaining_);
+        current_num_bits_ = static_cast<int32_t>(remaining_);
+      } else {
+        // No bits remaining, perhaps we found a run?
+        return AdjustRun({pos, len});
+      }
+      // If current word starts with a zero, we got a full run
+      if (!(current_word_ & kFirstBit)) {
+        return AdjustRun({pos, len});
+      }
+    }
+    // Current word should now start with a set bit
+    len += CountNextOnes();
+    return AdjustRun({pos, len});
+  }
+
+ protected:
+  int64_t position() const {
+    if (Reverse) {
+      return remaining_;
+    } else {
+      return length_ - remaining_;
+    }
+  }
+
+  SetBitRun AdjustRun(SetBitRun run) {
+    if (Reverse) {
+      assert(run.position >= run.length);
+      run.position -= run.length;
+    }
+    return run;
+  }
+
+  uint64_t LoadFullWord() {
+    uint64_t word;
+    if (Reverse) {
+      bitmap_ -= 8;
+    }
+    memcpy(&word, bitmap_, 8);
+    if (!Reverse) {
+      bitmap_ += 8;
+    }
+    return BitUtil::ToLittleEndian(word);
+  }
+
+  uint64_t LoadPartialWord(int8_t bit_offset, int64_t num_bits) {
+    assert(num_bits > 0);
+    uint64_t word = 0;
+    const int64_t num_bytes = BitUtil::BytesForBits(num_bits);
+    if (Reverse) {
+      // Read in the most significant bytes of the word
+      bitmap_ -= num_bytes;
+      memcpy(reinterpret_cast<char*>(&word) + 8 - num_bytes, bitmap_, 
num_bytes);
+      // XXX MostSignificantBitmask
+      return (BitUtil::ToLittleEndian(word) << bit_offset) &
+             ~BitUtil::LeastSignificantBitMask(64 - num_bits);
+    } else {
+      memcpy(&word, bitmap_, num_bytes);
+      bitmap_ += num_bytes;
+      return (BitUtil::ToLittleEndian(word) >> bit_offset) &
+             BitUtil::LeastSignificantBitMask(num_bits);
+    }
+  }
+
+  void SkipNextZeros() {
+    assert(current_num_bits_ == 0);
+    while (ARROW_PREDICT_TRUE(remaining_ >= 64)) {
+      current_word_ = LoadFullWord();
+      const auto num_zeros = CountFirstZeros(current_word_);
+      if (num_zeros < 64) {
+        // Run of zeros ends here
+        current_word_ = ConsumeBits(current_word_, num_zeros);
+        current_num_bits_ = 64 - num_zeros;
+        remaining_ -= num_zeros;
+        assert(remaining_ >= 0);
+        assert(current_num_bits_ >= 0);
+        return;
+      }
+      remaining_ -= 64;
+    }
+    // Run of zeros continues in last bitmap word
+    if (remaining_ > 0) {
+      current_word_ = LoadPartialWord(/*bit_offset=*/0, remaining_);
+      current_num_bits_ = static_cast<int32_t>(remaining_);
+      const auto num_zeros =
+          std::min<int32_t>(current_num_bits_, CountFirstZeros(current_word_));
+      current_word_ = ConsumeBits(current_word_, num_zeros);
+      current_num_bits_ -= num_zeros;
+      remaining_ -= num_zeros;
+      assert(remaining_ >= 0);
+      assert(current_num_bits_ >= 0);
+    }
+  }
+
+  int64_t CountNextOnes() {
+    assert(current_word_ & kFirstBit);
+
+    int64_t len;
+    if (~current_word_) {
+      const auto num_ones = CountFirstZeros(~current_word_);
+      assert(num_ones <= current_num_bits_);
+      assert(num_ones <= remaining_);
+      remaining_ -= num_ones;
+      current_word_ = ConsumeBits(current_word_, num_ones);
+      current_num_bits_ -= num_ones;
+      if (current_num_bits_) {
+        // Run of ones ends here
+        return num_ones;
+      }
+      len = num_ones;
+    } else {
+      // current_word_ is all ones
+      remaining_ -= 64;
+      current_num_bits_ = 0;
+      len = 64;
+    }
+
+    while (ARROW_PREDICT_TRUE(remaining_ >= 64)) {
+      current_word_ = LoadFullWord();
+      const auto num_ones = CountFirstZeros(~current_word_);
+      len += num_ones;
+      remaining_ -= num_ones;
+      if (num_ones < 64) {
+        // Run of ones ends here
+        current_word_ = ConsumeBits(current_word_, num_ones);
+        current_num_bits_ = 64 - num_ones;
+        return len;
+      }
+    }
+    // Run of ones continues in last bitmap word
+    if (remaining_ > 0) {
+      current_word_ = LoadPartialWord(/*bit_offset=*/0, remaining_);
+      current_num_bits_ = static_cast<int32_t>(remaining_);
+      const auto num_ones = CountFirstZeros(~current_word_);
+      assert(num_ones <= current_num_bits_);
+      assert(num_ones <= remaining_);
+      current_word_ = ConsumeBits(current_word_, num_ones);
+      current_num_bits_ -= num_ones;
+      remaining_ -= num_ones;
+      len += num_ones;
+    }
+    return len;
+  }
+
+  SetBitRun FindCurrentRun() {
+    // Skip any pending zeros
+    const auto num_zeros = CountFirstZeros(current_word_);
+    if (num_zeros >= current_num_bits_) {
+      remaining_ -= current_num_bits_;
+      current_word_ = 0;
+      current_num_bits_ = 0;
+      return {0, 0};
+    }
+    assert(num_zeros <= remaining_);
+    current_word_ = ConsumeBits(current_word_, num_zeros);
+    current_num_bits_ -= num_zeros;
+    remaining_ -= num_zeros;
+    const int64_t pos = position();
+    // Count any ones
+    const auto num_ones = CountFirstZeros(~current_word_);
+    assert(num_ones <= current_num_bits_);
+    assert(num_ones <= remaining_);
+    current_word_ = ConsumeBits(current_word_, num_ones);
+    current_num_bits_ -= num_ones;
+    remaining_ -= num_ones;
+    return {pos, num_ones};
+  }
+
+  inline int CountFirstZeros(uint64_t word);
+  inline uint64_t ConsumeBits(uint64_t word, int32_t num_bits);
+
+  const uint8_t* bitmap_;
+  const int64_t length_;
+  int64_t remaining_;
+  uint64_t current_word_;
+  int32_t current_num_bits_;
+
+  static constexpr uint64_t kFirstBit = Reverse ? 0x8000000000000000ULL : 1;
+};
+
+template <>
+inline int BaseSetBitRunReader<false>::CountFirstZeros(uint64_t word) {
+  return BitUtil::CountTrailingZeros(word);
+}
+
+template <>
+inline int BaseSetBitRunReader<true>::CountFirstZeros(uint64_t word) {
+  return BitUtil::CountLeadingZeros(word);
+}
+
+template <>
+inline uint64_t BaseSetBitRunReader<false>::ConsumeBits(uint64_t word, int32_t 
num_bits) {
+  return word >> num_bits;
+}
+
+template <>
+inline uint64_t BaseSetBitRunReader<true>::ConsumeBits(uint64_t word, int32_t 
num_bits) {
+  return word << num_bits;
+}
+
+template <>
+inline BaseSetBitRunReader<false>::BaseSetBitRunReader(const uint8_t* bitmap,
+                                                       int64_t start_offset,
+                                                       int64_t length)
+    : bitmap_(bitmap + start_offset / 8), length_(length), remaining_(length_) 
{
+  const int8_t bit_offset = static_cast<int8_t>(start_offset % 8);
+  if (length > 0 && bit_offset) {
+    // Get MSBs from first byte
+    current_num_bits_ =
+        std::min(static_cast<int32_t>(length), static_cast<int32_t>(8 - 
bit_offset));
+    current_word_ = LoadPartialWord(bit_offset, current_num_bits_);
+  } else {
+    current_num_bits_ = 0;
+    current_word_ = 0;
+  }
+}
+
+template <>
+inline BaseSetBitRunReader<true>::BaseSetBitRunReader(const uint8_t* bitmap,
+                                                      int64_t start_offset,
+                                                      int64_t length)
+    : bitmap_(bitmap + (start_offset + length) / 8),
+      length_(length),
+      remaining_(length_) {
+  const int8_t end_bit_offset = static_cast<int8_t>((start_offset + length) % 
8);
+  if (length > 0 && end_bit_offset) {
+    // Get LSBs from last byte
+    ++bitmap_;
+    current_num_bits_ =
+        std::min(static_cast<int32_t>(length), 
static_cast<int32_t>(end_bit_offset));
+    current_word_ = LoadPartialWord(8 - end_bit_offset, current_num_bits_);
+  } else {
+    current_num_bits_ = 0;
+    current_word_ = 0;
+  }
+}
+
+using SetBitRunReader = BaseSetBitRunReader</*Reverse=*/false>;
+using ReverseSetBitRunReader = BaseSetBitRunReader</*Reverse=*/true>;
+
+// Functional-style bit run visitors.
+
+template <typename Visit>
+Status VisitSetBitRuns(const uint8_t* bitmap, int64_t offset, int64_t length,
+                       Visit&& visit) {
+  if (bitmap == NULLPTR) {
+    // Assuming all set (as in a null bitmap)
+    return visit(static_cast<int64_t>(0), static_cast<int64_t>(length));
+  }
+  SetBitRunReader reader(bitmap, offset, length);
+  while (true) {
+    const auto run = reader.NextRun();
+    if (run.length == 0) {
+      break;
+    }
+    ARROW_RETURN_NOT_OK(visit(run.position, run.length));
+  }
+  return Status::OK();
+}
+
+template <typename Visit>
+void VisitSetBitRunsVoid(const uint8_t* bitmap, int64_t offset, int64_t length,
+                         Visit&& visit) {
+  if (bitmap == NULLPTR) {
+    // Assuming all set (as in a null bitmap)
+    visit(static_cast<int64_t>(0), static_cast<int64_t>(length));
+    return;
+  }
+  SetBitRunReader reader(bitmap, offset, length);
+  while (true) {
+    const auto run = reader.NextRun();
+    if (run.length == 0) {
+      break;
+    }
+    visit(run.position, run.length);
+  }
+}
+
+template <typename Visit>
+Status VisitSetBitRuns(const std::shared_ptr<Buffer>& bitmap, int64_t offset,
+                       int64_t length, Visit&& visit) {
+  return VisitSetBitRuns(bitmap ? bitmap->data() : NULLPTR, offset, length,
+                         std::forward<Visit>(visit));
+}
+
+template <typename Visit>
+void VisitSetBitRunsVoid(const std::shared_ptr<Buffer>& bitmap, int64_t offset,
+                         int64_t length, Visit&& visit) {
+  VisitSetBitRunsVoid(bitmap ? bitmap->data() : NULLPTR, offset, length,
+                      std::forward<Visit>(visit));
+}
 
 }  // namespace internal
 }  // namespace arrow
diff --git a/cpp/src/arrow/util/bit_util_benchmark.cc 
b/cpp/src/arrow/util/bit_util_benchmark.cc
index 434c6f6..b0c10dd 100644
--- a/cpp/src/arrow/util/bit_util_benchmark.cc
+++ b/cpp/src/arrow/util/bit_util_benchmark.cc
@@ -110,6 +110,29 @@ static std::shared_ptr<Buffer> CreateRandomBuffer(int64_t 
nbytes) {
   return std::move(buffer);
 }
 
+static std::shared_ptr<Buffer> CreateRandomBitsBuffer(int64_t nbits,
+                                                      int64_t set_percentage) {
+  ::arrow::random::RandomArrayGenerator rag(/*seed=*/23);
+  double set_probability =
+      static_cast<double>(set_percentage == -1 ? 0 : set_percentage) / 100.0;
+  std::shared_ptr<Buffer> buffer =
+      rag.Boolean(nbits, set_probability)->data()->buffers[1];
+
+  if (set_percentage == -1) {
+    internal::BitmapWriter writer(buffer->mutable_data(), /*start_offset=*/0,
+                                  /*length=*/nbits);
+    for (int x = 0; x < nbits; x++) {
+      if (x % 2 == 0) {
+        writer.Set();
+      } else {
+        writer.Clear();
+      }
+      writer.Next();
+    }
+  }
+  return buffer;
+}
+
 template <typename DoAnd>
 static void BenchmarkAndImpl(benchmark::State& state, DoAnd&& do_and) {
   int64_t nbytes = state.range(0);
@@ -202,35 +225,37 @@ static void BenchmarkBitmapReader(benchmark::State& 
state, int64_t nbytes) {
 
 template <typename BitRunReaderType>
 static void BenchmarkBitRunReader(benchmark::State& state, int64_t 
set_percentage) {
-  ::arrow::random::RandomArrayGenerator rag(/*seed=*/23);
   constexpr int64_t kNumBits = 4096;
-  double set_probability =
-      static_cast<double>(set_percentage == -1 ? 0 : set_percentage) / 100.0;
-  std::shared_ptr<Buffer> buffer =
-      rag.Boolean(kNumBits, set_probability)->data()->buffers[1];
+  auto buffer = CreateRandomBitsBuffer(kNumBits, set_percentage);
 
-  const uint8_t* bitmap = buffer->data();
-  if (set_percentage == -1) {
-    internal::BitmapWriter writer(buffer->mutable_data(), /*start_offset=*/0,
-                                  /*length=*/kNumBits);
-    for (int x = 0; x < kNumBits; x++) {
-      if (x % 2 == 0) {
-        writer.Set();
-      } else {
-        writer.Clear();
-      }
-      writer.Next();
+  for (auto _ : state) {
+    {
+      BitRunReaderType reader(buffer->data(), 0, kNumBits);
+      int64_t set_total = 0;
+      internal::BitRun br;
+      do {
+        br = reader.NextRun();
+        set_total += br.set ? br.length : 0;
+      } while (br.length != 0);
+      benchmark::DoNotOptimize(set_total);
     }
   }
+  state.SetBytesProcessed(state.iterations() * (kNumBits / 8));
+}
+
+template <typename SetBitRunReaderType>
+static void BenchmarkSetBitRunReader(benchmark::State& state, int64_t 
set_percentage) {
+  constexpr int64_t kNumBits = 4096;
+  auto buffer = CreateRandomBitsBuffer(kNumBits, set_percentage);
 
   for (auto _ : state) {
     {
-      BitRunReaderType reader(bitmap, 0, kNumBits);
+      SetBitRunReaderType reader(buffer->data(), 0, kNumBits);
       int64_t set_total = 0;
-      internal::BitRun br;
+      internal::SetBitRun br;
       do {
         br = reader.NextRun();
-        set_total += br.set ? br.length : 0;
+        set_total += br.length;
       } while (br.length != 0);
       benchmark::DoNotOptimize(set_total);
     }
@@ -348,6 +373,14 @@ static void BitRunReaderLinear(benchmark::State& state) {
   BenchmarkBitRunReader<internal::BitRunReaderLinear>(state, state.range(0));
 }
 
+static void SetBitRunReader(benchmark::State& state) {
+  BenchmarkSetBitRunReader<internal::SetBitRunReader>(state, state.range(0));
+}
+
+static void ReverseSetBitRunReader(benchmark::State& state) {
+  BenchmarkSetBitRunReader<internal::ReverseSetBitRunReader>(state, 
state.range(0));
+}
+
 static void BitmapWriter(benchmark::State& state) {
   BenchmarkBitmapWriter<internal::BitmapWriter>(state, state.range(0));
 }
@@ -477,27 +510,17 @@ static void ReferenceNaiveBitmapReader(benchmark::State& 
state) {
 BENCHMARK(ReferenceNaiveBitmapReader)->Arg(kBufferSize);
 #endif
 
+void SetBitRunReaderPercentageArg(benchmark::internal::Benchmark* bench) {
+  bench->Arg(-1)->Arg(0)->Arg(10)->Arg(25)->Arg(50)->Arg(60)->Arg(75)->Arg(99);
+}
+
 BENCHMARK(BitmapReader)->Arg(kBufferSize);
 BENCHMARK(BitmapUInt64Reader)->Arg(kBufferSize);
 
-BENCHMARK(BitRunReader)
-    ->Arg(-1)
-    ->Arg(0)
-    ->Arg(10)
-    ->Arg(25)
-    ->Arg(50)
-    ->Arg(60)
-    ->Arg(75)
-    ->Arg(99);
-BENCHMARK(BitRunReaderLinear)
-    ->Arg(-1)
-    ->Arg(0)
-    ->Arg(10)
-    ->Arg(25)
-    ->Arg(50)
-    ->Arg(60)
-    ->Arg(75)
-    ->Arg(99);
+BENCHMARK(BitRunReader)->Apply(SetBitRunReaderPercentageArg);
+BENCHMARK(BitRunReaderLinear)->Apply(SetBitRunReaderPercentageArg);
+BENCHMARK(SetBitRunReader)->Apply(SetBitRunReaderPercentageArg);
+BENCHMARK(ReverseSetBitRunReader)->Apply(SetBitRunReaderPercentageArg);
 
 BENCHMARK(VisitBits)->Arg(kBufferSize);
 BENCHMARK(VisitBitsUnrolled)->Arg(kBufferSize);
diff --git a/cpp/src/arrow/util/bit_util_test.cc 
b/cpp/src/arrow/util/bit_util_test.cc
index baf2af8..c71abde 100644
--- a/cpp/src/arrow/util/bit_util_test.cc
+++ b/cpp/src/arrow/util/bit_util_test.cc
@@ -66,6 +66,13 @@ using internal::InvertBitmap;
 
 using ::testing::ElementsAreArray;
 
+namespace internal {
+
+void PrintTo(const BitRun& run, std::ostream* os) { *os << run.ToString(); }
+void PrintTo(const SetBitRun& run, std::ostream* os) { *os << run.ToString(); }
+
+}  // namespace internal
+
 template <class BitmapWriter>
 void WriteVectorToWriter(BitmapWriter& writer, const std::vector<int> values) {
   for (const auto& value : values) {
@@ -343,11 +350,216 @@ TEST_F(TestBitmapUInt64Reader, Random) {
   CheckExtensive(*buffer);
 }
 
-namespace internal {
-void PrintTo(const internal::BitRun& run, std::ostream* os) {
-  *os << run.ToString();  // whatever needed to print bar to os
+class TestSetBitRunReader : public ::testing::Test {
+ public:
+  std::vector<internal::SetBitRun> ReferenceBitRuns(const uint8_t* data,
+                                                    int64_t start_offset,
+                                                    int64_t length) {
+    std::vector<internal::SetBitRun> runs;
+    internal::BitRunReaderLinear reader(data, start_offset, length);
+    int64_t position = 0;
+    while (position < length) {
+      const auto br = reader.NextRun();
+      if (br.set) {
+        runs.push_back({position, br.length});
+      }
+      position += br.length;
+    }
+    return runs;
+  }
+
+  template <typename SetBitRunReaderType>
+  std::vector<internal::SetBitRun> AllBitRuns(SetBitRunReaderType* reader) {
+    std::vector<internal::SetBitRun> runs;
+    auto run = reader->NextRun();
+    while (!run.AtEnd()) {
+      runs.push_back(run);
+      run = reader->NextRun();
+    }
+    return runs;
+  }
+
+  template <typename SetBitRunReaderType>
+  void AssertBitRuns(SetBitRunReaderType* reader,
+                     const std::vector<internal::SetBitRun>& expected) {
+    ASSERT_EQ(AllBitRuns(reader), expected);
+  }
+
+  void AssertBitRuns(const uint8_t* data, int64_t start_offset, int64_t length,
+                     const std::vector<internal::SetBitRun>& expected) {
+    {
+      internal::SetBitRunReader reader(data, start_offset, length);
+      AssertBitRuns(&reader, expected);
+    }
+    {
+      internal::ReverseSetBitRunReader reader(data, start_offset, length);
+      auto reversed_expected = expected;
+      std::reverse(reversed_expected.begin(), reversed_expected.end());
+      AssertBitRuns(&reader, reversed_expected);
+    }
+  }
+
+  void AssertBitRuns(const Buffer& buffer, int64_t start_offset, int64_t 
length,
+                     const std::vector<internal::SetBitRun>& expected) {
+    AssertBitRuns(buffer.data(), start_offset, length, expected);
+  }
+
+  void CheckAgainstReference(const Buffer& buffer, int64_t start_offset, 
int64_t length) {
+    const auto expected = ReferenceBitRuns(buffer.data(), start_offset, 
length);
+    AssertBitRuns(buffer.data(), start_offset, length, expected);
+  }
+
+  struct Range {
+    int64_t offset;
+    int64_t length;
+
+    int64_t end_offset() const { return offset + length; }
+  };
+
+  std::vector<Range> BufferTestRanges(const Buffer& buffer) {
+    const int64_t buffer_size = buffer.size() * 8;  // in bits
+    std::vector<Range> ranges;
+    for (const int64_t offset : kTestOffsets) {
+      for (const int64_t length_adjust : kTestOffsets) {
+        int64_t length = std::min(buffer_size - offset, length_adjust);
+        EXPECT_GE(length, 0);
+        ranges.push_back({offset, length});
+        length = std::min(buffer_size - offset, buffer_size - length_adjust);
+        EXPECT_GE(length, 0);
+        ranges.push_back({offset, length});
+      }
+    }
+    return ranges;
+  }
+
+ protected:
+  const std::vector<int64_t> kTestOffsets = {0, 1, 6, 7, 8, 33, 63, 64, 65, 
71};
+};
+
+TEST_F(TestSetBitRunReader, Empty) {
+  for (const int64_t offset : kTestOffsets) {
+    // Does not access invalid memory
+    AssertBitRuns(nullptr, offset, 0, {});
+  }
+}
+
+TEST_F(TestSetBitRunReader, OneByte) {
+  auto buffer = BitmapFromString("01101101");
+  AssertBitRuns(*buffer, 0, 8, {{1, 2}, {4, 2}, {7, 1}});
+
+  for (const char* bitmap_string : {"01101101", "10110110", "00000000", 
"11111111"}) {
+    auto buffer = BitmapFromString(bitmap_string);
+    for (int64_t offset = 0; offset < 8; ++offset) {
+      for (int64_t length = 0; length <= 8 - offset; ++length) {
+        CheckAgainstReference(*buffer, offset, length);
+      }
+    }
+  }
+}
+
+TEST_F(TestSetBitRunReader, Tiny) {
+  auto buffer = BitmapFromString("11100011 10001110 00111000 11100011 10001110 
00111000");
+
+  AssertBitRuns(*buffer, 0, 48,
+                {{0, 3}, {6, 3}, {12, 3}, {18, 3}, {24, 3}, {30, 3}, {36, 3}, 
{42, 3}});
+  AssertBitRuns(*buffer, 0, 46,
+                {{0, 3}, {6, 3}, {12, 3}, {18, 3}, {24, 3}, {30, 3}, {36, 3}, 
{42, 3}});
+  AssertBitRuns(*buffer, 0, 45,
+                {{0, 3}, {6, 3}, {12, 3}, {18, 3}, {24, 3}, {30, 3}, {36, 3}, 
{42, 3}});
+  AssertBitRuns(*buffer, 0, 42,
+                {{0, 3}, {6, 3}, {12, 3}, {18, 3}, {24, 3}, {30, 3}, {36, 3}});
+  AssertBitRuns(*buffer, 3, 45,
+                {{3, 3}, {9, 3}, {15, 3}, {21, 3}, {27, 3}, {33, 3}, {39, 3}});
+  AssertBitRuns(*buffer, 3, 43,
+                {{3, 3}, {9, 3}, {15, 3}, {21, 3}, {27, 3}, {33, 3}, {39, 3}});
+  AssertBitRuns(*buffer, 3, 42,
+                {{3, 3}, {9, 3}, {15, 3}, {21, 3}, {27, 3}, {33, 3}, {39, 3}});
+  AssertBitRuns(*buffer, 3, 39, {{3, 3}, {9, 3}, {15, 3}, {21, 3}, {27, 3}, 
{33, 3}});
+}
+
+TEST_F(TestSetBitRunReader, AllZeros) {
+  const int64_t kBufferSize = 256;
+  ASSERT_OK_AND_ASSIGN(auto buffer, AllocateEmptyBitmap(kBufferSize));
+
+  for (const auto range : BufferTestRanges(*buffer)) {
+    AssertBitRuns(*buffer, range.offset, range.length, {});
+  }
+}
+
+TEST_F(TestSetBitRunReader, AllOnes) {
+  const int64_t kBufferSize = 256;
+  ASSERT_OK_AND_ASSIGN(auto buffer, AllocateEmptyBitmap(kBufferSize));
+  BitUtil::SetBitsTo(buffer->mutable_data(), 0, kBufferSize, true);
+
+  for (const auto range : BufferTestRanges(*buffer)) {
+    if (range.length > 0) {
+      AssertBitRuns(*buffer, range.offset, range.length, {{0, range.length}});
+    } else {
+      AssertBitRuns(*buffer, range.offset, range.length, {});
+    }
+  }
+}
+
+TEST_F(TestSetBitRunReader, Small) {
+  // Ones then zeros then ones
+  const int64_t kBufferSize = 256;
+  const int64_t kOnesLength = 64;
+  const int64_t kSecondOnesStart = kBufferSize - kOnesLength;
+  ASSERT_OK_AND_ASSIGN(auto buffer, AllocateEmptyBitmap(kBufferSize));
+  BitUtil::SetBitsTo(buffer->mutable_data(), 0, kBufferSize, false);
+  BitUtil::SetBitsTo(buffer->mutable_data(), 0, kOnesLength, true);
+  BitUtil::SetBitsTo(buffer->mutable_data(), kSecondOnesStart, kOnesLength, 
true);
+
+  for (const auto range : BufferTestRanges(*buffer)) {
+    std::vector<internal::SetBitRun> expected;
+    if (range.offset < kOnesLength && range.length > 0) {
+      expected.push_back({0, std::min(kOnesLength - range.offset, 
range.length)});
+    }
+    if (range.offset + range.length > kSecondOnesStart) {
+      expected.push_back({kSecondOnesStart - range.offset,
+                          range.length + range.offset - kSecondOnesStart});
+    }
+    AssertBitRuns(*buffer, range.offset, range.length, expected);
+  }
+}
+
+TEST_F(TestSetBitRunReader, SingleRun) {
+  // One single run of ones, at varying places in the buffer
+  const int64_t kBufferSize = 512;
+  ASSERT_OK_AND_ASSIGN(auto buffer, AllocateEmptyBitmap(kBufferSize));
+
+  for (const auto ones_range : BufferTestRanges(*buffer)) {
+    BitUtil::SetBitsTo(buffer->mutable_data(), 0, kBufferSize, false);
+    BitUtil::SetBitsTo(buffer->mutable_data(), ones_range.offset, 
ones_range.length,
+                       true);
+    for (const auto range : BufferTestRanges(*buffer)) {
+      std::vector<internal::SetBitRun> expected;
+
+      if (range.length && ones_range.length && range.offset < 
ones_range.end_offset() &&
+          ones_range.offset < range.end_offset()) {
+        // The two ranges intersect
+        const int64_t intersect_start = std::max(range.offset, 
ones_range.offset);
+        const int64_t intersect_stop =
+            std::min(range.end_offset(), ones_range.end_offset());
+        expected.push_back(
+            {intersect_start - range.offset, intersect_stop - 
intersect_start});
+      }
+      AssertBitRuns(*buffer, range.offset, range.length, expected);
+    }
+  }
+}
+
+TEST_F(TestSetBitRunReader, Random) {
+  const int64_t kBufferSize = 4096;
+  arrow::random::RandomArrayGenerator rng(42);
+  for (const double set_probability : {0.003, 0.01, 0.1, 0.5, 0.9, 0.99, 
0.997}) {
+    auto arr = rng.Boolean(kBufferSize, set_probability);
+    auto buffer = arr->data()->buffers[1];
+    for (const auto range : BufferTestRanges(*buffer)) {
+      CheckAgainstReference(*buffer, range.offset, range.length);
+    }
+  }
 }
-}  // namespace internal
 
 TEST(BitRunReader, ZeroLength) {
   internal::BitRunReader reader(nullptr, /*start_offset=*/0, /*length=*/0);
diff --git a/cpp/src/arrow/util/spaced.h b/cpp/src/arrow/util/spaced.h
index f2f369a..8265e1d 100644
--- a/cpp/src/arrow/util/spaced.h
+++ b/cpp/src/arrow/util/spaced.h
@@ -17,9 +17,11 @@
 
 #pragma once
 
-#include "arrow/util/align_util.h"
-#include "arrow/util/bit_block_counter.h"
-#include "arrow/util/bitmap_reader.h"
+#include <cassert>
+#include <cstdint>
+#include <cstring>
+
+#include "arrow/util/bit_run_reader.h"
 
 namespace arrow {
 namespace util {
@@ -37,57 +39,15 @@ template <typename T>
 inline int SpacedCompress(const T* src, int num_values, const uint8_t* 
valid_bits,
                           int64_t valid_bits_offset, T* output) {
   int num_valid_values = 0;
-  int idx_src = 0;
-
-  const auto p =
-      arrow::internal::BitmapWordAlign<1>(valid_bits, valid_bits_offset, 
num_values);
-  // First handle the leading bits
-  const int leading_bits = static_cast<int>(p.leading_bits);
-  while (idx_src < leading_bits) {
-    if (BitUtil::GetBit(valid_bits, valid_bits_offset)) {
-      output[num_valid_values] = src[idx_src];
-      num_valid_values++;
-    }
-    idx_src++;
-    valid_bits_offset++;
-  }
 
-  // The aligned parts scanned with BitBlockCounter
-  arrow::internal::BitBlockCounter data_counter(valid_bits, valid_bits_offset,
-                                                num_values - leading_bits);
-  auto current_block = data_counter.NextWord();
-  while (idx_src < num_values) {
-    if (current_block.AllSet()) {  // All true values
-      int run_length = 0;
-      // Scan forward until a block that has some false values (or the end)
-      while (current_block.length > 0 && current_block.AllSet()) {
-        run_length += current_block.length;
-        current_block = data_counter.NextWord();
-      }
-      // Fill all valid values of this scan
-      std::memcpy(&output[num_valid_values], &src[idx_src], run_length * 
sizeof(T));
-      num_valid_values += run_length;
-      idx_src += run_length;
-      valid_bits_offset += run_length;
-      // The current_block already computed, advance to next loop
-      continue;
-    } else if (!current_block.NoneSet()) {  // Some values are null
-      arrow::internal::BitmapReader valid_bits_reader(valid_bits, 
valid_bits_offset,
-                                                      current_block.length);
-      for (int64_t i = 0; i < current_block.length; i++) {
-        if (valid_bits_reader.IsSet()) {
-          output[num_valid_values] = src[idx_src];
-          num_valid_values++;
-        }
-        idx_src++;
-        valid_bits_reader.Next();
-      }
-      valid_bits_offset += current_block.length;
-    } else {  // All null values
-      idx_src += current_block.length;
-      valid_bits_offset += current_block.length;
+  arrow::internal::SetBitRunReader reader(valid_bits, valid_bits_offset, 
num_values);
+  while (true) {
+    const auto run = reader.NextRun();
+    if (run.length == 0) {
+      break;
     }
-    current_block = data_counter.NextWord();
+    std::memcpy(output + num_valid_values, src + run.position, run.length * 
sizeof(T));
+    num_valid_values += static_cast<int32_t>(run.length);
   }
 
   return num_valid_values;
@@ -105,92 +65,31 @@ inline int SpacedCompress(const T* src, int num_values, 
const uint8_t* valid_bit
 template <typename T>
 inline int SpacedExpand(T* buffer, int num_values, int null_count,
                         const uint8_t* valid_bits, int64_t valid_bits_offset) {
-  constexpr int64_t kBatchSize = arrow::internal::BitBlockCounter::kWordBits;
-  constexpr int64_t kBatchByteSize = kBatchSize / 8;
-
   // Point to end as we add the spacing from the back.
-  int idx_decode = num_values - null_count - 1;
-  int idx_buffer = num_values - 1;
-  int64_t idx_valid_bits = valid_bits_offset + idx_buffer;
+  int idx_decode = num_values - null_count;
 
   // Depending on the number of nulls, some of the value slots in buffer may
   // be uninitialized, and this will cause valgrind warnings / potentially UB
-  std::memset(static_cast<void*>(&buffer[idx_decode + 1]), 0, null_count * 
sizeof(T));
-  if (idx_decode < 0) {
+  std::memset(static_cast<void*>(buffer + idx_decode), 0, null_count * 
sizeof(T));
+  if (idx_decode == 0) {
+    // All nulls, nothing more to do
     return num_values;
   }
 
-  // Access the bitmap by aligned way
-  const auto p = arrow::internal::BitmapWordAlign<kBatchByteSize>(
-      valid_bits, valid_bits_offset, num_values);
-
-  // The trailing bits
-  auto trailing_bits = p.trailing_bits;
-  while (trailing_bits > 0) {
-    if (BitUtil::GetBit(valid_bits, idx_valid_bits)) {
-      buffer[idx_buffer] = buffer[idx_decode];
-      idx_decode--;
-    }
-
-    idx_buffer--;
-    idx_valid_bits--;
-    trailing_bits--;
-  }
-
-  // The aligned parts scanned from the back with BitBlockCounter
-  auto aligned_words = p.aligned_words;
-  T tmp[kBatchSize];
-  std::memset(&tmp, 0, kBatchSize * sizeof(T));
-  while (aligned_words > 0 && idx_decode < idx_buffer) {
-    const uint8_t* aligned_bits = p.aligned_start + (aligned_words - 1) * 
kBatchByteSize;
-    arrow::internal::BitBlockCounter data_counter(aligned_bits, 0, kBatchSize);
-    const auto current_block = data_counter.NextWord();
-    if (current_block.AllSet()) {  // All true values
-      idx_buffer -= kBatchSize;
-      idx_decode -= kBatchSize;
-      if (idx_buffer >= idx_decode + static_cast<int>(kBatchSize)) {
-        std::memcpy(&buffer[idx_buffer + 1], &buffer[idx_decode + 1],
-                    kBatchSize * sizeof(T));
-      } else {
-        // Exchange the data to avoid the buffer overlap
-        std::memcpy(tmp, &buffer[idx_decode + 1], kBatchSize * sizeof(T));
-        std::memcpy(&buffer[idx_buffer + 1], tmp, kBatchSize * sizeof(T));
-      }
-    } else if (!current_block.NoneSet()) {  // Some values are null
-      arrow::internal::BitmapReader valid_bits_reader(aligned_bits, 0, 
kBatchSize);
-      idx_buffer -= kBatchSize;
-      idx_decode -= current_block.popcount;
-
-      // Forward scan and pack the target data to temp
-      int idx = idx_decode + 1;
-      for (uint64_t i = 0; i < kBatchSize; i++) {
-        if (valid_bits_reader.IsSet()) {
-          tmp[i] = buffer[idx];
-          idx++;
-        }
-        valid_bits_reader.Next();
-      }
-      std::memcpy(&buffer[idx_buffer + 1], tmp, kBatchSize * sizeof(T));
-    } else {  // All null values
-      idx_buffer -= kBatchSize;
-    }
-
-    idx_valid_bits -= kBatchSize;
-    aligned_words--;
-  }
-
-  // The remaining leading bits
-  auto leading_bits = p.leading_bits;
-  while (leading_bits > 0) {
-    if (BitUtil::GetBit(valid_bits, idx_valid_bits)) {
-      buffer[idx_buffer] = buffer[idx_decode];
-      idx_decode--;
+  arrow::internal::ReverseSetBitRunReader reader(valid_bits, valid_bits_offset,
+                                                 num_values);
+  while (true) {
+    const auto run = reader.NextRun();
+    if (run.length == 0) {
+      break;
     }
-    leading_bits--;
-    idx_buffer--;
-    idx_valid_bits--;
+    idx_decode -= static_cast<int32_t>(run.length);
+    assert(idx_decode >= 0);
+    std::memmove(buffer + run.position, buffer + idx_decode, run.length * 
sizeof(T));
   }
 
+  // Otherwise caller gave an incorrect null_count
+  assert(idx_decode == 0);
   return num_values;
 }
 
diff --git a/cpp/src/parquet/column_io_benchmark.cc 
b/cpp/src/parquet/column_io_benchmark.cc
index ffdf7df..26b8aff 100644
--- a/cpp/src/parquet/column_io_benchmark.cc
+++ b/cpp/src/parquet/column_io_benchmark.cc
@@ -165,35 +165,41 @@ static void BM_ReadInt64Column(::benchmark::State& state) 
{
   SetBytesProcessed(state, repetition);
 }
 
-BENCHMARK_TEMPLATE(BM_ReadInt64Column, Repetition::REQUIRED)
-    ->RangePair(1024, 65536, 1, 1024);
+void ReadColumnSetArgs(::benchmark::internal::Benchmark* bench) {
+  // Small column, tiny reads
+  bench->Args({1024, 16});
+  // Small column, full read
+  bench->Args({1024, 1024});
+  // Midsize column, midsize reads
+  bench->Args({65536, 1024});
+}
+
+BENCHMARK_TEMPLATE(BM_ReadInt64Column, 
Repetition::REQUIRED)->Apply(ReadColumnSetArgs);
 
-BENCHMARK_TEMPLATE(BM_ReadInt64Column, Repetition::OPTIONAL)
-    ->RangePair(1024, 65536, 1, 1024);
+BENCHMARK_TEMPLATE(BM_ReadInt64Column, 
Repetition::OPTIONAL)->Apply(ReadColumnSetArgs);
 
-BENCHMARK_TEMPLATE(BM_ReadInt64Column, Repetition::REPEATED)
-    ->RangePair(1024, 65536, 1, 1024);
+BENCHMARK_TEMPLATE(BM_ReadInt64Column, 
Repetition::REPEATED)->Apply(ReadColumnSetArgs);
 
 BENCHMARK_TEMPLATE(BM_ReadInt64Column, Repetition::REQUIRED, 
Compression::SNAPPY)
-    ->RangePair(1024, 65536, 1, 1024);
+    ->Apply(ReadColumnSetArgs);
 BENCHMARK_TEMPLATE(BM_ReadInt64Column, Repetition::OPTIONAL, 
Compression::SNAPPY)
-    ->RangePair(1024, 65536, 1, 1024);
+    ->Apply(ReadColumnSetArgs);
 BENCHMARK_TEMPLATE(BM_ReadInt64Column, Repetition::REPEATED, 
Compression::SNAPPY)
-    ->RangePair(1024, 65536, 1, 1024);
+    ->Apply(ReadColumnSetArgs);
 
 BENCHMARK_TEMPLATE(BM_ReadInt64Column, Repetition::REQUIRED, Compression::LZ4)
-    ->RangePair(1024, 65536, 1, 1024);
+    ->Apply(ReadColumnSetArgs);
 BENCHMARK_TEMPLATE(BM_ReadInt64Column, Repetition::OPTIONAL, Compression::LZ4)
-    ->RangePair(1024, 65536, 1, 1024);
+    ->Apply(ReadColumnSetArgs);
 BENCHMARK_TEMPLATE(BM_ReadInt64Column, Repetition::REPEATED, Compression::LZ4)
-    ->RangePair(1024, 65536, 1, 1024);
+    ->Apply(ReadColumnSetArgs);
 
 BENCHMARK_TEMPLATE(BM_ReadInt64Column, Repetition::REQUIRED, Compression::ZSTD)
-    ->RangePair(1024, 65536, 1, 1024);
+    ->Apply(ReadColumnSetArgs);
 BENCHMARK_TEMPLATE(BM_ReadInt64Column, Repetition::OPTIONAL, Compression::ZSTD)
-    ->RangePair(1024, 65536, 1, 1024);
+    ->Apply(ReadColumnSetArgs);
 BENCHMARK_TEMPLATE(BM_ReadInt64Column, Repetition::REPEATED, Compression::ZSTD)
-    ->RangePair(1024, 65536, 1, 1024);
+    ->Apply(ReadColumnSetArgs);
 
 static void BM_RleEncoding(::benchmark::State& state) {
   std::vector<int16_t> levels(state.range(0), 0);
diff --git a/cpp/src/parquet/encoding.cc b/cpp/src/parquet/encoding.cc
index 0f99a92..bd4d52b 100644
--- a/cpp/src/parquet/encoding.cc
+++ b/cpp/src/parquet/encoding.cc
@@ -30,6 +30,7 @@
 #include "arrow/array/builder_dict.h"
 #include "arrow/stl_allocator.h"
 #include "arrow/type_traits.h"
+#include "arrow/util/bit_run_reader.h"
 #include "arrow/util/bit_stream_utils.h"
 #include "arrow/util/bitmap_ops.h"
 #include "arrow/util/byte_stream_split.h"
@@ -525,14 +526,12 @@ class DictEncoderImpl : public EncoderImpl, virtual 
public DictEncoder<DType> {
 
   void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
                  int64_t valid_bits_offset) override {
-    ::arrow::internal::BitmapReader valid_bits_reader(valid_bits, 
valid_bits_offset,
-                                                      num_values);
-    for (int32_t i = 0; i < num_values; i++) {
-      if (valid_bits_reader.IsSet()) {
-        Put(src[i]);
-      }
-      valid_bits_reader.Next();
-    }
+    ::arrow::internal::VisitSetBitRunsVoid(valid_bits, valid_bits_offset, 
num_values,
+                                           [&](int64_t position, int64_t 
length) {
+                                             for (int64_t i = 0; i < length; 
i++) {
+                                               Put(src[i + position]);
+                                             }
+                                           });
   }
 
   using TypedEncoder<DType>::Put;
@@ -546,20 +545,14 @@ class DictEncoderImpl : public EncoderImpl, virtual 
public DictEncoder<DType> {
     size_t buffer_position = buffered_indices_.size();
     buffered_indices_.resize(buffer_position +
                              static_cast<size_t>(data.length() - 
data.null_count()));
-    if (data.null_count() > 0) {
-      ::arrow::internal::BitmapReader 
valid_bits_reader(data.null_bitmap_data(),
-                                                        data.offset(), 
data.length());
-      for (int64_t i = 0; i < data.length(); ++i) {
-        if (valid_bits_reader.IsSet()) {
-          buffered_indices_[buffer_position++] = 
static_cast<int32_t>(values[i]);
-        }
-        valid_bits_reader.Next();
-      }
-    } else {
-      for (int64_t i = 0; i < data.length(); ++i) {
-        buffered_indices_[buffer_position++] = static_cast<int32_t>(values[i]);
-      }
-    }
+    ::arrow::internal::VisitSetBitRunsVoid(
+        data.null_bitmap_data(), data.offset(), data.length(),
+        [&](int64_t position, int64_t length) {
+          for (int64_t i = 0; i < length; ++i) {
+            buffered_indices_[buffer_position++] =
+                static_cast<int32_t>(values[i + position]);
+          }
+        });
   }
 
   void PutIndices(const ::arrow::Array& data) override {
diff --git a/cpp/src/parquet/statistics.cc b/cpp/src/parquet/statistics.cc
index 5386fed..d261cb5 100644
--- a/cpp/src/parquet/statistics.cc
+++ b/cpp/src/parquet/statistics.cc
@@ -27,7 +27,7 @@
 #include "arrow/array.h"
 #include "arrow/type.h"
 #include "arrow/type_traits.h"
-#include "arrow/util/bitmap_reader.h"
+#include "arrow/util/bit_run_reader.h"
 #include "arrow/util/checked_cast.h"
 #include "arrow/util/logging.h"
 #include "arrow/util/optional.h"
@@ -297,14 +297,16 @@ class TypedComparatorImpl : virtual public 
TypedComparator<DType> {
     T min = Helper::DefaultMin();
     T max = Helper::DefaultMax();
 
-    ::arrow::internal::BitmapReader reader(valid_bits, valid_bits_offset, 
length);
-    for (int64_t i = 0; i < length; i++, reader.Next()) {
-      if (reader.IsSet()) {
-        auto val = values[i];
-        min = Helper::Min(type_length_, min, Helper::Coalesce(val, 
Helper::DefaultMin()));
-        max = Helper::Max(type_length_, max, Helper::Coalesce(val, 
Helper::DefaultMax()));
-      }
-    }
+    ::arrow::internal::VisitSetBitRunsVoid(
+        valid_bits, valid_bits_offset, length, [&](int64_t position, int64_t 
length) {
+          for (int64_t i = 0; i < length; i++) {
+            const auto val = values[i + position];
+            min = Helper::Min(type_length_, min,
+                              Helper::Coalesce(val, Helper::DefaultMin()));
+            max = Helper::Max(type_length_, max,
+                              Helper::Coalesce(val, Helper::DefaultMax()));
+          }
+        });
 
     return {min, max};
   }

Reply via email to