jayzhan211 commented on code in PR #11013:
URL: https://github.com/apache/datafusion/pull/11013#discussion_r1699453879


##########
datafusion/functions-aggregate/src/min_max.rs:
##########
@@ -1186,12 +1190,300 @@ impl Accumulator for SlidingMinAccumulator {
     }
 }
 
+//
+// Moving min and moving max
+// The implementation is taken from 
https://github.com/spebern/moving_min_max/blob/master/src/lib.rs.
+
+// Keep track of the minimum or maximum value in a sliding window.
+//
+// `moving min max` provides one data structure for keeping track of the
+// minimum value and one for keeping track of the maximum value in a sliding
+// window.
+//
+// Each element is stored with the current min/max. One stack to push and 
another one for pop. If pop stack is empty,
+// push to this stack all elements popped from first stack while updating 
their current min/max. Now pop from
+// the second stack (MovingMin/Max struct works as a queue). To find the 
minimum element of the queue,
+// look at the smallest/largest two elements of the individual stacks, then 
take the minimum of those two values.
+//
+// The complexity of the operations are
+// - O(1) for getting the minimum/maximum
+// - O(1) for push
+// - amortized O(1) for pop
+
+/// ```
+/// # use datafusion_functions_aggregate::min_max::MovingMin;
+/// let mut moving_min = MovingMin::<i32>::new();
+/// moving_min.push(2);
+/// moving_min.push(1);
+/// moving_min.push(3);
+///
+/// assert_eq!(moving_min.min(), Some(&1));
+/// assert_eq!(moving_min.pop(), Some(2));
+///
+/// assert_eq!(moving_min.min(), Some(&1));
+/// assert_eq!(moving_min.pop(), Some(1));
+///
+/// assert_eq!(moving_min.min(), Some(&3));
+/// assert_eq!(moving_min.pop(), Some(3));
+///
+/// assert_eq!(moving_min.min(), None);
+/// assert_eq!(moving_min.pop(), None);
+/// ```
+#[derive(Debug)]
+pub struct MovingMin<T> {
+    push_stack: Vec<(T, T)>,
+    pop_stack: Vec<(T, T)>,
+}
+
+impl<T: Clone + PartialOrd> Default for MovingMin<T> {
+    fn default() -> Self {
+        Self {
+            push_stack: Vec::new(),
+            pop_stack: Vec::new(),
+        }
+    }
+}
+
+impl<T: Clone + PartialOrd> MovingMin<T> {
+    /// Creates a new `MovingMin` to keep track of the minimum in a sliding
+    /// window.
+    #[inline]
+    pub fn new() -> Self {
+        Self::default()
+    }
+
+    /// Creates a new `MovingMin` to keep track of the minimum in a sliding
+    /// window with `capacity` allocated slots.
+    #[inline]
+    pub fn with_capacity(capacity: usize) -> Self {
+        Self {
+            push_stack: Vec::with_capacity(capacity),
+            pop_stack: Vec::with_capacity(capacity),
+        }
+    }
+
+    /// Returns the minimum of the sliding window or `None` if the window is
+    /// empty.
+    #[inline]
+    pub fn min(&self) -> Option<&T> {
+        match (self.push_stack.last(), self.pop_stack.last()) {
+            (None, None) => None,
+            (Some((_, min)), None) => Some(min),
+            (None, Some((_, min))) => Some(min),
+            (Some((_, a)), Some((_, b))) => Some(if a < b { a } else { b }),
+        }
+    }
+
+    /// Pushes a new element into the sliding window.
+    #[inline]
+    pub fn push(&mut self, val: T) {
+        self.push_stack.push(match self.push_stack.last() {
+            Some((_, min)) => {
+                if val > *min {
+                    (val, min.clone())
+                } else {
+                    (val.clone(), val)
+                }
+            }
+            None => (val.clone(), val),
+        });
+    }
+
+    /// Removes and returns the last value of the sliding window.
+    #[inline]
+    pub fn pop(&mut self) -> Option<T> {
+        if self.pop_stack.is_empty() {
+            match self.push_stack.pop() {
+                Some((val, _)) => {
+                    let mut last = (val.clone(), val);
+                    self.pop_stack.push(last.clone());
+                    while let Some((val, _)) = self.push_stack.pop() {
+                        let min = if last.1 < val {
+                            last.1.clone()
+                        } else {
+                            val.clone()
+                        };
+                        last = (val.clone(), min);
+                        self.pop_stack.push(last.clone());
+                    }
+                }
+                None => return None,
+            }
+        }
+        self.pop_stack.pop().map(|(val, _)| val)
+    }
+
+    /// Returns the number of elements stored in the sliding window.
+    #[inline]
+    pub fn len(&self) -> usize {
+        self.push_stack.len() + self.pop_stack.len()
+    }
+
+    /// Returns `true` if the moving window contains no elements.
+    #[inline]
+    pub fn is_empty(&self) -> bool {
+        self.len() == 0
+    }
+}
+/// ```
+/// # use datafusion_functions_aggregate::min_max::MovingMax;
+/// let mut moving_max = MovingMax::<i32>::new();
+/// moving_max.push(2);
+/// moving_max.push(3);
+/// moving_max.push(1);
+///
+/// assert_eq!(moving_max.max(), Some(&3));
+/// assert_eq!(moving_max.pop(), Some(2));
+///
+/// assert_eq!(moving_max.max(), Some(&3));
+/// assert_eq!(moving_max.pop(), Some(3));
+///
+/// assert_eq!(moving_max.max(), Some(&1));
+/// assert_eq!(moving_max.pop(), Some(1));
+///
+/// assert_eq!(moving_max.max(), None);
+/// assert_eq!(moving_max.pop(), None);
+/// ```
+#[derive(Debug)]
+pub struct MovingMax<T> {
+    push_stack: Vec<(T, T)>,
+    pop_stack: Vec<(T, T)>,
+}
+
+impl<T: Clone + PartialOrd> Default for MovingMax<T> {
+    fn default() -> Self {
+        Self {
+            push_stack: Vec::new(),
+            pop_stack: Vec::new(),
+        }
+    }
+}
+
+impl<T: Clone + PartialOrd> MovingMax<T> {
+    /// Creates a new `MovingMax` to keep track of the maximum in a sliding 
window.
+    #[inline]
+    pub fn new() -> Self {
+        Self::default()
+    }
+
+    /// Creates a new `MovingMax` to keep track of the maximum in a sliding 
window with
+    /// `capacity` allocated slots.
+    #[inline]
+    pub fn with_capacity(capacity: usize) -> Self {
+        Self {
+            push_stack: Vec::with_capacity(capacity),
+            pop_stack: Vec::with_capacity(capacity),
+        }
+    }
+
+    /// Returns the maximum of the sliding window or `None` if the window is 
empty.
+    #[inline]
+    pub fn max(&self) -> Option<&T> {
+        match (self.push_stack.last(), self.pop_stack.last()) {
+            (None, None) => None,
+            (Some((_, max)), None) => Some(max),
+            (None, Some((_, max))) => Some(max),
+            (Some((_, a)), Some((_, b))) => Some(if a > b { a } else { b }),
+        }
+    }
+
+    /// Pushes a new element into the sliding window.
+    #[inline]
+    pub fn push(&mut self, val: T) {
+        self.push_stack.push(match self.push_stack.last() {
+            Some((_, max)) => {
+                if val < *max {
+                    (val, max.clone())
+                } else {
+                    (val.clone(), val)
+                }
+            }
+            None => (val.clone(), val),
+        });
+    }
+
+    /// Removes and returns the last value of the sliding window.
+    #[inline]
+    pub fn pop(&mut self) -> Option<T> {
+        if self.pop_stack.is_empty() {
+            match self.push_stack.pop() {
+                Some((val, _)) => {
+                    let mut last = (val.clone(), val);
+                    self.pop_stack.push(last.clone());
+                    while let Some((val, _)) = self.push_stack.pop() {
+                        let max = if last.1 > val {
+                            last.1.clone()
+                        } else {
+                            val.clone()
+                        };
+                        last = (val.clone(), max);
+                        self.pop_stack.push(last.clone());
+                    }
+                }
+                None => return None,
+            }
+        }
+        self.pop_stack.pop().map(|(val, _)| val)
+    }
+
+    /// Returns the number of elements stored in the sliding window.
+    #[inline]
+    pub fn len(&self) -> usize {
+        self.push_stack.len() + self.pop_stack.len()
+    }
+
+    /// Returns `true` if the moving window contains no elements.
+    #[inline]
+    pub fn is_empty(&self) -> bool {
+        self.len() == 0
+    }
+}
+
+make_udaf_expr_and_func!(
+    Max,
+    max,
+    expression,
+    "Returns the maximum of a group of values.",
+    max_udaf
+);
+
+make_udaf_expr_and_func!(
+    Min,
+    min,
+    expression,
+    "Returns the minimum of a group of values.",
+    min_udaf
+);
+
+pub fn max_distinct(expr: Expr) -> Expr {
+    Expr::AggregateFunction(datafusion_expr::expr::AggregateFunction::new_udf(
+        max_udaf(),
+        vec![expr],
+        true,
+        None,
+        None,
+        None,
+    ))
+}
+
+pub fn min_distinct(expr: Expr) -> Expr {

Review Comment:
   Move this function to `single_distinct_to_groupby` and make it private



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to