UBarney commented on code in PR #15266: URL: https://github.com/apache/datafusion/pull/15266#discussion_r2002683221
########## datafusion/functions-aggregate/src/first_last.rs: ########## @@ -179,6 +292,419 @@ impl AggregateUDFImpl for FirstValue { } } +struct FirstPrimitiveGroupsAccumulator<T> +where + T: ArrowPrimitiveType + Send, +{ + // ================ state =========== + vals: Vec<T::Native>, + // Stores ordering values, of the aggregator requirement corresponding to first value + // of the aggregator. + // The `orderings` are stored row-wise, meaning that `orderings[group_idx]` + // represents the ordering values corresponding to the `group_idx`-th group. + orderings: Vec<Vec<ScalarValue>>, + // At the beginning, `is_sets[group_idx]` is false, which means `first` is not seen yet. + // Once we see the first value, we set the `is_sets[group_idx]` flag + is_sets: BooleanBufferBuilder, + // null_builder[group_idx] == false => vals[group_idx] is null + null_builder: BooleanBufferBuilder, + // size of `self.orderings` + // Calculating the memory usage of `self.orderings` using `ScalarValue::size_of_vec` is quite costly. + // Therefore, we cache it and compute `size_of` only after each update + // to avoid calling `ScalarValue::size_of_vec` by Self.size. + size_of_orderings: usize, + + // =========== option ============ + + // Stores the applicable ordering requirement. + ordering_req: LexOrdering, + // derived from `ordering_req`. + sort_options: Vec<SortOptions>, + // Stores whether incoming data already satisfies the ordering requirement. + input_requirement_satisfied: bool, + // Ignore null values. + ignore_nulls: bool, + /// The output type + data_type: DataType, + default_orderings: Vec<ScalarValue>, +} + +impl<T> FirstPrimitiveGroupsAccumulator<T> +where + T: ArrowPrimitiveType + Send, +{ + fn try_new( + ordering_req: LexOrdering, + ignore_nulls: bool, + data_type: &DataType, + ordering_dtypes: &[DataType], + ) -> Result<Self> { + let requirement_satisfied = ordering_req.is_empty(); + + let default_orderings = ordering_dtypes + .iter() + .map(ScalarValue::try_from) + .collect::<Result<Vec<_>>>()?; + + let sort_options = get_sort_options(ordering_req.as_ref()); + + Ok(Self { + null_builder: BooleanBufferBuilder::new(0), + ordering_req, + sort_options, + input_requirement_satisfied: requirement_satisfied, + ignore_nulls, + default_orderings, + data_type: data_type.clone(), + vals: Vec::new(), + orderings: Vec::new(), + is_sets: BooleanBufferBuilder::new(0), + size_of_orderings: 0, + }) + } + + fn need_update(&self, group_idx: usize) -> bool { + if !self.is_sets.get_bit(group_idx) { + return true; + } + + if self.ignore_nulls && !self.null_builder.get_bit(group_idx) { + return true; + } + + !self.input_requirement_satisfied + } + + fn should_update_state( + &self, + group_idx: usize, + new_ordering_values: &[ScalarValue], + ) -> Result<bool> { + if !self.is_sets.get_bit(group_idx) { + return Ok(true); + } + + assert!(new_ordering_values.len() == self.ordering_req.len()); + let current_ordering = &self.orderings[group_idx]; + compare_rows(current_ordering, new_ordering_values, &self.sort_options) + .map(|x| x.is_gt()) + } + + fn take_orderings(&mut self, emit_to: EmitTo) -> Vec<Vec<ScalarValue>> { + let result = emit_to.take_needed(&mut self.orderings); + + match emit_to { + EmitTo::All => self.size_of_orderings = 0, + EmitTo::First(_) => { + self.size_of_orderings -= + result.iter().map(ScalarValue::size_of_vec).sum::<usize>() + } + } + + result + } + + fn take_need( + bool_buf_builder: &mut BooleanBufferBuilder, + emit_to: EmitTo, + ) -> BooleanBuffer { + let bool_buf = bool_buf_builder.finish(); + match emit_to { + EmitTo::All => bool_buf, + EmitTo::First(n) => { + // split off the first N values in seen_values + // + // TODO make this more efficient rather than two + // copies and bitwise manipulation + let first_n: BooleanBuffer = bool_buf.iter().take(n).collect(); + // reset the existing buffer + for b in bool_buf.iter().skip(n) { + bool_buf_builder.append(b); + } + first_n + } + } + } + + fn resize_states(&mut self, new_size: usize) { + self.vals.resize(new_size, T::default_value()); + + self.null_builder.resize(new_size); + + if self.orderings.len() < new_size { + let current_len = self.orderings.len(); + + self.orderings + .resize(new_size, self.default_orderings.clone()); + + self.size_of_orderings += (new_size - current_len) + * ScalarValue::size_of_vec( + // Note: In some cases (such as in the unit test below) + // ScalarValue::size_of_vec(&self.default_orderings) != ScalarValue::size_of_vec(&self.default_orderings.clone()) + // This may be caused by the different vec.capacity() values? + self.orderings.last().unwrap(), + ); + } + + self.is_sets.resize(new_size); + } + + fn update_state( + &mut self, + group_idx: usize, + orderings: &[ScalarValue], + new_val: T::Native, + is_null: bool, + ) { + self.vals[group_idx] = new_val; + self.is_sets.set_bit(group_idx, true); + + self.null_builder.set_bit(group_idx, !is_null); + + assert!(orderings.len() == self.ordering_req.len()); + let old_size = ScalarValue::size_of_vec(&self.orderings[group_idx]); + self.orderings[group_idx].clear(); + self.orderings[group_idx].extend_from_slice(orderings); + let new_size = ScalarValue::size_of_vec(&self.orderings[group_idx]); + self.size_of_orderings = self.size_of_orderings - old_size + new_size; + } + + // should be used in test only + #[cfg(test)] + fn compute_size_of_orderings(&self) -> usize { + self.orderings + .iter() + .map(ScalarValue::size_of_vec) + .sum::<usize>() + } + + /// Returns a hashmap where each group (identified by `group_indices`) is mapped to + /// the index of its minimum value in `orderings`, based on lexicographical comparison. + /// The function filters values using `opt_filter` and `is_set_arr` + fn get_filtered_min_of_each_group( + &self, + orderings: &[ArrayRef], + group_indices: &[usize], + opt_filter: Option<&BooleanArray>, + vals: &PrimitiveArray<T>, + is_set_arr: Option<&BooleanArray>, + ) -> Result<HashMap<usize, usize>> { + let mut result = HashMap::with_capacity(orderings.len()); // group_idx -> idx_in_orderings + + let comparator = { + assert_eq!(orderings.len(), self.ordering_req.len()); + let sort_columns = orderings + .iter() + .zip(self.ordering_req.iter()) + .map(|(array, req)| SortColumn { + values: Arc::clone(array), + options: Some(req.options), + }) + .collect::<Vec<_>>(); + + LexicographicalComparator::try_new(&sort_columns)? + }; + + for (idx_in_val, group_idx) in group_indices.iter().enumerate() { + let group_idx = *group_idx; + + let passed_filter = opt_filter.is_none_or(|x| x.value(idx_in_val)); + + let is_set = is_set_arr.is_none_or(|x| x.value(idx_in_val)); + + if !passed_filter || !is_set { + continue; + } + + if !self.need_update(group_idx) { + continue; + } + + if self.ignore_nulls && vals.is_null(idx_in_val) { + continue; + } + + if !result.contains_key(&group_idx) + || comparator + .compare(*result.get(&group_idx).unwrap(), idx_in_val) + .is_gt() + { + result.insert(group_idx, idx_in_val); + } + } + + Ok(result) + } + + fn take_vals_and_null_buf(&mut self, emit_to: EmitTo) -> ArrayRef { + let r = emit_to.take_needed(&mut self.vals); + + let null_buf = NullBuffer::new(Self::take_need(&mut self.null_builder, emit_to)); + + let values = PrimitiveArray::<T>::new(r.into(), Some(null_buf)) // no copy + .with_data_type(self.data_type.clone()); + Arc::new(values) + } +} + +impl<T> GroupsAccumulator for FirstPrimitiveGroupsAccumulator<T> +where + T: ArrowPrimitiveType + Send, +{ + fn update_batch( + &mut self, + values_with_orderings: &[ArrayRef], + group_indices: &[usize], + opt_filter: Option<&BooleanArray>, + total_num_groups: usize, + ) -> Result<()> { + self.resize_states(total_num_groups); + + let vals = values_with_orderings[0].as_primitive::<T>(); + + let mut ordering_buf = Vec::with_capacity(self.ordering_req.len()); + + // The overhead of calling `extract_row_at_idx_to_buf` is somewhat high, so we need to minimize its calls as much as possible. + for (group_idx, idx) in self + .get_filtered_min_of_each_group( + &values_with_orderings[1..], + group_indices, + opt_filter, + vals, + None, + )? + .into_iter() + { + extract_row_at_idx_to_buf( + &values_with_orderings[1..], + idx, + &mut ordering_buf, + )?; + + if self.should_update_state(group_idx, &ordering_buf)? { + self.update_state( + group_idx, + &ordering_buf, + vals.value(idx), + vals.is_null(idx), + ); + } + } + + Ok(()) + } + + fn evaluate(&mut self, emit_to: EmitTo) -> Result<ArrayRef> { + self.take_orderings(emit_to); + Self::take_need(&mut self.is_sets, emit_to); + + Ok(self.take_vals_and_null_buf(emit_to)) + } + + fn state(&mut self, emit_to: EmitTo) -> Result<Vec<ArrayRef>> { + let mut result = Vec::with_capacity(self.orderings.len() + 2); + + result.push(self.take_vals_and_null_buf(emit_to)); + + let orderings = self.take_orderings(emit_to); + + let ordering_cols = { + let mut ordering_cols = Vec::with_capacity(self.ordering_req.len()); + for _ in 0..self.ordering_req.len() { + ordering_cols.push(Vec::with_capacity(self.orderings.len())); + } + for row in orderings.into_iter() { + assert_eq!(row.len(), self.ordering_req.len()); + for (col_idx, ordering) in row.into_iter().enumerate() { + ordering_cols[col_idx].push(ordering); + } + } + + ordering_cols + }; + for ordering_col in ordering_cols { + result.push(ScalarValue::iter_to_array(ordering_col)?); + } + + let is_sets = Self::take_need(&mut self.is_sets, emit_to); + result.push(Arc::new(BooleanArray::new(is_sets, None))); + + Ok(result) + } + + fn merge_batch( + &mut self, + values: &[ArrayRef], + group_indices: &[usize], + opt_filter: Option<&BooleanArray>, + total_num_groups: usize, + ) -> Result<()> { + self.resize_states(total_num_groups); + + let mut ordering_buf = Vec::with_capacity(self.ordering_req.len()); + + let (is_set_arr, val_and_orderings) = match values.split_last() { + Some(result) => result, + None => return internal_err!("Empty row in FISRT_VALUE"), + }; + + let is_set_arr = as_boolean_array(is_set_arr)?; + + let vals = values[0].as_primitive::<T>(); + // The overhead of calling `extract_row_at_idx_to_buf` is somewhat high, so we need to minimize its calls as much as possible. + let groups = self.get_filtered_min_of_each_group( Review Comment: > so input groups should be unique No. When there are many unique values, the first-phase aggregation may use `convert_to_state` to skip aggregation by converting the input batch into an intermediate aggregate state and passing it to the second-phase aggregation.([skip agg](https://github.com/apache/datafusion/blob/6c5c5c15e4551e600f59c21cb6d5a7d83354bfb4/datafusion/physical-plan/src/aggregates/row_hash.rs#L746) [convert_to_state](https://github.com/apache/datafusion/blob/6c5c5c15e4551e600f59c21cb6d5a7d83354bfb4/datafusion/expr-common/src/groups_accumulator.rs#L198-L233)). In this case `group_indices` contain duplicate group_id. We can verfiy this by adding `assert_eq!(groups.len(), group_indices.len());` to line 663 and run fuzz test. The fuzz test fails with the following error: ``` thread 'tokio-runtime-worker' panicked at datafusion/functions-aggregate/src/first_last.rs:663:9: assertion `left == right` failed left: 26 right: 32 ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org