This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git


The following commit(s) were added to refs/heads/master by this push:
     new 610db7a5 Add DistinctCount support (#384)
610db7a5 is described below

commit 610db7a583d05129e96de29bb0e55ea70d20289c
Author: r.4ntix <[email protected]>
AuthorDate: Wed Oct 19 14:26:05 2022 +0800

    Add DistinctCount support (#384)
    
    * Add DistinctCount support
    
    * Add roundtrip_distinct_count test
---
 ballista/core/proto/ballista.proto                |  1 +
 ballista/core/src/serde/physical_plan/mod.rs      | 29 +++++++++++++++++++++--
 ballista/core/src/serde/physical_plan/to_proto.rs |  8 ++++++-
 3 files changed, 35 insertions(+), 3 deletions(-)

diff --git a/ballista/core/proto/ballista.proto 
b/ballista/core/proto/ballista.proto
index 1c5e8bcf..d473053b 100644
--- a/ballista/core/proto/ballista.proto
+++ b/ballista/core/proto/ballista.proto
@@ -133,6 +133,7 @@ message PhysicalScalarUdfNode {
 message PhysicalAggregateExprNode {
   datafusion.AggregateFunction aggr_function = 1;
   repeated PhysicalExprNode expr = 2;
+  bool distinct = 3;
 }
 
 message PhysicalWindowExprNode {
diff --git a/ballista/core/src/serde/physical_plan/mod.rs 
b/ballista/core/src/serde/physical_plan/mod.rs
index d2c6b089..d081422a 100644
--- a/ballista/core/src/serde/physical_plan/mod.rs
+++ b/ballista/core/src/serde/physical_plan/mod.rs
@@ -424,7 +424,7 @@ impl AsExecutionPlan for PhysicalPlanNode {
 
                                 Ok(create_aggregate_expr(
                                     &aggr_function.into(),
-                                    false,
+                                    agg_node.distinct,
                                     input_phy_expr.as_slice(),
                                     &physical_schema,
                                     name.to_string(),
@@ -1310,7 +1310,7 @@ mod roundtrip_tests {
             aggregates::{AggregateExec, AggregateMode},
             empty::EmptyExec,
             expressions::{binary, col, lit, InListExpr, NotExpr},
-            expressions::{Avg, Column, PhysicalSortExpr},
+            expressions::{Avg, Column, DistinctCount, PhysicalSortExpr},
             file_format::{FileScanConfig, ParquetExec},
             filter::FilterExec,
             hash_join::{HashJoinExec, PartitionMode},
@@ -1662,4 +1662,29 @@ mod roundtrip_tests {
 
         roundtrip_test_with_context(Arc::new(project), ctx)
     }
+
+    #[test]
+    fn roundtrip_distinct_count() -> Result<()> {
+        let field_a = Field::new("a", DataType::Int64, false);
+        let field_b = Field::new("b", DataType::Int64, false);
+        let schema = Arc::new(Schema::new(vec![field_a, field_b]));
+
+        let aggregates: Vec<Arc<dyn AggregateExpr>> = 
vec![Arc::new(DistinctCount::new(
+            vec![DataType::Int64],
+            vec![col("b", &schema)?],
+            "COUNT(DISTINCT b)".to_string(),
+            DataType::Int64,
+        ))];
+
+        let groups: Vec<(Arc<dyn PhysicalExpr>, String)> =
+            vec![(col("a", &schema)?, "unused".to_string())];
+
+        roundtrip_test(Arc::new(AggregateExec::try_new(
+            AggregateMode::Final,
+            PhysicalGroupBy::new_single(groups),
+            aggregates.clone(),
+            Arc::new(EmptyExec::new(false, schema.clone())),
+            schema,
+        )?))
+    }
 }
diff --git a/ballista/core/src/serde/physical_plan/to_proto.rs 
b/ballista/core/src/serde/physical_plan/to_proto.rs
index a6a7a694..fbf8e5e7 100644
--- a/ballista/core/src/serde/physical_plan/to_proto.rs
+++ b/ballista/core/src/serde/physical_plan/to_proto.rs
@@ -38,7 +38,7 @@ use datafusion::physical_plan::{
 use datafusion::datasource::listing::{FileRange, PartitionedFile};
 use datafusion::physical_plan::file_format::FileScanConfig;
 
-use datafusion::physical_plan::expressions::{Count, Literal};
+use datafusion::physical_plan::expressions::{Count, DistinctCount, Literal};
 
 use datafusion::physical_plan::expressions::{Avg, BinaryExpr, Column, Max, 
Min, Sum};
 use datafusion::physical_plan::{AggregateExpr, PhysicalExpr};
@@ -55,12 +55,17 @@ impl TryInto<protobuf::PhysicalExprNode> for Arc<dyn 
AggregateExpr> {
     fn try_into(self) -> Result<protobuf::PhysicalExprNode, Self::Error> {
         use datafusion::physical_plan::expressions;
         use datafusion_proto::protobuf::AggregateFunction;
+
+        let mut distinct = false;
         let aggr_function = if self.as_any().downcast_ref::<Avg>().is_some() {
             Ok(AggregateFunction::Avg.into())
         } else if self.as_any().downcast_ref::<Sum>().is_some() {
             Ok(AggregateFunction::Sum.into())
         } else if self.as_any().downcast_ref::<Count>().is_some() {
             Ok(AggregateFunction::Count.into())
+        } else if self.as_any().downcast_ref::<DistinctCount>().is_some() {
+            distinct = true;
+            Ok(AggregateFunction::Count.into())
         } else if self.as_any().downcast_ref::<Min>().is_some() {
             Ok(AggregateFunction::Min.into())
         } else if self.as_any().downcast_ref::<Max>().is_some() {
@@ -153,6 +158,7 @@ impl TryInto<protobuf::PhysicalExprNode> for Arc<dyn 
AggregateExpr> {
                 protobuf::PhysicalAggregateExprNode {
                     aggr_function,
                     expr: expressions,
+                    distinct,
                 },
             )),
         })

Reply via email to