xudong963 commented on code in PR #18868:
URL: https://github.com/apache/datafusion/pull/18868#discussion_r2558605513
##########
datafusion/datasource-parquet/src/row_group_filter.rs:
##########
@@ -46,13 +48,19 @@ use parquet::{
pub struct RowGroupAccessPlanFilter {
/// which row groups should be accessed
access_plan: ParquetAccessPlan,
+ /// which row groups are fully contained within the pruning predicate
+ is_fully_matched: Vec<bool>,
Review Comment:
The field tracks the row groups and marks if they're fully matched
##########
datafusion/datasource-parquet/src/opener.rs:
##########
@@ -407,8 +411,12 @@ impl FileOpener for ParquetOpener {
.add_matched(n_remaining_row_groups);
}
- let mut access_plan = row_groups.build();
+ // Prune by limit if limit is set and limit order is not sensitive
+ if let (Some(limit), false) = (limit, limit_order_sensitive) {
Review Comment:
The entry to do limit pruning
##########
datafusion/core/tests/parquet/row_group_pruning.rs:
##########
@@ -1636,3 +1722,241 @@ async fn test_bloom_filter_decimal_dict() {
.test_row_group_prune()
.await;
}
+
+// Helper function to create a batch with a single Int32 column.
Review Comment:
Here are the tests
##########
datafusion/core/tests/parquet/mod.rs:
##########
@@ -232,20 +247,41 @@ impl TestOutput {
/// and the appropriate scenario
impl ContextWithParquet {
async fn new(scenario: Scenario, unit: Unit) -> Self {
- Self::with_config(scenario, unit, SessionConfig::new()).await
+ Self::with_config(scenario, unit, SessionConfig::new(), None,
None).await
+ }
+
+ /// Set custom schema and batches for the test
+ pub async fn with_custom_data(
+ scenario: Scenario,
+ unit: Unit,
+ schema: Arc<Schema>,
+ batches: Vec<RecordBatch>,
+ ) -> Self {
+ Self::with_config(
+ scenario,
+ unit,
+ SessionConfig::new(),
+ Some(schema),
+ Some(batches),
+ )
+ .await
}
async fn with_config(
scenario: Scenario,
unit: Unit,
mut config: SessionConfig,
+ custom_schema: Option<Arc<Schema>>,
+ custom_batches: Option<Vec<RecordBatch>>,
) -> Self {
// Use a single partition for deterministic results no matter how many
CPUs the host has
config = config.with_target_partitions(1);
let file = match unit {
Unit::RowGroup(row_per_group) => {
config = config.with_parquet_bloom_filter_pruning(true);
- make_test_file_rg(scenario, row_per_group).await
+ config.options_mut().execution.parquet.pushdown_filters = true;
Review Comment:
I enabled `pushdown_filters` here, then filters and limit can be pushed down
to scan.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]