vegarsti commented on code in PR #17837:
URL: https://github.com/apache/datafusion/pull/17837#discussion_r2390963820
##########
datafusion/functions-aggregate/src/string_agg.rs:
##########
@@ -269,6 +301,103 @@ fn filter_index<T: Clone>(values: &[T], index: usize) ->
Vec<T> {
.collect::<Vec<_>>()
}
+/// StringAgg accumulator for the simple case (no order or distinct specified)
+/// This accumulator is more efficient than `StringAggAccumulator`
+#[derive(Debug)]
+pub(crate) struct SimpleStringAggAccumulator {
+ delimiter: String,
+ // Updating during `update_batch()`. e.g. "foo,bar"
Review Comment:
Use doc comment?
```suggestion
/// Updating during `update_batch()`. e.g. "foo,bar"
```
##########
datafusion/functions-aggregate/src/string_agg.rs:
##########
@@ -269,6 +301,103 @@ fn filter_index<T: Clone>(values: &[T], index: usize) ->
Vec<T> {
.collect::<Vec<_>>()
}
+/// StringAgg accumulator for the simple case (no order or distinct specified)
+/// This accumulator is more efficient than `StringAggAccumulator`
+#[derive(Debug)]
+pub(crate) struct SimpleStringAggAccumulator {
+ delimiter: String,
+ // Updating during `update_batch()`. e.g. "foo,bar"
+ in_progress_string: String,
+ has_value: bool,
+}
+
+impl SimpleStringAggAccumulator {
+ pub fn new(delimiter: &str) -> Self {
+ Self {
+ delimiter: delimiter.to_string(),
+ in_progress_string: "".to_string(),
+ has_value: false,
+ }
+ }
+
+ #[inline]
+ fn append_strings<'a, I>(&mut self, iter: I)
+ where
+ I: Iterator<Item = Option<&'a str>>,
+ {
+ for value in iter.flatten() {
+ if self.has_value {
+ self.in_progress_string.push_str(&self.delimiter);
+ }
+
+ self.in_progress_string.push_str(value);
+ self.has_value = true;
+ }
+ }
+}
+
+impl Accumulator for SimpleStringAggAccumulator {
+ fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
+ let string_arr = values.first().ok_or_else(|| {
Review Comment:
Is there only one element in `values`? That was surprising
##########
datafusion/functions-aggregate/src/string_agg.rs:
##########
@@ -269,6 +301,103 @@ fn filter_index<T: Clone>(values: &[T], index: usize) ->
Vec<T> {
.collect::<Vec<_>>()
}
+/// StringAgg accumulator for the simple case (no order or distinct specified)
+/// This accumulator is more efficient than `StringAggAccumulator`
+#[derive(Debug)]
+pub(crate) struct SimpleStringAggAccumulator {
+ delimiter: String,
+ // Updating during `update_batch()`. e.g. "foo,bar"
+ in_progress_string: String,
+ has_value: bool,
+}
+
+impl SimpleStringAggAccumulator {
+ pub fn new(delimiter: &str) -> Self {
+ Self {
+ delimiter: delimiter.to_string(),
+ in_progress_string: "".to_string(),
+ has_value: false,
+ }
+ }
+
+ #[inline]
+ fn append_strings<'a, I>(&mut self, iter: I)
+ where
+ I: Iterator<Item = Option<&'a str>>,
+ {
+ for value in iter.flatten() {
+ if self.has_value {
+ self.in_progress_string.push_str(&self.delimiter);
+ }
+
+ self.in_progress_string.push_str(value);
+ self.has_value = true;
+ }
+ }
+}
+
+impl Accumulator for SimpleStringAggAccumulator {
+ fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
+ let string_arr = values.first().ok_or_else(|| {
+ internal_datafusion_err!(
+ "Planner should ensure its first arg is Utf8/Utf8View"
+ )
+ })?;
+
+ match string_arr.data_type() {
+ DataType::Utf8 => {
+ let array = as_string_array(string_arr)?;
+ self.append_strings(array.iter());
+ }
+ DataType::LargeUtf8 => {
+ let array = as_generic_string_array::<i64>(string_arr)?;
+ self.append_strings(array.iter());
+ }
+ DataType::Utf8View => {
+ let array = as_string_view_array(string_arr)?;
+ self.append_strings(array.iter());
+ }
+ other => {
+ return internal_err!(
+ "Planner should ensure string_agg first argument is
Utf8-like, found {other}"
+ );
+ }
+ }
+
+ Ok(())
+ }
+
+ fn evaluate(&mut self) -> Result<ScalarValue> {
+ let result = if self.has_value {
+ ScalarValue::LargeUtf8(Some(std::mem::take(&mut
self.in_progress_string)))
+ } else {
+ ScalarValue::LargeUtf8(None)
+ };
+
+ self.has_value = false;
+ Ok(result)
+ }
+
+ fn size(&self) -> usize {
+ size_of_val(self) + self.delimiter.capacity() +
self.in_progress_string.capacity()
+ }
+
+ fn state(&mut self) -> Result<Vec<ScalarValue>> {
Review Comment:
Just asking to understand the Accumulator trait: I see that this and
evaluate er the same except for what they return - what is the difference
between the two and when they are used, do you know?
##########
datafusion/functions-aggregate/src/string_agg.rs:
##########
@@ -269,6 +301,103 @@ fn filter_index<T: Clone>(values: &[T], index: usize) ->
Vec<T> {
.collect::<Vec<_>>()
}
+/// StringAgg accumulator for the simple case (no order or distinct specified)
+/// This accumulator is more efficient than `StringAggAccumulator`
+#[derive(Debug)]
+pub(crate) struct SimpleStringAggAccumulator {
+ delimiter: String,
+ // Updating during `update_batch()`. e.g. "foo,bar"
+ in_progress_string: String,
Review Comment:
Just thinking out loud about the name here: I think `acc` or `accumulated`
would also be conventional. But this name is fine!
##########
datafusion/functions-aggregate/src/string_agg.rs:
##########
@@ -269,6 +301,103 @@ fn filter_index<T: Clone>(values: &[T], index: usize) ->
Vec<T> {
.collect::<Vec<_>>()
}
+/// StringAgg accumulator for the simple case (no order or distinct specified)
+/// This accumulator is more efficient than `StringAggAccumulator`
Review Comment:
Maybe add a note about why?
```suggestion
/// This accumulator is more efficient than `StringAggAccumulator`
/// because it accumulates the string directly,
/// whereas `StringAggAccumulator` uses `ArrayAggAccumulator`.
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]