RamakrishnaChilaka opened a new issue, #21176:
URL: https://github.com/apache/datafusion/issues/21176
### Describe the bug
When executing a SQL query with `LIMIT` against a `MemTable`, the limit is
silently ignored if the `SELECT` clause projects columns in a different order
than they appear in the table schema.
The optimized plan correctly shows `Limit: skip=0, fetch=5` and `TableScan:
fetch=5`, but execution returns all rows instead of the requested limit.
This only occurs when the SELECT reorders columns relative to the schema.
Selecting columns in schema order works correctly. Adding `ORDER BY` also makes
it work.
**Environment:**
- DataFusion version: **53.0.0**
- Arrow version: 55.x (via DataFusion re-export)
- Rust version: 1.94.0
- OS: Linux (Ubuntu 22.04)
### To Reproduce
**Minimal standalone reproduction (copy-paste into any project with
`datafusion = "53"`):**
```rust
use datafusion::prelude::*;
use datafusion::datasource::MemTable;
use datafusion::arrow::datatypes::{Schema, Field, DataType};
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::arrow::array::{StringArray, Float64Array};
use std::sync::Arc;
#[tokio::test]
async fn df53_limit_bug_repro() {
let schema = Arc::new(Schema::new(vec![
Field::new("col_a", DataType::Utf8, false),
Field::new("col_b", DataType::Float64, true),
Field::new("col_c", DataType::Utf8, true),
]));
let batch = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(StringArray::from(
(0..20).map(|i| format!("a-{i}")).collect::<Vec<_>>(),
)),
Arc::new(Float64Array::from(
(0..20).map(|i| i as f64).collect::<Vec<_>>(),
)),
Arc::new(StringArray::from(
(0..20).map(|i| format!("c-{i}")).collect::<Vec<_>>(),
)),
],
)
.unwrap();
// Case 1: Schema-order SELECT → LIMIT works
let t1 = MemTable::try_new(schema.clone(),
vec![vec![batch.clone()]]).unwrap();
let ctx1 = SessionContext::new();
ctx1.register_table("t", Arc::new(t1)).unwrap();
let r1: usize = ctx1
.sql("SELECT col_b, col_c FROM t LIMIT 5")
.await.unwrap().collect().await.unwrap()
.iter().map(|b| b.num_rows()).sum();
// Case 2: Reverse-order SELECT → LIMIT silently ignored (BUG)
let t2 = MemTable::try_new(schema.clone(),
vec![vec![batch.clone()]]).unwrap();
let ctx2 = SessionContext::new();
ctx2.register_table("t", Arc::new(t2)).unwrap();
let df2 = ctx2.sql("SELECT col_c, col_b FROM t LIMIT 5").await.unwrap();
let plan = df2.clone().into_optimized_plan().unwrap();
println!("Optimized plan:\n{plan}");
let r2: usize = df2.collect().await.unwrap()
.iter().map(|b| b.num_rows()).sum();
// Case 3: Single column → LIMIT works
let t3 = MemTable::try_new(schema.clone(),
vec![vec![batch.clone()]]).unwrap();
let ctx3 = SessionContext::new();
ctx3.register_table("t", Arc::new(t3)).unwrap();
let r3: usize = ctx3
.sql("SELECT col_c FROM t LIMIT 5")
.await.unwrap().collect().await.unwrap()
.iter().map(|b| b.num_rows()).sum();
// Case 4: Reverse-order + ORDER BY → LIMIT works
let t4 = MemTable::try_new(schema.clone(), vec![vec![batch]]).unwrap();
let ctx4 = SessionContext::new();
ctx4.register_table("t", Arc::new(t4)).unwrap();
let r4: usize = ctx4
.sql("SELECT col_c, col_b FROM t ORDER BY col_b LIMIT 5")
.await.unwrap().collect().await.unwrap()
.iter().map(|b| b.num_rows()).sum();
println!("Schema-order (col_b, col_c) LIMIT 5: {r1} rows");
println!("Reverse-order (col_c, col_b) LIMIT 5: {r2} rows");
println!("Single column (col_c) LIMIT 5: {r3} rows");
println!("Reverse + ORDER BY LIMIT 5: {r4} rows");
assert_eq!(r1, 5, "schema-order should return 5");
assert_eq!(r2, 5, "reverse-order should return 5"); // FAILS: returns 20
assert_eq!(r3, 5, "single column should return 5");
assert_eq!(r4, 5, "reverse + ORDER BY should return 5");
}
```
**Actual output (DataFusion 53.0.0):**
```
Optimized plan for reverse-order SELECT:
Projection: t.col_c, t.col_b
Limit: skip=0, fetch=5
TableScan: t projection=[col_b, col_c], fetch=5
Schema-order (col_b, col_c) LIMIT 5: 5 rows ✓
Reverse-order (col_c, col_b) LIMIT 5: 20 rows ✗ BUG
Single column (col_c) LIMIT 5: 5 rows ✓
Reverse + ORDER BY LIMIT 5: 5 rows ✓
```
Note: the optimized plan is **correct** — `fetch=5` is present at both the
`Limit` and `TableScan` nodes. But execution returns all 20 rows.
**Summary of affected patterns:**
| Query | Actual | Expected | Status |
|---|---|---|---|
| `SELECT col_b, col_c FROM t LIMIT 5` (schema order) | 5 | 5 | ✓ |
| `SELECT col_c, col_b FROM t LIMIT 5` (reverse order) | **20** | 5 | ✗ BUG |
| `SELECT col_c FROM t LIMIT 5` (single column) | 5 | 5 | ✓ |
| `SELECT col_c, col_b FROM t ORDER BY col_b LIMIT 5` | 5 | 5 | ✓ |
The `DataFrame::limit()` API is also affected:
```rust
ctx.table("t").await.unwrap()
.select_columns(&["col_c", "col_b"]).unwrap()
.limit(0, Some(5)).unwrap()
.collect().await.unwrap() // returns 20 rows, not 5
```
### Expected behavior
`SELECT col_c, col_b FROM t LIMIT 5` should return exactly 5 rows,
regardless of whether the SELECT column order matches the table schema column
order.
The optimized plan correctly shows `fetch=5` propagated into the
`TableScan`, so the limit semantics are correctly planned — the issue is in
physical execution.
### Additional context
**Probable root cause:**
The `LIMIT` fetch-pushdown optimization propagates `fetch` into `MemTable`'s
`TableScan`. The `TableScan` plan shows `projection=[col_b, col_c]` (ascending
schema index order) regardless of SELECT order, and an outer `Projection` node
handles the column reorder.
The issue appears to be that when `MemTable` applies the `fetch` parameter
with a non-identity projection (indices not in ascending order relative to the
output), the limit is lost or applied to the wrong stream.
**Workaround:**
Reorder the `RecordBatch` columns to match the SQL `SELECT` column order
before creating the `MemTable`. This makes DataFusion's projection an identity
operation, preventing the buggy code path:
```rust
// Before creating MemTable, reorder batch columns to match SELECT order
let reordered_schema = Arc::new(Schema::new(vec![
Field::new("col_c", DataType::Utf8, true),
Field::new("col_b", DataType::Float64, true),
]));
let reordered_batch = RecordBatch::try_new(reordered_schema.clone(), vec![
batch.column_by_name("col_c").unwrap().clone(),
batch.column_by_name("col_b").unwrap().clone(),
]).unwrap();
let table = MemTable::try_new(reordered_schema,
vec![vec![reordered_batch]]).unwrap();
// Now SELECT col_c, col_b FROM t LIMIT 5 works correctly
```
**Discovered in:**
A distributed search engine where shard-level `RecordBatch`es have a fixed
schema (`_id, score, data_col1, data_col2, ...`) but user SQL projects columns
in arbitrary order. Multi-column LIMIT queries returned `N × num_shards` rows
instead of `N` rows on a 3-node cluster.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]