alamb commented on code in PR #19316: URL: https://github.com/apache/datafusion/pull/19316#discussion_r3003216350
########## datafusion/core/src/execution/plan_observer.rs: ########## @@ -0,0 +1,158 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Provides callbacks to keep track of planned and executed queries. + +use std::collections::HashMap; +use std::io::Write; +use std::sync::Arc; +use std::time::Duration; +use std::{fmt::Debug, fs}; + +use arrow::{array::RecordBatch, util::pretty::pretty_format_batches}; +use datafusion_common::error::Result; +use datafusion_expr::LogicalPlan; +use datafusion_physical_plan::ExecutionPlan; +use parking_lot::Mutex; + +#[cfg(feature = "sql")] +use datafusion_sql::unparser::Unparser; + +/// Provides callbacks to keep track of planned and executed queries. +pub trait PlanObserver: Send + Sync + 'static + Debug { + /// Called after the physical plan has been created but before it has been executed. + /// Receives the logical and physical plans, as well as a unique identifier. + fn plan_created( + &self, + id: &str, + logical_plan: &LogicalPlan, + physical_plan: &Arc<dyn ExecutionPlan>, + ) -> Result<()>; + + /// Called after the physical plan has been executed. + /// Receives the identifier, the EXPLAIN ANALYZE output, and the duration. + fn plan_executed( + &self, + id: &str, + explain_result: RecordBatch, + duration: Duration, + ) -> Result<()>; +} + +/// Default implementation of [`PlanObserver`]. +/// Outputs the result in the following format: +/// ```txt +/// QUERY: <sql query> (requires the `sql` feature; outputs `(unavailable)` otherwise) +/// DURATION: <query duration in milliseconds> +/// EXPLAIN: <explain analyze output> +/// ``` +#[derive(Debug)] +pub struct DefaultPlanObserver { + output: String, + min_duration_ms: usize, + /// stores a SQL representation of the logical plan, if the `sql` feature is enabled. Review Comment: Codex points out that on the error path this map never gets cleaned up -- so a bunch of errors will potentially cause this map to grow without bound. Maybe something we could clean up as a follow on PR (file a ticket to fix, etc) ########## datafusion/core/src/execution/plan_observer.rs: ########## @@ -0,0 +1,158 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Provides callbacks to keep track of planned and executed queries. + +use std::collections::HashMap; +use std::io::Write; +use std::sync::Arc; +use std::time::Duration; +use std::{fmt::Debug, fs}; + +use arrow::{array::RecordBatch, util::pretty::pretty_format_batches}; +use datafusion_common::error::Result; +use datafusion_expr::LogicalPlan; +use datafusion_physical_plan::ExecutionPlan; +use parking_lot::Mutex; + +#[cfg(feature = "sql")] +use datafusion_sql::unparser::Unparser; + +/// Provides callbacks to keep track of planned and executed queries. +pub trait PlanObserver: Send + Sync + 'static + Debug { + /// Called after the physical plan has been created but before it has been executed. + /// Receives the logical and physical plans, as well as a unique identifier. + fn plan_created( + &self, + id: &str, + logical_plan: &LogicalPlan, + physical_plan: &Arc<dyn ExecutionPlan>, + ) -> Result<()>; + + /// Called after the physical plan has been executed. + /// Receives the identifier, the EXPLAIN ANALYZE output, and the duration. + fn plan_executed( + &self, + id: &str, + explain_result: RecordBatch, + duration: Duration, + ) -> Result<()>; +} + +/// Default implementation of [`PlanObserver`]. +/// Outputs the result in the following format: +/// ```txt +/// QUERY: <sql query> (requires the `sql` feature; outputs `(unavailable)` otherwise) +/// DURATION: <query duration in milliseconds> +/// EXPLAIN: <explain analyze output> +/// ``` +#[derive(Debug)] +pub struct DefaultPlanObserver { + output: String, + min_duration_ms: usize, + /// stores a SQL representation of the logical plan, if the `sql` feature is enabled. + queries: Arc<Mutex<HashMap<String, String>>>, +} + +impl DefaultPlanObserver { + /// Creates a new `DefaultPlanObserver`. + /// * `output`: where to write the output. + /// Possible values: + /// - `log::error` + /// - `log::warn` + /// - `log::info` + /// - `log::debug` + /// - `log::trace` + /// - a file path: creates the file if it does not exist, or appends to it if it does. Review Comment: I think it would make more sense to have an explicit enum or something here especially as this can do file I/O I worry that if someone accidentally passes in `log:error` (one `:`) that will write to a file named `log:error` What do you think about using something like ```rust enum Output { LogError, LogWarn, ... LogToFile(String), } ``` ? ########## datafusion/core/src/execution/session_state.rs: ########## @@ -1503,6 +1516,7 @@ impl SessionStateBuilder { function_factory, cache_factory, prepared_plans: HashMap::new(), + plan_observer: Some(Arc::new(DefaultPlanObserver::default())), Review Comment: This should only be set when `auto_explain` is enabled, right? ########## datafusion/core/src/physical_planner.rs: ########## @@ -274,7 +277,21 @@ impl PhysicalPlanner for DefaultPhysicalPlanner { .create_initial_plan(logical_plan, session_state) .await?; - self.optimize_physical_plan(plan, session_state, |_, _| {}) + let mut plan = self.optimize_physical_plan(plan, session_state, |_, _| {})?; + + // setup the auto explain mode if necessary Review Comment: I wonder if this would be cleaner to add to `handle_explain_or_analyze` (or put it in its own method)? ########## datafusion/core/src/execution/plan_observer.rs: ########## @@ -0,0 +1,158 @@ +// Licensed to the Apache Software Foundation (ASF) under one Review Comment: I think it does ########## datafusion/core/src/execution/plan_observer.rs: ########## @@ -0,0 +1,158 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Provides callbacks to keep track of planned and executed queries. + +use std::collections::HashMap; +use std::io::Write; +use std::sync::Arc; +use std::time::Duration; +use std::{fmt::Debug, fs}; + +use arrow::{array::RecordBatch, util::pretty::pretty_format_batches}; +use datafusion_common::error::Result; +use datafusion_expr::LogicalPlan; +use datafusion_physical_plan::ExecutionPlan; +use parking_lot::Mutex; + +#[cfg(feature = "sql")] +use datafusion_sql::unparser::Unparser; + +/// Provides callbacks to keep track of planned and executed queries. +pub trait PlanObserver: Send + Sync + 'static + Debug { + /// Called after the physical plan has been created but before it has been executed. + /// Receives the logical and physical plans, as well as a unique identifier. + fn plan_created( + &self, + id: &str, + logical_plan: &LogicalPlan, + physical_plan: &Arc<dyn ExecutionPlan>, + ) -> Result<()>; + + /// Called after the physical plan has been executed. + /// Receives the identifier, the EXPLAIN ANALYZE output, and the duration. + fn plan_executed( + &self, + id: &str, + explain_result: RecordBatch, Review Comment: is this result the actual output batches? Or is it the annotated explain plan? ########## datafusion/physical-plan/src/analyze.rs: ########## @@ -54,6 +55,25 @@ pub struct AnalyzeExec { /// The output schema for RecordBatches of this exec node schema: SchemaRef, cache: Arc<PlanProperties>, + /// If Some, passes the output of the analyze once it completes, as well as the duration. + callback: Option<AnalyzeCallback>, + /// If true, returns the inner batches instead of the analyze result. + /// Can be used together with the `callback` to keep track of the analyze result without + /// having to return it as the plan's output. + return_inner: bool, +} + +/// Optionally used by the `AnalyzeExec` operator to callback it with the result. Review Comment: We won't be able to easily add new things to this callback It might also be useful to point out somewhere that setting `return_inner` is effectively going to buffer the entire query output into RAM (which could be quite large) I wonder if it would make sense to define a trait ```rust trait AnalyzeObserver { ... } ``` And then instead of ```rust /// If Some, passes the output of the analyze once it completes, as well as the duration. callback: Option<AnalyzeCallback>, ``` Do something like ```rust /// If Some, passes the output of the analyze once it completes, as well as the duration. callback: Option<Arc<dyn AnalyzeObserver>>, ``` That way if we want to add more methods (like adding something which sees the plan metrics) it would be easier to add without an API change. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
