This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new f974e8861 try to use arrow::compute::sort in approx_percentile_cont
(#3219)
f974e8861 is described below
commit f974e886155e8cabdafec3da5361911a3d7262b9
Author: Yang Jiang <[email protected]>
AuthorDate: Mon Aug 22 22:44:48 2022 +0800
try to use arrow::compute::sort in approx_percentile_cont (#3219)
---
datafusion/core/benches/aggregate_query_sql.rs | 20 ++++++++++++++++++++
.../src/aggregate/approx_percentile_cont.rs | 7 ++++---
datafusion/physical-expr/src/aggregate/tdigest.rs | 6 +++++-
3 files changed, 29 insertions(+), 4 deletions(-)
diff --git a/datafusion/core/benches/aggregate_query_sql.rs
b/datafusion/core/benches/aggregate_query_sql.rs
index 8570f8170..3734cfbe3 100644
--- a/datafusion/core/benches/aggregate_query_sql.rs
+++ b/datafusion/core/benches/aggregate_query_sql.rs
@@ -143,6 +143,26 @@ fn criterion_benchmark(c: &mut Criterion) {
)
})
});
+
+ c.bench_function("aggregate_query_approx_percentile_cont_on_u64", |b| {
+ b.iter(|| {
+ query(
+ ctx.clone(),
+ "SELECT utf8, approx_percentile_cont(u64_wide, 0.5, 2500) \
+ FROM t GROUP BY utf8",
+ )
+ })
+ });
+
+ c.bench_function("aggregate_query_approx_percentile_cont_on_f32", |b| {
+ b.iter(|| {
+ query(
+ ctx.clone(),
+ "SELECT utf8, approx_percentile_cont(f32, 0.5, 2500) \
+ FROM t GROUP BY utf8",
+ )
+ })
+ });
}
criterion_group!(benches, criterion_benchmark);
diff --git a/datafusion/physical-expr/src/aggregate/approx_percentile_cont.rs
b/datafusion/physical-expr/src/aggregate/approx_percentile_cont.rs
index ee32b0a6a..fc5cc920e 100644
--- a/datafusion/physical-expr/src/aggregate/approx_percentile_cont.rs
+++ b/datafusion/physical-expr/src/aggregate/approx_percentile_cont.rs
@@ -370,9 +370,10 @@ impl Accumulator for ApproxPercentileAccumulator {
fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
let values = &values[0];
- let unsorted_values =
- ApproxPercentileAccumulator::convert_to_ordered_float(values)?;
- self.digest = self.digest.merge_unsorted_f64(unsorted_values);
+ let sorted_values = &arrow::compute::sort(values, None)?;
+ let sorted_values =
+
ApproxPercentileAccumulator::convert_to_ordered_float(sorted_values)?;
+ self.digest = self.digest.merge_sorted_f64(&sorted_values);
Ok(())
}
diff --git a/datafusion/physical-expr/src/aggregate/tdigest.rs
b/datafusion/physical-expr/src/aggregate/tdigest.rs
index fa937d3e1..6314a2af6 100644
--- a/datafusion/physical-expr/src/aggregate/tdigest.rs
+++ b/datafusion/physical-expr/src/aggregate/tdigest.rs
@@ -260,6 +260,7 @@ impl TDigest {
}
}
+ #[cfg(test)]
pub(crate) fn merge_unsorted_f64(
&self,
unsorted_values: Vec<OrderedFloat<f64>>,
@@ -269,7 +270,10 @@ impl TDigest {
self.merge_sorted_f64(&values)
}
- fn merge_sorted_f64(&self, sorted_values: &[OrderedFloat<f64>]) -> TDigest
{
+ pub(crate) fn merge_sorted_f64(
+ &self,
+ sorted_values: &[OrderedFloat<f64>],
+ ) -> TDigest {
#[cfg(debug_assertions)]
debug_assert!(is_sorted(sorted_values), "unsorted input to TDigest");