martin-g commented on code in PR #18644:
URL: https://github.com/apache/datafusion/pull/18644#discussion_r2517177723


##########
datafusion/physical-plan/src/aggregates/no_grouping.rs:
##########
@@ -53,15 +56,196 @@ pub(crate) struct AggregateStream {
 ///
 /// The latter requires a state object, which is [`AggregateStreamInner`].
 struct AggregateStreamInner {
+    // ==== Properties ====
     schema: SchemaRef,
     mode: AggregateMode,
     input: SendableRecordBatchStream,
-    baseline_metrics: BaselineMetrics,
     aggregate_expressions: Vec<Vec<Arc<dyn PhysicalExpr>>>,
     filter_expressions: Vec<Option<Arc<dyn PhysicalExpr>>>,
+
+    // ==== Runtime States/Buffers ====
     accumulators: Vec<AccumulatorItem>,
-    reservation: MemoryReservation,
+    // None if the dynamic filter is not applicable. See details in 
`AggrDynFilter`.
+    agg_dyn_filter_state: Option<Arc<AggrDynFilter>>,
     finished: bool,
+
+    // ==== Execution Resources ====
+    baseline_metrics: BaselineMetrics,
+    reservation: MemoryReservation,
+}
+
+impl AggregateStreamInner {
+    // TODO: check if we get Null handling correct
+    /// # Examples
+    /// - Example 1
+    ///   Accumulators: min(c1)
+    ///   Current Bounds: min(c1)=10
+    ///   --> dynamic filter PhysicalExpr: c1 < 10
+    ///
+    /// - Example 2
+    ///   Accumulators: min(c1), max(c1), min(c2)
+    ///   Current Bounds: min(c1)=10, max(c1)=100, min(c2)=20
+    ///   --> dynamic filter PhysicalExpr: (c1 < 10) OR (c1>100) OR (c2 < 20)
+    ///
+    /// # Errors
+    /// Returns internal errors if the dynamic filter is not enabled

Review Comment:
   It actually returns a literal true: 
https://github.com/apache/datafusion/pull/18644/files#diff-f46967eb78190dc32ac594949a10eff372cc8a23bb859aac1933feea30088fedR96



##########
datafusion/physical-plan/src/aggregates/mod.rs:
##########
@@ -815,6 +914,66 @@ impl AggregateExec {
             }
         }
     }
