2010YOUY01 commented on code in PR #15301:
URL: https://github.com/apache/datafusion/pull/15301#discussion_r2006853236


##########
datafusion/physical-plan/src/sorts/sort.rs:
##########
@@ -1067,35 +1067,53 @@ impl ExecutionPlan for SortExec {
     ) -> Result<SendableRecordBatchStream> {
         trace!("Start SortExec::execute for partition {} of context session_id 
{} and task_id {:?}", partition, context.session_id(), context.task_id());
 
-        let mut input = self.input.execute(partition, Arc::clone(&context))?;
-
-        let execution_options = &context.session_config().options().execution;
-
-        trace!("End SortExec's input.execute for partition: {}", partition);
-
         let sort_satisfied = self
             .input
             .equivalence_properties()
             
.ordering_satisfy_requirement(&LexRequirement::from(self.expr.clone()));
 
+        let input_exec = Arc::clone(&self.input);
+
+        let execution_options = &context.session_config().options().execution;
+
+        trace!("End SortExec's input.execute for partition: {}", partition);
+
         match (sort_satisfied, self.fetch.as_ref()) {
-            (true, Some(fetch)) => Ok(Box::pin(LimitStream::new(
-                input,
-                0,
-                Some(*fetch),
-                BaselineMetrics::new(&self.metrics_set, partition),
-            ))),
-            (true, None) => Ok(input),
+            (true, Some(fetch)) => {
+                let input = input_exec.execute(partition, 
Arc::clone(&context))?;
+                Ok(Box::pin(LimitStream::new(
+                    input,
+                    0,
+                    Some(*fetch),
+                    BaselineMetrics::new(&self.metrics_set, partition),
+                )))
+            }
+            (true, None) => self.input.execute(partition, 
Arc::clone(&context)),
             (false, Some(fetch)) => {
+                let schema = input_exec.schema();
                 let mut topk = TopK::try_new(
                     partition,
-                    input.schema(),
+                    schema,
                     self.expr.clone(),
                     *fetch,
                     context.session_config().batch_size(),
                     context.runtime_env(),
                     &self.metrics_set,
                 )?;
+                let input_exec = if context
+                    .session_config()
+                    .options()
+                    .optimizer
+                    .enable_dynamic_filter_pushdown
+                    && input_exec.supports_dynamic_filter_pushdown()
+                {
+                    input_exec
+                        
.push_down_dynamic_filter(topk.dynamic_filter_source())?

Review Comment:
   This pushdown logic seems better to be put in physical optimizing, is it 
possible to move it in the future?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to