Andrew Lamb created ARROW-12235:
-----------------------------------

             Summary: [Rust][DataFusion] LIMIT returns incorrect results when 
used with several small partitions
                 Key: ARROW-12235
                 URL: https://issues.apache.org/jira/browse/ARROW-12235
             Project: Apache Arrow
          Issue Type: Bug
          Components: Rust - DataFusion
            Reporter: Andrew Lamb
            Assignee: Andrew Lamb


I noticed when I was running some queries locally that `LIMIT` was not behaving 
correctly. For my case, a query with `LIMIT 10` was always returning zero rows. 

I spent some time and I have found a self contained reproducer. If you put the 
following test in `rust/src/datafusion/execution/context.rs` it will fail.


{code}


    /// Return a RecordBatch with a single Int32 array with values (0..sz)
    fn make_partition(sz: i32) -> RecordBatch {
        let seq_start = 0;
        let seq_end =  sz;
        let values = (seq_start..seq_end).collect::<Vec<_>>();
        let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, 
true)]));
        let arr = Arc::new(Int32Array::from(values));
        let arr = arr as ArrayRef;

        RecordBatch::try_new(schema.clone(),vec![arr]).unwrap()
    }

    #[tokio::test]
    async fn limit_multi_partitions() -> Result<()> {
        let tmp_dir = TempDir::new()?;
        let mut ctx = create_ctx(&tmp_dir, 1)?;

        let partitions = vec![
            vec![make_partition(0)],
            vec![make_partition(1)],
            vec![make_partition(2)],
            vec![make_partition(3)],
            vec![make_partition(4)],
            vec![make_partition(5)],
        ];
        let schema = partitions[0][0].schema();
        let provider = Arc::new(MemTable::try_new(schema, partitions).unwrap());

        ctx.register_table("t", provider)
            .unwrap();

        // select all rows
        let results = plan_and_collect(&mut ctx, "SELECT i FROM t")
            .await
            .unwrap();

        let num_rows: usize = results.into_iter().map(|b| b.num_rows()).sum();
        assert_eq!(num_rows, 15);

        for limit in 1..10 {
            let query = format!("SELECT i FROM t limit {}", limit);
            let results = plan_and_collect(&mut ctx, &query)
                .await
                .unwrap();

            let num_rows: usize = results.into_iter().map(|b| 
b.num_rows()).sum();
            assert_eq!(num_rows, limit, "mismatch with query {}", query);
        }

        Ok(())
    }

{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to