jizezhang commented on issue #18782: URL: https://github.com/apache/datafusion/issues/18782#issuecomment-3568216125
Hi @alamb, I wonder if I may confirm the behavior of one test https://github.com/apache/datafusion/blob/fc77be94570e3ada7e28db8c5412125f54e0b96d/datafusion/core/tests/custom_sources_cases/provider_filter_pushdown.rs#L267 with you wrt integrating with `BatchCoalescer`. I noticed that this test runs two versions of the same query, one with Dataframe API and one with `SessionContext::sql`. The logical plans resulting from the two are slightly different, in particular the `projection` part: - With DataFrame API ``` [2025-11-22T21:29:47Z DEBUG datafusion_optimizer::utils] Final optimized plan: Aggregate: groupBy=[[]], aggr=[[count(?table?.flag)]] TableScan: ?table? projection=[flag], full_filters=[?table?.flag = Int32(0)] ``` - With `sql`, ``` [2025-11-22T21:31:15Z DEBUG datafusion_optimizer::utils] Final optimized plan: Projection: count(Int64(1)) AS count(*) Aggregate: groupBy=[[]], aggr=[[count(Int64(1))]] TableScan: data projection=[], full_filters=[data.flag = Int32(0)] ``` The `SessionContext::sql` version had an issue when using arrow `BatchCoalescer` kernel via `LimitedBatchCoalescer`. The reason (I think) is that the custom table provider `CustomProvider` in the test has a branching logic on schema depending on `projection` https://github.com/apache/datafusion/blob/fc77be94570e3ada7e28db8c5412125f54e0b96d/datafusion/core/tests/custom_sources_cases/provider_filter_pushdown.rs#L208-L210 thus the resulting physical plans differ in `schema`: - With DataFrame API, ``` AggregateExec: mode=Final, gby=[], aggr=[count(?table?.flag)], schema=[count(?table?.flag):Int64] CoalescePartitionsExec, schema=[count(?table?.flag)[count]:Int64] AggregateExec: mode=Partial, gby=[], aggr=[count(?table?.flag)], schema=[count(?table?.flag)[count]:Int64] RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, schema=[flag:Int32] CooperativeExec, schema=[flag:Int32] CustomPlan: batch_size=1, schema=[flag:Int32] ``` - With `sql`, ``` ProjectionExec: expr=[count(Int64(1))@0 as count(*)], schema=[count(*):Int64] AggregateExec: mode=Final, gby=[], aggr=[count(Int64(1))], schema=[count(Int64(1)):Int64] CoalescePartitionsExec, schema=[count(Int64(1))[count]:Int64] AggregateExec: mode=Partial, gby=[], aggr=[count(Int64(1))], schema=[count(Int64(1))[count]:Int64] RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, schema=[] CooperativeExec, schema=[] CustomPlan: batch_size=1, schema=[] ``` However, the batches returned by the associated custom execution plan `CustomPlan` is always full (not projected) https://github.com/apache/datafusion/blob/fc77be94570e3ada7e28db8c5412125f54e0b96d/datafusion/core/tests/custom_sources_cases/provider_filter_pushdown.rs#L139 Arrow `BatchCoalescer` checks that the provided schema to the coalescer matches actual batch schema https://github.com/apache/arrow-rs/blob/a67d49758b1faee7d42fe3b215e226d6d560f237/arrow-select/src/coalesce.rs#L428, thus the `SessionContext::sql` version would panic. The issue went away when I modified `CustomPlan::execute` to return projected batches, but I wanted to check whether the behavior is expected and whether the test should be updated, or otherwise what approaches to take. Thanks a lot! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
