alamb commented on code in PR #15561:
URL: https://github.com/apache/datafusion/pull/15561#discussion_r2027656995


##########
datafusion/core/src/datasource/physical_plan/parquet.rs:
##########
@@ -1445,119 +1465,26 @@ mod tests {
         // batch1: c1(string)
         let batch1 = string_batch();
 
-        // c1 != 'bar'
-        let filter = col("c1").not_eq(lit("bar"));
+        // c1 == 'aaa', should prune via stats

Review Comment:
   why is this change needed?



##########
datafusion/datasource-parquet/src/source.rs:
##########
@@ -537,11 +513,10 @@ impl FileSource for ParquetSource {
             .expect("projected_statistics must be set");
         // When filters are pushed down, we have no way of knowing the exact 
statistics.
         // Note that pruning predicate is also a kind of filter pushdown.
-        // (bloom filters use `pruning_predicate` too)
-        if self.pruning_predicate().is_some()
-            || self.page_pruning_predicate().is_some()
-            || (self.predicate().is_some() && self.pushdown_filters())
-        {
+        // (bloom filters use `pruning_predicate` too).
+        // Because filter pushdown may happen dynamically as long as there is 
a predicate
+        // if we have *any* predicate applied, we can't guarantee the 
statistics are exact.

Review Comment:
   I think this is the correct check and makes sense to me



##########
datafusion/datasource-parquet/src/opener.rs:
##########
@@ -295,3 +312,89 @@ fn create_initial_plan(
     // default to scanning all row groups
     Ok(ParquetAccessPlan::new_all(row_group_count))
 }
+
+/// Build a pruning predicate from an optional predicate expression.
+/// If the predicate is None or the predicate cannot be converted to a pruning
+/// predicate, return None.
+/// If there is an error creating the pruning predicate it is recorded by 
incrementing
+/// the `predicate_creation_errors` counter.
+pub fn build_pruning_predicate(

Review Comment:
   does this need to be `pub`?



##########
datafusion/sqllogictest/test_files/parquet.slt:
##########
@@ -625,7 +625,7 @@ physical_plan
 01)CoalesceBatchesExec: target_batch_size=8192
 02)--FilterExec: column1@0 LIKE f%
 03)----RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
-04)------DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/foo.parquet]]},
 projection=[column1], file_type=parquet, predicate=column1@0 LIKE f%, 
pruning_predicate=column1_null_count@2 != row_count@3 AND column1_min@0 <= g 
AND f <= column1_max@1, required_guarantees=[]
+04)------DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/foo.parquet]]},
 projection=[column1], file_type=parquet, predicate=column1@0 LIKE f%

Review Comment:
   this is basically my only concern with the entire PR -- that the explain 
plans now do not show pruning predicates / literal guarantees anymore.
   
   This will make it harder to understand when optimizations are happening or 
