LiaCastaneda commented on code in PR #21589:
URL: https://github.com/apache/datafusion/pull/21589#discussion_r3092557754


##########
datafusion/physical-plan/profile.json.gz:
##########


Review Comment:
   We can remove this profiling artifact from the PR. If this is a Samply 
profile you can link or include a screenshot in the description or as a comment.



##########
datafusion/physical-plan/Cargo.toml:
##########
@@ -106,3 +106,11 @@ required-features = ["test_utils"]
 harness = false
 name = "aggregate_vectorized"
 required-features = ["test_utils"]
+
+[[bench]]
+name = "single_column_aggr"
+harness = false
+
+[profile.profiling]
+inherits = "release"
+debug = true

Review Comment:
   we can also remove this since this is from your profiling setup



##########
datafusion/physical-plan/src/aggregates/row_hash.rs:
##########
@@ -1702,3 +1700,91 @@ mod tests {
         Ok(())
     }
 }
+
+#[cfg(test)]
+mod dictionary_aggregation {

Review Comment:
   Should we move these tests to the test block above?



##########
datafusion/physical-plan/src/aggregates/row_hash.rs:
##########
@@ -1702,3 +1700,91 @@ mod tests {
         Ok(())
     }
 }
+
+#[cfg(test)]
+mod dictionary_aggregation {
+    use super::*;
+    use crate::aggregates::{ArrayRef, DataType, Field, RecordBatch, Schema};
+    use crate::expressions::col;
+    use crate::test::TestMemoryExec;
+    use arrow::datatypes::UInt8Type;
+    use datafusion_functions_aggregate::count::count_udaf;
+    use datafusion_physical_expr::aggregate::AggregateExprBuilder;
+
+    /// Equivalent SQL:
+    /// SELECT region, COUNT(*)
+    /// FROM events
+    /// GROUP BY region
+    ///
+    /// Smoke test to verify that aggregation over a dictionary-encoded
+    /// GROUP BY column produces output without panicking or erroring.
+    /// region is a low cardinality dictionary-encoded string column
+    /// with 3 distinct values across 6 rows, mirroring a realistic
+    /// events table where region is always present.
+    #[tokio::test]
+    async fn test_count_group_by_dictionary_column() -> Result<()> {
+        // dictionary encoded region column
+        // 3 distinct values across 6 rows
+        let keys = UInt8Array::from(vec![0, 1, 0, 2, 1, 0]);
+        let values = StringArray::from(vec!["us-east", "us-west", 
"eu-central"]);
+        let region_col: ArrayRef = Arc::new(
+            DictionaryArray::<UInt8Type>::try_new(keys, 
Arc::new(values)).unwrap(),
+        );
+
+        // event_id column to count
+        let event_id_col: ArrayRef =
+            Arc::new(Int64Array::from(vec![1001, 1002, 1003, 1004, 1005, 
1006]));
+
+        let schema = Arc::new(Schema::new(vec![
+            Field::new(
+                "region",
+                DataType::Dictionary(Box::new(DataType::UInt8), 
Box::new(DataType::Utf8)),
+                false,
+            ),
+            Field::new("event_id", DataType::Int64, false),
+        ]));
+
+        let batch =
+            RecordBatch::try_new(Arc::clone(&schema), vec![region_col, 
event_id_col])?;
+
+        let exec = Arc::new(TestMemoryExec::try_new(
+            &[vec![batch]],
+            Arc::clone(&schema),
+            None,
+        )?);
+
+        let aggregate_exec = AggregateExec::try_new(
+            AggregateMode::Partial,
+            PhysicalGroupBy::new_single(vec![(
+                col("region", &schema)?,
+                "region".to_string(),
+            )]),
+            vec![Arc::new(
+                AggregateExprBuilder::new(count_udaf(), vec![col("event_id", 
&schema)?])
+                    .schema(Arc::clone(&schema))
+                    .alias("count")
+                    .build()?,
+            )],
+            vec![None],
+            exec,
+            Arc::clone(&schema),
+        )?;
+
+        let task_ctx = Arc::new(TaskContext::default());
+        let mut stream = GroupedHashAggregateStream::new(&aggregate_exec, 
&task_ctx, 0)?;
+
+        let mut batches = vec![];
+        while let Some(result) = stream.next().await {
+            batches.push(result?);
+        }
+
+        // verify we got output
+        assert!(!batches.is_empty());
+        // verify we got 3 groups - one per distinct region
+        let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
+        assert_eq!(total_rows, 3);
+        dbg!("record batches: {batches:#?}");

Review Comment:
   We should probably remove this debug print and instead assert on the actual 
output values directly.



##########
datafusion/physical-plan/src/aggregates/group_values/single_group_by/mod.rs:
##########
@@ -20,4 +20,5 @@
 pub(crate) mod boolean;
 pub(crate) mod bytes;
 pub(crate) mod bytes_view;
+pub mod dictionary;

Review Comment:
   why not pub(crate) ? otherwise we leak  `GroupValuesDictionary` in the 
public DF API



##########
datafusion/physical-plan/src/aggregates/group_values/mod.rs:
##########
@@ -207,6 +258,19 @@ pub fn new_group_values(
             Ok(Box::new(GroupValuesColumn::<true>::try_new(schema)?))
         }
     } else {
+        // TODO:  add specialized implementation for dictionary encoding 
columns for 2+ group by columns case
         Ok(Box::new(GroupValuesRows::try_new(schema)?))
     }
 }
+
+fn supported_single_dictionary_value(t: &DataType) -> bool {
+    matches!(
+        t,
+        DataType::Utf8
+            | DataType::LargeUtf8
+            | DataType::Binary
+            | DataType::LargeBinary
+            | DataType::Utf8View
+            | DataType::BinaryView
+    )
+}

Review Comment:
   These are the most common types, but why not support other types? Other 
types would fall under the slow path no?



##########
datafusion/physical-plan/src/aggregates/group_values/mod.rs:
##########
@@ -196,6 +197,56 @@ pub fn new_group_values(
             DataType::Boolean => {
                 return Ok(Box::new(GroupValuesBoolean::new()));
             }
+            DataType::Dictionary(key_type, value_type) => {
+                if supported_single_dictionary_value(value_type) {
+                    return match key_type.as_ref() {
+                        // TODO: turn this into a macro

Review Comment:
   looks like there is an existing arrow macro for this: `downcast_integer!`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to