daphnenhuch-at opened a new issue, #15833:
URL: https://github.com/apache/datafusion/issues/15833

   ### Describe the bug
   
   I have a query which sorts the data by a column called "userPrimaryKey" and 
then using a windowing function to add a row number column to the data frame. 
I've set `target_partitions` to 8 to have a relatively efficient use of 
parallelism. However, when the query comes back, round robin partitioning is 
causing the record batches to be returned in an unsorted order. I have 10,000 
records and the result has records 8192 through 9999 followed by 0 through 
8191. I would expect the result to be fully sorted. This is fixed by setting 
`datafusion.optimizer.enable_round_robin_repartition` to `false`. But I think 
the query plan is making the wrong decision to Repartition the data after the 
Bounded window function. 
   
   I've included a test which reproduces this bug as well as the query plan 
produced by the query.
   
   ### To Reproduce
   
   Run the following test 
   
   ```
   #[cfg(test)]
   mod tests {
       use deltalake::arrow::array::Int32Array;
       use deltalake::datafusion::prelude::SessionContext;
       use rand::distributions::Alphanumeric;
       use rand::Rng;
       use std::sync::Arc;
   
       use deltalake::datafusion::logical_expr::{ident, lit, ExprSchemable};
       use deltalake::{
           arrow::datatypes::DataType as ArrowDataType,
           arrow::{
               array::{BooleanArray, Int64Array, RecordBatch, StringArray},
               datatypes::{DataType, Field, Schema},
           },
           datafusion::{
               datasource::MemTable,
               functions_window::expr_fn::row_number,
               logical_expr::{expr::WildcardOptions, utils::expand_wildcard},
               sql::sqlparser::ast::ExcludeSelectItem,
           },
       };
       use napi::anyhow::Result;
   
       fn generate_random_id() -> String {
           rand::thread_rng()
               .sample_iter(&Alphanumeric)
               .take(10)
               .map(char::from)
               .collect()
       }
   
       fn get_session_context() -> SessionContext {
           use std::sync::Arc;
   
           use deltalake::datafusion::{
               execution::{
                   disk_manager::DiskManagerConfig, memory_pool::FairSpillPool,
                   runtime_env::RuntimeEnvBuilder,
               },
               prelude::{SessionConfig, SessionContext},
           };
   
           let memory_pool_size_in_bytes = 100000000;
           let memory_pool = FairSpillPool::new(memory_pool_size_in_bytes);
           let runtime = Arc::new(
               RuntimeEnvBuilder::new()
                   // We disable disk manager to avoid potentially writing 
temporary files to disk, which
                   // would violate conditions of EKM
                   .with_disk_manager(DiskManagerConfig::Disabled)
                   .with_memory_pool(Arc::new(memory_pool))
                   .build()
                   .unwrap(),
           );
           SessionContext::new_with_config_rt(
               SessionConfig::default()
                   .set_bool("datafusion.execution.parquet.pushdown_filters", 
true)
                   .set_bool("datafusion.execution.parquet.reorder_filters", 
true)
                   .set_usize("datafusion.execution.target_partitions", 2)
                   .set_bool("datafusion.execution.keep_partition_by_columns", 
false)
                   
.set_usize("datafusion.execution.parquet.metadata_size_hint", 512 << 10),
               runtime,
           )
       }
   
       fn create_record_batch(schema: Arc<Schema>, start: i32, end: i32) -> 
RecordBatch {
           let mut column_1_values = vec![];
           let mut column_2_values = vec![];
           let mut primary_keys = vec![];
           let mut ids = vec![];
           let mut last_update_times = vec![];
           let mut is_deleted_values = vec![];
   
           for i in start..end {
               column_1_values.push(format!("{:04}", i));
               column_2_values.push("a");
               primary_keys.push(format!("{:04}", i));
               ids.push(generate_random_id());
               last_update_times.push(chrono::Utc::now().timestamp_millis());
               is_deleted_values.push(false);
           }
           let batch = RecordBatch::try_new(
               schema.clone(),
               vec![
                   Arc::new(StringArray::from(column_1_values)),
                   Arc::new(StringArray::from(column_2_values)),
                   Arc::new(StringArray::from(primary_keys)),
                   Arc::new(StringArray::from(ids)),
                   Arc::new(Int64Array::from(last_update_times)),
                   Arc::new(BooleanArray::from(is_deleted_values)),
               ],
           )
           .unwrap();
           return batch;
       }
   
       #[tokio::test(flavor = "multi_thread")]
       async fn simple_reproduction() -> Result<()> {
           let schema = Arc::new(Schema::new(vec![
               Field::new("1", DataType::Utf8, true),
               Field::new("2", DataType::Utf8, true),
               Field::new("userPrimaryKey", DataType::Utf8, false),
               Field::new("id", DataType::Utf8, false),
               Field::new("lastUpdateTime", DataType::Int64, false),
               Field::new("isDeleted", DataType::Boolean, false),
           ]));
   
           let mut batches: Vec<Vec<RecordBatch>> = vec![];
   
           let mut batch: RecordBatch = create_record_batch(schema.clone(), 0, 
8192);
           batches.push(vec![batch]);
   
           let mut batch: RecordBatch = create_record_batch(schema.clone(), 
8192, 10000);
           batches.push(vec![batch]);
           dbg!(&batches);
   
           let table = MemTable::try_new(schema.clone(), batches).unwrap();
           let ctx = get_session_context();
           ctx.register_table("my_table", Arc::new(table)).unwrap();
   
           let row_number_sub_query = ctx
               .table("my_table")
               .await?
               .sort(vec![ident("userPrimaryKey").sort(true, true)])?
               .window(vec![row_number().alias("dataFusionRowNumber")])?;
   
           let column_names_to_exclude = vec!["dataFusionRowNumber".into()];
           let mut columns_to_select_exprs = expand_wildcard(
               row_number_sub_query.schema(),
               row_number_sub_query.logical_plan(),
               Some(&WildcardOptions {
                   ilike: None,
                   exclude: 
Some(ExcludeSelectItem::Multiple(column_names_to_exclude)),
                   except: None,
                   replace: None,
                   rename: None,
               }),
           )?;
   
           columns_to_select_exprs.push(
               (ident("dataFusionRowNumber") - lit(1))
                   .cast_to(&ArrowDataType::Int32, 
row_number_sub_query.schema())?
                   .alias_qualified(Some("my_table"), "fileRowNumber"),
           );
           let result = row_number_sub_query.select(columns_to_select_exprs)?;
           let batches = result.collect().await?;
           let mut counter = 0;
           for batch in batches {
               let primary_key_column = batch
                   .column_by_name("userPrimaryKey")
                   .unwrap()
                   .as_any()
                   .downcast_ref::<StringArray>()
                   .unwrap();
               let file_row_number_column = batch
                   .column_by_name("fileRowNumber")
                   .unwrap()
                   .as_any()
                   .downcast_ref::<Int32Array>()
                   .unwrap();
               for i in 0..batch.num_rows() {
                   let user_primary_key = 
primary_key_column.value(i).to_string();
                   let file_row_number = file_row_number_column.value(i) as i32;
                   assert_eq!(file_row_number, counter);
                   assert_eq!(user_primary_key, format!("{:04}", counter));
                   counter += 1;
               }
           }
   
           Ok(())
       }
   }
   ```
   
   ### Expected behavior
   
   I expect the data to come back fully sorted from 0000 through 9999 with the 
