baiguoname commented on issue #18381:
URL: https://github.com/apache/datafusion/issues/18381#issuecomment-3484704656
I'm writing a `ExecutionPlan` similar to `datafusion::Expr::Aggregate` that
can caclculate on unbounded streaming data without requring partition keys to
be sorted. To achieve this, I have to `emit` all keys from `group_values` and
then put it back in the `group_values`. However, as you can see in the
following code at line number 140, without the type `GroupValuesBytesViewMap`,
I have to call `self.group_values.emit` and then call
`self.group_values.intern`. Another example is the type `RequiredIndices`,
shown at the line 269.
```rust
use arrow::array::RecordBatch;
use arrow_schema::{Schema, SchemaRef};
use async_trait::async_trait;
use datafusion::execution::context::QueryPlanner;
use datafusion::execution::{SendableRecordBatchStream, SessionState,
TaskContext};
use datafusion::logical_expr::expr::physical_name;
use datafusion::logical_expr::expr_rewriter::normalize_cols;
use datafusion::logical_expr::{Aggregate, EmitTo, Extension,
GroupsAccumulator, InvariantLevel, LogicalPlan, LogicalPlanBuilder,
UserDefinedLogicalNode, UserDefinedLogicalNodeCore};
use datafusion::physical_expr::{EquivalenceProperties,
GroupsAccumulatorAdapter, create_physical_expr};
use datafusion::physical_plan::PhysicalExpr;
use datafusion::physical_plan::aggregates::group_values::GroupValues;
use datafusion::physical_plan::aggregates::order::GroupOrdering;
use datafusion::physical_plan::aggregates::{PhysicalGroupBy,
evaluate_group_by, evaluate_many};
use datafusion::physical_planner::{DefaultPhysicalPlanner, ExtensionPlanner,
PhysicalPlanner, create_aggregate_expr_and_maybe_filter};
use datafusion::prelude::{DataFrame, Expr};
use datafusion_physical_plan::aggregates::group_values::new_group_values;
use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType};
use datafusion_physical_plan::udaf::AggregateFunctionExpr;
use datafusion_physical_plan::{DisplayAs, DisplayFormatType, Distribution,
ExecutionPlan, Partitioning, PlanProperties, RecordBatchStream, Statistics};
use datafusion_physical_plan::aggregates::{AggregateExec, AggregateMode,
aggregate_expressions, create_accumulators};
use futures::{ ready, Stream, StreamExt };
use polars::prelude::expr::PhysicalAggExpr;
use std::any::Any;
use std::cmp::Ordering;
use std::fmt::Formatter;
use std::hash::{Hash, Hasher};
use std::sync::Arc;
use std::task::{Context, Poll};
use qust_ds::prelude::*;
use super::*;
use datafusion::common::{DFSchema, DFSchemaRef, PrettyFormat, Result};
#[derive(Debug, Clone)]
enum ExecState {
ReadingInput,
ProducingOutput(RB),
}
pub struct GroupedHashAggregateStream {
schema: SchemaRef,
input: SendableRecordBatchStream,
exec_state: ExecState,
group_by: PhysicalGroupBy,
aggr_arguments: Vec<Vec<Arc<dyn PhysicalExpr>>>,
current_group_indices: Vec<usize>,
group_values: Box<dyn GroupValues>,
group_ordering: GroupOrdering,
accumulators: Vec<Box<dyn GroupsAccumulator>>,
}
pub(crate) fn create_group_accumulator(
agg_expr: &Arc<AggregateFunctionExpr>,
) -> Result<Box<dyn GroupsAccumulator>> {
if agg_expr.groups_accumulator_supported() {
agg_expr.create_groups_accumulator()
} else {
let agg_expr_captured = Arc::clone(agg_expr);
let factory = move || agg_expr_captured.create_accumulator();
Ok(Box::new(GroupsAccumulatorAdapter::new(factory)))
}
}
impl GroupedHashAggregateStream {
fn try_new(
aggr_exec: &AggrExec,
partition: usize,
context: Arc<TaskContext>,
) -> Result<Self> {
let aggr_schema = aggr_exec.input.schema();
let input = aggr_exec.input.execute(partition, context)?;
let accumulators = aggr_exec
.aggregates
.iter()
.map(create_group_accumulator)
.collect::<Result<Vec<_>>>()?;
let group_schema =
aggr_exec.physical_group_by.group_schema(&aggr_schema)?;
let group_ordering = GroupOrdering::None;
let group_values = new_group_values(group_schema, &group_ordering)?;
let aggr_arguments = aggregate_expressions(
&aggr_exec.aggregates,
&AggregateMode::Single,
aggr_exec.aggregates.len(),
)?;
GroupedHashAggregateStream {
// schema: aggr_exec.schema().clone(),
schema: aggr_exec.schema.clone(),
input,
exec_state: ExecState::ReadingInput,
group_by: aggr_exec.physical_group_by.clone(),
aggr_arguments,
current_group_indices: Default::default(),
group_values,
group_ordering,
accumulators,
}.to_ok()
}
fn group_aggregate_batch(&mut self, rb: RB) -> Result<()> {
let group_by_values = evaluate_group_by(&self.group_by, &rb)?;
let input_values = evaluate_many(&self.aggr_arguments, &rb)?;
for group_values in &group_by_values {
let starting_num_groups = self.group_values.len();
self.group_values.intern(group_values, &mut
self.current_group_indices)?;
let group_indices = &self.current_group_indices;
let total_num_groups = self.group_values.len();
if total_num_groups > starting_num_groups {
self
.group_ordering
.new_groups(
group_values,
group_indices,
total_num_groups,
)?;
}
let t = self
.accumulators
.iter_mut()
.zip(input_values.iter());
for (accu, values) in t {
accu.update_batch(
values,
group_indices,
None,
total_num_groups,
)?;
}
}
Ok(())
}
fn emit(&mut self) -> Result<Option<RB>> {
if self.group_values.is_empty() {
return Ok(None);
}
let mut output = self.group_values.emit(EmitTo::All)?;
let mut group_indexes = vec![];
self.group_values.intern(&output, &mut group_indexes)?;
for acc in self.accumulators.iter_mut() {
let res_part = acc.evaluate(EmitTo::All)?;
output.push(res_part);
}
loge!("gb", "{:?} {:?}", self.schema(), output);
let rb = RB::try_new(self.schema(), output)?;
Ok(Some(rb))
}
}
impl Stream for GroupedHashAggregateStream {
type Item = Result<RB>;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut Context<'_>
) -> Poll<Option<Self::Item>> {
loop {
match &self.exec_state {
ExecState::ReadingInput => {
if let Some(rb) =
ready!(self.input.poll_next_unpin(cx)).transpose()? {
loge!("agg", "{}", rb.pf());
self.group_aggregate_batch(rb)?;
if let Some(rb) = self.emit()? {
self.exec_state = ExecState::ProducingOutput(rb);
}
}
}
ExecState::ProducingOutput(rb) => {
let rb = rb.clone();
self.exec_state = ExecState::ReadingInput;
return Poll::Ready(Some(Ok(rb)));
}
}
}
}
}
impl RecordBatchStream for GroupedHashAggregateStream {
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
}
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
pub struct AggrNode {
aggregate: Aggregate,
}
impl UserDefinedLogicalNode for AggrNode {
fn name(&self) -> &str {
"AggLogicalNode"
}
fn inputs(&self) -> Vec<&LogicalPlan> {
vec![&self.aggregate.input]
}
fn schema(&self) -> &DFSchemaRef {
&self.aggregate.schema
}
fn expressions(&self) -> Vec<Expr> {
vec![]
}
fn fmt_for_explain(&self, f: &mut std::fmt::Formatter) ->
std::fmt::Result {
write!(f, "AggLogicalNode-No-Order")
}
fn as_any(&self) -> &dyn Any {
self
}
fn necessary_children_exprs(
&self,
output_columns: &[usize],
) -> Option<Vec<Vec<usize>>> {
// RequiredIndices
// let n_group_exprs = self.aggregate.group_expr_len()?;
// // Offset aggregate indices so that they point to valid indices at
// // `aggregate.aggr_expr`:
// let (group_by_reqs, aggregate_reqs) =
output_columns.split_off(n_group_exprs);
// // Get absolutely necessary GROUP BY fields:
// let group_by_expr_existing = aggregate
// .group_expr
// .iter()
// .map(|group_by_expr| group_by_expr.schema_name().to_string())
// .collect::<Vec<_>>();
// let new_group_bys = if let Some(simplest_groupby_indices) =
// get_required_group_by_exprs_indices(
// aggregate.input.schema(),
// &group_by_expr_existing,
// ) {
// // Some of the fields in the GROUP BY may be required by the
// // parent even if these fields are unnecessary in terms of
// // functional dependency.
// group_by_reqs
// .append(&simplest_groupby_indices)
// .get_at_indices(&aggregate.group_expr)
// } else {
// aggregate.group_expr
// };
// // Only use the absolutely necessary aggregate expressions
required
// // by the parent:
// let new_aggr_expr =
aggregate_reqs.get_at_indices(&aggregate.aggr_expr);
// if new_group_bys.is_empty() && new_aggr_expr.is_empty() {
// // Global aggregation with no aggregate functions always
produces 1 row and no columns.
// return Ok(Transformed::yes(LogicalPlan::EmptyRelation(
// EmptyRelation {
// produce_one_row: true,
// schema: Arc::new(DFSchema::empty()),
// },
// )));
// }
// let all_exprs_iter =
new_group_bys.iter().chain(new_aggr_expr.iter());
// let schema = aggregate.input.schema();
// let necessary_indices =
// RequiredIndices::new().with_exprs(schema, all_exprs_iter);
Some(vec![vec![2, 3]])
}
// fn with_exprs_and_inputs(
// &self,
// _exprs: Vec<Expr>,
// mut inputs: Vec<LogicalPlan>,
// ) -> Result<Self> {
// Ok(Self {
// input: inputs.swap_remove(0),
// group_expr: self.group_expr.clone(),
// aggr_expr: self.aggr_expr.clone()
// })
// }
fn dyn_hash(&self, state: &mut dyn Hasher) {
let mut s = state;
self.hash(&mut s);
}
fn dyn_eq(&self, other: &dyn UserDefinedLogicalNode) -> bool {
match other.as_any().downcast_ref::<Self>() {
Some(o) => self == o,
None => false,
}
}
fn dyn_ord(&self, other: &dyn UserDefinedLogicalNode) ->
Option<Ordering> {
other
.as_any()
.downcast_ref::<Self>()
.and_then(|other| self.partial_cmp(other))
}
fn supports_limit_pushdown(&self) -> bool {
false
}
fn check_invariants(&self, _check: InvariantLevel) -> Result<()> {
Ok(())
}
fn with_exprs_and_inputs(
&self,
_exprs: Vec<Expr>,
_inputs: Vec<LogicalPlan>,
) -> Result<Arc<dyn UserDefinedLogicalNode>> {
Ok(Arc::new(Self {
aggregate: self.aggregate.clone()
}))
}
}
fn create_schema(
input_schema: &Schema,
group_by: &PhysicalGroupBy,
aggr_expr: &[Arc<AggregateFunctionExpr>],
) -> Result<Schema> {
let num_exprs = group_by.num_group_exprs();
let mut fields = Vec::with_capacity(num_exprs + aggr_expr.len());
let group_schema = group_by.group_schema(input_schema)?;
let mut fields_inner = group_schema.fields().as_ref().to_vec();
fields_inner.truncate(num_exprs);
fields.extend(fields_inner);
for expr in aggr_expr {
fields.push(expr.field())
}
Ok(Schema::new_with_metadata(
fields,
input_schema.metadata().clone(),
))
}
pub struct AggExtensionPlanner;
#[async_trait]
impl ExtensionPlanner for AggExtensionPlanner {
async fn plan_extension(
&self,
_planner: &dyn PhysicalPlanner,
node: &dyn UserDefinedLogicalNode,
logical_inputs: &[&LogicalPlan],
physical_inputs: &[Arc<dyn ExecutionPlan>],
session_state: &SessionState,
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
let Some(aggr_node) = node.as_any().downcast_ref::<AggrNode>() else {
return Ok(None);
};
let physical_input_schema = physical_inputs[0].schema();
let logical_input_schema = logical_inputs[0].schema();
let exec_probs = session_state.execution_props();
let physical_expr_iter = aggr_node
.aggregate
.group_expr
.iter()
.map(|e| {
let f_e = create_physical_expr(
e,
logical_input_schema,
exec_probs,
)?;
Ok((f_e, physical_name(e)?))
})
.collect::<Result<Vec<_>>>()?;
let aggregates = aggr_node
.aggregate
.aggr_expr
.iter()
.map(|e| {
Ok(create_aggregate_expr_and_maybe_filter(
e,
logical_input_schema,
physical_input_schema.as_ref(),
exec_probs,
)?.0)
})
.collect::<Result<Vec<_>>>()?;
let physical_group_by =
PhysicalGroupBy::new_single(physical_expr_iter);
let schema = create_schema(
physical_input_schema.as_ref(),
&physical_group_by,
&aggregates,
)?;
let schema = Arc::new(schema);
let res = Arc::new(AggrExec::new(
schema,
physical_inputs[0].clone(),
aggregates,
physical_group_by,
));
Ok(Some(res))
}
}
#[derive(Debug)]
pub struct AggQueryPlaner;
#[async_trait]
impl QueryPlanner for AggQueryPlaner {
async fn create_physical_plan(
&self,
logical_plan: &LogicalPlan,
session_state: &SessionState,
) -> Result<Arc<dyn ExecutionPlan>> {
// Teach the default physical planner how to plan TopK nodes.
let physical_planner = DefaultPhysicalPlanner
::with_extension_planners(vec![
Arc::new(AggExtensionPlanner)
]);
// Delegate most work of physical planning to the default physical
planner
physical_planner
.create_physical_plan(logical_plan, session_state)
.await
}
}
pub struct AggrExec {
input: Arc<dyn ExecutionPlan>,
schema: SchemaRef,
aggregates: Vec<Arc<AggregateFunctionExpr>>,
physical_group_by: PhysicalGroupBy,
cache: PlanProperties,
}
impl AggrExec {
fn new(
schema: SchemaRef,
input: Arc<dyn ExecutionPlan>,
aggregates: Vec<Arc<AggregateFunctionExpr>>,
physical_group_by: PhysicalGroupBy,
) -> Self {
let cache = Self::compute_properties(schema.clone());
// let cache = Self::compute_properties(input.schema());
Self {
input,
schema,
aggregates,
physical_group_by,
cache
}
}
fn compute_properties(schema: SchemaRef) -> PlanProperties {
PlanProperties::new(
EquivalenceProperties::new(schema),
Partitioning::UnknownPartitioning(1),
EmissionType::Incremental,
Boundedness::Unbounded { requires_infinite_memory: false }
)
}
}
impl std::fmt::Debug for AggrExec {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "AggrExec")
}
}
impl DisplayAs for AggrExec {
fn fmt_as(&self, _t: DisplayFormatType, f: &mut Formatter) ->
std::fmt::Result {
write!(f, "AggrExec")
}
}
#[async_trait]
impl ExecutionPlan for AggrExec {
fn as_any(&self) -> &dyn Any {
self
}
fn name(&self) -> &'static str {
"AggrExec"
}
fn properties(&self) -> &PlanProperties {
&self.cache
}
fn required_input_distribution(&self) -> Vec<Distribution> {
vec![Distribution::SinglePartition]
}
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
vec![&self.input]
}
fn with_new_children(
self: Arc<Self>,
mut children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
Ok(Arc::new(Self {
input: children.swap_remove(0),
schema: self.schema.clone(),
aggregates: self.aggregates.clone(),
physical_group_by: self.physical_group_by.clone(),
cache: self.cache.clone()
}))
}
/// Execute one partition and return an iterator over RecordBatch
fn execute(
&self,
partition: usize,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
let res = GroupedHashAggregateStream
::try_new(self, partition, context)?;
Ok(Box::pin(res))
}
fn statistics(&self) -> Result<Statistics> {
Ok(Statistics::new_unknown(&self.schema()))
}
}
pub trait GroupBy: Sized {
fn group_by(
self,
group_expr: impl IntoIterator<Item = impl Into<Expr>>,
aggr_expr: impl IntoIterator<Item = impl Into<Expr>>,
) -> Result<Self>;
}
impl GroupBy for LogicalPlanBuilder {
fn group_by(
self,
group_expr: impl IntoIterator<Item = impl Into<Expr>>,
aggr_expr: impl IntoIterator<Item = impl Into<Expr>>,
) -> Result<Self> {
let group_expr = normalize_cols(group_expr, self.plan())?;
let aggr_expr = normalize_cols(aggr_expr, self.plan())?;
let aggregate = Aggregate::try_new(Arc::new(self.plan().clone()),
group_expr, aggr_expr)?;
let plan = LogicalPlan::Extension(Extension {
node: Arc::new(AggrNode { aggregate })
});
Self::new(plan).to_ok()
}
}
impl GroupBy for DataFrame {
fn group_by(
self,
group_expr: impl IntoIterator<Item = impl Into<Expr>>,
aggr_expr: impl IntoIterator<Item = impl Into<Expr>>,
) -> Result<Self> {
let (session_state, plan) = self.into_parts();
let plan = LogicalPlanBuilder
::from(plan)
.group_by(group_expr, aggr_expr)?
.build()?;
let df = DataFrame::new(session_state, plan);
Ok(df)
}
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]