not (because a particular predicate can't be converted for example)
   
   Would it be possible to add this part of the explain back in (by trying to 
create a pruning predicate for only the explain plan, when needed)?
   
   I realize that approach has the downside that the pruning predicate used for 
each file may be different than what is shown in the explain plan



##########
datafusion/core/src/datasource/physical_plan/parquet.rs:
##########
@@ -1445,119 +1465,26 @@ mod tests {
         // batch1: c1(string)
         let batch1 = string_batch();
 
-        // c1 != 'bar'
-        let filter = col("c1").not_eq(lit("bar"));
+        // c1 == 'aaa', should prune via stats
+        let filter = col("c1").eq(lit("aaa"));
 
         let rt = RoundTrip::new()
             .with_predicate(filter)
             .with_pushdown_predicate()
             .round_trip(vec![batch1])
             .await;
 
-        // should have a pruning predicate
-        let pruning_predicate = rt.parquet_source.pruning_predicate();
-        assert!(pruning_predicate.is_some());
+        let explain = rt.explain.unwrap();
 
-        // convert to explain plan form
-        let display = displayable(rt.parquet_exec.as_ref())
-            .indent(true)
-            .to_string();
+        // check that there was a pruning predicate -> row groups got pruned
+        assert_contains!(&explain, "predicate=c1@0 = aaa");
 
-        assert_contains!(
-            &display,
-            "pruning_predicate=c1_null_count@2 != row_count@3 AND (c1_min@0 != 
bar OR bar != c1_max@1)"
-        );
-
-        assert_contains!(&display, r#"predicate=c1@0 != bar"#);
+        // there's a single row group, but we can check that it matched
+        // if no pruning was done this would be 0 instead of 1
+        assert_contains!(&explain, "row_groups_matched_statistics=1");
 
-        assert_contains!(&display, "projection=[c1]");
-    }
-
-    #[tokio::test]
-    async fn parquet_exec_display_deterministic() {

Review Comment:
   I think the guarantees were in a HashSet and printed to the explain plan so 
they could change from run to run



##########
datafusion/core/src/datasource/physical_plan/parquet.rs:
##########
@@ -137,71 +139,109 @@ mod tests {
             self.round_trip(batches).await.batches
         }
 
-        /// run the test, returning the `RoundTripResult`
-        async fn round_trip(self, batches: Vec<RecordBatch>) -> 
RoundTripResult {
-            let Self {
-                projection,
-                schema,
-                predicate,
-                pushdown_predicate,
-                page_index_predicate,
-            } = self;
-
-            let file_schema = match schema {
-                Some(schema) => schema,
-                None => Arc::new(
-                    Schema::try_merge(
-                        batches.iter().map(|b| b.schema().as_ref().clone()),
-                    )
-                    .unwrap(),
-                ),
-            };
-            // If testing with page_index_predicate, write parquet
-            // files with multiple pages
-            let multi_page = page_index_predicate;
-            let (meta, _files) = store_parquet(batches, 
multi_page).await.unwrap();
-            let file_group = meta.into_iter().map(Into::into).collect();
-
+        fn build_file_source(&self, file_schema: SchemaRef) -> 
Arc<ParquetSource> {
             // set up predicate (this is normally done by a layer higher up)
-            let predicate = predicate.map(|p| logical2physical(&p, 
&file_schema));
+            let predicate = self
+                .predicate
+                .as_ref()
+                .map(|p| logical2physical(p, &file_schema));
 
             let mut source = ParquetSource::default();
             if let Some(predicate) = predicate {
                 source = source.with_predicate(Arc::clone(&file_schema), 
predicate);
             }
 
-            if pushdown_predicate {
+            if self.pushdown_predicate {
                 source = source
                     .with_pushdown_filters(true)
                     .with_reorder_filters(true);
             }
 
-            if page_index_predicate {
+            if self.page_index_predicate {
                 source = source.with_enable_page_index(true);
             }
 
+            Arc::new(source)
+        }
+
+        fn build_parquet_exec(
+            &self,
+            file_schema: SchemaRef,
+            file_group: FileGroup,
+            source: Arc<ParquetSource>,
+        ) -> Arc<DataSourceExec> {
             let base_config = FileScanConfigBuilder::new(
                 ObjectStoreUrl::local_filesystem(),
                 file_schema,
-                Arc::new(source.clone()),
+                source,
             )
             .with_file_group(file_group)
-            .with_projection(projection)
+            .with_projection(self.projection.clone())
             .build();
+            DataSourceExec::from_data_source(base_config)
+        }
+
+        /// run the test, returning the `RoundTripResult`
+        async fn round_trip(&self, batches: Vec<RecordBatch>) -> 
RoundTripResult {
+            let file_schema = match &self.schema {
+                Some(schema) => schema,
+                None => &Arc::new(
+                    Schema::try_merge(
+                        batches.iter().map(|b| b.schema().as_ref().clone()),
+                    )
+                    .unwrap(),
+                ),
+            };
+            let file_schema = Arc::clone(file_schema);
+            // If testing with page_index_predicate, write parquet
+            // files with multiple pages
+            let multi_page = self.page_index_predicate;
+            let (meta, _files) = store_parquet(batches, 
multi_page).await.unwrap();
+            let file_group: FileGroup = 
meta.into_iter().map(Into::into).collect();
+
+            // build a ParquetExec to return the results
+            let parquet_source = self.build_file_source(file_schema.clone());
+            let parquet_exec = self.build_parquet_exec(
+                file_schema.clone(),
+                file_group.clone(),
+                Arc::clone(&parquet_source),
+            );
+
+            let analyze_exec = Arc::new(AnalyzeExec::new(
+                false,
+                false,
+                // use a new ParquetSource to avoid sharing execution metrics
+                self.build_parquet_exec(
+                    file_schema.clone(),
+                    file_group.clone(),
+                    self.build_file_source(file_schema.clone()),
+                ),
+                Arc::new(Schema::new(vec![
+                    Field::new("plan_type", DataType::Utf8, true),
+                    Field::new("plan", DataType::Utf8, true),
+                ])),
+            ));

Review Comment:
   I agree this sounds good to me



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to