alamb commented on code in PR #4679:
URL: https://github.com/apache/arrow-datafusion/pull/4679#discussion_r1053638385
##########
datafusion/core/src/dataframe.rs:
##########
@@ -505,17 +505,40 @@ impl DataFrame {
self.plan.schema()
}
- /// Return the unoptimized logical plan represented by this DataFrame.
+ /// Return the unoptimized logical plan
+ pub fn logical_plan(&self) -> &LogicalPlan {
+ &self.plan
+ }
+
+ /// Return the logical plan represented by this DataFrame without running
the optimizers
Review Comment:
👍
I wonder if we should call it `into_unoptimized_plan` for consistency 🤔
##########
datafusion/core/tests/sql/avro.rs:
##########
@@ -121,11 +121,8 @@ async fn avro_single_nan_schema() {
.await
.unwrap();
let sql = "SELECT mycol FROM single_nan";
- let plan = ctx.create_logical_plan(sql).unwrap();
Review Comment:
🙈 -- this pattern is very old in the DataFusion codebase -- I think it was
simply copied around and has never been cleaned up. Thank you 🙏
##########
datafusion/core/tests/sql/mod.rs:
##########
@@ -1020,61 +1020,20 @@ async fn try_execute_to_batches(
ctx: &SessionContext,
sql: &str,
) -> Result<Vec<RecordBatch>> {
- let plan = ctx.create_logical_plan(sql)?;
- let logical_schema = plan.schema();
+ let dataframe = ctx.sql(sql).await?;
+ let logical_schema = dataframe.schema().clone();
- let plan = ctx.optimize(&plan)?;
- let optimized_logical_schema = plan.schema();
+ let optimized = ctx.optimize(dataframe.logical_plan())?;
+ let optimized_logical_schema = optimized.schema();
+ let results = dataframe.collect().await?;
- let plan = ctx.create_physical_plan(&plan).await?;
-
- let task_ctx = ctx.task_ctx();
- let results = collect(plan, task_ctx).await?;
-
- assert_eq!(logical_schema.as_ref(), optimized_logical_schema.as_ref());
+ assert_eq!(&logical_schema, optimized_logical_schema.as_ref());
Ok(results)
}
/// Execute query and return results as a Vec of RecordBatches
async fn execute_to_batches(ctx: &SessionContext, sql: &str) ->
Vec<RecordBatch> {
- let msg = format!("Creating logical plan for '{}'", sql);
- let plan = ctx
- .create_logical_plan(sql)
- .map_err(|e| format!("{:?} at {}", e, msg))
- .unwrap();
- let logical_schema = plan.schema();
-
- // We are not really interested in the direct output of
optimized_logical_plan
- // since the physical plan construction already optimizes the given
logical plan
- // and we want to avoid double-optimization as a consequence. So we just
construct
- // it here to make sure that it doesn't fail at this step and get the
optimized
- // schema (to assert later that the logical and optimized schemas are the
same).
- let msg = format!("Optimizing logical plan for '{}': {:?}", sql, plan);
- let optimized_logical_plan = ctx
- .optimize(&plan)
- .map_err(|e| format!("{:?} at {}", e, msg))
- .unwrap();
- let optimized_logical_schema = optimized_logical_plan.schema();
-
- let msg = format!(
- "Creating physical plan for '{}': {:?}",
- sql, optimized_logical_plan
- );
- let plan = ctx
- .create_physical_plan(&plan)
- .await
- .map_err(|e| format!("{:?} at {}", e, msg))
- .unwrap();
-
- let msg = format!("Executing physical plan for '{}': {:?}", sql, plan);
- let task_ctx = ctx.task_ctx();
- let results = collect(plan, task_ctx)
- .await
- .map_err(|e| format!("{:?} at {}", e, msg))
- .unwrap();
-
- assert_eq!(logical_schema.as_ref(), optimized_logical_schema.as_ref());
Review Comment:
This change appears to have lost the check logical schema and optimized
schema are the same. I think that is a valuable check to have in tests
##########
datafusion/core/src/dataframe.rs:
##########
@@ -505,17 +505,40 @@ impl DataFrame {
self.plan.schema()
}
- /// Return the unoptimized logical plan represented by this DataFrame.
+ /// Return the unoptimized logical plan
+ pub fn logical_plan(&self) -> &LogicalPlan {
+ &self.plan
+ }
+
+ /// Return the logical plan represented by this DataFrame without running
the optimizers
+ ///
+ /// Note: This method should not be used outside testing, as it loses the
snapshot
+ /// of the [`SessionState`] attached to this [`DataFrame`] and
consequently subsequent
+ /// operations may take place against a different state
pub fn to_unoptimized_plan(self) -> LogicalPlan {
self.plan
}
/// Return the optimized logical plan represented by this DataFrame.
- pub fn to_logical_plan(self) -> Result<LogicalPlan> {
+ ///
+ /// Note: This method should not be used outside testing, as it loses the
snapshot
+ /// of the [`SessionState`] attached to this [`DataFrame`] and
consequently subsequent
+ /// operations may take place against a different state
+ pub fn to_optimized_plan(self) -> Result<LogicalPlan> {
Review Comment:
Is there any reason to make this consume `self` ? Perhaps it could be
```suggestion
pub fn optimized_plan(&self) -> Result<LogicalPlan> {
```
##########
datafusion/core/tests/sql/select.rs:
##########
@@ -1275,32 +1265,14 @@ async fn test_prepare_statement() -> Result<()> {
// sql to statement then to prepare logical plan with parameters
// c1 defined as UINT32, c2 defined as UInt64 but the params are Int32 and
Float64
- let logical_plan =
- ctx.create_logical_plan("PREPARE my_plan(INT, DOUBLE) AS SELECT c1, c2
FROM test WHERE c1 > $2 AND c1 < $1")?;
+ let dataframe =
+ ctx.sql("PREPARE my_plan(INT, DOUBLE) AS SELECT c1, c2 FROM test WHERE
c1 > $2 AND c1 < $1").await?;
// prepare logical plan to logical plan without parameters
let param_values = vec![ScalarValue::Int32(Some(3)),
ScalarValue::Float64(Some(0.0))];
- let logical_plan = logical_plan.with_param_values(param_values)?;
+ let dataframe = dataframe.with_param_values(param_values)?;
+ let results = dataframe.collect().await?;
- // logical plan to optimized logical plan
- let logical_plan = ctx.optimize(&logical_plan)?;
-
- // optimized logical plan to physical plan
- let physical_plan = ctx.create_physical_plan(&logical_plan).await?;
-
- let task_ctx = ctx.task_ctx();
- let results = collect_partitioned(physical_plan, task_ctx).await?;
-
- // note that the order of partitions is not deterministic
- let mut num_rows = 0;
- for partition in &results {
- for batch in partition {
- num_rows += batch.num_rows();
- }
- }
- assert_eq!(20, num_rows);
Review Comment:
I agree that the 20 output row check is covered by `assert_batches_eq` 👍
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]