alamb commented on a change in pull request #1228:
URL: https://github.com/apache/arrow-rs/pull/1228#discussion_r795043370



##########
File path: arrow/src/compute/kernels/filter.rs
##########
@@ -17,184 +17,117 @@
 
 //! Defines miscellaneous array kernels.
 
+use crate::array::*;
 use crate::buffer::buffer_bin_and;
 use crate::datatypes::DataType;
 use crate::error::Result;
 use crate::record_batch::RecordBatch;
-use crate::{array::*, util::bit_chunk_iterator::BitChunkIterator};
-use std::iter::Enumerate;
+use crate::util::bit_chunk_iterator::{UnalignedBitChunk, 
UnalignedBitChunkIterator};
 
 /// Function that can filter arbitrary arrays
 pub type Filter<'a> = Box<dyn Fn(&ArrayData) -> ArrayData + 'a>;
 
-/// Internal state of [SlicesIterator]
-#[derive(Debug, PartialEq)]
-enum State {
-    // it is iterating over bits of a mask (`u64`, steps of size of 1 slot)
-    Bits(u64),
-    // it is iterating over chunks (steps of size of 64 slots)
-    Chunks,
-    // it is iterating over the remaining bits (steps of size of 1 slot)
-    Remainder,
-    // nothing more to iterate.
-    Finish,
-}
-
 /// An iterator of `(usize, usize)` each representing an interval 
`[start,end[` whose
 /// slots of a [BooleanArray] are true. Each interval corresponds to a 
contiguous region of memory to be
 /// "taken" from an array to be filtered.
 #[derive(Debug)]
 pub struct SlicesIterator<'a> {
-    iter: Enumerate<BitChunkIterator<'a>>,
-    state: State,
-    filter: &'a BooleanArray,
-    remainder_mask: u64,
-    remainder_len: usize,
-    chunk_len: usize,
+    iter: UnalignedBitChunkIterator<'a>,
     len: usize,
