alamb commented on code in PR #10286: URL: https://github.com/apache/datafusion/pull/10286#discussion_r1583665774
########## datafusion-examples/examples/parquet_exec_visitor.rs: ########## @@ -0,0 +1,102 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use datafusion::datasource::file_format::parquet::ParquetFormat; +use datafusion::datasource::listing::ListingOptions; +use datafusion::datasource::physical_plan::{FileScanConfig, ParquetExec}; +use datafusion::execution::context::SessionContext; +use datafusion::physical_plan::metrics::MetricValue; +use datafusion::physical_plan::{ + execute_stream, visit_execution_plan, ExecutionPlan, ExecutionPlanVisitor, +}; +use futures::StreamExt; + +#[tokio::main] +async fn main() { + let ctx = SessionContext::new(); + + let test_data = datafusion::test_util::parquet_test_data(); + + // Configure listing options + let file_format = ParquetFormat::default().with_enable_pruning(true); + let listing_options = ListingOptions::new(Arc::new(file_format)) + // This is a workaround for this example since `test_data` contains Review Comment: I think you could avoid this using `&format!("file://{test_data}/alltypes_plain.parquet"),` as the filename below (I tried it locally and it seems to work) ########## datafusion-examples/examples/parquet_exec_visitor.rs: ########## @@ -0,0 +1,102 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use datafusion::datasource::file_format::parquet::ParquetFormat; +use datafusion::datasource::listing::ListingOptions; +use datafusion::datasource::physical_plan::{FileScanConfig, ParquetExec}; +use datafusion::execution::context::SessionContext; +use datafusion::physical_plan::metrics::MetricValue; +use datafusion::physical_plan::{ + execute_stream, visit_execution_plan, ExecutionPlan, ExecutionPlanVisitor, +}; +use futures::StreamExt; + +#[tokio::main] +async fn main() { + let ctx = SessionContext::new(); + + let test_data = datafusion::test_util::parquet_test_data(); + + // Configure listing options + let file_format = ParquetFormat::default().with_enable_pruning(true); + let listing_options = ListingOptions::new(Arc::new(file_format)) + // This is a workaround for this example since `test_data` contains + // many different parquet different files, + // in practice use FileType::PARQUET.get_ext(). + .with_file_extension("alltypes_plain.parquet"); + + // First example were we use an absolute path, which requires no additional setup. + let _ = ctx + .register_listing_table( + "my_table", + &format!("file://{test_data}/"), + listing_options.clone(), + None, + None, + ) + .await; + + let df = ctx.sql("SELECT * FROM my_table").await.unwrap(); + let plan = df.create_physical_plan().await.unwrap(); + + // Create empty visitor + let mut visitor = ParquetExecVisitor { + file_scan_config: None, + bytes_scanned: None, + }; + + // Make sure you execute the plan to collect actual execution statistics. + // For example, in this example the `file_scan_config` is known without executing + // but the `bytes_scanned` would be None if we did not execute. + let mut batch_stream = execute_stream(plan.clone(), ctx.task_ctx()).unwrap(); + while let Some(batch) = batch_stream.next().await { + println!("Do something with batch"); + } + + visit_execution_plan(plan.as_ref(), &mut visitor).unwrap(); + + println!("ParquetExecVisitor: {:?}", visitor); +} + +/// Define a struct with fields to hold the information you want to collect +#[derive(Debug)] +struct ParquetExecVisitor { + file_scan_config: Option<FileScanConfig>, + bytes_scanned: Option<MetricValue>, +} + +impl ExecutionPlanVisitor for ParquetExecVisitor { + type Error = datafusion_common::DataFusionError; + + /// Based on your needs implement either `pre_visit` or `post_visit` Review Comment: ```suggestion /// This function is called once for every node in the tree. /// Based on your needs implement either `pre_visit` (visit each node before its children/inputs) /// or `post_visit` (visit each node after its children/inputs) ``` ########## datafusion-examples/examples/parquet_exec_visitor.rs: ########## @@ -0,0 +1,102 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use datafusion::datasource::file_format::parquet::ParquetFormat; +use datafusion::datasource::listing::ListingOptions; +use datafusion::datasource::physical_plan::{FileScanConfig, ParquetExec}; +use datafusion::execution::context::SessionContext; +use datafusion::physical_plan::metrics::MetricValue; +use datafusion::physical_plan::{ + execute_stream, visit_execution_plan, ExecutionPlan, ExecutionPlanVisitor, +}; +use futures::StreamExt; + +#[tokio::main] +async fn main() { Review Comment: I think it helps to make the examples self standing with a few comments that explain what it shows, for example https://github.com/apache/datafusion/blob/0f2a68ee1676c0d141d2c7cacf4b7c21d0033870/datafusion-examples/examples/csv_sql.rs#L22-L23 So in this one, perhaps we could add something like ```suggestion /// Example of collecting metrics after execution by visiting the `ExecutionPlan` #[tokio::main] async fn main() { ``` ########## datafusion-examples/examples/parquet_exec_visitor.rs: ########## @@ -0,0 +1,102 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use datafusion::datasource::file_format::parquet::ParquetFormat; +use datafusion::datasource::listing::ListingOptions; +use datafusion::datasource::physical_plan::{FileScanConfig, ParquetExec}; +use datafusion::execution::context::SessionContext; +use datafusion::physical_plan::metrics::MetricValue; +use datafusion::physical_plan::{ + execute_stream, visit_execution_plan, ExecutionPlan, ExecutionPlanVisitor, +}; +use futures::StreamExt; + +#[tokio::main] +async fn main() { + let ctx = SessionContext::new(); + + let test_data = datafusion::test_util::parquet_test_data(); + + // Configure listing options + let file_format = ParquetFormat::default().with_enable_pruning(true); + let listing_options = ListingOptions::new(Arc::new(file_format)) + // This is a workaround for this example since `test_data` contains + // many different parquet different files, + // in practice use FileType::PARQUET.get_ext(). + .with_file_extension("alltypes_plain.parquet"); + + // First example were we use an absolute path, which requires no additional setup. + let _ = ctx + .register_listing_table( + "my_table", + &format!("file://{test_data}/"), + listing_options.clone(), + None, + None, + ) + .await; + + let df = ctx.sql("SELECT * FROM my_table").await.unwrap(); + let plan = df.create_physical_plan().await.unwrap(); + + // Create empty visitor + let mut visitor = ParquetExecVisitor { + file_scan_config: None, + bytes_scanned: None, + }; + + // Make sure you execute the plan to collect actual execution statistics. + // For example, in this example the `file_scan_config` is known without executing + // but the `bytes_scanned` would be None if we did not execute. + let mut batch_stream = execute_stream(plan.clone(), ctx.task_ctx()).unwrap(); + while let Some(batch) = batch_stream.next().await { + println!("Do something with batch"); + } + + visit_execution_plan(plan.as_ref(), &mut visitor).unwrap(); + + println!("ParquetExecVisitor: {:?}", visitor); Review Comment: The output looks a little messy: ```shell Do something with batch ParquetExecVisitor: ParquetExecVisitor { file_scan_config: Some(object_store_url=ObjectStoreUrl { url: Url { scheme: "file", cannot_be_a_base: false, username: "", password: None, host: None, port: None, path: "/", query: None, fragment: None } }, statistics=Statistics { num_rows: Exact(8), total_byte_size: Exact(671), column_statistics: [ColumnStatistics { null_count: Absent, max_value: Absent, min_value: Absent, distinct_count: Absent }, ColumnStatistics { null_count: Absent, max_value: Absent, min_value: Absent, distinct_count: Absent }, ColumnStatistics { null_count: Absent, max_value: Absent, min_value: Absent, distinct_count: Absent }, ColumnStatistics { null_count: Absent, max_value: Absent, min_value: Absent, distinct_count: Absent }, ColumnStatistics { null_count: Absent, max_value: Absent, min_value: Absent, distinct_count: Absent }, ColumnStatistics { null_count: Absent, max_value: Absent, min_value: Absent, distinct_count: Absent }, ColumnStatistics { null_count: Absen t, max_value: Absent, min_value: Absent, distinct_count: Absent }, ColumnStatistics { null_count: Absent, max_value: Absent, min_value: Absent, distinct_count: Absent }, ColumnStatistics { null_count: Absent, max_value: Absent, min_value: Absent, distinct_count: Absent }, ColumnStatistics { null_count: Absent, max_value: Absent, min_value: Absent, distinct_count: Absent }, ColumnStatistics { null_count: Absent, max_value: Absent, min_value: Absent, distinct_count: Absent }] }, file_groups={1 group: [[Users/andrewlamb/Software/datafusion2/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col]), bytes_scanned: Some(Count { name: "bytes_scanned", count: Count { value: 671 } }) } ``` Maybe we could print out a nicer version like ```rust let bytes_scanned = visitor.bytes_scanned.unwrap(); println!("Total parquet bytes scanned {bytes_scanned}"); ``` ```shell Do something with batch Total parquet bytes scanned 671 ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
