RamakrishnaChilaka opened a new issue, #21176:
URL: https://github.com/apache/datafusion/issues/21176

   ### Describe the bug
   
   When executing a SQL query with `LIMIT` against a `MemTable`, the limit is 
silently ignored if the `SELECT` clause projects columns in a different order 
than they appear in the table schema.
   
   The optimized plan correctly shows `Limit: skip=0, fetch=5` and `TableScan: 
fetch=5`, but execution returns all rows instead of the requested limit.
   
   This only occurs when the SELECT reorders columns relative to the schema. 
Selecting columns in schema order works correctly. Adding `ORDER BY` also makes 
it work.
   
   **Environment:**
   - DataFusion version: **53.0.0**
   - Arrow version: 55.x (via DataFusion re-export)
   - Rust version: 1.94.0
   - OS: Linux (Ubuntu 22.04)
   
   ### To Reproduce
   
   **Minimal standalone reproduction (copy-paste into any project with 
`datafusion = "53"`):**
   
   ```rust
   use datafusion::prelude::*;
   use datafusion::datasource::MemTable;
   use datafusion::arrow::datatypes::{Schema, Field, DataType};
   use datafusion::arrow::record_batch::RecordBatch;
   use datafusion::arrow::array::{StringArray, Float64Array};
   use std::sync::Arc;
   
   #[tokio::test]
   async fn df53_limit_bug_repro() {
       let schema = Arc::new(Schema::new(vec![
           Field::new("col_a", DataType::Utf8, false),
           Field::new("col_b", DataType::Float64, true),
           Field::new("col_c", DataType::Utf8, true),
       ]));
       let batch = RecordBatch::try_new(
           schema.clone(),
           vec![
               Arc::new(StringArray::from(
                   (0..20).map(|i| format!("a-{i}")).collect::<Vec<_>>(),
               )),
               Arc::new(Float64Array::from(
                   (0..20).map(|i| i as f64).collect::<Vec<_>>(),
               )),
               Arc::new(StringArray::from(
                   (0..20).map(|i| format!("c-{i}")).collect::<Vec<_>>(),
               )),
           ],
       )
       .unwrap();
   
       // Case 1: Schema-order SELECT → LIMIT works
       let t1 = MemTable::try_new(schema.clone(), 
vec![vec![batch.clone()]]).unwrap();
       let ctx1 = SessionContext::new();
       ctx1.register_table("t", Arc::new(t1)).unwrap();
       let r1: usize = ctx1
           .sql("SELECT col_b, col_c FROM t LIMIT 5")
           .await.unwrap().collect().await.unwrap()
           .iter().map(|b| b.num_rows()).sum();
   
       // Case 2: Reverse-order SELECT → LIMIT silently ignored (BUG)
       let t2 = MemTable::try_new(schema.clone(), 
vec![vec![batch.clone()]]).unwrap();
       let ctx2 = SessionContext::new();
       ctx2.register_table("t", Arc::new(t2)).unwrap();
       let df2 = ctx2.sql("SELECT col_c, col_b FROM t LIMIT 5").await.unwrap();
       let plan = df2.clone().into_optimized_plan().unwrap();
       println!("Optimized plan:\n{plan}");
       let r2: usize = df2.collect().await.unwrap()
           .iter().map(|b| b.num_rows()).sum();
   
       // Case 3: Single column → LIMIT works
       let t3 = MemTable::try_new(schema.clone(), 
vec![vec![batch.clone()]]).unwrap();
       let ctx3 = SessionContext::new();
       ctx3.register_table("t", Arc::new(t3)).unwrap();
       let r3: usize = ctx3
           .sql("SELECT col_c FROM t LIMIT 5")
           .await.unwrap().collect().await.unwrap()
           .iter().map(|b| b.num_rows()).sum();
   
       // Case 4: Reverse-order + ORDER BY → LIMIT works
       let t4 = MemTable::try_new(schema.clone(), vec![vec![batch]]).unwrap();
       let ctx4 = SessionContext::new();
       ctx4.register_table("t", Arc::new(t4)).unwrap();
       let r4: usize = ctx4
           .sql("SELECT col_c, col_b FROM t ORDER BY col_b LIMIT 5")
           .await.unwrap().collect().await.unwrap()
           .iter().map(|b| b.num_rows()).sum();
   
       println!("Schema-order (col_b, col_c) LIMIT 5: {r1} rows");
       println!("Reverse-order (col_c, col_b) LIMIT 5: {r2} rows");
       println!("Single column (col_c) LIMIT 5: {r3} rows");
       println!("Reverse + ORDER BY LIMIT 5: {r4} rows");
   
       assert_eq!(r1, 5, "schema-order should return 5");
       assert_eq!(r2, 5, "reverse-order should return 5");  // FAILS: returns 20
       assert_eq!(r3, 5, "single column should return 5");
       assert_eq!(r4, 5, "reverse + ORDER BY should return 5");
   }
   ```
   
   **Actual output (DataFusion 53.0.0):**
   
   ```
   Optimized plan for reverse-order SELECT:
   Projection: t.col_c, t.col_b
     Limit: skip=0, fetch=5
       TableScan: t projection=[col_b, col_c], fetch=5
   
   Schema-order (col_b, col_c) LIMIT 5: 5 rows ✓
   Reverse-order (col_c, col_b) LIMIT 5: 20 rows ✗ BUG
   Single column (col_c) LIMIT 5: 5 rows ✓
   Reverse + ORDER BY LIMIT 5: 5 rows ✓
   ```
   
   Note: the optimized plan is **correct** — `fetch=5` is present at both the 
