waruto210 opened a new issue, #12416:
URL: https://github.com/apache/datafusion/issues/12416
### Describe the bug
I'm working on a project based on datafusion's `ListingTable` and
`ParquetExec`. I've made some modifications to enable exact filter pushdown for
parquet tables.
When I execute a statement like `select count(*) from table where Age > 10
limit 10`, I noticed that in the physical plan, `TableScanExec` is replaced
with a placeholder, causing the query to directly return the total number of
rows in the table.
Eventually, I found that `AggregateStatistics` was optimizing the query plan
using stats, and `ParquetExec::statistics()` was returning stats as follows:
`Rows=Exact(390616), Bytes=Absent, [(Col[0]: Min=Exact(Int16(0))
Max=Exact(Int16(55)) Null=Exact(0))]`.
However, `ParquetExec` contains some filters, so the stats should be inexact
or absent.
Currently, datafusion's `ListingTable` supports inexact filter pushdown, so
there would be a `FilterExec` outside the `TableScanExec`, which prevents
incorrect optimization by `AggregateStatistics`. But, since filters can still
exist within `ParquetExec`, returning exact stats is semantically incorrect.
### To Reproduce
Use the following code and print stats in `ParquetExec::statistics()`
```rust
use arrow::util::pretty::print_batches;
use arrow_schema::DataType;
use datafusion::error::Result;
use datafusion::physical_plan::display::DisplayableExecutionPlan;
use datafusion::physical_plan::{collect, ExecutionPlan};
use datafusion::{
config::{ConfigField, TableParquetOptions},
datasource::{file_format::parquet::ParquetFormat,
listing::ListingOptions},
prelude::*,
};
use std::sync::Arc;
use tokio;
#[tokio::main]
async fn main() -> datafusion::error::Result<()> {
let ctx = SessionContext::new();
ctx.state_ref()
.write()
.config_mut()
.options_mut()
.sql_parser
.enable_ident_normalization = false;
ctx.state_ref()
.write()
.config_mut()
.options_mut()
.execution
.target_partitions = 1;
let mut opts = TableParquetOptions::default();
opts.set("pushdown_filters", "true").unwrap();
let format = ParquetFormat::new().with_options(opts);
let options = ListingOptions::new(Arc::new(format))
.with_table_partition_cols(vec![("A".to_owned(), DataType::Int32)]);
ctx.register_listing_table(
"hits",
"/path",
options,
None,
None,
)
.await
.unwrap();
let pp = plan(&ctx, "select count(*) from hits where Age > 10")
.await
.unwrap();
let rb = collect(pp, ctx.task_ctx()).await.unwrap();
print_batches(&rb).unwrap();
Ok(())
}
async fn plan(ctx: &SessionContext, sql: &str) -> Result<Arc<dyn
ExecutionPlan>> {
let now = std::time::Instant::now();
let lp = Arc::new(ctx.state().create_logical_plan(sql).await?);
println!("LogicalPlan:\n{:?}", lp);
let state = ctx.state();
let lp = state.optimize(&lp).unwrap();
println!("Optimized LogicalPlan:\n{:?}", lp);
let pp = state.create_physical_plan(&lp).await.unwrap();
println!(
"Plan duration: {}us. Optimized PhysicalPlan:\n{}",
now.elapsed().as_micros(),
DisplayableExecutionPlan::new(&*pp).indent(true)
);
Ok(pp)
}
```
You can find that `Rows` in stats is `Exact(n)`, but `n` may exceed the
actual number of rows returned by ParquetExec, which is semantically incorrect.
### Expected behavior
I suggest adding the following code to the `statistics` method of some
TableScanExec implementations
```rust
let stats = if self.pushdown_filters() && self.predicate.is_some() {
self.projected_statistics.clone().into_inexact()
} else {
self.projected_statistics.clone()
};
```
### Additional context
_No response_
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]