andygrove commented on issue #1907: URL: https://github.com/apache/datafusion-ballista/issues/1907#issuecomment-4821004737
## Root cause found — and it's a correctness bug, not (just) a decode slowdown The earlier "same rows/task but ~8× slower decode" reading was a misdiagnosis. The real cause is that **each scan task reads the entire table instead of its own file group**, so later-wave tasks do ~N× the work *and* produce ~N× inflated results. The slowdown is a side effect of the over-read. ### What actually happens DataFusion 54 changed `DataSourceExec` to hand file groups to partition streams from a **shared work-queue** (`open_with_args` + a per-execution `SharedWorkSource`; `create_sibling_state` in `FileScanConfig`). The queue is only divided across partitions when **all partitions of one plan instance are polled concurrently**. Ballista executes **one partition per task on its own decoded plan instance** (`ShuffleWriterExec`/`SortShuffleWriterExec` calls `child.execute(input_partition)` for a single partition). A single partition polled in isolation drains the *entire* work-queue → it scans the whole table. - Wave 1 is dispatched as one `LaunchMultiTask` batch sharing one decoded plan instance; its partitions run concurrently and cooperatively split the file groups → table read once → fast and correct. - Each later task is dispatched individually as a slot frees → its own plan instance → drains all file groups alone → whole-table scan → ~N× CPU and N× rows. ### Correctness impact q1 SF10, 16 partitions, `prefer_hash_join=false`: | executor slots | waves | `count_order` (A/F) | correct? | |---:|---:|---:|:--:| | 16 | 1 | 14,804,077 | ✅ | | 8 | 2 | 118,432,616 | ❌ (exactly 8×) | Total rows counted on `-c8` = 473,140,872 = 8 × the table. Averages still match (8×sum / 8×count cancels), which is why it can pass spot checks; the benchmark harness only validates results at SF1. ### Minimal standalone reproducer (pure DataFusion 54, no Ballista) Register a multi-file parquet table with `target_partitions = N`, build a physical plan, then: ```rust // TEST 1: poll all N partitions concurrently on ONE instance // -> each reads ~1/N of the rows; sum == table row count. (correct) // TEST 2: poll a single partition alone on its own instance // -> that one partition reads the ENTIRE table. (the bug) let stream = plan.execute(p, ctx.task_ctx())?; // for a lone p, reads everything ``` Observed on 16 files (TPC-H lineitem SF10): TEST 1 → 16/16 partitions each ~3.7M rows, sum 59,142,609; TEST 2 → `execute(0)`, `execute(8)`, `execute(15)` each return 59,142,609 (the whole table). This is intrinsic to `FileScanConfig` under DF54 (`scheduling_type()` is hardcoded `Cooperative`; `repartition_file_scans=false` does not change it). It worked on DF53, where `DataSourceExec::execute(p)` read file group `p` statically. ### Fix that works In the executor, restrict each task's `DataSourceExec` to the file group for its `partition_id` before execution (other group slots emptied, partition count preserved), so the lone `execute(partition_id)` reads only its group. With this change, q1 `-c8` is correct (A/F 14,804,077) and back to ~0.50 s, and q1–q14 all return canonical TPC-H row counts. Guard: skip when `partition_id >= file_groups.len()` (an operator between the scan and the stage output changed the partition count). PR to follow. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
