This is an automated email from the ASF dual-hosted git repository.
dheres pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git
The following commit(s) were added to refs/heads/master by this push:
new 610db7a5 Add DistinctCount support (#384)
610db7a5 is described below
commit 610db7a583d05129e96de29bb0e55ea70d20289c
Author: r.4ntix <[email protected]>
AuthorDate: Wed Oct 19 14:26:05 2022 +0800
Add DistinctCount support (#384)
* Add DistinctCount support
* Add roundtrip_distinct_count test
---
ballista/core/proto/ballista.proto | 1 +
ballista/core/src/serde/physical_plan/mod.rs | 29 +++++++++++++++++++++--
ballista/core/src/serde/physical_plan/to_proto.rs | 8 ++++++-
3 files changed, 35 insertions(+), 3 deletions(-)
diff --git a/ballista/core/proto/ballista.proto
b/ballista/core/proto/ballista.proto
index 1c5e8bcf..d473053b 100644
--- a/ballista/core/proto/ballista.proto
+++ b/ballista/core/proto/ballista.proto
@@ -133,6 +133,7 @@ message PhysicalScalarUdfNode {
message PhysicalAggregateExprNode {
datafusion.AggregateFunction aggr_function = 1;
repeated PhysicalExprNode expr = 2;
+ bool distinct = 3;
}
message PhysicalWindowExprNode {
diff --git a/ballista/core/src/serde/physical_plan/mod.rs
b/ballista/core/src/serde/physical_plan/mod.rs
index d2c6b089..d081422a 100644
--- a/ballista/core/src/serde/physical_plan/mod.rs
+++ b/ballista/core/src/serde/physical_plan/mod.rs
@@ -424,7 +424,7 @@ impl AsExecutionPlan for PhysicalPlanNode {
Ok(create_aggregate_expr(
&aggr_function.into(),
- false,
+ agg_node.distinct,
input_phy_expr.as_slice(),
&physical_schema,
name.to_string(),
@@ -1310,7 +1310,7 @@ mod roundtrip_tests {
aggregates::{AggregateExec, AggregateMode},
empty::EmptyExec,
expressions::{binary, col, lit, InListExpr, NotExpr},
- expressions::{Avg, Column, PhysicalSortExpr},
+ expressions::{Avg, Column, DistinctCount, PhysicalSortExpr},
file_format::{FileScanConfig, ParquetExec},
filter::FilterExec,
hash_join::{HashJoinExec, PartitionMode},
@@ -1662,4 +1662,29 @@ mod roundtrip_tests {
roundtrip_test_with_context(Arc::new(project), ctx)
}
+
+ #[test]
+ fn roundtrip_distinct_count() -> Result<()> {
+ let field_a = Field::new("a", DataType::Int64, false);
+ let field_b = Field::new("b", DataType::Int64, false);
+ let schema = Arc::new(Schema::new(vec![field_a, field_b]));
+
+ let aggregates: Vec<Arc<dyn AggregateExpr>> =
vec![Arc::new(DistinctCount::new(
+ vec![DataType::Int64],
+ vec![col("b", &schema)?],
+ "COUNT(DISTINCT b)".to_string(),
+ DataType::Int64,
+ ))];
+
+ let groups: Vec<(Arc<dyn PhysicalExpr>, String)> =
+ vec![(col("a", &schema)?, "unused".to_string())];
+
+ roundtrip_test(Arc::new(AggregateExec::try_new(
+ AggregateMode::Final,
+ PhysicalGroupBy::new_single(groups),
+ aggregates.clone(),
+ Arc::new(EmptyExec::new(false, schema.clone())),
+ schema,
+ )?))
+ }
}
diff --git a/ballista/core/src/serde/physical_plan/to_proto.rs
b/ballista/core/src/serde/physical_plan/to_proto.rs
index a6a7a694..fbf8e5e7 100644
--- a/ballista/core/src/serde/physical_plan/to_proto.rs
+++ b/ballista/core/src/serde/physical_plan/to_proto.rs
@@ -38,7 +38,7 @@ use datafusion::physical_plan::{
use datafusion::datasource::listing::{FileRange, PartitionedFile};
use datafusion::physical_plan::file_format::FileScanConfig;
-use datafusion::physical_plan::expressions::{Count, Literal};
+use datafusion::physical_plan::expressions::{Count, DistinctCount, Literal};
use datafusion::physical_plan::expressions::{Avg, BinaryExpr, Column, Max,
Min, Sum};
use datafusion::physical_plan::{AggregateExpr, PhysicalExpr};
@@ -55,12 +55,17 @@ impl TryInto<protobuf::PhysicalExprNode> for Arc<dyn
AggregateExpr> {
fn try_into(self) -> Result<protobuf::PhysicalExprNode, Self::Error> {
use datafusion::physical_plan::expressions;
use datafusion_proto::protobuf::AggregateFunction;
+
+ let mut distinct = false;
let aggr_function = if self.as_any().downcast_ref::<Avg>().is_some() {
Ok(AggregateFunction::Avg.into())
} else if self.as_any().downcast_ref::<Sum>().is_some() {
Ok(AggregateFunction::Sum.into())
} else if self.as_any().downcast_ref::<Count>().is_some() {
Ok(AggregateFunction::Count.into())
+ } else if self.as_any().downcast_ref::<DistinctCount>().is_some() {
+ distinct = true;
+ Ok(AggregateFunction::Count.into())
} else if self.as_any().downcast_ref::<Min>().is_some() {
Ok(AggregateFunction::Min.into())
} else if self.as_any().downcast_ref::<Max>().is_some() {
@@ -153,6 +158,7 @@ impl TryInto<protobuf::PhysicalExprNode> for Arc<dyn
AggregateExpr> {
protobuf::PhysicalAggregateExprNode {
aggr_function,
expr: expressions,
+ distinct,
},
)),
})