rluvaton commented on code in PR #6893:
URL: https://github.com/apache/arrow-rs/pull/6893#discussion_r1888719695


##########
arrow-select/src/concat.rs:
##########
@@ -129,12 +130,164 @@ fn concat_dictionaries<K: ArrowDictionaryKeyType>(
     Ok(Arc::new(array))
 }
 
+fn concat_list_of_dictionaries<OffsetSize: OffsetSizeTrait, K: 
ArrowDictionaryKeyType>(
+    arrays: &[&dyn Array],
+) -> Result<ArrayRef, ArrowError> {
+    let mut output_len = 0;
+    let mut list_has_nulls = false;
+
+    let lists = arrays
+        .iter()
+        .map(|x| x.as_list::<OffsetSize>())
+        .inspect(|l| {
+            output_len += l.len();
+            list_has_nulls |= l.null_count() != 0;
+        })
+        .collect::<Vec<_>>();
+
+    let mut dictionary_output_len = 0;
+    let dictionaries: Vec<_> = lists
+        .iter()
+        .map(|x| x.values().as_ref().as_dictionary::<K>())
+        .inspect(|d| dictionary_output_len += d.len())
+        .collect();
+
+    if !should_merge_dictionary_values::<K>(&dictionaries, 
dictionary_output_len) {
+        return concat_fallback(arrays, Capacities::Array(output_len));
+    }
+
+    let merged = merge_dictionary_values(&dictionaries, None)?;
+
+    let lists_nulls = list_has_nulls.then(|| {
+        let mut nulls = BooleanBufferBuilder::new(output_len);
+        for l in &lists {
+            match l.nulls() {
+                Some(n) => nulls.append_buffer(n.inner()),
+                None => nulls.append_n(l.len(), true),
+            }
+        }
+        NullBuffer::new(nulls.finish())
+    });
+
+    // Recompute keys
+    let mut key_values = Vec::with_capacity(dictionary_output_len);
+
+    let mut dictionary_has_nulls = false;
+    for (d, mapping) in dictionaries.iter().zip(merged.key_mappings) {
+        dictionary_has_nulls |= d.null_count() != 0;
+        for key in d.keys().values() {
+            // Use get to safely handle nulls
+            
key_values.push(mapping.get(key.as_usize()).copied().unwrap_or_default())
+        }
+    }
+
+    let dictionary_nulls = dictionary_has_nulls.then(|| {
+        let mut nulls = BooleanBufferBuilder::new(dictionary_output_len);
+        for d in &dictionaries {
+            match d.nulls() {
+                Some(n) => nulls.append_buffer(n.inner()),
+                None => nulls.append_n(d.len(), true),
+            }
+        }
+        NullBuffer::new(nulls.finish())
+    });
+
+    let keys = PrimitiveArray::<K>::new(key_values.into(), dictionary_nulls);
+    // Sanity check
+    assert_eq!(keys.len(), dictionary_output_len);
+
+    let array = unsafe { DictionaryArray::new_unchecked(keys, merged.values) };
+
+    // Merge value offsets from the lists
+    let all_value_offsets_iterator = lists.iter().map(|x| x.offsets());
+
+    let value_offset_buffer = merge_value_offsets(all_value_offsets_iterator);
+
+    let builder = ArrayDataBuilder::new(arrays[0].data_type().clone())
+        .len(output_len)
+        .nulls(lists_nulls)
+        // `GenericListArray` must only have 1 buffer
+        .buffers(vec![value_offset_buffer])
+        // `GenericListArray` must only have 1 child_data
+        .child_data(vec![array.to_data()]);
+
+    // TODO - maybe use build_unchecked?
+    let array_data = builder.build()?;
+
+    let array = GenericListArray::<OffsetSize>::from(array_data);
+    Ok(Arc::new(array))
+}
+
+/// Merge value offsets
+///
+///
+/// if we have the following
+/// [[0, 3, 5], [0, 2, 2, 8], [], [0, 0, 1]]
+/// The output should be
+/// [ 0, 3, 5,      7, 7, 13,         13, 14]
+fn merge_value_offsets<
+    'a,
+    OffsetSize: OffsetSizeTrait,
+    I: Iterator<Item = &'a OffsetBuffer<OffsetSize>>,
+>(
+    offset_buffers_iterator: I,
+) -> Buffer {
+    // 1. Filter out empty lists
+    let mut offset_buffers_iterator = offset_buffers_iterator.filter(|x| 
!x.is_empty());
+
+    // 2. Get first non-empty list as the starting point
+    let starting_buffer = offset_buffers_iterator.next();
+
+    // 3. If we have only empty lists, return an empty buffer
+    if starting_buffer.is_none() {
+        return Buffer::from(&[]);
+    }
+
+    let starting_buffer = starting_buffer.unwrap();
+
+    let mut offsets_iter: Box<dyn Iterator<Item = OffsetSize>> =
+        Box::new(starting_buffer.iter().copied());
+
+    // 4. Get the last value in the starting buffer as the starting point for 
the next buffer
+    // Safety: We already filtered out empty lists
+    let mut advance_by = *starting_buffer.last().unwrap();
+
+    // 5. Iterate over the remaining buffers
+    for offset_buffer in offset_buffers_iterator {
+        // 6. Get the last value of the current buffer so we can know how much 
to advance the next buffer
+        // Safety: We already filtered out empty lists
+        let last_value = *offset_buffer.last().unwrap();
+
+        // 7. Advance the offset buffer by the last value in the previous 
buffer
+        let offset_buffer_iter = offset_buffer
+            .iter()
+            // Skip the first value as it is the initial offset of 0
+            .skip(1)
+            .map(move |&x| x + advance_by);
+
+        // 8. concat the current buffer with the previous buffer
+        // Chaining keeps the iterator have trusting length
+        offsets_iter = Box::new(offsets_iter.chain(offset_buffer_iter));

Review Comment:
   I have stack overflow error here, fixing now



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to