`Limit` and `TableScan` nodes. But execution returns all 20 rows.
   
   **Summary of affected patterns:**
   
   | Query | Actual | Expected | Status |
   |---|---|---|---|
   | `SELECT col_b, col_c FROM t LIMIT 5` (schema order) | 5 | 5 | ✓ |
   | `SELECT col_c, col_b FROM t LIMIT 5` (reverse order) | **20** | 5 | ✗ BUG |
   | `SELECT col_c FROM t LIMIT 5` (single column) | 5 | 5 | ✓ |
   | `SELECT col_c, col_b FROM t ORDER BY col_b LIMIT 5` | 5 | 5 | ✓ |
   
   The `DataFrame::limit()` API is also affected:
   ```rust
   ctx.table("t").await.unwrap()
       .select_columns(&["col_c", "col_b"]).unwrap()
       .limit(0, Some(5)).unwrap()
       .collect().await.unwrap()  // returns 20 rows, not 5
   ```
   
   ### Expected behavior
   
   
   `SELECT col_c, col_b FROM t LIMIT 5` should return exactly 5 rows, 
regardless of whether the SELECT column order matches the table schema column 
order.
   
   The optimized plan correctly shows `fetch=5` propagated into the 
`TableScan`, so the limit semantics are correctly planned — the issue is in 
physical execution.
   
   
   ### Additional context
   
   **Probable root cause:**
   
   The `LIMIT` fetch-pushdown optimization propagates `fetch` into `MemTable`'s 
`TableScan`. The `TableScan` plan shows `projection=[col_b, col_c]` (ascending 
schema index order) regardless of SELECT order, and an outer `Projection` node 
handles the column reorder.
   
   The issue appears to be that when `MemTable` applies the `fetch` parameter 
with a non-identity projection (indices not in ascending order relative to the 
output), the limit is lost or applied to the wrong stream.
   
   **Workaround:**
   
   Reorder the `RecordBatch` columns to match the SQL `SELECT` column order 
before creating the `MemTable`. This makes DataFusion's projection an identity 
operation, preventing the buggy code path:
   
   ```rust
   // Before creating MemTable, reorder batch columns to match SELECT order
   let reordered_schema = Arc::new(Schema::new(vec![
       Field::new("col_c", DataType::Utf8, true),
       Field::new("col_b", DataType::Float64, true),
   ]));
   let reordered_batch = RecordBatch::try_new(reordered_schema.clone(), vec![
       batch.column_by_name("col_c").unwrap().clone(),
       batch.column_by_name("col_b").unwrap().clone(),
   ]).unwrap();
   let table = MemTable::try_new(reordered_schema, 
vec![vec![reordered_batch]]).unwrap();
   // Now SELECT col_c, col_b FROM t LIMIT 5 works correctly
   ```
   
   **Discovered in:**
   
   A distributed search engine where shard-level `RecordBatch`es have a fixed 
schema (`_id, score, data_col1, data_col2, ...`) but user SQL projects columns 
in arbitrary order. Multi-column LIMIT queries returned `N × num_shards` rows 
instead of `N` rows on a 3-node cluster.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to