-    start: usize,
-    on_region: bool,
-    current_chunk: usize,
-    current_bit: usize,
+    offset: usize,
+    current_chunk: u64,
 }
 
 impl<'a> SlicesIterator<'a> {
     pub fn new(filter: &'a BooleanArray) -> Self {
         let values = &filter.data_ref().buffers()[0];
-        let chunks = values.bit_chunks(filter.offset(), filter.len());
+        let len = filter.len();
+        let chunk = UnalignedBitChunk::new(values.as_slice(), filter.offset(), 
len);
+        let mut iter = chunk.iter();
+
+        let offset = chunk.lead_padding();
+        let current_chunk = iter.next().unwrap_or(0);
 
         Self {
-            iter: chunks.iter().enumerate(),
-            state: State::Chunks,
-            filter,
-            remainder_len: chunks.remainder_len(),
-            chunk_len: chunks.chunk_len(),
-            remainder_mask: chunks.remainder_bits(),
-            len: 0,
-            start: 0,
-            on_region: false,
-            current_chunk: 0,
-            current_bit: 0,
+            iter,
+            len,
+            offset,
+            current_chunk,
         }
     }
 
-    /// Counts the number of set bits in the filter array.
-    fn filter_count(&self) -> usize {
-        let values = self.filter.values();
-        values.count_set_bits_offset(self.filter.offset(), self.filter.len())
-    }
-
-    #[inline]
-    fn current_start(&self) -> usize {
-        self.current_chunk * 64 + self.current_bit
-    }
-
-    #[inline]
-    fn iterate_bits(&mut self, mask: u64, max: usize) -> Option<(usize, 
usize)> {
-        while self.current_bit < max {
-            if (mask & (1 << self.current_bit)) != 0 {
-                if !self.on_region {
-                    self.start = self.current_start();
-                    self.on_region = true;
-                }
-                self.len += 1;
-            } else if self.on_region {
-                let result = (self.start, self.start + self.len);
-                self.len = 0;
-                self.on_region = false;
-                self.current_bit += 1;
-                return Some(result);
+    fn advance_to_set_bit(&mut self) -> Option<(usize, u32)> {
+        loop {
+            if self.current_chunk != 0 {
+                // Find the index of the first 1
+                let bit_pos = self.current_chunk.trailing_zeros();
+                return Some((self.offset, bit_pos));
             }
-            self.current_bit += 1;
-        }
-        self.current_bit = 0;
-        None
-    }
 
-    /// iterates over chunks.
-    #[inline]
-    fn iterate_chunks(&mut self) -> Option<(usize, usize)> {
-        while let Some((i, mask)) = self.iter.next() {
-            self.current_chunk = i;
-            if mask == 0 {
-                if self.on_region {
-                    let result = (self.start, self.start + self.len);
-                    self.len = 0;
-                    self.on_region = false;
-                    return Some(result);
-                }
-            } else if mask == 18446744073709551615u64 {
-                // = !0u64
-                if !self.on_region {
-                    self.start = self.current_start();
-                    self.on_region = true;
-                }
-                self.len += 64;
-            } else {
-                // there is a chunk that has a non-trivial mask => iterate 
over bits.
-                self.state = State::Bits(mask);
-                return None;
-            }
+            self.current_chunk = self.iter.next()?;
+            self.offset += 64;
         }
-        // no more chunks => start iterating over the remainder
-        self.current_chunk = self.chunk_len;
-        self.state = State::Remainder;
-        None
     }
 }
 
 impl<'a> Iterator for SlicesIterator<'a> {
     type Item = (usize, usize);
 
     fn next(&mut self) -> Option<Self::Item> {
-        match self.state {
-            State::Chunks => {
-                match self.iterate_chunks() {
-                    None => {
-                        // iterating over chunks does not yield any new slice 
=> continue to the next
-                        self.current_bit = 0;
-                        self.next()
-                    }
-                    other => other,
-                }
+        // Used as termination condition
+        if self.len == 0 {
+            return None;
+        }
+
+        let (start_chunk, start_bit) = self.advance_to_set_bit()?;
+
+        // Set bits up to start
+        self.current_chunk |= (1 << start_bit) - 1;

Review comment:
       I think I understand what is going on here, but I am not 100% confident
   
   Would it be possible to write some unit tests of `SlicesIter` against 
`BooleanArray`s (I can help write them if you would like) that could 
demonstrate how the logic was working, and especially cover the corner cases 
(like enture `0xffffffff` and bits set within a mask?) 

##########
File path: arrow/src/compute/kernels/filter.rs
##########
@@ -17,184 +17,117 @@
 
 //! Defines miscellaneous array kernels.
 
+use crate::array::*;
 use crate::buffer::buffer_bin_and;
 use crate::datatypes::DataType;
 use crate::error::Result;
 use crate::record_batch::RecordBatch;
-use crate::{array::*, util::bit_chunk_iterator::BitChunkIterator};
-use std::iter::Enumerate;
+use crate::util::bit_chunk_iterator::{UnalignedBitChunk, 
UnalignedBitChunkIterator};
 
 /// Function that can filter arbitrary arrays
 pub type Filter<'a> = Box<dyn Fn(&ArrayData) -> ArrayData + 'a>;
 
-/// Internal state of [SlicesIterator]
-#[derive(Debug, PartialEq)]
-enum State {
-    // it is iterating over bits of a mask (`u64`, steps of size of 1 slot)
-    Bits(u64),
-    // it is iterating over chunks (steps of size of 64 slots)
-    Chunks,
-    // it is iterating over the remaining bits (steps of size of 1 slot)
-    Remainder,
-    // nothing more to iterate.
-    Finish,
-}
-
 /// An iterator of `(usize, usize)` each representing an interval 
`[start,end[` whose
 /// slots of a [BooleanArray] are true. Each interval corresponds to a 
contiguous region of memory to be
 /// "taken" from an array to be filtered.
 #[derive(Debug)]
 pub struct SlicesIterator<'a> {
-    iter: Enumerate<BitChunkIterator<'a>>,
-    state: State,
-    filter: &'a BooleanArray,
-    remainder_mask: u64,
-    remainder_len: usize,
-    chunk_len: usize,
+    iter: UnalignedBitChunkIterator<'a>,
     len: usize,
-    start: usize,
-    on_region: bool,
-    current_chunk: usize,
-    current_bit: usize,
+    offset: usize,
+    current_chunk: u64,
 }
 
 impl<'a> SlicesIterator<'a> {
     pub fn new(filter: &'a BooleanArray) -> Self {
         let values = &filter.data_ref().buffers()[0];
-        let chunks = values.bit_chunks(filter.offset(), filter.len());
+        let len = filter.len();
+        let chunk = UnalignedBitChunk::new(values.as_slice(), filter.offset(), 
len);
+        let mut iter = chunk.iter();
+
+        let offset = chunk.lead_padding();
+        let current_chunk = iter.next().unwrap_or(0);
 
         Self {
-            iter: chunks.iter().enumerate(),
-            state: State::Chunks,
-            filter,
-            remainder_len: chunks.remainder_len(),
-            chunk_len: chunks.chunk_len(),
-            remainder_mask: chunks.remainder_bits(),
-            len: 0,
-            start: 0,
-            on_region: false,
-            current_chunk: 0,
-            current_bit: 0,
+            iter,
+            len,
+            offset,
+            current_chunk,
         }
     }
 
-    /// Counts the number of set bits in the filter array.
-    fn filter_count(&self) -> usize {
-        let values = self.filter.values();
-        values.count_set_bits_offset(self.filter.offset(), self.filter.len())
-    }
-
-    #[inline]
-    fn current_start(&self) -> usize {
-        self.current_chunk * 64 + self.current_bit
-    }
-
-    #[inline]
-    fn iterate_bits(&mut self, mask: u64, max: usize) -> Option<(usize, 
usize)> {
-        while self.current_bit < max {
-            if (mask & (1 << self.current_bit)) != 0 {
-                if !self.on_region {
-                    self.start = self.current_start();
-                    self.on_region = true;
-                }
-                self.len += 1;
-            } else if self.on_region {
-                let result = (self.start, self.start + self.len);
-                self.len = 0;
-                self.on_region = false;
-                self.current_bit += 1;
-                return Some(result);
+    fn advance_to_set_bit(&mut self) -> Option<(usize, u32)> {

Review comment:
       ```suggestion
       /// Returns `Some((chunk_offset, bit_offset))` for the next chunk that 
has at 
       /// least one but set, or None if there is no such chunk.
       /// 
       /// where `chunk_offset` is the bit offset to the current `usize`d chunk 
       /// and `bit_offset` is the offset of the first `1` bit in that chunk
       fn advance_to_set_bit(&mut self) -> Option<(usize, u32)> {
   ```

##########
File path: parquet/src/arrow/bit_util.rs
##########
@@ -15,40 +15,33 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use arrow::util::bit_chunk_iterator::BitChunks;
+use arrow::util::bit_chunk_iterator::UnalignedBitChunk;
 use std::ops::Range;
 
 /// Counts the number of set bits in the provided range
 pub fn count_set_bits(bytes: &[u8], range: Range<usize>) -> usize {
-    let mut count = 0_usize;
-    let chunks = BitChunks::new(bytes, range.start, range.end - range.start);
-    chunks.iter().for_each(|chunk| {
-        count += chunk.count_ones() as usize;
-    });
-    count += chunks.remainder_bits().count_ones() as usize;
-    count
+    let unaligned = UnalignedBitChunk::new(bytes, range.start, range.end - 
range.start);

Review comment:
       Just to check my understanding, you said above that `UnalignedBitChunk` 
was slower than `BitChunks` as it handled unaligned data; This function is 
dealing with input that *is* aligned, but only to `u8`,  so it is still better 
to use the `UnalignedBitChunk`iterator
   
   

##########
File path: arrow/src/compute/kernels/filter.rs
##########
@@ -17,184 +17,117 @@
 
 //! Defines miscellaneous array kernels.
 
+use crate::array::*;
 use crate::buffer::buffer_bin_and;
 use crate::datatypes::DataType;
 use crate::error::Result;
 use crate::record_batch::RecordBatch;
-use crate::{array::*, util::bit_chunk_iterator::BitChunkIterator};
-use std::iter::Enumerate;
+use crate::util::bit_chunk_iterator::{UnalignedBitChunk, 
UnalignedBitChunkIterator};
 
 /// Function that can filter arbitrary arrays
 pub type Filter<'a> = Box<dyn Fn(&ArrayData) -> ArrayData + 'a>;
 
-/// Internal state of [SlicesIterator]
-#[derive(Debug, PartialEq)]
-enum State {
-    // it is iterating over bits of a mask (`u64`, steps of size of 1 slot)
-    Bits(u64),
-    // it is iterating over chunks (steps of size of 64 slots)
-    Chunks,
-    // it is iterating over the remaining bits (steps of size of 1 slot)
-    Remainder,
-    // nothing more to iterate.
-    Finish,
-}
-
 /// An iterator of `(usize, usize)` each representing an interval 
`[start,end[` whose
 /// slots of a [BooleanArray] are true. Each interval corresponds to a 
contiguous region of memory to be
 /// "taken" from an array to be filtered.
 #[derive(Debug)]
 pub struct SlicesIterator<'a> {
-    iter: Enumerate<BitChunkIterator<'a>>,
-    state: State,
-    filter: &'a BooleanArray,
-    remainder_mask: u64,
-    remainder_len: usize,
-    chunk_len: usize,
+    iter: UnalignedBitChunkIterator<'a>,
     len: usize,
-    start: usize,
-    on_region: bool,
-    current_chunk: usize,
-    current_bit: usize,
+    offset: usize,
+    current_chunk: u64,
 }
 
 impl<'a> SlicesIterator<'a> {
     pub fn new(filter: &'a BooleanArray) -> Self {
         let values = &filter.data_ref().buffers()[0];
-        let chunks = values.bit_chunks(filter.offset(), filter.len());
+        let len = filter.len();
+        let chunk = UnalignedBitChunk::new(values.as_slice(), filter.offset(), 
len);
+        let mut iter = chunk.iter();
+
+        let offset = chunk.lead_padding();
+        let current_chunk = iter.next().unwrap_or(0);
 
         Self {
-            iter: chunks.iter().enumerate(),
-            state: State::Chunks,
-            filter,
-            remainder_len: chunks.remainder_len(),
-            chunk_len: chunks.chunk_len(),
-            remainder_mask: chunks.remainder_bits(),
-            len: 0,
-            start: 0,
-            on_region: false,
-            current_chunk: 0,
-            current_bit: 0,
+            iter,
+            len,
+            offset,
+            current_chunk,
         }
     }
 
-    /// Counts the number of set bits in the filter array.
-    fn filter_count(&self) -> usize {
-        let values = self.filter.values();
-        values.count_set_bits_offset(self.filter.offset(), self.filter.len())
-    }
-
-    #[inline]
-    fn current_start(&self) -> usize {
-        self.current_chunk * 64 + self.current_bit
-    }
-
-    #[inline]
-    fn iterate_bits(&mut self, mask: u64, max: usize) -> Option<(usize, 
usize)> {
-        while self.current_bit < max {
-            if (mask & (1 << self.current_bit)) != 0 {
-                if !self.on_region {
-                    self.start = self.current_start();
-                    self.on_region = true;
-                }
-                self.len += 1;
-            } else if self.on_region {
-                let result = (self.start, self.start + self.len);
-                self.len = 0;
-                self.on_region = false;
-                self.current_bit += 1;
-                return Some(result);
+    fn advance_to_set_bit(&mut self) -> Option<(usize, u32)> {
+        loop {
+            if self.current_chunk != 0 {
+                // Find the index of the first 1
+                let bit_pos = self.current_chunk.trailing_zeros();

Review comment:
       TIL: 
https://doc.rust-lang.org/std/primitive.u64.html#method.trailing_zeros 🤔 👍 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to