zhuqi-lucas commented on issue #16837:
URL: https://github.com/apache/datafusion/issues/16837#issuecomment-3112629819

   Updated here, if we remove UTC to None, it works well:
   
   ```rust
   use std::sync::Arc;
   use arrow::array::{Int32Array, RecordBatch, TimestampMillisecondArray};
   use arrow::util::pretty::pretty_format_batches;
   use arrow_schema::{DataType, Field, Schema, TimeUnit};
   use datafusion::{
       catalog::MemTable,
       datasource::provider_as_source,
       functions_aggregate::min_max::max,
       physical_plan::collect,
       prelude::SessionContext,
   };
   use datafusion::common::DataFusionError;
   use datafusion::logical_expr::{col, LogicalPlanBuilder, SortExpr};
   
   async fn test_aggregate_then_topk(asc: bool) -> 
datafusion::common::Result<()> {
       let schema = Arc::new(Schema::new(vec![
           Field::new(
               "ts",
               DataType::Timestamp(TimeUnit::Millisecond, None),
               false,
           ),
           Field::new("value", DataType::Int32, false),
       ]));
       let columns = vec![
           Arc::new(
               TimestampMillisecondArray::from(vec![1000, 2000, 3000]),
           ) as _,
           Arc::new(Int32Array::from(vec![1, 2, 3])) as _,
       ];
       let batch = RecordBatch::try_new(schema.clone(), 
columns).map_err(DataFusionError::from)?;
       let mem_table = MemTable::try_new(schema.clone(), vec![vec![batch]])?;
   
       let plan = LogicalPlanBuilder::scan("t", 
provider_as_source(Arc::new(mem_table)), None)?
           .aggregate(vec![col("ts")], 
vec![max(col("value")).alias("max_value")])?
           .sort_with_limit(vec![SortExpr::new(col("max_value"), asc, true)], 
Some(1))?
           .build()?;
       println!("{}", plan.display_indent());
   
       let session_state = SessionContext::new().state();
       let exec_plan = session_state.create_physical_plan(&plan).await?;
       let batches = collect(exec_plan, session_state.task_ctx()).await?;
       println!("{}", pretty_format_batches(&batches).unwrap());
   
       Ok(())
   }
   
   #[tokio::main]
   async fn main() {
       // Case 1: TableScan -> Aggregate -> Topk (asc = true)
       test_aggregate_then_topk(true).await.unwrap();
   
       // Case 2: TableScan -> Aggregate -> Topk (asc = false)
       test_aggregate_then_topk(false).await.unwrap();
   }
   
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to