daphnenhuch-at opened a new issue, #15833: URL: https://github.com/apache/datafusion/issues/15833
### Describe the bug I have a query which sorts the data by a column called "userPrimaryKey" and then using a windowing function to add a row number column to the data frame. I've set `target_partitions` to 8 to have a relatively efficient use of parallelism. However, when the query comes back, round robin partitioning is causing the record batches to be returned in an unsorted order. I have 10,000 records and the result has records 8192 through 9999 followed by 0 through 8191. I would expect the result to be fully sorted. This is fixed by setting `datafusion.optimizer.enable_round_robin_repartition` to `false`. But I think the query plan is making the wrong decision to Repartition the data after the Bounded window function. I've included a test which reproduces this bug as well as the query plan produced by the query. ### To Reproduce Run the following test ``` #[cfg(test)] mod tests { use deltalake::arrow::array::Int32Array; use deltalake::datafusion::prelude::SessionContext; use rand::distributions::Alphanumeric; use rand::Rng; use std::sync::Arc; use deltalake::datafusion::logical_expr::{ident, lit, ExprSchemable}; use deltalake::{ arrow::datatypes::DataType as ArrowDataType, arrow::{ array::{BooleanArray, Int64Array, RecordBatch, StringArray}, datatypes::{DataType, Field, Schema}, }, datafusion::{ datasource::MemTable, functions_window::expr_fn::row_number, logical_expr::{expr::WildcardOptions, utils::expand_wildcard}, sql::sqlparser::ast::ExcludeSelectItem, }, }; use napi::anyhow::Result; fn generate_random_id() -> String { rand::thread_rng() .sample_iter(&Alphanumeric) .take(10) .map(char::from) .collect() } fn get_session_context() -> SessionContext { use std::sync::Arc; use deltalake::datafusion::{ execution::{ disk_manager::DiskManagerConfig, memory_pool::FairSpillPool, runtime_env::RuntimeEnvBuilder, }, prelude::{SessionConfig, SessionContext}, }; let memory_pool_size_in_bytes = 100000000; let memory_pool = FairSpillPool::new(memory_pool_size_in_bytes); let runtime = Arc::new( RuntimeEnvBuilder::new() // We disable disk manager to avoid potentially writing temporary files to disk, which // would violate conditions of EKM .with_disk_manager(DiskManagerConfig::Disabled) .with_memory_pool(Arc::new(memory_pool)) .build() .unwrap(), ); SessionContext::new_with_config_rt( SessionConfig::default() .set_bool("datafusion.execution.parquet.pushdown_filters", true) .set_bool("datafusion.execution.parquet.reorder_filters", true) .set_usize("datafusion.execution.target_partitions", 2) .set_bool("datafusion.execution.keep_partition_by_columns", false) .set_usize("datafusion.execution.parquet.metadata_size_hint", 512 << 10), runtime, ) } fn create_record_batch(schema: Arc<Schema>, start: i32, end: i32) -> RecordBatch { let mut column_1_values = vec![]; let mut column_2_values = vec![]; let mut primary_keys = vec![]; let mut ids = vec![]; let mut last_update_times = vec![]; let mut is_deleted_values = vec![]; for i in start..end { column_1_values.push(format!("{:04}", i)); column_2_values.push("a"); primary_keys.push(format!("{:04}", i)); ids.push(generate_random_id()); last_update_times.push(chrono::Utc::now().timestamp_millis()); is_deleted_values.push(false); } let batch = RecordBatch::try_new( schema.clone(), vec![ Arc::new(StringArray::from(column_1_values)), Arc::new(StringArray::from(column_2_values)), Arc::new(StringArray::from(primary_keys)), Arc::new(StringArray::from(ids)), Arc::new(Int64Array::from(last_update_times)), Arc::new(BooleanArray::from(is_deleted_values)), ], ) .unwrap(); return batch; } #[tokio::test(flavor = "multi_thread")] async fn simple_reproduction() -> Result<()> { let schema = Arc::new(Schema::new(vec![ Field::new("1", DataType::Utf8, true), Field::new("2", DataType::Utf8, true), Field::new("userPrimaryKey", DataType::Utf8, false), Field::new("id", DataType::Utf8, false), Field::new("lastUpdateTime", DataType::Int64, false), Field::new("isDeleted", DataType::Boolean, false), ])); let mut batches: Vec<Vec<RecordBatch>> = vec![]; let mut batch: RecordBatch = create_record_batch(schema.clone(), 0, 8192); batches.push(vec![batch]); let mut batch: RecordBatch = create_record_batch(schema.clone(), 8192, 10000); batches.push(vec![batch]); dbg!(&batches); let table = MemTable::try_new(schema.clone(), batches).unwrap(); let ctx = get_session_context(); ctx.register_table("my_table", Arc::new(table)).unwrap(); let row_number_sub_query = ctx .table("my_table") .await? .sort(vec![ident("userPrimaryKey").sort(true, true)])? .window(vec![row_number().alias("dataFusionRowNumber")])?; let column_names_to_exclude = vec!["dataFusionRowNumber".into()]; let mut columns_to_select_exprs = expand_wildcard( row_number_sub_query.schema(), row_number_sub_query.logical_plan(), Some(&WildcardOptions { ilike: None, exclude: Some(ExcludeSelectItem::Multiple(column_names_to_exclude)), except: None, replace: None, rename: None, }), )?; columns_to_select_exprs.push( (ident("dataFusionRowNumber") - lit(1)) .cast_to(&ArrowDataType::Int32, row_number_sub_query.schema())? .alias_qualified(Some("my_table"), "fileRowNumber"), ); let result = row_number_sub_query.select(columns_to_select_exprs)?; let batches = result.collect().await?; let mut counter = 0; for batch in batches { let primary_key_column = batch .column_by_name("userPrimaryKey") .unwrap() .as_any() .downcast_ref::<StringArray>() .unwrap(); let file_row_number_column = batch .column_by_name("fileRowNumber") .unwrap() .as_any() .downcast_ref::<Int32Array>() .unwrap(); for i in 0..batch.num_rows() { let user_primary_key = primary_key_column.value(i).to_string(); let file_row_number = file_row_number_column.value(i) as i32; assert_eq!(file_row_number, counter); assert_eq!(user_primary_key, format!("{:04}", counter)); counter += 1; } } Ok(()) } } ``` ### Expected behavior I expect the data to come back fully sorted from 0000 through 9999 with the corresponding row number. ### Additional context This is the query plan produced. I think that we should either not be repartitioning the data or should use a `SoftPreservingMergeExec` such that the streams come back in sorted order. ``` CoalescePartitionsExec { input: ProjectionExec { expr: [ ( Column { name: "1", index: 0, }, "1", ), ( Column { name: "2", index: 1, }, "2", ), ( Column { name: "userPrimaryKey", index: 2, }, "userPrimaryKey", ), ( Column { name: "id", index: 3, }, "id", ), ( Column { name: "lastUpdateTime", index: 4, }, "lastUpdateTime", ), ( Column { name: "isDeleted", index: 5, }, "isDeleted", ), ( BinaryExpr { left: CastExpr { expr: Column { name: "dataFusionRowNumber", index: 6, }, cast_type: Int32, cast_options: CastOptions { safe: false, format_options: FormatOptions { safe: true, null: "", date_format: None, datetime_format: None, timestamp_format: None, timestamp_tz_format: None, time_format: None, duration_format: Pretty, }, }, }, op: Minus, right: Literal { value: Int32(1), }, fail_on_overflow: false, }, "fileRowNumber", ), ], schema: Schema { fields: [ Field { name: "1", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {}, }, Field { name: "2", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {}, }, Field { name: "userPrimaryKey", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {}, }, Field { name: "id", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {}, }, Field { name: "lastUpdateTime", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {}, }, Field { name: "isDeleted", data_type: Boolean, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {}, }, Field { name: "fileRowNumber", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {}, }, ], metadata: {}, }, input: RepartitionExec { input: BoundedWindowAggExec { input: SortExec { input: DataSourceExec { data_source: FileScanConfig {object_store_url=ObjectStoreUrl { url: Url { scheme: "s3", cannot_be_a_base: false, username: "", password: None, host: Some(Domain("airtable-enterprise-data-tables-us-west-2-development")), port: None, path: "/", query: None, fragment: None } }, statistics=Statistics { num_rows: Exact(10000), total_byte_size: Exact(521607), column_statistics: [ColumnStatistics { null_count: Exact(0), max_value: Exact(Utf8("9999")), min_value: Exact(Utf8("0000")), sum_value: Absent, distinct_count: Absent }, ColumnStatistics { null_count: Exact(0), max_value: Exact(Utf8("a")), min_value: Exact(Utf8("a")), sum_value: Absent, distinct_count: Absent }, ColumnStatistics { null_count: Exact(0), max_value: Exact(Utf8("9999")), min_value: Exact(Utf8("0000")), sum_value: Absent, distinct_count: Absent }, ColumnStatistics { null_count: Exact(0), max_value: Exact(Utf8("dtr00000009999541")), min_value: Exact(Utf8("dtr00000000000175")), sum_value: Absent, d istinct_count: Absent }, ColumnStatistics { null_count: Exact(0), max_value: Exact(Int64(10000)), min_value: Exact(Int64(1)), sum_value: Absent, distinct_count: Absent }, ColumnStatistics { null_count: Exact(0), max_value: Exact(Boolean(false)), min_value: Exact(Boolean(false)), sum_value: Absent, distinct_count: Absent }, ColumnStatistics { null_count: Exact(10000), max_value: Exact(Int32(NULL)), min_value: Exact(Int32(NULL)), sum_value: Absent, distinct_count: Absent }] }, file_groups={1 group: [[entRDQybgmzURTXUd/edtcuTVRzNDkbys5t/optimize_42QnfFpBs8/2za67O8nb1.zstd.parquet/partitioning_column=0/XtwrzQJVtMmjo88H.parquet]]}, projection=[1, 2, userPrimaryKey, id, lastUpdateTime, isDeleted]}, cache: PlanProperties { eq_properties: EquivalenceProperties { eq_group: EquivalenceGroup { classes: [], }, oeq_class: OrderingEquivalenceClass { orderings: [], }, constants: [], constraints: Constraints { inner: [], }, schema: Schema { fields: [ Field { name: "1", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {}, }, Field { name: "2", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {}, }, Field { name: "userPrimaryKey", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {}, }, Field { name: "id", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {}, }, Field { name: "lastUpdateTime", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {}, }, Field { name: "isDeleted", data_type: Boolean, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {}, }, ], metadata: {}, }, }, partitioning: UnknownPartitioning( 1, ), emission_type: Incremental, boundedness: Bounded, output_ordering: None, }, }, expr: LexOrdering { inner: [ PhysicalSortExpr { expr: Column { name: "userPrimaryKey", index: 2, }, options: SortOptions { descending: false, nulls_first: true, }, }, ], }, metrics_set: ExecutionPlanMetricsSet { inner: Mutex { data: MetricsSet { metrics: [], }, }, }, preserve_partitioning: false, fetch: None, cache: PlanProperties { eq_properties: EquivalenceProperties { eq_group: EquivalenceGroup { classes: [], }, oeq_class: OrderingEquivalenceClass { orderings: [ LexOrdering { inner: [ PhysicalSortExpr { expr: Column { name: "userPrimaryKey", index: 2, }, options: SortOptions { descending: false, nulls_first: true, }, }, ], }, ], }, constants: [], constraints: Constraints { inner: [], }, schema: Schema { fields: [ Field { name: "1", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {}, }, Field { name: "2", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {}, }, Field { name: "userPrimaryKey", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {}, }, Field { name: "id", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {}, }, Field { name: "lastUpdateTime", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {}, }, Field { name: "isDeleted", data_type: Boolean, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {}, }, ], metadata: {}, }, }, partitioning: UnknownPartitioning( 1, ), emission_type: Final, boundedness: Bounded, output_ordering: Some( LexOrdering { inner: [ PhysicalSortExpr { expr: Column { name: "userPrimaryKey", index: 2, }, options: SortOptions { descending: false, nulls_first: true, }, }, ], }, ), }, }, window_expr: [ StandardWindowExpr { expr: WindowUDFExpr { fun: WindowUDF { inner: RowNumber { signature: Signature { type_signature: Nullary, volatility: Immutable, }, }, }, args: [], name: "dataFusionRowNumber", input_types: [], is_reversed: false, ignore_nulls: false, }, partition_by: [], order_by: LexOrdering { inner: [], }, window_frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)), is_causal: false }, }, ], schema: Schema { fields: [ Field { name: "1", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {}, }, Field { name: "2", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {}, }, Field { name: "userPrimaryKey", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {}, }, Field { name: "id", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {}, }, Field { name: "lastUpdateTime", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {}, }, Field { name: "isDeleted", data_type: Boolean, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {}, }, Field { name: "dataFusionRowNumber", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {}, }, ], metadata: {}, }, metrics: ExecutionPlanMetricsSet { inner: Mutex { data: MetricsSet { metrics: [], }, }, }, input_order_mode: Sorted, ordered_partition_by_indices: [], cache: PlanProperties { eq_properties: EquivalenceProperties { eq_group: EquivalenceGroup { classes: [], }, oeq_class: OrderingEquivalenceClass { orderings: [ LexOrdering { inner: [ PhysicalSortExpr { expr: Column { name: "userPrimaryKey", index: 2, }, options: SortOptions { descending: false, nulls_first: true, }, }, ], }, LexOrdering { inner: [ PhysicalSortExpr { expr: Column { name: "dataFusionRowNumber", index: 6, }, options: SortOptions { descending: false, nulls_first: false, }, }, ], }, ], }, constants: [], constraints: Constraints { inner: [], }, schema: Schema { fields: [ Field { name: "1", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {}, }, Field { name: "2", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {}, }, Field { name: "userPrimaryKey", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {}, }, Field { name: "id", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {}, }, Field { name: "lastUpdateTime", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {}, }, Field { name: "isDeleted", data_type: Boolean, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {}, }, Field { name: "dataFusionRowNumber", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {}, }, ], metadata: {}, }, }, partitioning: UnknownPartitioning( 1, ), emission_type: Final, boundedness: Bounded, output_ordering: Some( LexOrdering { inner: [ PhysicalSortExpr { expr: Column { name: "userPrimaryKey", index: 2, }, options: SortOptions { descending: false, nulls_first: true, }, }, PhysicalSortExpr { expr: Column { name: "dataFusionRowNumber", index: 6, }, options: SortOptions { descending: false, nulls_first: false, }, }, ], }, ), }, can_repartition: false, }, state: OnceCell { value: None, }, metrics: ExecutionPlanMetricsSet { inner: Mutex { data: MetricsSet { metrics: [], }, }, }, preserve_order: false, cache: PlanProperties { eq_properties: EquivalenceProperties { eq_group: EquivalenceGroup { classes: [], }, oeq_class: OrderingEquivalenceClass { orderings: [ LexOrdering { inner: [ PhysicalSortExpr { expr: Column { name: "userPrimaryKey", index: 2, }, options: SortOptions { descending: false, nulls_first: true, }, }, ], }, LexOrdering { inner: [ PhysicalSortExpr { expr: Column { name: "dataFusionRowNumber", index: 6, }, options: SortOptions { descending: false, nulls_first: false, }, }, ], }, ], }, constants: [], constraints: Constraints { inner: [], }, schema: Schema { fields: [ Field { name: "1", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {}, }, Field { name: "2", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {}, }, Field { name: "userPrimaryKey", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {}, }, Field { name: "id", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {}, }, Field { name: "lastUpdateTime", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {}, }, Field { name: "isDeleted", data_type: Boolean, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {}, }, Field { name: "dataFusionRowNumber", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {}, }, ], metadata: {}, }, }, partitioning: RoundRobinBatch( 8, ), emission_type: Final, boundedness: Bounded, output_ordering: Some( LexOrdering { inner: [ PhysicalSortExpr { expr: Column { name: "userPrimaryKey", index: 2, }, options: SortOptions { descending: false, nulls_first: true, }, }, PhysicalSortExpr { expr: Column { name: "dataFusionRowNumber", index: 6, }, options: SortOptions { descending: false, nulls_first: false, }, }, ], }, ), }, }, metrics: ExecutionPlanMetricsSet { inner: Mutex { data: MetricsSet { metrics: [], }, }, }, cache: PlanProperties { eq_properties: EquivalenceProperties { eq_group: EquivalenceGroup { classes: [], }, oeq_class: OrderingEquivalenceClass { orderings: [ LexOrdering { inner: [ PhysicalSortExpr { expr: Column { name: "userPrimaryKey", index: 2, }, options: SortOptions { descending: false, nulls_first: true, }, }, ], }, LexOrdering { inner: [ PhysicalSortExpr { expr: Column { name: "fileRowNumber", index: 6, }, options: SortOptions { descending: false, nulls_first: false, }, }, ], }, ], }, constants: [], constraints: Constraints { inner: [], }, schema: Schema { fields: [ Field { name: "1", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {}, }, Field { name: "2", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {}, }, Field { name: "userPrimaryKey", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {}, }, Field { name: "id", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {}, }, Field { name: "lastUpdateTime", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {}, }, Field { name: "isDeleted", data_type: Boolean, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {}, }, Field { name: "fileRowNumber", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {}, }, ], metadata: {}, }, }, partitioning: RoundRobinBatch( 8, ), emission_type: Final, boundedness: Bounded, output_ordering: Some( LexOrdering { inner: [ PhysicalSortExpr { expr: Column { name: "userPrimaryKey", index: 2, }, options: SortOptions { descending: false, nulls_first: true, }, }, PhysicalSortExpr { expr: Column { name: "fileRowNumber", index: 6, }, options: SortOptions { descending: false, nulls_first: false, }, }, ], }, ), }, }, metrics: ExecutionPlanMetricsSet { inner: Mutex { data: MetricsSet { metrics: [], }, }, }, cache: PlanProperties { eq_properties: EquivalenceProperties { eq_group: EquivalenceGroup { classes: [], }, oeq_class: OrderingEquivalenceClass { orderings: [], }, constants: [], constraints: Constraints { inner: [], }, schema: Schema { fields: [ Field { name: "1", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {}, }, Field { name: "2", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {}, }, Field { name: "userPrimaryKey", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {}, }, Field { name: "id", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {}, }, Field { name: "lastUpdateTime", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {}, }, Field { name: "isDeleted", data_type: Boolean, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {}, }, Field { name: "fileRowNumber", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {}, }, ], metadata: {}, }, }, partitioning: UnknownPartitioning( 1, ), emission_type: Final, boundedness: Bounded, output_ordering: None, }, fetch: None, } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org