This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new d62f2621f6 feat(substrait): support order_by in aggregate functions
(#13114)
d62f2621f6 is described below
commit d62f2621f6335899dd095b70c2d969320386edaa
Author: Bruno Volpato <[email protected]>
AuthorDate: Tue Oct 29 07:33:11 2024 -0400
feat(substrait): support order_by in aggregate functions (#13114)
---
datafusion/substrait/src/logical_plan/consumer.rs | 17 +++-
.../tests/cases/roundtrip_logical_plan.rs | 16 ++-
.../aggregate_sorted_no_project.substrait.json | 113 +++++++++++++++++++++
3 files changed, 143 insertions(+), 3 deletions(-)
diff --git a/datafusion/substrait/src/logical_plan/consumer.rs
b/datafusion/substrait/src/logical_plan/consumer.rs
index 99e7990df6..e0bb3b4e4f 100644
--- a/datafusion/substrait/src/logical_plan/consumer.rs
+++ b/datafusion/substrait/src/logical_plan/consumer.rs
@@ -714,14 +714,27 @@ pub async fn from_substrait_rel(
}
_ => false,
};
+ let order_by = if !f.sorts.is_empty() {
+ Some(
+ from_substrait_sorts(
+ ctx,
+ &f.sorts,
+ input.schema(),
+ extensions,
+ )
+ .await?,
+ )
+ } else {
+ None
+ };
+
from_substrait_agg_func(
ctx,
f,
input.schema(),
extensions,
filter,
- // TODO: Add parsing of order_by also
- None,
+ order_by,
distinct,
)
.await
diff --git a/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs
b/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs
index 1f654f1d3c..8108b9ad67 100644
--- a/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs
+++ b/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs
@@ -685,6 +685,19 @@ async fn aggregate_wo_projection_consume() -> Result<()> {
.await
}
+#[tokio::test]
+async fn aggregate_wo_projection_sorted_consume() -> Result<()> {
+ let proto_plan =
+
read_json("tests/testdata/test_plans/aggregate_sorted_no_project.substrait.json");
+
+ assert_expected_plan_substrait(
+ proto_plan,
+ "Aggregate: groupBy=[[data.a]], aggr=[[count(data.a) ORDER BY [data.a
DESC NULLS FIRST] AS countA]]\
+ \n TableScan: data projection=[a]",
+ )
+ .await
+}
+
#[tokio::test]
async fn simple_intersect_consume() -> Result<()> {
let proto_plan =
read_json("tests/testdata/test_plans/intersect.substrait.json");
@@ -1025,8 +1038,9 @@ async fn roundtrip_aggregate_udf() -> Result<()> {
let ctx = create_context().await?;
ctx.register_udaf(dummy_agg);
+ roundtrip_with_ctx("select dummy_agg(a) from data", ctx.clone()).await?;
+ roundtrip_with_ctx("select dummy_agg(a order by a) from data",
ctx.clone()).await?;
- roundtrip_with_ctx("select dummy_agg(a) from data", ctx).await?;
Ok(())
}
diff --git
a/datafusion/substrait/tests/testdata/test_plans/aggregate_sorted_no_project.substrait.json
b/datafusion/substrait/tests/testdata/test_plans/aggregate_sorted_no_project.substrait.json
new file mode 100644
index 0000000000..d5170223cd
--- /dev/null
+++
b/datafusion/substrait/tests/testdata/test_plans/aggregate_sorted_no_project.substrait.json
@@ -0,0 +1,113 @@
+{
+ "extensionUris": [
+ {
+ "uri":
"https://github.com/substrait-io/substrait/blob/main/extensions/functions_aggregate_generic.yaml"
+ }
+ ],
+ "extensions": [
+ {
+ "extensionFunction": {
+ "functionAnchor": 185,
+ "name": "count:any"
+ }
+ }
+ ],
+ "relations": [
+ {
+ "root": {
+ "input": {
+ "aggregate": {
+ "input": {
+ "read": {
+ "common": {
+ "direct": {}
+ },
+ "baseSchema": {
+ "names": [
+ "a"
+ ],
+ "struct": {
+ "types": [
+ {
+ "i64": {
+ "nullability": "NULLABILITY_NULLABLE"
+ }
+ }
+ ],
+ "nullability": "NULLABILITY_NULLABLE"
+ }
+ },
+ "namedTable": {
+ "names": [
+ "data"
+ ]
+ }
+ }
+ },
+ "groupings": [
+ {
+ "groupingExpressions": [
+ {
+ "selection": {
+ "directReference": {
+ "structField": {}
+ },
+ "rootReference": {}
+ }
+ }
+ ]
+ }
+ ],
+ "measures": [
+ {
+ "measure": {
+ "functionReference": 185,
+ "phase": "AGGREGATION_PHASE_INITIAL_TO_RESULT",
+ "outputType": {
+ "i64": {}
+ },
+ "arguments": [
+ {
+ "value": {
+ "selection": {
+ "directReference": {
+ "structField": {}
+ },
+ "rootReference": {}
+ }
+ }
+ }
+ ],
+ "sorts": [
+ {
+ "expr": {
+ "selection": {
+ "directReference": {
+ "structField": {
+ "field": 0
+ }
+ },
+ "rootReference": {
+ }
+ }
+ },
+ "direction": "SORT_DIRECTION_DESC_NULLS_FIRST"
+ }
+ ]
+ }
+ }
+ ]
+ }
+ },
+ "names": [
+ "a",
+ "countA"
+ ]
+ }
+ }
+ ],
+ "version": {
+ "minorNumber": 54,
+ "producer": "manual"
+ }
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]