ozankabak commented on code in PR #11845:
URL: https://github.com/apache/datafusion/pull/11845#discussion_r1706418247


##########
datafusion/expr-common/src/interval_arithmetic.rs:
##########
@@ -17,9 +17,9 @@
 
 //! Interval arithmetic library
 
+use crate::operator::Operator;
 use crate::type_coercion::binary::get_result_type;
-use crate::Operator;
-use arrow_buffer::{IntervalDayTime, IntervalMonthDayNano};
+use arrow::datatypes::{IntervalDayTime, IntervalMonthDayNano};

Review Comment:
   We can move this down next to other `arrow::datatypes` imports and have one 
import statement instead of three for arrow datatypes



##########
datafusion/expr-common/src/interval_arithmetic.rs:
##########
@@ -1135,12 +1135,12 @@ fn next_value_helper<const INC: bool>(value: 
ScalarValue) -> ScalarValue {
         }
         IntervalDayTime(Some(val)) => 
IntervalDayTime(Some(increment_decrement::<
             INC,
-            arrow_buffer::IntervalDayTime,
+            arrow::datatypes::IntervalDayTime,
         >(val))),
         IntervalMonthDayNano(Some(val)) => {
             IntervalMonthDayNano(Some(increment_decrement::<
                 INC,
-                arrow_buffer::IntervalMonthDayNano,
+                arrow::datatypes::IntervalMonthDayNano,

Review Comment:
   Do we need this explicit qualifier even though we have the import above?



##########
datafusion/expr-common/src/interval_arithmetic.rs:
##########
@@ -120,12 +120,12 @@ macro_rules! value_transition {
                 IntervalYearMonth(None)
             }
             IntervalDayTime(Some(value))
-                if value == arrow_buffer::IntervalDayTime::$bound =>
+                if value == arrow::datatypes::IntervalDayTime::$bound =>

Review Comment:
   Do we need this explicit qualifier even though we have the import above?



##########
datafusion/expr/src/operator.rs:
##########


Review Comment:
   Maybe we should rename this file `operations.rs`? Having two `operator` 
modules, one with the `Operator` object and one without, is somewhat confusing.



##########
datafusion/expr-common/src/interval_arithmetic.rs:
##########
@@ -120,12 +120,12 @@ macro_rules! value_transition {
                 IntervalYearMonth(None)
             }
             IntervalDayTime(Some(value))
-                if value == arrow_buffer::IntervalDayTime::$bound =>
+                if value == arrow::datatypes::IntervalDayTime::$bound =>
             {
                 IntervalDayTime(None)
             }
             IntervalMonthDayNano(Some(value))
-                if value == arrow_buffer::IntervalMonthDayNano::$bound =>
+                if value == arrow::datatypes::IntervalMonthDayNano::$bound =>

Review Comment:
   Do we need this explicit qualifier even though we have the import above?



##########
datafusion/core/src/physical_optimizer/limit_pushdown.rs:
##########
@@ -257,10 +257,10 @@ mod tests {
     use arrow_schema::{DataType, Field, Schema, SchemaRef};
     use datafusion_execution::{SendableRecordBatchStream, TaskContext};
     use datafusion_expr::Operator;
+    use datafusion_physical_expr::expressions::col;
+    use datafusion_physical_expr::expressions::lit;

Review Comment:
   ```suggestion
       use datafusion_physical_expr::expressions::{col, lit};
   ```



##########
datafusion/functions-aggregate/src/approx_percentile_cont.rs:
##########
@@ -134,31 +132,35 @@ impl ApproxPercentileCont {
     }
 }
 
-fn get_lit_value(expr: &Expr) -> datafusion_common::Result<ScalarValue> {
+fn get_scalar_value(
+    expr: &Arc<dyn PhysicalExpr>,
+) -> datafusion_common::Result<ScalarValue> {
     let empty_schema = Arc::new(Schema::empty());
-    let empty_batch = RecordBatch::new_empty(Arc::clone(&empty_schema));
-    let dfschema = DFSchema::empty();
-    let expr =
-        limited_convert_logical_expr_to_physical_expr_with_dfschema(expr, 
&dfschema)?;
-    let result = expr.evaluate(&empty_batch)?;
-    match result {
-        ColumnarValue::Array(_) => Err(DataFusionError::Internal(format!(
-            "The expr {:?} can't be evaluated to scalar value",
-            expr
-        ))),
-        ColumnarValue::Scalar(scalar_value) => Ok(scalar_value),
+    let batch = RecordBatch::new_empty(Arc::clone(&empty_schema));
+    let val = expr.evaluate(&batch)?;
+    if let ColumnarValue::Scalar(s) = val {
+        Ok(s)
+    } else {
+        internal_err!("Didn't expect ColumnarValue::Array")
     }
 }
 
-fn validate_input_percentile_expr(expr: &Expr) -> 
datafusion_common::Result<f64> {
-    let lit = get_lit_value(expr)?;
-    let percentile = match &lit {
-        ScalarValue::Float32(Some(q)) => *q as f64,
-        ScalarValue::Float64(Some(q)) => *q,
-        got => return not_impl_err!(
-            "Percentile value for 'APPROX_PERCENTILE_CONT' must be Float32 or 
Float64 literal (got data type {})",
-            got.data_type()
-        )
+fn validate_input_percentile_expr(
+    expr: &Arc<dyn PhysicalExpr>,
+) -> datafusion_common::Result<f64> {

Review Comment:
   ```suggestion
   ) -> Result<f64> {
   ```



##########
datafusion/functions-aggregate-common/src/aggregate.rs:
##########
@@ -0,0 +1,184 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::fmt::Debug;
+use std::{any::Any, sync::Arc};
+
+use arrow::datatypes::Field;
+
+use crate::order::AggregateOrderSensitivity;
+use datafusion_common::exec_err;
+use datafusion_common::{not_impl_err, Result};
+use datafusion_expr_common::accumulator::Accumulator;
+use datafusion_expr_common::groups_accumulator::GroupsAccumulator;
+use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
+use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;
+
+pub mod count_distinct;
+pub mod groups_accumulator;
+
+/// An aggregate expression that:
+/// * knows its resulting field
+/// * knows how to create its accumulator
+/// * knows its accumulator's state's field
+/// * knows the expressions from whose its accumulator will receive values
+///
+/// Any implementation of this trait also needs to implement the
+/// `PartialEq<dyn Any>` to allows comparing equality between the
+/// trait objects.
+pub trait AggregateExpr: Send + Sync + Debug + PartialEq<dyn Any> {
+    /// Returns the aggregate expression as [`Any`] so that it can be
+    /// downcast to a specific implementation.
+    fn as_any(&self) -> &dyn Any;
+
+    /// the field of the final result of this aggregation.
+    fn field(&self) -> Result<Field>;
+
+    /// the accumulator used to accumulate values from the expressions.
+    /// the accumulator expects the same number of arguments as `expressions` 
and must
+    /// return states with the same description as `state_fields`
+    fn create_accumulator(&self) -> Result<Box<dyn Accumulator>>;
+
+    /// the fields that encapsulate the Accumulator's state
+    /// the number of fields here equals the number of states that the 
accumulator contains
+    fn state_fields(&self) -> Result<Vec<Field>>;
+
+    /// expressions that are passed to the Accumulator.
+    /// Single-column aggregations such as `sum` return a single value, others 
(e.g. `cov`) return many.
+    fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>>;
+
+    /// Order by requirements for the aggregate function
+    /// By default it is `None` (there is no requirement)
+    /// Order-sensitive aggregators, such as `FIRST_VALUE(x ORDER BY y)` 
should implement this
+    fn order_bys(&self) -> Option<&[PhysicalSortExpr]> {
+        None
+    }
+
+    /// Indicates whether aggregator can produce the correct result with any
+    /// arbitrary input ordering. By default, we assume that aggregate 
expressions
+    /// are order insensitive.
+    fn order_sensitivity(&self) -> AggregateOrderSensitivity {
+        AggregateOrderSensitivity::Insensitive
+    }
+
+    /// Sets the indicator whether ordering requirements of the aggregator is
+    /// satisfied by its input. If this is not the case, aggregators with order
+    /// sensitivity `AggregateOrderSensitivity::Beneficial` can still produce
+    /// the correct result with possibly more work internally.
+    ///
+    /// # Returns
+    ///
+    /// Returns `Ok(Some(updated_expr))` if the process completes successfully.
+    /// If the expression can benefit from existing input ordering, but does
+    /// not implement the method, returns an error. Order insensitive and hard
+    /// requirement aggregators return `Ok(None)`.
+    fn with_beneficial_ordering(
+        self: Arc<Self>,
+        _requirement_satisfied: bool,
+    ) -> Result<Option<Arc<dyn AggregateExpr>>> {
+        if self.order_bys().is_some() && 
self.order_sensitivity().is_beneficial() {
+            return exec_err!(
+                "Should implement with satisfied for aggregator :{:?}",
+                self.name()
+            );
+        }
+        Ok(None)
+    }
+
+    /// Human readable name such as `"MIN(c2)"`. The default
+    /// implementation returns placeholder text.
+    fn name(&self) -> &str {
+        "AggregateExpr: default name"
+    }
+
+    /// If the aggregate expression has a specialized
+    /// [`GroupsAccumulator`] implementation. If this returns true,
+    /// `[Self::create_groups_accumulator`] will be called.
+    fn groups_accumulator_supported(&self) -> bool {
+        false
+    }
+
+    /// Return a specialized [`GroupsAccumulator`] that manages state
+    /// for all groups.
+    ///
+    /// For maximum performance, a [`GroupsAccumulator`] should be
+    /// implemented in addition to [`Accumulator`].
+    fn create_groups_accumulator(&self) -> Result<Box<dyn GroupsAccumulator>> {
+        not_impl_err!("GroupsAccumulator hasn't been implemented for {self:?} 
yet")
+    }
+
+    /// Construct an expression that calculates the aggregate in reverse.
+    /// Typically the "reverse" expression is itself (e.g. SUM, COUNT).
+    /// For aggregates that do not support calculation in reverse,
+    /// returns None (which is the default value).
+    fn reverse_expr(&self) -> Option<Arc<dyn AggregateExpr>> {
+        None
+    }
+
+    /// Creates accumulator implementation that supports retract
+    fn create_sliding_accumulator(&self) -> Result<Box<dyn Accumulator>> {
+        not_impl_err!("Retractable Accumulator hasn't been implemented for 
{self:?} yet")
+    }
+
+    /// Returns all expressions used in the [`AggregateExpr`].
+    /// These expressions are  (1)function arguments, (2) order by expressions.
+    fn all_expressions(&self) -> AggregatePhysicalExpressions {
+        let args = self.expressions();
+        let order_bys = self.order_bys().unwrap_or(&[]);
+        let order_by_exprs = order_bys
+            .iter()
+            .map(|sort_expr| Arc::clone(&sort_expr.expr))
+            .collect::<Vec<_>>();
+        AggregatePhysicalExpressions {
+            args,
+            order_by_exprs,
+        }
+    }
+
+    /// Rewrites [`AggregateExpr`], with new expressions given. The argument 
should be consistent
+    /// with the return value of the [`AggregateExpr::all_expressions`] method.
+    /// Returns `Some(Arc<dyn AggregateExpr>)` if re-write is supported, 
otherwise returns `None`.
+    /// TODO: This method only rewrites the [`PhysicalExpr`]s and does not 
handle `Expr`s.
+    /// This can cause silent bugs and should be fixed in the future (possibly 
with physical-to-logical
+    /// conversions).

Review Comment:
   Will this TODO still be valid after this PR? Should we remove it?



##########
datafusion/functions-aggregate/src/approx_percentile_cont.rs:
##########
@@ -134,31 +132,35 @@ impl ApproxPercentileCont {
     }
 }
 
-fn get_lit_value(expr: &Expr) -> datafusion_common::Result<ScalarValue> {
+fn get_scalar_value(
+    expr: &Arc<dyn PhysicalExpr>,
+) -> datafusion_common::Result<ScalarValue> {

Review Comment:
   ```suggestion
   ) -> Result<ScalarValue> {
   ```



##########
datafusion/functions-aggregate-common/src/accumulator.rs:
##########
@@ -0,0 +1,103 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use arrow::datatypes::{DataType, Field, Schema};
+use datafusion_common::{DFSchema, Result};
+use datafusion_expr_common::accumulator::Accumulator;
+use datafusion_physical_expr_common::{
+    physical_expr::PhysicalExpr, sort_expr::PhysicalSortExpr,
+};
+
+/// [`AccumulatorArgs`] contains information about how an aggregate
+/// function was called, including the types of its arguments and any optional
+/// ordering expressions.
+#[derive(Debug)]
+pub struct AccumulatorArgs<'a> {
+    /// The return type of the aggregate function.
+    pub data_type: &'a DataType,
+
+    /// The schema of the input arguments
+    pub schema: &'a Schema,
+
+    /// The schema of the input arguments
+    pub dfschema: &'a DFSchema,

Review Comment:
   Do we actually need both `schema` and `dfschema`? It'd be great to have one 
schema here.



##########
datafusion/functions-aggregate/src/approx_percentile_cont.rs:
##########
@@ -134,31 +132,35 @@ impl ApproxPercentileCont {
     }
 }
 
-fn get_lit_value(expr: &Expr) -> datafusion_common::Result<ScalarValue> {
+fn get_scalar_value(
+    expr: &Arc<dyn PhysicalExpr>,
+) -> datafusion_common::Result<ScalarValue> {
     let empty_schema = Arc::new(Schema::empty());
-    let empty_batch = RecordBatch::new_empty(Arc::clone(&empty_schema));
-    let dfschema = DFSchema::empty();
-    let expr =
-        limited_convert_logical_expr_to_physical_expr_with_dfschema(expr, 
&dfschema)?;
-    let result = expr.evaluate(&empty_batch)?;
-    match result {
-        ColumnarValue::Array(_) => Err(DataFusionError::Internal(format!(
-            "The expr {:?} can't be evaluated to scalar value",
-            expr
-        ))),
-        ColumnarValue::Scalar(scalar_value) => Ok(scalar_value),
+    let batch = RecordBatch::new_empty(Arc::clone(&empty_schema));
+    let val = expr.evaluate(&batch)?;
+    if let ColumnarValue::Scalar(s) = val {

Review Comment:
   ```suggestion
       if let ColumnarValue::Scalar(s) = expr.evaluate(&batch)? {
   ```



##########
datafusion/physical-expr/src/expressions/case.rs:
##########
@@ -550,6 +550,7 @@ mod tests {
     use super::*;
     use crate::expressions::{binary, cast, col, lit, BinaryExpr};
 
+    use super::Literal;

Review Comment:
   I already see a `use super::*;` above, is this necessary?



##########
datafusion/physical-expr/benches/is_null.rs:
##########
@@ -20,8 +20,8 @@ use arrow::record_batch::RecordBatch;
 use arrow_array::builder::Int32Builder;
 use arrow_schema::DataType;
 use criterion::{black_box, criterion_group, criterion_main, Criterion};
+use datafusion_physical_expr::expressions::Column;
 use datafusion_physical_expr::expressions::{IsNotNullExpr, IsNullExpr};

Review Comment:
   ```suggestion
   use datafusion_physical_expr::expressions::{Column, IsNotNullExpr, 
IsNullExpr};
   ```



##########
datafusion/physical-expr/benches/case_when.rs:
##########
@@ -23,8 +23,7 @@ use criterion::{black_box, criterion_group, criterion_main, 
Criterion};
 use datafusion_common::ScalarValue;
 use datafusion_expr::Operator;
 use datafusion_physical_expr::expressions::{BinaryExpr, CaseExpr};
-use datafusion_physical_expr_common::expressions::column::Column;
-use datafusion_physical_expr_common::expressions::Literal;
+use datafusion_physical_expr::expressions::{Column, Literal};

Review Comment:
   We can unify this import with the one above



##########
datafusion/physical-expr/src/expressions/binary.rs:
##########
@@ -683,9 +683,9 @@ mod tests {
     use super::*;
     use crate::expressions::{col, lit, try_cast, Literal};
 
+    use crate::expressions::Column;

Review Comment:
   ```suggestion
       use crate::expressions::{col, lit, try_cast, Column, Literal};
   
   ```



##########
datafusion/functions-aggregate-common/src/aggregate.rs:
##########
@@ -0,0 +1,184 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::fmt::Debug;
+use std::{any::Any, sync::Arc};
+
+use arrow::datatypes::Field;
+
+use crate::order::AggregateOrderSensitivity;
+use datafusion_common::exec_err;
+use datafusion_common::{not_impl_err, Result};
+use datafusion_expr_common::accumulator::Accumulator;
+use datafusion_expr_common::groups_accumulator::GroupsAccumulator;
+use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
+use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;

Review Comment:
   ```suggestion
   use std::fmt::Debug;
   use std::{any::Any, sync::Arc};
   
   use crate::order::AggregateOrderSensitivity;
   
   use arrow::datatypes::Field;
   use datafusion_common::{exec_err, not_impl_err, Result};
   use datafusion_expr_common::accumulator::Accumulator;
   use datafusion_expr_common::groups_accumulator::GroupsAccumulator;
   use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
   use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;
   ```



##########
datafusion/expr-common/src/interval_arithmetic.rs:
##########
@@ -1135,12 +1135,12 @@ fn next_value_helper<const INC: bool>(value: 
ScalarValue) -> ScalarValue {
         }
         IntervalDayTime(Some(val)) => 
IntervalDayTime(Some(increment_decrement::<
             INC,
-            arrow_buffer::IntervalDayTime,
+            arrow::datatypes::IntervalDayTime,

Review Comment:
   Do we need this explicit qualifier even though we have the import above?



##########
datafusion/functions-aggregate/src/first_last.rs:
##########
@@ -117,24 +114,21 @@ impl AggregateUDFImpl for FirstValue {
     }
 
     fn accumulator(&self, acc_args: AccumulatorArgs) -> Result<Box<dyn 
Accumulator>> {
-        let ordering_req = 
limited_convert_logical_sort_exprs_to_physical_with_dfschema(
-            acc_args.sort_exprs,
-            acc_args.dfschema,
-        )?;
-
-        let ordering_dtypes = ordering_req
+        let ordering_dtypes = acc_args
+            .ordering_req
             .iter()
             .map(|e| e.expr.data_type(acc_args.schema))
             .collect::<Result<Vec<_>>>()?;
 
         // When requirement is empty, or it is signalled by outside caller that
         // the ordering requirement is/will be satisfied.
-        let requirement_satisfied = ordering_req.is_empty() || 
self.requirement_satisfied;
+        let requirement_satisfied =
+            acc_args.ordering_req.is_empty() || self.requirement_satisfied;
 
         FirstValueAccumulator::try_new(
             acc_args.data_type,
             &ordering_dtypes,
-            ordering_req,
+            acc_args.ordering_req.to_vec(),

Review Comment:
   Same issue, I wonder if we still need to pass dtypes separately given that 
we pass the full physical expression list



##########
datafusion/functions-aggregate/src/approx_percentile_cont.rs:
##########
@@ -170,22 +172,26 @@ fn validate_input_percentile_expr(expr: &Expr) -> 
datafusion_common::Result<f64>
     Ok(percentile)
 }
 
-fn validate_input_max_size_expr(expr: &Expr) -> 
datafusion_common::Result<usize> {
-    let lit = get_lit_value(expr)?;
-    let max_size = match &lit {
-        ScalarValue::UInt8(Some(q)) => *q as usize,
-        ScalarValue::UInt16(Some(q)) => *q as usize,
-        ScalarValue::UInt32(Some(q)) => *q as usize,
-        ScalarValue::UInt64(Some(q)) => *q as usize,
-        ScalarValue::Int32(Some(q)) if *q > 0 => *q as usize,
-        ScalarValue::Int64(Some(q)) if *q > 0 => *q as usize,
-        ScalarValue::Int16(Some(q)) if *q > 0 => *q as usize,
-        ScalarValue::Int8(Some(q)) if *q > 0 => *q as usize,
-        got => return not_impl_err!(
-            "Tdigest max_size value for 'APPROX_PERCENTILE_CONT' must be UInt 
> 0 literal (got data type {}).",
-            got.data_type()
-        )
+fn validate_input_max_size_expr(
+    expr: &Arc<dyn PhysicalExpr>,
+) -> datafusion_common::Result<usize> {

Review Comment:
   ```suggestion
   ) -> Result<usize> {
   ```



##########
datafusion/functions-aggregate/src/approx_percentile_cont.rs:
##########
@@ -31,20 +31,18 @@ use arrow::{
 use arrow_schema::{Field, Schema};
 
 use datafusion_common::{
-    downcast_value, internal_err, not_impl_err, plan_err, DFSchema, 
DataFusionError,
-    ScalarValue,
+    downcast_value, internal_err, not_impl_err, plan_err, DataFusionError, 
ScalarValue,

Review Comment:
   ```suggestion
       downcast_value, internal_err, not_impl_err, plan_err, DataFusionError, 
Result, ScalarValue,
   ```



##########
datafusion/functions-aggregate/src/array_agg.rs:
##########
@@ -123,26 +120,22 @@ impl AggregateUDFImpl for ArrayAgg {
             )?));
         }
 
-        if acc_args.sort_exprs.is_empty() {
+        if acc_args.ordering_req.is_empty() {
             return Ok(Box::new(ArrayAggAccumulator::try_new(
                 &acc_args.input_types[0],
             )?));
         }
 
-        let ordering_req = 
limited_convert_logical_sort_exprs_to_physical_with_dfschema(
-            acc_args.sort_exprs,
-            acc_args.dfschema,
-        )?;
-
-        let ordering_dtypes = ordering_req
+        let ordering_dtypes = acc_args
+            .ordering_req
             .iter()
             .map(|e| e.expr.data_type(acc_args.schema))
             .collect::<Result<Vec<_>>>()?;
 
         OrderSensitiveArrayAggAccumulator::try_new(
             &acc_args.input_types[0],
             &ordering_dtypes,
-            ordering_req,
+            acc_args.ordering_req.to_vec(),

Review Comment:
   I wonder if we still need to pass dtypes separately since we pass the full 
physical expression list for ordering?



##########
datafusion/functions-aggregate/src/nth_value.rs:
##########
@@ -116,7 +116,7 @@ impl AggregateUDFImpl for NthValueAgg {
             n,
             &acc_args.input_types[0],
             &ordering_dtypes,
-            ordering_req,
+            acc_args.ordering_req.to_vec(),

Review Comment:
   Same pattern with other accumulators regarding dtype passing



##########
datafusion/functions-aggregate/src/first_last.rs:
##########
@@ -416,22 +410,19 @@ impl AggregateUDFImpl for LastValue {
     }
 
     fn accumulator(&self, acc_args: AccumulatorArgs) -> Result<Box<dyn 
Accumulator>> {
-        let ordering_req = 
limited_convert_logical_sort_exprs_to_physical_with_dfschema(
-            acc_args.sort_exprs,
-            acc_args.dfschema,
-        )?;
-
-        let ordering_dtypes = ordering_req
+        let ordering_dtypes = acc_args
+            .ordering_req
             .iter()
             .map(|e| e.expr.data_type(acc_args.schema))
             .collect::<Result<Vec<_>>>()?;
 
-        let requirement_satisfied = ordering_req.is_empty() || 
self.requirement_satisfied;
+        let requirement_satisfied =
+            acc_args.ordering_req.is_empty() || self.requirement_satisfied;
 
         LastValueAccumulator::try_new(
             acc_args.data_type,
             &ordering_dtypes,
-            ordering_req,
+            acc_args.ordering_req.to_vec(),

Review Comment:
   Same question with `FirstValueAccumulator`



##########
datafusion/physical-expr-common/src/sort_expr.rs:
##########
@@ -272,29 +271,3 @@ pub type LexRequirement = Vec<PhysicalSortRequirement>;
 ///`LexRequirementRef` is an alias for the type &`[PhysicalSortRequirement]`, 
which
 /// represents a reference to a lexicographical ordering requirement.
 pub type LexRequirementRef<'a> = &'a [PhysicalSortRequirement];
-
-/// Converts each [`Expr::Sort`] into a corresponding [`PhysicalSortExpr`].
-/// Returns an error if the given logical expression is not a [`Expr::Sort`].
-pub fn limited_convert_logical_sort_exprs_to_physical_with_dfschema(

Review Comment:
   Great to see this removed 👏 



##########
datafusion/physical-expr-functions-aggregate/src/aggregate.rs:
##########
@@ -15,34 +15,25 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use std::fmt::Debug;
-use std::{any::Any, sync::Arc};
-
 use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
-
-use datafusion_common::exec_err;
 use datafusion_common::{internal_err, not_impl_err, DFSchema, Result};
 use datafusion_expr::expr::create_function_physical_name;
-use datafusion_expr::function::StateFieldsArgs;
-use datafusion_expr::type_coercion::aggregates::check_arg_count;
-use datafusion_expr::utils::AggregateOrderSensitivity;
+use datafusion_expr::AggregateUDF;
 use datafusion_expr::ReversedUDAF;
-use datafusion_expr::{
-    function::AccumulatorArgs, Accumulator, AggregateUDF, Expr, 
GroupsAccumulator,
-};
+use datafusion_expr_common::accumulator::Accumulator;
+use datafusion_expr_common::groups_accumulator::GroupsAccumulator;
+use datafusion_expr_common::type_coercion::aggregates::check_arg_count;
+use datafusion_functions_aggregate_common::accumulator::AccumulatorArgs;
+use datafusion_functions_aggregate_common::accumulator::StateFieldsArgs;
+use datafusion_functions_aggregate_common::aggregate::AggregateExpr;
+use datafusion_functions_aggregate_common::order::AggregateOrderSensitivity;
+use datafusion_functions_aggregate_common::utils::{self, down_cast_any_ref};
+use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
+use datafusion_physical_expr_common::sort_expr::{LexOrdering, 
PhysicalSortExpr};
+use datafusion_physical_expr_common::utils::reverse_order_bys;
 
-use crate::physical_expr::PhysicalExpr;
-use crate::sort_expr::{LexOrdering, PhysicalSortExpr};
-use crate::utils::reverse_order_bys;
-
-use self::utils::down_cast_any_ref;
-
-pub mod count_distinct;
-pub mod groups_accumulator;
-pub mod merge_arrays;
-pub mod stats;
-pub mod tdigest;
-pub mod utils;
+use std::fmt::Debug;

Review Comment:
   I think you can keep these at the top (as that is the convention in most 
files in the repository)



##########
datafusion/physical-expr-common/src/utils.rs:
##########
@@ -20,14 +20,12 @@ use std::sync::Arc;
 use arrow::array::{make_array, Array, ArrayRef, BooleanArray, 
MutableArrayData};
 use arrow::compute::{and_kleene, is_not_null, SlicesIterator};
 
-use datafusion_common::{exec_err, DFSchema, Result};
-use datafusion_expr::expr::Alias;
-use datafusion_expr::sort_properties::ExprProperties;
-use datafusion_expr::Expr;
-
-use crate::expressions::column::Column;
-use crate::expressions::literal::Literal;
-use crate::expressions::CastExpr;
+use datafusion_common::Result;
+// use datafusion_expr::expr::Alias;
+// use datafusion_expr::sort_properties::ExprProperties;
+// use datafusion_expr::Expr;

Review Comment:
   Leftover?



##########
datafusion/physical-plan/src/windows/mod.rs:
##########
@@ -395,6 +379,8 @@ impl BuiltInWindowFunctionExpr for WindowUDFExpr {
     }
 }
 
+// TODO: Find a way to make clippy happy
+#[allow(clippy::needless_borrow)]

Review Comment:
   I wonder why we get this warning now. Is it because we used to clone with 
`expr` and now do it with `&expr`? We should understand if this is a spurious 
warning or indicative of a code smell.



##########
datafusion/physical-expr-functions-aggregate/src/aggregate.rs:
##########
@@ -15,34 +15,25 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use std::fmt::Debug;
-use std::{any::Any, sync::Arc};
-
 use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
-
-use datafusion_common::exec_err;
 use datafusion_common::{internal_err, not_impl_err, DFSchema, Result};
 use datafusion_expr::expr::create_function_physical_name;
-use datafusion_expr::function::StateFieldsArgs;
-use datafusion_expr::type_coercion::aggregates::check_arg_count;
-use datafusion_expr::utils::AggregateOrderSensitivity;
+use datafusion_expr::AggregateUDF;
 use datafusion_expr::ReversedUDAF;
-use datafusion_expr::{
-    function::AccumulatorArgs, Accumulator, AggregateUDF, Expr, 
GroupsAccumulator,
-};
+use datafusion_expr_common::accumulator::Accumulator;
+use datafusion_expr_common::groups_accumulator::GroupsAccumulator;
+use datafusion_expr_common::type_coercion::aggregates::check_arg_count;
+use datafusion_functions_aggregate_common::accumulator::AccumulatorArgs;
+use datafusion_functions_aggregate_common::accumulator::StateFieldsArgs;

Review Comment:
   ```suggestion
   use datafusion_functions_aggregate_common::accumulator::{AccumulatorArgs, 
StateFieldsArgs};
   ```



##########
datafusion/physical-expr/src/expressions/case.rs:
##########
@@ -32,8 +32,8 @@ use datafusion_common::cast::as_boolean_array;
 use datafusion_common::{exec_err, internal_err, DataFusionError, Result, 
ScalarValue};
 use datafusion_expr::ColumnarValue;
 
-use datafusion_physical_expr_common::expressions::column::Column;
-use datafusion_physical_expr_common::expressions::Literal;
+use super::Column;
+use super::Literal;

Review Comment:
   ```suggestion
   use super::{Column, Literal};
   ```



##########
datafusion/physical-expr-common/src/physical_expr.rs:
##########
@@ -193,33 +191,6 @@ pub fn with_new_children_if_necessary(
     }
 }
 
-/// Rewrites an expression according to new schema; i.e. changes the columns it
-/// refers to with the column at corresponding index in the new schema. Returns
-/// an error if the given schema has fewer columns than the original schema.
-/// Note that the resulting expression may not be valid if data types in the
-/// new schema is incompatible with expression nodes.
-pub fn with_new_schema(

Review Comment:
   Since this takes in a general `PhysicalExpr` and rewrites it, it is more 
natural for it to be a top-level utility. Do we have to nest it inside 
`expressions::column` due to some dependency issues? Avoiding that nesting 
would be good due to the signature/function of this routine.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to