UBarney commented on issue #16620:
URL: https://github.com/apache/datafusion/issues/16620#issuecomment-4003028557

   > use arrow::array::{ArrayRef, StringArray};
   > use arrow::datatypes::{DataType, Field, Schema};
   > use arrow::record_batch::RecordBatch;
   > use datafusion::prelude::*;
   > use parquet::arrow::ArrowWriter;
   > use parquet::file::properties::WriterProperties;
   > use rand::Rng;
   > use std::fs::File;
   > use std::sync::Arc;
   > 
   > fn generate_random_8byte_hex() -> String {
   >     let mut rng = rand::rng();
   >     let mut bytes = [0u8; 8];
   >     rng.fill(&mut bytes);
   >     hex::encode(bytes)
   > }
   > 
   > #[tokio::main]
   > async fn main() -> Result<(), Box<dyn std::error::Error>> {
   >     let schema = Arc::new(Schema::new(vec![
   >         Field::new("span_id", DataType::Utf8, false),
   >         Field::new("data", DataType::Utf8, false),
   >     ]));
   > 
   >     let temp_file = 
tempfile::Builder::new().suffix(".parquet").tempfile()?;
   >     let temp_path = temp_file.path().to_string_lossy().to_string();
   > 
   >     let file = File::create(&temp_path)?;
   >     let props = WriterProperties::builder().build();
   >     let mut writer = ArrowWriter::try_new(file, schema.clone(), 
Some(props))?;
   > 
   >     let batch_size = 10_000;
   >     let total_rows = 1_000_000;
   >     let num_batches = total_rows / batch_size;
   > 
   >     for _batch_idx in 0..num_batches {
   >         let mut span_ids = Vec::with_capacity(batch_size);
   >         let mut data_values = Vec::with_capacity(batch_size);
   > 
   >         for _ in 0..batch_size {
   >             span_ids.push(generate_random_8byte_hex());
   >             data_values.push(generate_random_8byte_hex());
   >         }
   > 
   >         let span_id_array: ArrayRef = 
Arc::new(StringArray::from(span_ids));
   >         let data_array: ArrayRef = 
Arc::new(StringArray::from(data_values));
   > 
   >         let batch = RecordBatch::try_new(schema.clone(), 
vec![span_id_array, data_array])?;
   > 
   >         writer.write(&batch)?;
   >     }
   > 
   >     writer.close()?;
   > 
   >     let ctx = SessionContext::new();
   > 
   >     ctx.register_parquet("spans", &temp_path, 
ParquetReadOptions::default())
   >         .await?;
   > 
   >     let sql = "explain analyze SELECT DISTINCT ON (span_id) span_id, data 
FROM spans ORDER BY span_id LIMIT 10";
   > 
   >     let df = ctx.sql(sql).await?;
   >     let results = df.collect().await?;
   > 
   >     for batch in results {
   >         println!("{}", 
arrow::util::pretty::pretty_format_batches(&[batch])?);
   >     }
   > 
   >     let sql2 = "explain analyze SELECT DISTINCT span_id, data FROM spans 
ORDER BY span_id LIMIT 10";
   > 
   >     let df2 = ctx.sql(sql2).await?;
   >     let results2 = df2.collect().await?;
   > 
   >     for batch in results2 {
   >         println!("{}", 
arrow::util::pretty::pretty_format_batches(&[batch])?);
   >     }
   > 
   >     Ok(())
   > }
   > ```
   >                  |     AggregateExec: mode=FinalPartitioned, 
gby=[span_id@0 as span_id, data@1 as data], aggr=[], 
metrics=[output_rows=1000000, elapsed_compute=74.551957ms, spill_count=0, 
spilled_bytes=0.0 B, spilled_rows=0, peak_mem_used=115344288]  
   > ```
   > 
   > vs
   > 
   > ```
   > AggregateExec: mode=FinalPartitioned, gby=[span_id@0 as span_id], 
aggr=[first_value(spans.span_id) ORDER BY [spans.span_id ASC NULLS LAST], 
first_value(spans.data) ORDER BY [spans.span_id ASC NULLS LAST]], 
metrics=[output_rows=1000000, elapsed_compute=28.630642585s, spill_count=0, 
spilled_bytes=0.0 B, spilled_rows=0, peak_mem_used=1170265952]   
   > ```
   
   
   
   It seems like the SQL plan generated in this code has some differences 
compared to the plan described in the issue:
   
   * In the issue description, `FIRST_VALUE` does not have an `ORDER BY` 
clause, but in this code, it does.
   
   Additionally, based on the query plan corresponding to this code, we could 
further optimize the `FIRST_VALUE` parameters:
   
   * **When the argument of `FIRST_VALUE` is only the GROUP BY key:** It can be 
simplified to just the group by key itself.
     Example: `FIRST_VALUE(span_id ORDER BY span_id) GROUP BY span_id` → 
`span_id GROUP BY span_id`
   * **When the `ORDER BY` clause contains the GROUP BY key:** Those fields can 
be removed from the `ORDER BY` clause.
     Example: `FIRST_VALUE(data ORDER BY span_id) AS data FROM spans GROUP BY 
span_id` → `FIRST_VALUE(data) AS data FROM spans GROUP BY span_id`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to