alamb commented on code in PR #15744:
URL: https://github.com/apache/datafusion/pull/15744#discussion_r2069768717


##########
datafusion/core/tests/user_defined/user_defined_plan.rs:
##########
@@ -102,362 +96,10 @@ use 
datafusion_physical_plan::execution_plan::{Boundedness, EmissionType};
 use async_trait::async_trait;
 use futures::{Stream, StreamExt};
 
-/// Execute the specified sql and return the resulting record batches
-/// pretty printed as a String.
-async fn exec_sql(ctx: &SessionContext, sql: &str) -> Result<String> {
-    let df = ctx.sql(sql).await?;
-    let batches = df.collect().await?;
-    pretty_format_batches(&batches)
-        .map_err(|e| arrow_datafusion_err!(e))
-        .map(|d| d.to_string())
-}
-
-/// Create a test table.
-async fn setup_table(ctx: SessionContext) -> Result<SessionContext> {
-    let sql = "
-        CREATE EXTERNAL TABLE sales(customer_id VARCHAR, revenue BIGINT)
-        STORED AS CSV location 'tests/data/customer.csv'
-        OPTIONS('format.has_header' 'false')
-    ";
-
-    let expected = vec!["++", "++"];
-
-    let s = exec_sql(&ctx, sql).await?;
-    let actual = s.lines().collect::<Vec<_>>();
-
-    assert_eq!(expected, actual, "Creating table");
-    Ok(ctx)
-}
-
-async fn setup_table_without_schemas(ctx: SessionContext) -> 
Result<SessionContext> {
-    let sql = "
-        CREATE EXTERNAL TABLE sales
-        STORED AS CSV location 'tests/data/customer.csv'
-        OPTIONS('format.has_header' 'false')
-    ";
-
-    let expected = vec!["++", "++"];
-
-    let s = exec_sql(&ctx, sql).await?;
-    let actual = s.lines().collect::<Vec<_>>();
-
-    assert_eq!(expected, actual, "Creating table");
-    Ok(ctx)
-}
-
-const QUERY: &str =
-    "SELECT customer_id, revenue FROM sales ORDER BY revenue DESC limit 3";
-
-const QUERY1: &str = "SELECT * FROM sales limit 3";
-
-const QUERY2: &str = "SELECT 42, arrow_typeof(42)";
-
-// Run the query using the specified execution context and compare it
-// to the known result
-async fn run_and_compare_query(ctx: SessionContext, description: &str) -> 
Result<()> {
-    let s = exec_sql(&ctx, QUERY).await?;
-    let actual = s.lines().collect::<Vec<_>>().join("\n");
-
-    insta::allow_duplicates! {
-        insta::with_settings!({
-            description => description,
-        }, {
-            insta::assert_snapshot!(actual, @r###"
-            +-------------+---------+
-            | customer_id | revenue |
-            +-------------+---------+
-            | paul        | 300     |
-            | jorge       | 200     |
-            | andy        | 150     |
-            +-------------+---------+
-        "###);
-        });
-    }
-
-    Ok(())
-}
-
-// Run the query using the specified execution context and compare it
-// to the known result
-async fn run_and_compare_query_with_analyzer_rule(
-    ctx: SessionContext,
-    description: &str,
-) -> Result<()> {
-    let s = exec_sql(&ctx, QUERY2).await?;
-    let actual = s.lines().collect::<Vec<_>>().join("\n");
-
-    insta::with_settings!({
-        description => description,
-    }, {
-        insta::assert_snapshot!(actual, @r###"
-        +------------+--------------------------+
-        | UInt64(42) | arrow_typeof(UInt64(42)) |
-        +------------+--------------------------+
-        | 42         | UInt64                   |
-        +------------+--------------------------+
-        "###);
-    });
-
-    Ok(())
-}
-
-// Run the query using the specified execution context and compare it
-// to the known result
-async fn run_and_compare_query_with_auto_schemas(
-    ctx: SessionContext,
-    description: &str,
-) -> Result<()> {
-    let s = exec_sql(&ctx, QUERY1).await?;
-    let actual = s.lines().collect::<Vec<_>>().join("\n");
-
-    insta::with_settings!({
-            description => description,
-        }, {
-            insta::assert_snapshot!(actual, @r###"
-            +----------+----------+
-            | column_1 | column_2 |
-            +----------+----------+
-            | andrew   | 100      |
-            | jorge    | 200      |
-            | andy     | 150      |
-            +----------+----------+
-        "###);
-    });
-
-    Ok(())
-}
-
-#[tokio::test]
-// Run the query using default planners and optimizer
-async fn normal_query_without_schemas() -> Result<()> {
-    let ctx = setup_table_without_schemas(SessionContext::new()).await?;
-    run_and_compare_query_with_auto_schemas(ctx, "Default context").await
-}
-
-#[tokio::test]
-// Run the query using default planners and optimizer
-async fn normal_query() -> Result<()> {
-    let ctx = setup_table(SessionContext::new()).await?;
-    run_and_compare_query(ctx, "Default context").await
-}
-
-#[tokio::test]
-// Run the query using default planners, optimizer and custom analyzer rule
-async fn normal_query_with_analyzer() -> Result<()> {
-    let ctx = SessionContext::new();
-    ctx.add_analyzer_rule(Arc::new(MyAnalyzerRule {}));
-    run_and_compare_query_with_analyzer_rule(ctx, "MyAnalyzerRule").await
-}
-
-#[tokio::test]
-// Run the query using topk optimization
-async fn topk_query() -> Result<()> {
-    // Note the only difference is that the top
-    let ctx = setup_table(make_topk_context()).await?;
-    run_and_compare_query(ctx, "Topk context").await
-}
-
-#[tokio::test]
-// Run EXPLAIN PLAN and show the plan was in fact rewritten
-async fn topk_plan() -> Result<()> {
-    let ctx = setup_table(make_topk_context()).await?;
-
-    let mut expected = ["| logical_plan after topk                             
  | TopK: k=3                                                                   
  |",
-        "|                                                       |   
TableScan: sales projection=[customer_id,revenue]                               
   |"].join("\n");
-
-    let explain_query = format!("EXPLAIN VERBOSE {QUERY}");
-    let actual_output = exec_sql(&ctx, &explain_query).await?;
-
-    // normalize newlines (output on windows uses \r\n)
-    let mut actual_output = actual_output.replace("\r\n", "\n");
-    actual_output.retain(|x| !x.is_ascii_whitespace());
-    expected.retain(|x| !x.is_ascii_whitespace());
-
-    assert!(
-        actual_output.contains(&expected),
-        "Expected output not present in actual output\
-        \nExpected:\
-        \n---------\
-        \n{expected}\
-        \nActual:\
-        \n--------\
-        \n{actual_output}"
-    );
-    Ok(())
-}
-
-#[tokio::test]
-/// Run invariant checks on the logical plan extension [`TopKPlanNode`].
-async fn topk_invariants() -> Result<()> {
-    // Test: pass an InvariantLevel::Always
-    let pass = InvariantMock {
-        should_fail_invariant: false,
-        kind: InvariantLevel::Always,
-    };
-    let ctx = 
setup_table(make_topk_context_with_invariants(Some(pass))).await?;
-    run_and_compare_query(ctx, "Topk context").await?;
-
-    // Test: fail an InvariantLevel::Always
-    let fail = InvariantMock {
-        should_fail_invariant: true,
-        kind: InvariantLevel::Always,
-    };
-    let ctx = 
setup_table(make_topk_context_with_invariants(Some(fail))).await?;
-    matches!(
-        &*run_and_compare_query(ctx, "Topk context")
-            .await
-            .unwrap_err()
-            .message(),
-        "node fails check, such as improper inputs"
-    );
-
-    // Test: pass an InvariantLevel::Executable
-    let pass = InvariantMock {
-        should_fail_invariant: false,
-        kind: InvariantLevel::Executable,
-    };
-    let ctx = 
setup_table(make_topk_context_with_invariants(Some(pass))).await?;
-    run_and_compare_query(ctx, "Topk context").await?;
-
-    // Test: fail an InvariantLevel::Executable
-    let fail = InvariantMock {
-        should_fail_invariant: true,
-        kind: InvariantLevel::Executable,
-    };
-    let ctx = 
setup_table(make_topk_context_with_invariants(Some(fail))).await?;
-    matches!(
-        &*run_and_compare_query(ctx, "Topk context")
-            .await
-            .unwrap_err()
-            .message(),
-        "node fails check, such as improper inputs"
-    );
-
-    Ok(())
-}
-
-#[tokio::test]
-async fn topk_invariants_after_invalid_mutation() -> Result<()> {
-    // CONTROL
-    // Build a valid topK plan.
-    let config = SessionConfig::new().with_target_partitions(48);
-    let runtime = Arc::new(RuntimeEnv::default());
-    let state = SessionStateBuilder::new()
-        .with_config(config)
-        .with_runtime_env(runtime)
-        .with_default_features()
-        .with_query_planner(Arc::new(TopKQueryPlanner {}))
-        // 1. adds a valid TopKPlanNode
-        .with_optimizer_rule(Arc::new(TopKOptimizerRule {
-            invariant_mock: Some(InvariantMock {
-                should_fail_invariant: false,
-                kind: InvariantLevel::Always,
-            }),
-        }))
-        .with_analyzer_rule(Arc::new(MyAnalyzerRule {}))
-        .build();
-    let ctx = setup_table(SessionContext::new_with_state(state)).await?;
-    run_and_compare_query(ctx, "Topk context").await?;
-
-    // Test
-    // Build a valid topK plan.
-    // Then have an invalid mutation in an optimizer run.
-    let config = SessionConfig::new().with_target_partitions(48);
-    let runtime = Arc::new(RuntimeEnv::default());
-    let state = SessionStateBuilder::new()
-        .with_config(config)
-        .with_runtime_env(runtime)
-        .with_default_features()
-        .with_query_planner(Arc::new(TopKQueryPlanner {}))
-        // 1. adds a valid TopKPlanNode
-        .with_optimizer_rule(Arc::new(TopKOptimizerRule {
-            invariant_mock: Some(InvariantMock {
-                should_fail_invariant: false,
-                kind: InvariantLevel::Always,
-            }),
-        }))
-        // 2. break the TopKPlanNode
-        .with_optimizer_rule(Arc::new(OptimizerMakeExtensionNodeInvalid {}))
-        .with_analyzer_rule(Arc::new(MyAnalyzerRule {}))
-        .build();
-    let ctx = setup_table(SessionContext::new_with_state(state)).await?;
-    matches!(
-        &*run_and_compare_query(ctx, "Topk context")
-            .await
-            .unwrap_err()
-            .message(),
-        "node fails check, such as improper inputs"
-    );
-
-    Ok(())
-}
-
-fn make_topk_context() -> SessionContext {
-    make_topk_context_with_invariants(None)
-}
-
-fn make_topk_context_with_invariants(
-    invariant_mock: Option<InvariantMock>,
-) -> SessionContext {
-    let config = SessionConfig::new().with_target_partitions(48);
-    let runtime = Arc::new(RuntimeEnv::default());
-    let state = SessionStateBuilder::new()
-        .with_config(config)
-        .with_runtime_env(runtime)
-        .with_default_features()
-        .with_query_planner(Arc::new(TopKQueryPlanner {}))
-        .with_optimizer_rule(Arc::new(TopKOptimizerRule { invariant_mock }))
-        .with_analyzer_rule(Arc::new(MyAnalyzerRule {}))
-        .build();
-    SessionContext::new_with_state(state)
-}
-
-#[derive(Debug)]
-struct OptimizerMakeExtensionNodeInvalid;
-
-impl OptimizerRule for OptimizerMakeExtensionNodeInvalid {
-    fn name(&self) -> &str {
-        "OptimizerMakeExtensionNodeInvalid"
-    }
-
-    fn apply_order(&self) -> Option<ApplyOrder> {
-        Some(ApplyOrder::TopDown)
-    }
-
-    fn supports_rewrite(&self) -> bool {
-        true
-    }
-
-    // Example rewrite pass which impacts validity of the extension node.
-    fn rewrite(
-        &self,
-        plan: LogicalPlan,
-        _config: &dyn OptimizerConfig,
-    ) -> Result<Transformed<LogicalPlan>, DataFusionError> {
-        if let LogicalPlan::Extension(Extension { node }) = &plan {
-            if let Some(prev) = node.as_any().downcast_ref::<TopKPlanNode>() {
-                return Ok(Transformed::yes(LogicalPlan::Extension(Extension {
-                    node: Arc::new(TopKPlanNode {
-                        k: prev.k,
-                        input: prev.input.clone(),
-                        expr: prev.expr.clone(),
-                        // In a real use case, this rewriter could have change 
the number of inputs, etc
-                        invariant_mock: Some(InvariantMock {
-                            should_fail_invariant: true,
-                            kind: InvariantLevel::Always,
-                        }),
-                    }),
-                })));
-            }
-        };
-
-        Ok(Transformed::no(plan))
-    }
-}
-
 // ------ The implementation of the TopK code follows -----
 
-#[derive(Debug)]
+#[derive(Debug, Default)]

Review Comment:
   If we are removing all the tests that refer to this structure, I think we 
should remove the rest of the code too rather than making it as "allow unused"



##########
datafusion/core/tests/user_defined/user_defined_plan.rs:
##########
@@ -102,362 +96,10 @@ use 
datafusion_physical_plan::execution_plan::{Boundedness, EmissionType};
 use async_trait::async_trait;
 use futures::{Stream, StreamExt};
 
-/// Execute the specified sql and return the resulting record batches

Review Comment:
   Can we please remove the tests in some other PR so it is clear what behavior 
the code is changing, if any? I found it hard to find the actual code / 
behavior change in this PR with several different behaviors in there



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to