corresponding row number.
   
   ### Additional context
   
   This is the query plan produced. I think that we should either not be 
repartitioning the data or should use a `SoftPreservingMergeExec` such that the 
streams come back in sorted order.
   
   ```
   CoalescePartitionsExec {
       input: ProjectionExec {
           expr: [
               (
                   Column {
                       name: "1",
                       index: 0,
                   },
                   "1",
               ),
               (
                   Column {
                       name: "2",
                       index: 1,
                   },
                   "2",
               ),
               (
                   Column {
                       name: "userPrimaryKey",
                       index: 2,
                   },
                   "userPrimaryKey",
               ),
               (
                   Column {
                       name: "id",
                       index: 3,
                   },
                   "id",
               ),
               (
                   Column {
                       name: "lastUpdateTime",
                       index: 4,
                   },
                   "lastUpdateTime",
               ),
               (
                   Column {
                       name: "isDeleted",
                       index: 5,
                   },
                   "isDeleted",
               ),
               (
                   BinaryExpr {
                       left: CastExpr {
                           expr: Column {
                               name: "dataFusionRowNumber",
                               index: 6,
                           },
                           cast_type: Int32,
                           cast_options: CastOptions {
                               safe: false,
                               format_options: FormatOptions {
                                   safe: true,
                                   null: "",
                                   date_format: None,
                                   datetime_format: None,
                                   timestamp_format: None,
                                   timestamp_tz_format: None,
                                   time_format: None,
                                   duration_format: Pretty,
                               },
                           },
                       },
                       op: Minus,
                       right: Literal {
                           value: Int32(1),
                       },
                       fail_on_overflow: false,
                   },
                   "fileRowNumber",
               ),
           ],
           schema: Schema {
               fields: [
                   Field {
                       name: "1",
                       data_type: Utf8,
                       nullable: true,
                       dict_id: 0,
                       dict_is_ordered: false,
                       metadata: {},
                   },
                   Field {
                       name: "2",
                       data_type: Utf8,
                       nullable: true,
                       dict_id: 0,
                       dict_is_ordered: false,
                       metadata: {},
                   },
                   Field {
                       name: "userPrimaryKey",
                       data_type: Utf8,
                       nullable: false,
                       dict_id: 0,
                       dict_is_ordered: false,
                       metadata: {},
                   },
                   Field {
                       name: "id",
                       data_type: Utf8,
                       nullable: false,
                       dict_id: 0,
                       dict_is_ordered: false,
                       metadata: {},
                   },
                   Field {
                       name: "lastUpdateTime",
                       data_type: Int64,
                       nullable: false,
                       dict_id: 0,
                       dict_is_ordered: false,
                       metadata: {},
                   },
                   Field {
                       name: "isDeleted",
                       data_type: Boolean,
                       nullable: false,
                       dict_id: 0,
                       dict_is_ordered: false,
                       metadata: {},
                   },
                   Field {
                       name: "fileRowNumber",
                       data_type: Int32,
                       nullable: false,
                       dict_id: 0,
                       dict_is_ordered: false,
                       metadata: {},
                   },
               ],
               metadata: {},
           },
           input: RepartitionExec {
               input: BoundedWindowAggExec {
                   input: SortExec {
                       input: DataSourceExec {
                           data_source: FileScanConfig 
{object_store_url=ObjectStoreUrl { url: Url { scheme: "s3", cannot_be_a_base: 
false, username: "", password: None, host: 
Some(Domain("airtable-enterprise-data-tables-us-west-2-development")), port: 
None, path: "/", query: None, fragment: None } }, statistics=Statistics { 
num_rows: Exact(10000), total_byte_size: Exact(521607), column_statistics: 
[ColumnStatistics { null_count: Exact(0), max_value: Exact(Utf8("9999")), 
min_value: Exact(Utf8("0000")), sum_value: Absent, distinct_count: Absent }, 
ColumnStatistics { null_count: Exact(0), max_value: Exact(Utf8("a")), 
min_value: Exact(Utf8("a")), sum_value: Absent, distinct_count: Absent }, 
ColumnStatistics { null_count: Exact(0), max_value: Exact(Utf8("9999")), 
min_value: Exact(Utf8("0000")), sum_value: Absent, distinct_count: Absent }, 
ColumnStatistics { null_count: Exact(0), max_value: 
Exact(Utf8("dtr00000009999541")), min_value: Exact(Utf8("dtr00000000000175")), 
sum_value: Absent, d
 istinct_count: Absent }, ColumnStatistics { null_count: Exact(0), max_value: 
Exact(Int64(10000)), min_value: Exact(Int64(1)), sum_value: Absent, 
distinct_count: Absent }, ColumnStatistics { null_count: Exact(0), max_value: 
Exact(Boolean(false)), min_value: Exact(Boolean(false)), sum_value: Absent, 
distinct_count: Absent }, ColumnStatistics { null_count: Exact(10000), 
max_value: Exact(Int32(NULL)), min_value: Exact(Int32(NULL)), sum_value: 
Absent, distinct_count: Absent }] }, file_groups={1 group: 
[[entRDQybgmzURTXUd/edtcuTVRzNDkbys5t/optimize_42QnfFpBs8/2za67O8nb1.zstd.parquet/partitioning_column=0/XtwrzQJVtMmjo88H.parquet]]},
 projection=[1, 2, userPrimaryKey, id, lastUpdateTime, isDeleted]},
                           cache: PlanProperties {
                               eq_properties: EquivalenceProperties {
                                   eq_group: EquivalenceGroup {
                                       classes: [],
                                   },
                                   oeq_class: OrderingEquivalenceClass {
                                       orderings: [],
                                   },
                                   constants: [],
                                   constraints: Constraints {
                                       inner: [],
                                   },
                                   schema: Schema {
                                       fields: [
                                           Field {
                                               name: "1",
                                               data_type: Utf8,
                                               nullable: true,
                                               dict_id: 0,
                                               dict_is_ordered: false,
                                               metadata: {},
                                           },
                                           Field {
                                               name: "2",
                                               data_type: Utf8,
                                               nullable: true,
                                               dict_id: 0,
                                               dict_is_ordered: false,
                                               metadata: {},
                                           },
                                           Field {
                                               name: "userPrimaryKey",
                                               data_type: Utf8,
                                               nullable: false,
                                               dict_id: 0,
                                               dict_is_ordered: false,
                                               metadata: {},
                                           },
                                           Field {
                                               name: "id",
                                               data_type: Utf8,
                                               nullable: false,
                                               dict_id: 0,
                                               dict_is_ordered: false,
                                               metadata: {},
                                           },
                                           Field {
                                               name: "lastUpdateTime",
                                               data_type: Int64,
                                               nullable: false,
                                               dict_id: 0,
                                               dict_is_ordered: false,
                                               metadata: {},
                                           },
                                           Field {
                                               name: "isDeleted",
                                               data_type: Boolean,
                                               nullable: false,
                                               dict_id: 0,
                                               dict_is_ordered: false,
                                               metadata: {},
                                           },
                                       ],
                                       metadata: {},
                                   },
                               },
                               partitioning: UnknownPartitioning(
                                   1,
                               ),
                               emission_type: Incremental,
                               boundedness: Bounded,
                               output_ordering: None,
                           },
                       },
                       expr: LexOrdering {
                           inner: [
                               PhysicalSortExpr {
                                   expr: Column {
                                       name: "userPrimaryKey",
                                       index: 2,
                                   },
                                   options: SortOptions {
                                       descending: false,
                                       nulls_first: true,
                                   },
                               },
                           ],
                       },
                       metrics_set: ExecutionPlanMetricsSet {
                           inner: Mutex {
                               data: MetricsSet {
                                   metrics: [],
                               },
                           },
                       },
                       preserve_partitioning: false,
                       fetch: None,
                       cache: PlanProperties {
                           eq_properties: EquivalenceProperties {
                               eq_group: EquivalenceGroup {
                                   classes: [],
                               },
                               oeq_class: OrderingEquivalenceClass {
                                   orderings: [
                                       LexOrdering {
                                           inner: [
                                               PhysicalSortExpr {
                                                   expr: Column {
                                                       name: "userPrimaryKey",
                                                       index: 2,
                                                   },
                                                   options: SortOptions {
                                                       descending: false,
                                                       nulls_first: true,
                                                   },
                                               },
                                           ],
                                       },
                                   ],
                               },
                               constants: [],
                               constraints: Constraints {
                                   inner: [],
                               },
                               schema: Schema {
                                   fields: [
                                       Field {
                                           name: "1",
                                           data_type: Utf8,
                                           nullable: true,
                                           dict_id: 0,
                                           dict_is_ordered: false,
                                           metadata: {},
                                       },
                                       Field {
                                           name: "2",
                                           data_type: Utf8,
                                           nullable: true,
                                           dict_id: 0,
                                           dict_is_ordered: false,
                                           metadata: {},
                                       },
                                       Field {
                                           name: "userPrimaryKey",
                                           data_type: Utf8,
                                           nullable: false,
                                           dict_id: 0,
                                           dict_is_ordered: false,
                                           metadata: {},
                                       },
                                       Field {
                                           name: "id",
                                           data_type: Utf8,
                                           nullable: false,
                                           dict_id: 0,
                                           dict_is_ordered: false,
                                           metadata: {},
                                       },
                                       Field {
                                           name: "lastUpdateTime",
                                           data_type: Int64,
                                           nullable: false,
                                           dict_id: 0,
                                           dict_is_ordered: false,
                                           metadata: {},
                                       },
                                       Field {
                                           name: "isDeleted",
                                           data_type: Boolean,
                                           nullable: false,
                                           dict_id: 0,
                                           dict_is_ordered: false,
                                           metadata: {},
                                       },
                                   ],
                                   metadata: {},
                               },
                           },
                           partitioning: UnknownPartitioning(
                               1,
                           ),
                           emission_type: Final,
                           boundedness: Bounded,
                           output_ordering: Some(
                               LexOrdering {
                                   inner: [
                                       PhysicalSortExpr {
                                           expr: Column {
                                               name: "userPrimaryKey",
                                               index: 2,
                                           },
                                           options: SortOptions {
                                               descending: false,
                                               nulls_first: true,
                                           },
                                       },
                                   ],
                               },
                           ),
                       },
                   },
                   window_expr: [
                       StandardWindowExpr {
                           expr: WindowUDFExpr {
                               fun: WindowUDF {
                                   inner: RowNumber {
                                       signature: Signature {
                                           type_signature: Nullary,
                                           volatility: Immutable,
                                       },
                                   },
                               },
                               args: [],
                               name: "dataFusionRowNumber",
                               input_types: [],
                               is_reversed: false,
                               ignore_nulls: false,
                           },
                           partition_by: [],
                           order_by: LexOrdering {
                               inner: [],
                           },
                           window_frame: WindowFrame { units: Rows, 
start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)), 
is_causal: false },
                       },
                   ],
                   schema: Schema {
                       fields: [
                           Field {
                               name: "1",
                               data_type: Utf8,
                               nullable: true,
                               dict_id: 0,
                               dict_is_ordered: false,
                               metadata: {},
                           },
                           Field {
                               name: "2",
                               data_type: Utf8,
                               nullable: true,
                               dict_id: 0,
                               dict_is_ordered: false,
                               metadata: {},
                           },
                           Field {
                               name: "userPrimaryKey",
                               data_type: Utf8,
                               nullable: false,
                               dict_id: 0,
                               dict_is_ordered: false,
                               metadata: {},
                           },
                           Field {
                               name: "id",
                               data_type: Utf8,
                               nullable: false,
                               dict_id: 0,
                               dict_is_ordered: false,
                               metadata: {},
                           },
                           Field {
                               name: "lastUpdateTime",
                               data_type: Int64,
                               nullable: false,
                               dict_id: 0,
                               dict_is_ordered: false,
                               metadata: {},
                           },
                           Field {
                               name: "isDeleted",
                               data_type: Boolean,
                               nullable: false,
                               dict_id: 0,
                               dict_is_ordered: false,
                               metadata: {},
                           },
                           Field {
                               name: "dataFusionRowNumber",
                               data_type: UInt64,
                               nullable: false,
                               dict_id: 0,
                               dict_is_ordered: false,
                               metadata: {},
                           },
                       ],
                       metadata: {},
                   },
                   metrics: ExecutionPlanMetricsSet {
                       inner: Mutex {
                           data: MetricsSet {
                               metrics: [],
                           },
                       },
                   },
                   input_order_mode: Sorted,
                   ordered_partition_by_indices: [],
                   cache: PlanProperties {
                       eq_properties: EquivalenceProperties {
                           eq_group: EquivalenceGroup {
                               classes: [],
                           },
                           oeq_class: OrderingEquivalenceClass {
                               orderings: [
                                   LexOrdering {
                                       inner: [
                                           PhysicalSortExpr {
                                               expr: Column {
                                                   name: "userPrimaryKey",
                                                   index: 2,
                                               },
                                               options: SortOptions {
                                                   descending: false,
                                                   nulls_first: true,
                                               },
                                           },
                                       ],
                                   },
                                   LexOrdering {
                                       inner: [
                                           PhysicalSortExpr {
                                               expr: Column {
                                                   name: "dataFusionRowNumber",
                                                   index: 6,
                                               },
                                               options: SortOptions {
                                                   descending: false,
                                                   nulls_first: false,
                                               },
                                           },
                                       ],
                                   },
                               ],
                           },
                           constants: [],
                           constraints: Constraints {
                               inner: [],
                           },
                           schema: Schema {
                               fields: [
                                   Field {
                                       name: "1",
                                       data_type: Utf8,
                                       nullable: true,
                                       dict_id: 0,
                                       dict_is_ordered: false,
                                       metadata: {},
                                   },
                                   Field {
                                       name: "2",
                                       data_type: Utf8,
                                       nullable: true,
                                       dict_id: 0,
                                       dict_is_ordered: false,
                                       metadata: {},
                                   },
                                   Field {
                                       name: "userPrimaryKey",
                                       data_type: Utf8,
                                       nullable: false,
                                       dict_id: 0,
                                       dict_is_ordered: false,
                                       metadata: {},
                                   },
                                   Field {
                                       name: "id",
                                       data_type: Utf8,
                                       nullable: false,
                                       dict_id: 0,
                                       dict_is_ordered: false,
                                       metadata: {},
                                   },
                                   Field {
                                       name: "lastUpdateTime",
                                       data_type: Int64,
                                       nullable: false,
                                       dict_id: 0,
                                       dict_is_ordered: false,
                                       metadata: {},
                                   },
                                   Field {
                                       name: "isDeleted",
                                       data_type: Boolean,
                                       nullable: false,
                                       dict_id: 0,
                                       dict_is_ordered: false,
                                       metadata: {},
                                   },
                                   Field {
                                       name: "dataFusionRowNumber",
                                       data_type: UInt64,
                                       nullable: false,
                                       dict_id: 0,
                                       dict_is_ordered: false,
                                       metadata: {},
                                   },
                               ],
                               metadata: {},
                           },
                       },
                       partitioning: UnknownPartitioning(
                           1,
                       ),
                       emission_type: Final,
                       boundedness: Bounded,
                       output_ordering: Some(
                           LexOrdering {
                               inner: [
                                   PhysicalSortExpr {
                                       expr: Column {
                                           name: "userPrimaryKey",
                                           index: 2,
                                       },
                                       options: SortOptions {
                                           descending: false,
                                           nulls_first: true,
                                       },
                                   },
                                   PhysicalSortExpr {
                                       expr: Column {
                                           name: "dataFusionRowNumber",
                                           index: 6,
                                       },
                                       options: SortOptions {
                                           descending: false,
                                           nulls_first: false,
                                       },
                                   },
                               ],
                           },
                       ),
                   },
                   can_repartition: false,
               },
               state: OnceCell {
                   value: None,
               },
               metrics: ExecutionPlanMetricsSet {
                   inner: Mutex {
                       data: MetricsSet {
                           metrics: [],
                       },
                   },
               },
               preserve_order: false,
               cache: PlanProperties {
                   eq_properties: EquivalenceProperties {
                       eq_group: EquivalenceGroup {
                           classes: [],
                       },
                       oeq_class: OrderingEquivalenceClass {
                           orderings: [
                               LexOrdering {
                                   inner: [
                                       PhysicalSortExpr {
                                           expr: Column {
                                               name: "userPrimaryKey",
                                               index: 2,
                                           },
                                           options: SortOptions {
                                               descending: false,
                                               nulls_first: true,
                                           },
                                       },
                                   ],
                               },
                               LexOrdering {
                                   inner: [
                                       PhysicalSortExpr {
                                           expr: Column {
                                               name: "dataFusionRowNumber",
                                               index: 6,
                                           },
                                           options: SortOptions {
                                               descending: false,
                                               nulls_first: false,
                                           },
                                       },
                                   ],
                               },
                           ],
                       },
                       constants: [],
                       constraints: Constraints {
                           inner: [],
                       },
                       schema: Schema {
                           fields: [
                               Field {
                                   name: "1",
                                   data_type: Utf8,
                                   nullable: true,
                                   dict_id: 0,
                                   dict_is_ordered: false,
                                   metadata: {},
                               },
                               Field {
                                   name: "2",
                                   data_type: Utf8,
                                   nullable: true,
                                   dict_id: 0,
                                   dict_is_ordered: false,
                                   metadata: {},
                               },
                               Field {
                                   name: "userPrimaryKey",
                                   data_type: Utf8,
                                   nullable: false,
                                   dict_id: 0,
                                   dict_is_ordered: false,
                                   metadata: {},
                               },
                               Field {
                                   name: "id",
                                   data_type: Utf8,
                                   nullable: false,
                                   dict_id: 0,
                                   dict_is_ordered: false,
                                   metadata: {},
                               },
                               Field {
                                   name: "lastUpdateTime",
                                   data_type: Int64,
                                   nullable: false,
                                   dict_id: 0,
                                   dict_is_ordered: false,
                                   metadata: {},
                               },
                               Field {
                                   name: "isDeleted",
                                   data_type: Boolean,
                                   nullable: false,
                                   dict_id: 0,
                                   dict_is_ordered: false,
                                   metadata: {},
                               },
                               Field {
                                   name: "dataFusionRowNumber",
                                   data_type: UInt64,
                                   nullable: false,
                                   dict_id: 0,
                                   dict_is_ordered: false,
                                   metadata: {},
                               },
                           ],
                           metadata: {},
                       },
                   },
                   partitioning: RoundRobinBatch(
                       8,
                   ),
                   emission_type: Final,
                   boundedness: Bounded,
                   output_ordering: Some(
                       LexOrdering {
                           inner: [
                               PhysicalSortExpr {
                                   expr: Column {
                                       name: "userPrimaryKey",
                                       index: 2,
                                   },
                                   options: SortOptions {
                                       descending: false,
                                       nulls_first: true,
                                   },
                               },
                               PhysicalSortExpr {
                                   expr: Column {
                                       name: "dataFusionRowNumber",
                                       index: 6,
                                   },
                                   options: SortOptions {
                                       descending: false,
                                       nulls_first: false,
                                   },
                               },
                           ],
                       },
                   ),
               },
           },
           metrics: ExecutionPlanMetricsSet {
               inner: Mutex {
                   data: MetricsSet {
                       metrics: [],
                   },
               },
           },
           cache: PlanProperties {
               eq_properties: EquivalenceProperties {
                   eq_group: EquivalenceGroup {
                       classes: [],
                   },
                   oeq_class: OrderingEquivalenceClass {
                       orderings: [
                           LexOrdering {
                               inner: [
                                   PhysicalSortExpr {
                                       expr: Column {
                                           name: "userPrimaryKey",
                                           index: 2,
                                       },
                                       options: SortOptions {
                                           descending: false,
                                           nulls_first: true,
                                       },
                                   },
                               ],
                           },
                           LexOrdering {
                               inner: [
                                   PhysicalSortExpr {
                                       expr: Column {
                                           name: "fileRowNumber",
                                           index: 6,
                                       },
                                       options: SortOptions {
                                           descending: false,
                                           nulls_first: false,
                                       },
                                   },
                               ],
                           },
                       ],
                   },
                   constants: [],
                   constraints: Constraints {
                       inner: [],
                   },
                   schema: Schema {
                       fields: [
                           Field {
                               name: "1",
                               data_type: Utf8,
                               nullable: true,
                               dict_id: 0,
                               dict_is_ordered: false,
                               metadata: {},
                           },
                           Field {
                               name: "2",
                               data_type: Utf8,
                               nullable: true,
                               dict_id: 0,
                               dict_is_ordered: false,
                               metadata: {},
                           },
                           Field {
                               name: "userPrimaryKey",
                               data_type: Utf8,
                               nullable: false,
                               dict_id: 0,
                               dict_is_ordered: false,
                               metadata: {},
                           },
                           Field {
                               name: "id",
                               data_type: Utf8,
                               nullable: false,
                               dict_id: 0,
                               dict_is_ordered: false,
                               metadata: {},
                           },
                           Field {
                               name: "lastUpdateTime",
                               data_type: Int64,
                               nullable: false,
                               dict_id: 0,
                               dict_is_ordered: false,
                               metadata: {},
                           },
                           Field {
                               name: "isDeleted",
                               data_type: Boolean,
                               nullable: false,
                               dict_id: 0,
                               dict_is_ordered: false,
                               metadata: {},
                           },
                           Field {
                               name: "fileRowNumber",
                               data_type: Int32,
                               nullable: false,
                               dict_id: 0,
                               dict_is_ordered: false,
                               metadata: {},
                           },
                       ],
                       metadata: {},
                   },
               },
               partitioning: RoundRobinBatch(
                   8,
               ),
               emission_type: Final,
               boundedness: Bounded,
               output_ordering: Some(
                   LexOrdering {
                       inner: [
                           PhysicalSortExpr {
                               expr: Column {
                                   name: "userPrimaryKey",
                                   index: 2,
                               },
                               options: SortOptions {
                                   descending: false,
                                   nulls_first: true,
                               },
                           },
                           PhysicalSortExpr {
                               expr: Column {
                                   name: "fileRowNumber",
                                   index: 6,
                               },
                               options: SortOptions {
                                   descending: false,
                                   nulls_first: false,
                               },
                           },
                       ],
                   },
               ),
           },
       },
       metrics: ExecutionPlanMetricsSet {
           inner: Mutex {
               data: MetricsSet {
                   metrics: [],
               },
           },
       },
       cache: PlanProperties {
           eq_properties: EquivalenceProperties {
               eq_group: EquivalenceGroup {
                   classes: [],
               },
               oeq_class: OrderingEquivalenceClass {
                   orderings: [],
               },
               constants: [],
               constraints: Constraints {
                   inner: [],
               },
               schema: Schema {
                   fields: [
                       Field {
                           name: "1",
                           data_type: Utf8,
                           nullable: true,
                           dict_id: 0,
                           dict_is_ordered: false,
                           metadata: {},
                       },
                       Field {
                           name: "2",
                           data_type: Utf8,
                           nullable: true,
                           dict_id: 0,
                           dict_is_ordered: false,
                           metadata: {},
                       },
                       Field {
                           name: "userPrimaryKey",
                           data_type: Utf8,
                           nullable: false,
                           dict_id: 0,
                           dict_is_ordered: false,
                           metadata: {},
                       },
                       Field {
                           name: "id",
                           data_type: Utf8,
                           nullable: false,
                           dict_id: 0,
                           dict_is_ordered: false,
                           metadata: {},
                       },
                       Field {
                           name: "lastUpdateTime",
                           data_type: Int64,
                           nullable: false,
                           dict_id: 0,
                           dict_is_ordered: false,
                           metadata: {},
                       },
                       Field {
                           name: "isDeleted",
                           data_type: Boolean,
                           nullable: false,
                           dict_id: 0,
                           dict_is_ordered: false,
                           metadata: {},
                       },
                       Field {
                           name: "fileRowNumber",
                           data_type: Int32,
                           nullable: false,
                           dict_id: 0,
                           dict_is_ordered: false,
                           metadata: {},
                       },
                   ],
                   metadata: {},
               },
           },
           partitioning: UnknownPartitioning(
               1,
           ),
           emission_type: Final,
           boundedness: Bounded,
           output_ordering: None,
       },
       fetch: None,
   }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to