2010YOUY01 commented on code in PR #15697:
URL: https://github.com/apache/datafusion/pull/15697#discussion_r2041421543


##########
datafusion/physical-plan/src/topk/mod.rs:
##########
@@ -202,27 +204,99 @@ impl TopK {
             })
             .collect::<Result<Vec<_>>>()?;
 
+        // selected indices
+        let mut selected_rows = None;
+
+        // If the heap doesn't have k elements yet, we can't create thresholds
+        if let Some(max_row) = self.heap.max() {
+            // Get the batch that contains the max row
+            let batch_entry = match self.heap.store.get(max_row.batch_id) {
+                Some(entry) => entry,
+                None => return internal_err!("Invalid batch ID in TopKRow"),
+            };
+
+            // Extract threshold values for each sort expression
+            // TODO: create a filter for each key that respects lexical 
ordering

Review Comment:
   Maybe it's too expensive to evaluate all sort keys 🤔 



##########
datafusion/physical-plan/src/topk/mod.rs:
##########
@@ -202,27 +204,99 @@ impl TopK {
             })
             .collect::<Result<Vec<_>>>()?;
 
+        // selected indices

Review Comment:
   ```suggestion
           // Selected indices in the input batch.
           // Some indices may be pre-filtered if they exceed the heap’s 
current max value.
   
   ```



##########
datafusion/physical-plan/src/topk/mod.rs:
##########
@@ -202,27 +204,99 @@ impl TopK {
             })
             .collect::<Result<Vec<_>>>()?;
 
+        // selected indices
+        let mut selected_rows = None;
+
+        // If the heap doesn't have k elements yet, we can't create thresholds
+        if let Some(max_row) = self.heap.max() {
+            // Get the batch that contains the max row
+            let batch_entry = match self.heap.store.get(max_row.batch_id) {
+                Some(entry) => entry,
+                None => return internal_err!("Invalid batch ID in TopKRow"),
+            };
+
+            // Extract threshold values for each sort expression
+            // TODO: create a filter for each key that respects lexical 
ordering
+            // in the form of col0 < threshold0 || col0 == threshold0 && (col1 
< threshold1 || ...)
+            // This could use BinaryExpr to benefit from short circuiting and 
early evaluation
+            // https://github.com/apache/datafusion/issues/15698
+            // Extract the value for this column from the max row
+            let expr = Arc::clone(&self.expr[0].expr);
+            let value = expr.evaluate(&batch_entry.batch.slice(max_row.index, 
1))?;
+
+            // Convert to scalar value - should be a single value since we're 
evaluating on a single row batch
+            let threshold = Scalar::new(value.to_array(1)?);
+
+            // Create a filter for each sort key
+            let is_multi_col = self.expr.len() > 1;
+            let filter = match (is_multi_col, self.expr[0].options.descending) 
{
+                (true, true) => gt_eq(&sort_keys[0], &threshold)?,
+                (true, false) => lt_eq(&sort_keys[0], &threshold)?,
+                (false, true) => gt(&sort_keys[0], &threshold)?,
+                (false, false) => lt(&sort_keys[0], &threshold)?,
+            };
+            if filter.true_count() == 0 {
+                // No rows are less than the max row, so we can skip this batch
+                // Early completion is still possible, as last row might be 
greater
+                self.attempt_early_completion(&batch)?;
+
+                return Ok(());
+            }
+            let filter_predicate = FilterBuilder::new(&filter);
+            let filter_predicate = if sort_keys.len() > 1 {
+                filter_predicate.optimize().build()

Review Comment:
   Could you add some comments to explain this `optimize()`? The original doc 
is not super clear I think.



##########
datafusion/physical-plan/src/topk/mod.rs:
##########
@@ -202,27 +204,99 @@ impl TopK {
             })
             .collect::<Result<Vec<_>>>()?;
 