+
+    /// Check if dynamic filter is possible for the current plan node.
+    /// - If yes, init one inside `AggregateExec`'s `dynamic_filter` field.
+    /// - If not supported, `self.dynamic_filter` should be kept `None`
+    fn init_dynamic_filter(&mut self) {
+        if (!self.group_by.is_single()) || (!matches!(self.mode, 
AggregateMode::Partial))

Review Comment:
   At line 423 it says `- No grouping (no `GROUP BY` clause in the sql, only a 
single global group to aggregate)`. 
   What is a **global** group ? 
   Asking here because if "no `GROUP BY` clause in the sql" then probably 
`!self.group_by.is_single()` should be `!self.group_by.is_empty()`. But I am 
not sure I understand the rules.



##########
datafusion/physical-plan/src/aggregates/no_grouping.rs:
##########
@@ -101,27 +303,32 @@ impl AggregateStream {
             accumulators,
             reservation,
             finished: false,
+            agg_dyn_filter_state: maybe_dynamic_filter,
         };
+
         let stream = futures::stream::unfold(inner, |mut this| async move {
             if this.finished {
                 return None;
             }
 
-            let elapsed_compute = this.baseline_metrics.elapsed_compute();
-
             loop {
                 let result = match this.input.next().await {
                     Some(Ok(batch)) => {
-                        let timer = elapsed_compute.timer();
-                        let result = aggregate_batch(
-                            &this.mode,
-                            batch,
-                            &mut this.accumulators,
-                            &this.aggregate_expressions,
-                            &this.filter_expressions,
-                        );
+                        let result = {
+                            let elapsed_compute = 
this.baseline_metrics.elapsed_compute();
+                            let timer = elapsed_compute.timer();
+                            let result = aggregate_batch(
+                                &this.mode,
+                                batch,
+                                &mut this.accumulators,
+                                &this.aggregate_expressions,
+                                &this.filter_expressions,
+                            );
+                            timer.done();
+                            result
+                        };
 
-                        timer.done();
+                        let _ = this.maybe_update_dyn_filter();

Review Comment:
   This may ignore any error.
   Maybe:
   ```suggestion
                           let _ = this.maybe_update_dyn_filter()?;
   ```



##########
datafusion/physical-plan/src/aggregates/no_grouping.rs:
##########
@@ -53,15 +56,196 @@ pub(crate) struct AggregateStream {
 ///
 /// The latter requires a state object, which is [`AggregateStreamInner`].
 struct AggregateStreamInner {
+    // ==== Properties ====
     schema: SchemaRef,
     mode: AggregateMode,
     input: SendableRecordBatchStream,
-    baseline_metrics: BaselineMetrics,
     aggregate_expressions: Vec<Vec<Arc<dyn PhysicalExpr>>>,
     filter_expressions: Vec<Option<Arc<dyn PhysicalExpr>>>,
+
+    // ==== Runtime States/Buffers ====
     accumulators: Vec<AccumulatorItem>,
-    reservation: MemoryReservation,
+    // None if the dynamic filter is not applicable. See details in 
`AggrDynFilter`.
+    agg_dyn_filter_state: Option<Arc<AggrDynFilter>>,
     finished: bool,
+
+    // ==== Execution Resources ====
+    baseline_metrics: BaselineMetrics,
+    reservation: MemoryReservation,
+}
+
+impl AggregateStreamInner {
+    // TODO: check if we get Null handling correct
+    /// # Examples
+    /// - Example 1
+    ///   Accumulators: min(c1)
+    ///   Current Bounds: min(c1)=10
+    ///   --> dynamic filter PhysicalExpr: c1 < 10
+    ///
+    /// - Example 2
+    ///   Accumulators: min(c1), max(c1), min(c2)
+    ///   Current Bounds: min(c1)=10, max(c1)=100, min(c2)=20
+    ///   --> dynamic filter PhysicalExpr: (c1 < 10) OR (c1>100) OR (c2 < 20)
+    ///
+    /// # Errors
+    /// Returns internal errors if the dynamic filter is not enabled
+    fn build_dynamic_filter_from_accumulator_bounds(
+        &self,
+    ) -> Result<Arc<dyn PhysicalExpr>> {
+        let Some(filter_state) = self.agg_dyn_filter_state.as_ref() else {
+            return Ok(lit(true));
+        };
+
+        let mut predicates: Vec<Arc<dyn PhysicalExpr>> =
+            Vec::with_capacity(filter_state.supported_accumulators_info.len());
+
+        for acc_info in &filter_state.supported_accumulators_info {
+            // Skip if we don't yet have a meaningful bound
+            let bound = {
+                let guard = acc_info.shared_bound.lock();
+                if (*guard).is_null() {
+                    continue;
+                }
+                guard.clone()
+            };
+
+            let agg_exprs = self
+                .aggregate_expressions
+                .get(acc_info.aggr_index)
+                .ok_or_else(|| {
+                    internal_datafusion_err!(
+                        "Invalid aggregate expression index {} for dynamic 
filter",
+                        acc_info.aggr_index
+                    )
+                })?;
+            // Only aggregates with a single argument are supported.
+            let column_expr = agg_exprs.first().ok_or_else(|| {
+                internal_datafusion_err!(
+                    "Aggregate expression at index {} expected a single 
argument",
+                    acc_info.aggr_index
+                )
+            })?;
+
+            let literal = lit(bound);
+            let predicate: Arc<dyn PhysicalExpr> = match acc_info.aggr_type {
+                DynamicFilterAggregateType::Min => Arc::new(BinaryExpr::new(
+                    Arc::clone(column_expr),
+                    Operator::Lt,
+                    literal,
+                )),
+                DynamicFilterAggregateType::Max => Arc::new(BinaryExpr::new(
+                    Arc::clone(column_expr),
+                    Operator::Gt,
+                    literal,
+                )),
+            };
+            predicates.push(predicate);
+        }
+
+        let combined = predicates.into_iter().reduce(|acc, pred| {
+            Arc::new(BinaryExpr::new(acc, Operator::Or, pred)) as Arc<dyn 
PhysicalExpr>
+        });
+
+        Ok(combined.unwrap_or_else(|| lit(true)))
+    }
+
+    // If the dynamic filter is enabled, update it using the current 
accumulator's
+    // values
+    fn maybe_update_dyn_filter(&mut self) -> Result<()> {
+        // Step 1: Update each partition's current bound
+        let Some(filter_state) = self.agg_dyn_filter_state.as_ref() else {
+            return Ok(());
+        };
+
+        for acc_info in &filter_state.supported_accumulators_info {
+            let acc =
+                self.accumulators
+                    .get_mut(acc_info.aggr_index)
+                    .ok_or_else(|| {
+                        internal_datafusion_err!(
+                            "Invalid accumulator index {} for dynamic filter",
+                            acc_info.aggr_index
+                        )
+                    })?;
+            // First get current partition's bound, then update the shared 
bound among
+            // all partitions.
+            let current_bound = acc.evaluate()?;

Review Comment:
   ```suggestion
               let current_bound = acc.evaluate()?;
               if current_bound.is_null() {
                   continue;
               }
   ```
   ?!
   because it will affect the `scalar_min()` below



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to