feniljain commented on issue #19654:
URL: https://github.com/apache/datafusion/issues/19654#issuecomment-3753239750

   Above results were achievable just from `ListingTable` `offset` pruning. I 
was exploring integrating `limit` and `offset` in `DataSourceExec` level too, I 
have found a few interesting points:
   
   - Currently `DataSourceExec` does not implement `supports_limit_pushdown`, 
which means it will be ignored for pushdown considerations 
[here](https://github.com/apache/datafusion/blob/eadbed51b5c711423bc5f1a4d72fbf1e5be5d975/datafusion/physical-optimizer/src/limit_pushdown.rs#L197)
   - Even if we implement the method returning `true`, upon checking the 
implementation of current `limit` in `DataSourceExec`, I think it will give 
incorrect results.
   
   Expanding on second point:
   We currently pass `fetch`/`limit` value to an `ExecutionPlan` node using: 
`with_fetch`. Current 
[implementation](https://github.com/apache/datafusion/blob/eadbed51b5c711423bc5f1a4d72fbf1e5be5d975/datafusion/datasource/src/source.rs#L324C8-L329)
 of `DataSourceExec`s `with_fetch` simply passes it onto `FileScanConfig`, 
where in `open`, `FileStream` always 
[sets](https://github.com/apache/datafusion/blob/eadbed51b5c711423bc5f1a4d72fbf1e5be5d975/datafusion/datasource/src/file_stream.rs#L83)
 it to its own `limit` which is then used in the `poll_inner` of same stream to 
[prune 
files](https://github.com/apache/datafusion/blob/eadbed51b5c711423bc5f1a4d72fbf1e5be5d975/datafusion/datasource/src/file_stream.rs#L175-L187).
   
   This means for each and every partition, we apply limit separately, I think 
this is an incorrect implementation? So we kinda don't have `limit` working for 
`DataSourceExec`. Do correct me if I am misunderstanding anything here.
   
   Given all claims verify, how do we want to proceed? I can get above PR till 
`ListingTable` merged, and then we can fix `limit` pushdown. After which I can 
work out `offset` pushdown too.
   
   The way I am thinking of fixing this is using an `AtomicCounter` at 
`FileScanConfig` level, which will get updated by each partition's `FileStream` 
and internally in `FileOpener` I would leverage 
[with_limit](https://docs.rs/parquet/latest/parquet/arrow/arrow_reader/struct.ArrowReaderBuilder.html#method.with_limit)
 API of `parquet-rs`. Currently, I have only thought about parquet because that 
would be the primary citizen, we can revisit other sources as move forward 😅


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to