+        // selected indices
+        let mut selected_rows = None;
+
+        // If the heap doesn't have k elements yet, we can't create thresholds
+        if let Some(max_row) = self.heap.max() {
+            // Get the batch that contains the max row
+            let batch_entry = match self.heap.store.get(max_row.batch_id) {
+                Some(entry) => entry,
+                None => return internal_err!("Invalid batch ID in TopKRow"),
+            };
+
+            // Extract threshold values for each sort expression
+            // TODO: create a filter for each key that respects lexical 
ordering
+            // in the form of col0 < threshold0 || col0 == threshold0 && (col1 
< threshold1 || ...)
+            // This could use BinaryExpr to benefit from short circuiting and 
early evaluation
+            // https://github.com/apache/datafusion/issues/15698
+            // Extract the value for this column from the max row
+            let expr = Arc::clone(&self.expr[0].expr);
+            let value = expr.evaluate(&batch_entry.batch.slice(max_row.index, 
1))?;
+
+            // Convert to scalar value - should be a single value since we're 
evaluating on a single row batch
+            let threshold = Scalar::new(value.to_array(1)?);
+
+            // Create a filter for each sort key
+            let is_multi_col = self.expr.len() > 1;
+            let filter = match (is_multi_col, self.expr[0].options.descending) 
{

Review Comment:
   How are nulls handled like `order by c1 [NULLS FIRST/LAST]`



##########
datafusion/physical-plan/src/topk/mod.rs:
##########
@@ -202,27 +204,99 @@ impl TopK {
             })
             .collect::<Result<Vec<_>>>()?;
 
+        // selected indices
+        let mut selected_rows = None;
+
+        // If the heap doesn't have k elements yet, we can't create thresholds
+        if let Some(max_row) = self.heap.max() {
+            // Get the batch that contains the max row
+            let batch_entry = match self.heap.store.get(max_row.batch_id) {
+                Some(entry) => entry,
+                None => return internal_err!("Invalid batch ID in TopKRow"),
+            };
+
+            // Extract threshold values for each sort expression
+            // TODO: create a filter for each key that respects lexical 
ordering
+            // in the form of col0 < threshold0 || col0 == threshold0 && (col1 
< threshold1 || ...)
+            // This could use BinaryExpr to benefit from short circuiting and 
early evaluation
+            // https://github.com/apache/datafusion/issues/15698
+            // Extract the value for this column from the max row
+            let expr = Arc::clone(&self.expr[0].expr);
+            let value = expr.evaluate(&batch_entry.batch.slice(max_row.index, 
1))?;
+
+            // Convert to scalar value - should be a single value since we're 
evaluating on a single row batch
+            let threshold = Scalar::new(value.to_array(1)?);
+
+            // Create a filter for each sort key
+            let is_multi_col = self.expr.len() > 1;
+            let filter = match (is_multi_col, self.expr[0].options.descending) 
{
+                (true, true) => gt_eq(&sort_keys[0], &threshold)?,
+                (true, false) => lt_eq(&sort_keys[0], &threshold)?,
+                (false, true) => gt(&sort_keys[0], &threshold)?,
+                (false, false) => lt(&sort_keys[0], &threshold)?,
+            };
+            if filter.true_count() == 0 {
+                // No rows are less than the max row, so we can skip this batch
+                // Early completion is still possible, as last row might be 
greater
+                self.attempt_early_completion(&batch)?;
+
+                return Ok(());
+            }
+            let filter_predicate = FilterBuilder::new(&filter);
+            let filter_predicate = if sort_keys.len() > 1 {
+                filter_predicate.optimize().build()
+            } else {
+                filter_predicate.build()
+            };
+            selected_rows = Some(filter);
+
+            sort_keys = sort_keys
+                .iter()
+                .map(|key| filter_predicate.filter(key).map_err(|x| x.into()))
+                .collect::<Result<Vec<_>>>()?;
+        }
+
         // reuse existing `Rows` to avoid reallocations
         let rows = &mut self.scratch_rows;
         rows.clear();
         self.row_converter.append(rows, &sort_keys)?;
 
-        // TODO make this algorithmically better?:
-        // Idea: filter out rows >= self.heap.max() early (before passing to 
`RowConverter`)
-        //       this avoids some work and also might be better vectorizable.
         let mut batch_entry = self.heap.register_batch(batch.clone());
-        for (index, row) in rows.iter().enumerate() {
-            match self.heap.max() {
-                // heap has k items, and the new row is greater than the
-                // current max in the heap ==> it is not a new topk
-                Some(max_row) if row.as_ref() >= max_row.row() => {}
-                // don't yet have k items or new item is lower than the 
currently k low values
-                None | Some(_) => {
-                    self.heap.add(&mut batch_entry, row, index);
-                    self.metrics.row_replacements.add(1);
+
+        let mut replacements = 0;
+
+        match selected_rows {
+            Some(filter) => {
+                for (index, row) in 
filter.values().set_indices().zip(rows.iter()) {
+                    match self.heap.max() {
+                        // heap has k items, and the new row is greater than 
the

Review Comment:
   It seems inner code can be reused for two matching branches



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to