alamb commented on code in PR #19285:
URL: https://github.com/apache/datafusion/pull/19285#discussion_r2615149710
##########
datafusion/physical-plan/src/aggregates/mod.rs:
##########
@@ -73,6 +75,39 @@ mod row_hash;
mod topk;
mod topk_stream;
+/// Returns true if TopK aggregation data structures support the provided key
and value types.
+///
+/// This function checks whether both the key type (used for grouping) and
value type
+/// (used in min/max aggregation) can be handled by the TopK aggregation heap
and hash table.
+/// Supported types include Arrow primitives (integers, floats, decimals,
intervals) and
+/// UTF-8 strings (`Utf8`, `LargeUtf8`, `Utf8View`).
+///
+/// # Example
Review Comment:
I don't think this example adds much (it is pretty clear how to use this
function).
##########
datafusion/physical-plan/src/aggregates/topk/heap.rs:
##########
@@ -15,10 +15,22 @@
// specific language governing permissions and limitations
// under the License.
-//! A custom binary heap implementation for performant top K aggregation
+//! A custom binary heap implementation for performant top K aggregation.
+//!
+//! This module uses the **Strategy pattern** with runtime polymorphism: the
`new_heap`
Review Comment:
I am not familiar with the strategy pattern -- perhaps you can add a
reference to a definiton if that is important to understand thus function.
I think you could probably simplify this paragraph to a single sentence:
```rust
//! the `new_heap` factory function selects an appropriate heap
implementation
//! based on the Arrow data type.
```
##########
datafusion/physical-plan/src/aggregates/topk/heap.rs:
##########
@@ -15,10 +15,22 @@
// specific language governing permissions and limitations
// under the License.
-//! A custom binary heap implementation for performant top K aggregation
+//! A custom binary heap implementation for performant top K aggregation.
+//!
+//! This module uses the **Strategy pattern** with runtime polymorphism: the
`new_heap`
+//! factory function selects an appropriate heap implementation
(`PrimitiveHeap` or `StringHeap`)
+//! based on the Arrow data type. All implementations conform to the
`ArrowHeap` trait,
+//! enabling dynamic dispatch while keeping the interface uniform.
+//!
+//! Supported value types include Arrow primitives (integers, floats,
decimals, intervals)
+//! and UTF-8 strings (`Utf8`, `LargeUtf8`, `Utf8View`) using lexicographic
ordering.
+//!
+//! Note: String values are owned/cloned on insertion. For very high
cardinality or large
Review Comment:
I am not sure that this is a useful comment (strings always have overhead
compared to primitive types 🤔 )
##########
datafusion/physical-plan/src/aggregates/topk/heap.rs:
##########
@@ -161,6 +180,108 @@ where
}
}
+/// An implementation of `ArrowHeap` that deals with string values.
+///
+/// Supports all three UTF-8 string types: `Utf8`, `LargeUtf8`, and `Utf8View`.
+/// String values are compared lexicographically. Null values are not
explicitly handled
+/// and should not appear in the input; the aggregation layer ensures nulls
are managed
+/// appropriately before calling this heap.
+pub struct StringHeap {
+ batch: ArrayRef,
+ heap: TopKHeap<String>,
+ desc: bool,
+ data_type: DataType,
+}
+
+impl StringHeap {
+ pub fn new(limit: usize, desc: bool, data_type: DataType) -> Self {
+ let batch: ArrayRef = Arc::new(StringArray::from(Vec::<&str>::new()));
+ Self {
+ batch,
+ heap: TopKHeap::new(limit, desc),
+ desc,
+ data_type,
+ }
+ }
+
+ /// Extracts a string value from the current batch at the given row index.
+ ///
+ /// Panics if the row index is out of bounds or if the data type is not
one of
+ /// the supported UTF-8 string types.
+ ///
+ /// Note: Null values should not appear in the input; the aggregation layer
+ /// ensures nulls are filtered before reaching this code.
+ fn value(&self, row_idx: usize) -> String {
+ extract_string_value(&self.batch, &self.data_type, row_idx)
+ }
+}
+
+/// Helper to extract a string value from an ArrayRef at a given index.
+///
+/// Supports `Utf8`, `LargeUtf8`, and `Utf8View` data types. This helper
reduces
+/// duplication between `StringHeap::value()` and `StringHeap::drain()`.
+///
+/// # Panics
+/// Panics if the index is out of bounds or if the data type is unsupported.
+fn extract_string_value(batch: &ArrayRef, data_type: &DataType, idx: usize) ->
String {
Review Comment:
This will allocate a string for *every* row -- which I suspect will perform
quite poorly
I think you could use this trait or something similar to return `&str` at
least, and only clone the String when you need to put it into the heap
https://docs.rs/arrow/latest/arrow/array/trait.StringArrayType.html
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]