jonmmease opened a new issue, #4673:
URL: https://github.com/apache/arrow-datafusion/issues/4673

   **Describe the bug**
   In a multi-threaded runtime, it seems the behavior of the `ROW_NUMBER` 
window function is not consistent when the input table consists of multiple 
partitions.
   
   In particular, it looks like the input partitions are arbitrarily sorted 
prior to calculating the `ROW_NUMBER`.
   
   **Example**
   
   The input table has a single column (`a`) 
   
   ```
   +---+
   | a |
   +---+
   | 1 |
   | 1 |
   | 1 |
   | 1 |
   | 2 |
   | 2 |
   | 2 |
   | 2 |
   | 3 |
   | 3 |
   | 3 |
   | 3 |
   | 4 |
   | 4 |
   | 4 |
   | 4 |
   +---+
   ```
   
   The query adds a `ROW_NUMBER() as row_num` column and then groups by `a`, 
keeping the minimum `row_num` for each group.
   
   ```sql
   SELECT a, min(row_num) as min_row_num
   from (SELECT a, ROW_NUMBER() OVER () AS "row_num" from tbl)
   GROUP BY a
   ORDER BY a
   ```
   
   The expected result for this query (And the result when the input consists 
of a single record partition) is
   
   ```
   +---+-------------+
   | a | min_row_num |
   +---+-------------+
   | 1 | 1           |
   | 2 | 5           |
   | 3 | 9           |
   | 4 | 13          |
   +---+-------------+
   ```
   
   The issue is that this is not always the result when the input table 
consists of multiple partitions and DataFusion is running in a multi-threaded 
runtime.
   
   Here are some representative results in this case:
   ```
   +---+-------------+
   | a | min_row_num |
   +---+-------------+
   | 1 | 9           |
   | 2 | 13          |
   | 3 | 5           |
   | 4 | 1           |
   +---+-------------+
   ```
   ```
   +---+-------------+
   | a | min_row_num |
   +---+-------------+
   | 1 | 9           |
   | 2 | 1           |
   | 3 | 13          |
   | 4 | 5           |
   +---+-------------+
   ```
   
   It looks like the partitions are randomly ordered before the ROW_NUMBER 
calculation is performed.
   
   **To Reproduce**
   Here is a full Rust example that reproduces the issue.
   
   ```rust
   use std::sync::Arc;
   use datafusion::arrow::array::{ArrayRef, Int64Array};
   use datafusion::arrow::datatypes::{DataType, Field, Schema};
   use datafusion::arrow::record_batch::RecordBatch;
   use datafusion::arrow::util::pretty::pretty_format_batches;
   use datafusion::datasource::MemTable;
   use datafusion::prelude::*;
   
   #[tokio::test(flavor = "multi_thread")]
   async fn row_number_over_partitions() {
       let schema = Arc::new(Schema::new(vec![
           Field::new("a", DataType::Int64, true)
       ]));
   
       let make_batch = |a: i64| -> RecordBatch {
           let array = Arc::new(Int64Array::from(vec![a; 4])) as ArrayRef;
           RecordBatch::try_new(schema.clone(), vec![array]).unwrap()
       };
   
       // // Single partition with four batches (Correct result)
       // let batches = vec![vec![make_batch(1), make_batch(2), make_batch(3), 
make_batch(4)]];
   
       // Four partitions, each with one batch (Incorrect result)
       let batches = vec![vec![make_batch(1)], vec![make_batch(2)], 
vec![make_batch(3)], vec![make_batch(4)]];
   
       let flat_batches: Vec<_> = batches.iter().flat_map(|v| 
v.clone()).collect();
       println!("{}", pretty_format_batches(flat_batches.as_slice()).unwrap());
   
       let mem_table = MemTable::try_new(schema, batches).unwrap();
       let ctx = SessionContext::new();
   
       ctx.register_table("tbl", Arc::new(mem_table)).unwrap();
   
       let res = ctx.sql(r#"
       SELECT a, min(row_num) as min_row_num
       from (SELECT a, ROW_NUMBER() OVER () AS "row_num" from tbl)
       GROUP BY a
       ORDER BY a
   "#).await.unwrap().collect().await.unwrap();
   
       let formatted = pretty_format_batches(res.as_slice()).unwrap();
       println!("{}", formatted);
   
       assert_eq!(formatted.to_string(), "\
   +---+-------------+
   | a | min_row_num |
   +---+-------------+
   | 1 | 1           |
   | 2 | 5           |
   | 3 | 9           |
   | 4 | 13          |
   +---+-------------+");
   }
   ```
   
   **Expected behavior**
   I expect the test case above to pass
   
   **Additional context**
   The error is only present when the tokio `multi_thread`  flavor is used. The 
correct results are returned when `flavor = "current_thread"`
   
   I'd be happy to dig in a bit more if someone could point me in the right 
direction!
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to