2010YOUY01 commented on code in PR #15563: URL: https://github.com/apache/datafusion/pull/15563#discussion_r2031112051
########## datafusion/physical-plan/src/topk/mod.rs: ########## @@ -183,6 +209,87 @@ impl TopK { // update memory reservation self.reservation.try_resize(self.size())?; + + // flag the topK as finished if we know that all + // subsequent batches are guaranteed to be worse than the + // current topK + self.attempt_early_completion(&batch)?; + + Ok(()) + } + + /// If input ordering shares a common sort prefix with the TopK, and if the TopK's heap is full, + /// check if the computation can be finished early. + /// This is the case if the last row of the current batch is strictly worse than the worst row in the heap, + /// comparing only on the shared prefix columns. + fn attempt_early_completion(&mut self, batch: &RecordBatch) -> Result<()> { + // Early exit if the batch is empty as there is no last row to extract from it. + if batch.num_rows() == 0 { + return Ok(()); + } + + // prefix_row_converter is only `Some` if the input ordering has a common prefix with the TopK, + // so early exit if it is `None`. + let Some(prefix_converter) = &self.common_sort_prefix_converter else { + return Ok(()); + }; + + // Early exit if the heap is not full (`heap.max()` only returns `Some` if the heap is full). + let Some(worst_topk_row) = self.heap.max() else { Review Comment: nit: Maybe rename all the `worst` to `max` like `worst_topk_row` -> `max_topk_row`, to keep the naming consistent? I think in this context, the heap follows the convention of referring to the heap root as the maximum, because it uses byte order for the converted rows, and it's always min-heap regardless of query order. ########## datafusion/sqllogictest/test_files/topk.slt: ########## @@ -233,3 +233,104 @@ d 1 -98 y7C453hRWd4E7ImjNDWlpexB8nUqjh y7C453hRWd4E7ImjNDWlpexB8nUqjh e 2 52 xipQ93429ksjNcXPX5326VSg1xJZcW xipQ93429ksjNcXPX5326VSg1xJZcW d 1 -72 wwXqSGKLyBQyPkonlzBNYUJTCo4LRS wwXqSGKLyBQyPkonlzBNYUJTCo4LRS a 1 -5 waIGbOGl1PM6gnzZ4uuZt4E2yDWRHs waIGbOGl1PM6gnzZ4uuZt4E2yDWRHs + +##################################### +## Test TopK with Partially Sorted Inputs +##################################### + + +# Create an external table where data is pre-sorted by (column1 DESC, column2 ASC) only. +statement ok +CREATE EXTERNAL TABLE partial_sorted ( + column1 VARCHAR, + column2 INT, + column3 INT +) +STORED AS parquet +LOCATION 'test_files/scratch/topk/partial_sorted/1.parquet' +WITH ORDER (column1 DESC, column2 ASC); + +# Insert test data into the external table. +query I +COPY (VALUES Review Comment: I think `CREATE EXTERNAL TABLE WITH ORDER` only acts as a metadata hint: when writing data into the external table, it doesn’t enforce the specified order, instead `VALUES` here should be declared with the order specified in `CREATE EXTERNAL TABLE` statement. The test case didn't fail because the whole parquet file fits in one batch, maybe we can let test execute in a smaller batch size config: ``` set datafusion.execution.batch_size = 2; ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org