This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new d62f2621f6 feat(substrait): support order_by in aggregate functions 
(#13114)
d62f2621f6 is described below

commit d62f2621f6335899dd095b70c2d969320386edaa
Author: Bruno Volpato <[email protected]>
AuthorDate: Tue Oct 29 07:33:11 2024 -0400

    feat(substrait): support order_by in aggregate functions (#13114)
---
 datafusion/substrait/src/logical_plan/consumer.rs  |  17 +++-
 .../tests/cases/roundtrip_logical_plan.rs          |  16 ++-
 .../aggregate_sorted_no_project.substrait.json     | 113 +++++++++++++++++++++
 3 files changed, 143 insertions(+), 3 deletions(-)

diff --git a/datafusion/substrait/src/logical_plan/consumer.rs 
b/datafusion/substrait/src/logical_plan/consumer.rs
index 99e7990df6..e0bb3b4e4f 100644
--- a/datafusion/substrait/src/logical_plan/consumer.rs
+++ b/datafusion/substrait/src/logical_plan/consumer.rs
@@ -714,14 +714,27 @@ pub async fn from_substrait_rel(
                                 }
                                 _ => false,
                             };
+                            let order_by = if !f.sorts.is_empty() {
+                                Some(
+                                    from_substrait_sorts(
+                                        ctx,
+                                        &f.sorts,
+                                        input.schema(),
+                                        extensions,
+                                    )
+                                    .await?,
+                                )
+                            } else {
+                                None
+                            };
+
                             from_substrait_agg_func(
                                 ctx,
                                 f,
                                 input.schema(),
                                 extensions,
                                 filter,
-                                // TODO: Add parsing of order_by also
-                                None,
+                                order_by,
                                 distinct,
                             )
                             .await
diff --git a/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs 
b/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs
index 1f654f1d3c..8108b9ad67 100644
--- a/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs
+++ b/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs
@@ -685,6 +685,19 @@ async fn aggregate_wo_projection_consume() -> Result<()> {
     .await
 }
 
+#[tokio::test]
+async fn aggregate_wo_projection_sorted_consume() -> Result<()> {
+    let proto_plan =
+        
read_json("tests/testdata/test_plans/aggregate_sorted_no_project.substrait.json");
+
+    assert_expected_plan_substrait(
+        proto_plan,
+        "Aggregate: groupBy=[[data.a]], aggr=[[count(data.a) ORDER BY [data.a 
DESC NULLS FIRST] AS countA]]\
+        \n  TableScan: data projection=[a]",
+    )
+    .await
+}
+
 #[tokio::test]
 async fn simple_intersect_consume() -> Result<()> {
     let proto_plan = 
read_json("tests/testdata/test_plans/intersect.substrait.json");
@@ -1025,8 +1038,9 @@ async fn roundtrip_aggregate_udf() -> Result<()> {
 
     let ctx = create_context().await?;
     ctx.register_udaf(dummy_agg);
+    roundtrip_with_ctx("select dummy_agg(a) from data", ctx.clone()).await?;
+    roundtrip_with_ctx("select dummy_agg(a order by a) from data", 
ctx.clone()).await?;
 
-    roundtrip_with_ctx("select dummy_agg(a) from data", ctx).await?;
     Ok(())
 }
 
diff --git 
a/datafusion/substrait/tests/testdata/test_plans/aggregate_sorted_no_project.substrait.json
 
b/datafusion/substrait/tests/testdata/test_plans/aggregate_sorted_no_project.substrait.json
new file mode 100644
index 0000000000..d5170223cd
--- /dev/null
+++ 
b/datafusion/substrait/tests/testdata/test_plans/aggregate_sorted_no_project.substrait.json
@@ -0,0 +1,113 @@
+{
+  "extensionUris": [
+    {
+      "uri": 
"https://github.com/substrait-io/substrait/blob/main/extensions/functions_aggregate_generic.yaml";
+    }
+  ],
+  "extensions": [
+    {
+      "extensionFunction": {
+        "functionAnchor": 185,
+        "name": "count:any"
+      }
+    }
+  ],
+  "relations": [
+    {
+      "root": {
+        "input": {
+          "aggregate": {
+            "input": {
+              "read": {
+                "common": {
+                  "direct": {}
+                },
+                "baseSchema": {
+                  "names": [
+                    "a"
+                  ],
+                  "struct": {
+                    "types": [
+                      {
+                        "i64": {
+                          "nullability": "NULLABILITY_NULLABLE"
+                        }
+                      }
+                    ],
+                    "nullability": "NULLABILITY_NULLABLE"
+                  }
+                },
+                "namedTable": {
+                  "names": [
+                    "data"
+                  ]
+                }
+              }
+            },
+            "groupings": [
+              {
+                "groupingExpressions": [
+                  {
+                    "selection": {
+                      "directReference": {
+                        "structField": {}
+                      },
+                      "rootReference": {}
+                    }
+                  }
+                ]
+              }
+            ],
+            "measures": [
+              {
+                "measure": {
+                  "functionReference": 185,
+                  "phase": "AGGREGATION_PHASE_INITIAL_TO_RESULT",
+                  "outputType": {
+                    "i64": {}
+                  },
+                  "arguments": [
+                    {
+                      "value": {
+                        "selection": {
+                          "directReference": {
+                            "structField": {}
+                          },
+                          "rootReference": {}
+                        }
+                      }
+                    }
+                  ],
+                  "sorts": [
+                    {
+                      "expr": {
+                        "selection": {
+                          "directReference": {
+                            "structField": {
+                              "field": 0
+                            }
+                          },
+                          "rootReference": {
+                          }
+                        }
+                      },
+                      "direction": "SORT_DIRECTION_DESC_NULLS_FIRST"
+                    }
+                  ]
+                }
+              }
+            ]
+          }
+        },
+        "names": [
+          "a",
+          "countA"
+        ]
+      }
+    }
+  ],
+  "version": {
+    "minorNumber": 54,
+    "producer": "manual"
+  }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to