alamb commented on code in PR #4679:
URL: https://github.com/apache/arrow-datafusion/pull/4679#discussion_r1053638385


##########
datafusion/core/src/dataframe.rs:
##########
@@ -505,17 +505,40 @@ impl DataFrame {
         self.plan.schema()
     }
 
-    /// Return the unoptimized logical plan represented by this DataFrame.
+    /// Return the unoptimized logical plan
+    pub fn logical_plan(&self) -> &LogicalPlan {
+        &self.plan
+    }
+
+    /// Return the logical plan represented by this DataFrame without running 
the optimizers

Review Comment:
   👍 
   I wonder if we should call it `into_unoptimized_plan` for consistency 🤔 



##########
datafusion/core/tests/sql/avro.rs:
##########
@@ -121,11 +121,8 @@ async fn avro_single_nan_schema() {
     .await
     .unwrap();
     let sql = "SELECT mycol FROM single_nan";
-    let plan = ctx.create_logical_plan(sql).unwrap();

Review Comment:
   🙈  -- this pattern is very old in the DataFusion codebase -- I think it was 
simply copied around and has never been cleaned up. Thank you 🙏 



##########
datafusion/core/tests/sql/mod.rs:
##########
@@ -1020,61 +1020,20 @@ async fn try_execute_to_batches(
     ctx: &SessionContext,
     sql: &str,
 ) -> Result<Vec<RecordBatch>> {
-    let plan = ctx.create_logical_plan(sql)?;
-    let logical_schema = plan.schema();
+    let dataframe = ctx.sql(sql).await?;
+    let logical_schema = dataframe.schema().clone();
 
-    let plan = ctx.optimize(&plan)?;
-    let optimized_logical_schema = plan.schema();
+    let optimized = ctx.optimize(dataframe.logical_plan())?;
+    let optimized_logical_schema = optimized.schema();
+    let results = dataframe.collect().await?;
 
-    let plan = ctx.create_physical_plan(&plan).await?;
-
-    let task_ctx = ctx.task_ctx();
-    let results = collect(plan, task_ctx).await?;
-
-    assert_eq!(logical_schema.as_ref(), optimized_logical_schema.as_ref());
+    assert_eq!(&logical_schema, optimized_logical_schema.as_ref());
     Ok(results)
 }
 
 /// Execute query and return results as a Vec of RecordBatches
 async fn execute_to_batches(ctx: &SessionContext, sql: &str) -> 
Vec<RecordBatch> {
-    let msg = format!("Creating logical plan for '{}'", sql);
-    let plan = ctx
-        .create_logical_plan(sql)
-        .map_err(|e| format!("{:?} at {}", e, msg))
-        .unwrap();
-    let logical_schema = plan.schema();
-
-    // We are not really interested in the direct output of 
optimized_logical_plan
-    // since the physical plan construction already optimizes the given 
logical plan
-    // and we want to avoid double-optimization as a consequence. So we just 
construct
-    // it here to make sure that it doesn't fail at this step and get the 
optimized
-    // schema (to assert later that the logical and optimized schemas are the 
same).
-    let msg = format!("Optimizing logical plan for '{}': {:?}", sql, plan);
-    let optimized_logical_plan = ctx
-        .optimize(&plan)
-        .map_err(|e| format!("{:?} at {}", e, msg))
-        .unwrap();
-    let optimized_logical_schema = optimized_logical_plan.schema();
-
-    let msg = format!(
-        "Creating physical plan for '{}': {:?}",
-        sql, optimized_logical_plan
-    );
-    let plan = ctx
-        .create_physical_plan(&plan)
-        .await
-        .map_err(|e| format!("{:?} at {}", e, msg))
-        .unwrap();
-
-    let msg = format!("Executing physical plan for '{}': {:?}", sql, plan);
-    let task_ctx = ctx.task_ctx();
-    let results = collect(plan, task_ctx)
-        .await
-        .map_err(|e| format!("{:?} at {}", e, msg))
-        .unwrap();
-
-    assert_eq!(logical_schema.as_ref(), optimized_logical_schema.as_ref());

Review Comment:
   This change appears to have lost the check  logical schema and optimized 
schema are the same. I think that is a valuable check to have in tests



##########
datafusion/core/src/dataframe.rs:
##########
@@ -505,17 +505,40 @@ impl DataFrame {
         self.plan.schema()
     }
 
-    /// Return the unoptimized logical plan represented by this DataFrame.
+    /// Return the unoptimized logical plan
+    pub fn logical_plan(&self) -> &LogicalPlan {
+        &self.plan
+    }
+
+    /// Return the logical plan represented by this DataFrame without running 
the optimizers
+    ///
+    /// Note: This method should not be used outside testing, as it loses the 
snapshot
+    /// of the [`SessionState`] attached to this [`DataFrame`] and 
consequently subsequent
+    /// operations may take place against a different state
     pub fn to_unoptimized_plan(self) -> LogicalPlan {
         self.plan
     }
 
     /// Return the optimized logical plan represented by this DataFrame.
-    pub fn to_logical_plan(self) -> Result<LogicalPlan> {
+    ///
+    /// Note: This method should not be used outside testing, as it loses the 
snapshot
+    /// of the [`SessionState`] attached to this [`DataFrame`] and 
consequently subsequent
+    /// operations may take place against a different state
+    pub fn to_optimized_plan(self) -> Result<LogicalPlan> {

Review Comment:
   Is there any reason to make this consume `self` ? Perhaps it could be
   
   ```suggestion
       pub fn optimized_plan(&self) -> Result<LogicalPlan> {
   ```



##########
datafusion/core/tests/sql/select.rs:
##########
@@ -1275,32 +1265,14 @@ async fn test_prepare_statement() -> Result<()> {
 
     // sql to statement then to prepare logical plan with parameters
     // c1 defined as UINT32, c2 defined as UInt64 but the params are Int32 and 
Float64
-    let logical_plan =
-        ctx.create_logical_plan("PREPARE my_plan(INT, DOUBLE) AS SELECT c1, c2 
FROM test WHERE c1 > $2 AND c1 < $1")?;
+    let dataframe =
+        ctx.sql("PREPARE my_plan(INT, DOUBLE) AS SELECT c1, c2 FROM test WHERE 
c1 > $2 AND c1 < $1").await?;
 
     // prepare logical plan to logical plan without parameters
     let param_values = vec![ScalarValue::Int32(Some(3)), 
ScalarValue::Float64(Some(0.0))];
-    let logical_plan = logical_plan.with_param_values(param_values)?;
+    let dataframe = dataframe.with_param_values(param_values)?;
+    let results = dataframe.collect().await?;
 
-    // logical plan to optimized logical plan
-    let logical_plan = ctx.optimize(&logical_plan)?;
-
-    // optimized logical plan to physical plan
-    let physical_plan = ctx.create_physical_plan(&logical_plan).await?;
-
-    let task_ctx = ctx.task_ctx();
-    let results = collect_partitioned(physical_plan, task_ctx).await?;
-
-    // note that the order of partitions is not deterministic
-    let mut num_rows = 0;
-    for partition in &results {
-        for batch in partition {
-            num_rows += batch.num_rows();
-        }
-    }
-    assert_eq!(20, num_rows);

Review Comment:
   I agree that the 20 output row check is covered by `assert_batches_eq` 👍 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to