alamb commented on code in PR #17843:
URL: https://github.com/apache/datafusion/pull/17843#discussion_r2589070564


##########
datafusion-examples/examples/relation_planner/table_sample.rs:
##########
@@ -0,0 +1,969 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! This example demonstrates using custom relation planners to implement
+//! SQL TABLESAMPLE clause support.
+//!
+//! TABLESAMPLE allows sampling a fraction or number of rows from a table:
+//!   - `SELECT * FROM table TABLESAMPLE BERNOULLI(10)` - 10% sample
+//!   - `SELECT * FROM table TABLESAMPLE (100 ROWS)` - 100 rows
+//!   - `SELECT * FROM table TABLESAMPLE (10 PERCENT) REPEATABLE(42)` - 
Reproducible
+
+use std::{
+    any::Any,
+    fmt::{self, Debug, Formatter},
+    hash::{Hash, Hasher},
+    ops::{Add, Div, Mul, Sub},
+    pin::Pin,
+    str::FromStr,
+    sync::Arc,
+    task::{Context, Poll},
+};
+
+use arrow::{
+    array::{ArrayRef, Int32Array, RecordBatch, StringArray, UInt32Array},
+    compute,
+};
+use arrow_schema::SchemaRef;
+use futures::{
+    ready,
+    stream::{Stream, StreamExt},
+};
+use rand::{rngs::StdRng, Rng, SeedableRng};
+use rand_distr::{Distribution, Poisson};
+use tonic::async_trait;
+
+use datafusion::{
+    execution::{
+        context::QueryPlanner, RecordBatchStream, SendableRecordBatchStream,
+        SessionState, SessionStateBuilder, TaskContext,
+    },
+    physical_expr::EquivalenceProperties,
+    physical_plan::{
+        metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet, 
RecordOutput},
+        DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties,
+    },
+    physical_planner::{DefaultPhysicalPlanner, ExtensionPlanner, 
PhysicalPlanner},
+    prelude::*,
+};
+use datafusion_common::{
+    internal_err, not_impl_err, plan_datafusion_err, plan_err, DFSchemaRef,
+    DataFusionError, Result, Statistics,
+};
+use datafusion_expr::{
+    logical_plan::{Extension, LogicalPlan, LogicalPlanBuilder},
+    planner::{
+        PlannedRelation, RelationPlanner, RelationPlannerContext, 
RelationPlanning,
+    },
+    UserDefinedLogicalNode, UserDefinedLogicalNodeCore,
+};
+use datafusion_sql::sqlparser::ast::{
+    self, TableFactor, TableSampleMethod, TableSampleUnit,
+};
+
+/// This example demonstrates using custom relation planners to implement
+/// SQL TABLESAMPLE clause support.
+pub async fn table_sample() -> Result<()> {
+    let state = SessionStateBuilder::new()
+        .with_default_features()
+        .with_query_planner(Arc::new(TableSampleQueryPlanner {}))
+        .build();
+
+    let ctx = SessionContext::new_with_state(state.clone());
+
+    // Register sample data table

Review Comment:
   a minor nit is that these comments don't add much additional value in my 
opinion as it is pretty clear that `register_sample_data` registers the sample 
data table



##########
datafusion-examples/examples/relation_planner/table_sample.rs:
##########
@@ -0,0 +1,969 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! This example demonstrates using custom relation planners to implement
+//! SQL TABLESAMPLE clause support.
+//!
+//! TABLESAMPLE allows sampling a fraction or number of rows from a table:
+//!   - `SELECT * FROM table TABLESAMPLE BERNOULLI(10)` - 10% sample
+//!   - `SELECT * FROM table TABLESAMPLE (100 ROWS)` - 100 rows
+//!   - `SELECT * FROM table TABLESAMPLE (10 PERCENT) REPEATABLE(42)` - 
Reproducible
+
+use std::{
+    any::Any,
+    fmt::{self, Debug, Formatter},
+    hash::{Hash, Hasher},
+    ops::{Add, Div, Mul, Sub},
+    pin::Pin,
+    str::FromStr,
+    sync::Arc,
+    task::{Context, Poll},
+};
+
+use arrow::{
+    array::{ArrayRef, Int32Array, RecordBatch, StringArray, UInt32Array},
+    compute,
+};
+use arrow_schema::SchemaRef;
+use futures::{
+    ready,
+    stream::{Stream, StreamExt},
+};
+use rand::{rngs::StdRng, Rng, SeedableRng};
+use rand_distr::{Distribution, Poisson};
+use tonic::async_trait;
+
+use datafusion::{
+    execution::{
+        context::QueryPlanner, RecordBatchStream, SendableRecordBatchStream,
+        SessionState, SessionStateBuilder, TaskContext,
+    },
+    physical_expr::EquivalenceProperties,
+    physical_plan::{
+        metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet, 
RecordOutput},
+        DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties,
+    },
+    physical_planner::{DefaultPhysicalPlanner, ExtensionPlanner, 
PhysicalPlanner},
+    prelude::*,
+};
+use datafusion_common::{
+    internal_err, not_impl_err, plan_datafusion_err, plan_err, DFSchemaRef,
+    DataFusionError, Result, Statistics,
+};
+use datafusion_expr::{
+    logical_plan::{Extension, LogicalPlan, LogicalPlanBuilder},
+    planner::{
+        PlannedRelation, RelationPlanner, RelationPlannerContext, 
RelationPlanning,
+    },
+    UserDefinedLogicalNode, UserDefinedLogicalNodeCore,
+};
+use datafusion_sql::sqlparser::ast::{
+    self, TableFactor, TableSampleMethod, TableSampleUnit,
+};
+
+/// This example demonstrates using custom relation planners to implement
+/// SQL TABLESAMPLE clause support.
+pub async fn table_sample() -> Result<()> {
+    let state = SessionStateBuilder::new()
+        .with_default_features()
+        .with_query_planner(Arc::new(TableSampleQueryPlanner {}))
+        .build();
+
+    let ctx = SessionContext::new_with_state(state.clone());
+
+    // Register sample data table
+    register_sample_data(&ctx)?;
+
+    // Register custom planner
+    ctx.register_relation_planner(Arc::new(TableSamplePlanner))?;
+
+    println!("Custom Relation Planner: TABLESAMPLE Support");
+    println!("============================================\n");
+    println!("Note: This shows logical planning for TABLESAMPLE.");
+    println!("Physical execution requires additional implementation.\n");
+
+    // Example 1: Full table without any sampling (baseline)
+    // Shows: Complete dataset with all 10 rows (1-10 with row_1 to row_10)
+    // Expected: 10 rows showing the full sample_data table
+    // Actual:
+    // +---------+---------+

Review Comment:
   I ran this example and indeed this is the result that comes out 👍 
   
   However, it might be valuable to automatically verify the output 
automatically to ensure the comments can't drift from the implementation.
   
   Something like this perhaps:
   
   ```rust
       run_example(
           &ctx,
           "Example 1: Full table (no sampling)",
           "SELECT * FROM sample_data",
   r#"
       // | column1 | column2 |
       // +---------+---------+
       // | 1       | row_1   |
       // | 2       | row_2   |
       // | 3       | row_3   |
       // | 4       | row_4   |
       // | 5       | row_5   |
       // | 6       | row_6   |
       // | 7       | row_7   |
       // | 8       | row_8   |
       // | 9       | row_9   |
       // | 10      | row_10  |
       // +---------+---------+
   #'
       )
       .await?;
   ```
   



##########
datafusion-examples/examples/relation_planner/match_recognize.rs:
##########
@@ -0,0 +1,345 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! This example demonstrates using custom relation planners to implement
+//! MATCH_RECOGNIZE-style pattern matching on event streams.
+//!
+//! MATCH_RECOGNIZE is a SQL extension for pattern matching on ordered data,
+//! similar to regular expressions but for relational data. This example shows
+//! how to use custom planners to implement new SQL syntax.
+
+use std::{any::Any, cmp::Ordering, hash::Hasher, sync::Arc};
+
+use arrow::array::{ArrayRef, Float64Array, Int32Array, StringArray};
+use arrow::record_batch::RecordBatch;
+use datafusion::prelude::*;
+use datafusion_common::{DFSchemaRef, DataFusionError, Result};
+use datafusion_expr::{
+    logical_plan::{Extension, InvariantLevel, LogicalPlan},
+    planner::{
+        PlannedRelation, RelationPlanner, RelationPlannerContext, 
RelationPlanning,
+    },
+    Expr, UserDefinedLogicalNode,
+};
+use datafusion_sql::sqlparser::ast::TableFactor;
+
+/// This example demonstrates using custom relation planners to implement
+/// MATCH_RECOGNIZE-style pattern matching on event streams.
+pub async fn match_recognize() -> Result<()> {
+    let ctx = SessionContext::new();
+
+    // Register sample data tables
+    register_sample_data(&ctx)?;
+
+    // Register custom planner
+    ctx.register_relation_planner(Arc::new(MatchRecognizePlanner))?;
+
+    println!("Custom Relation Planner: MATCH_RECOGNIZE Pattern Matching");
+    println!("==========================================================\n");
+
+    // Example 1: Basic MATCH_RECOGNIZE with MEASURES and DEFINE clauses
+    // Shows: How to use MATCH_RECOGNIZE to find patterns in event data with 
aggregations
+    // Expected: Logical plan showing MiniMatchRecognize node with SUM and AVG 
measures
+    // Note: This demonstrates the logical planning phase - actual execution 
would require physical implementation
+    // Actual (Logical Plan):

Review Comment:
   likewise here i recommend programatically verifying the actual result rather 
than just having it in comments



##########
datafusion-examples/README.md:
##########
@@ -86,6 +86,9 @@ cargo run --example dataframe -- dataframe
 - 
[`examples/external_dependency/query_aws_s3.rs`](examples/external_dependency/query_aws_s3.rs):
 Configure `object_store` and run a query against files stored in AWS S3
 - [`examples/data_io/query_http_csv.rs`](examples/data_io/query_http_csv.rs): 
Configure `object_store` and run a query against files via HTTP
 - 
[`examples/builtin_functions/regexp.rs`](examples/builtin_functions/regexp.rs): 
Examples of using regular expression functions
+- 
[`examples/relation_planner/match_recognize.rs`](examples/relation_planner/match_recognize.rs):
 Use custom relation planner to implement MATCH_RECOGNIZE pattern matching
+- 
[`examples/relation_planner/pivot_unpivot.rs`](examples/relation_planner/pivot_unpivot.rs):
 Use custom relation planner to implement PIVOT and UNPIVOT operations
+- 
[`examples/relation_planner/table_sample.rs`](examples/relation_planner/table_sample.rs):
 Use custom relation planner to implement TABLESAMPLE clause

Review Comment:
   It might help here to highlight what a custom relation planner means -- 
something like 
   
   ```suggestion
   - 
[`examples/relation_planner/match_recognize.rs`](examples/relation_planner/match_recognize.rs):
 Use custom relation planner to extend SQL support for `MATCH_RECOGNIZE` 
pattern matching
   - 
[`examples/relation_planner/pivot_unpivot.rs`](examples/relation_planner/pivot_unpivot.rs):
 Use custom relation planner to extend SQL support for `PIVOT` and `UNPIVOT` 
operations
   - 
[`examples/relation_planner/table_sample.rs`](examples/relation_planner/table_sample.rs):
 Use custom relation planner to extend SQL support for `TABLESAMPLE`
   ```



##########
datafusion/sql/src/relation/mod.rs:
##########
@@ -24,19 +24,119 @@ use datafusion_common::{
     not_impl_err, plan_err, DFSchema, Diagnostic, Result, Span, Spans, 
TableReference,
 };
 use datafusion_expr::builder::subquery_alias;
+use datafusion_expr::planner::{
+    PlannedRelation, RelationPlannerContext, RelationPlanning,
+};
 use datafusion_expr::{expr::Unnest, Expr, LogicalPlan, LogicalPlanBuilder};
 use datafusion_expr::{Subquery, SubqueryAlias};
 use sqlparser::ast::{FunctionArg, FunctionArgExpr, Spanned, TableFactor};
 
 mod join;
 
+struct SqlToRelRelationContext<'a, 'b, S: ContextProvider> {
+    planner: &'a SqlToRel<'b, S>,
+    planner_context: &'a mut PlannerContext,
+}
+
+// Implement RelationPlannerContext
+impl<'a, 'b, S: ContextProvider> RelationPlannerContext
+    for SqlToRelRelationContext<'a, 'b, S>
+{
+    fn context_provider(&self) -> &dyn ContextProvider {
+        self.planner.context_provider
+    }
+
+    fn plan(&mut self, relation: TableFactor) -> Result<LogicalPlan> {
+        self.planner.create_relation(relation, self.planner_context)
+    }
+
+    fn sql_to_expr(
+        &mut self,
+        expr: sqlparser::ast::Expr,
+        schema: &DFSchema,
+    ) -> Result<Expr> {
+        self.planner.sql_to_expr(expr, schema, self.planner_context)
+    }
+
+    fn sql_expr_to_logical_expr(
+        &mut self,
+        expr: sqlparser::ast::Expr,
+        schema: &DFSchema,
+    ) -> Result<Expr> {
+        self.planner
+            .sql_expr_to_logical_expr(expr, schema, self.planner_context)
+    }
+
+    fn normalize_ident(&self, ident: sqlparser::ast::Ident) -> String {
+        self.planner.ident_normalizer.normalize(ident)
+    }
+
+    fn object_name_to_table_reference(
+        &self,
+        name: sqlparser::ast::ObjectName,
+    ) -> Result<TableReference> {
+        self.planner.object_name_to_table_reference(name)
+    }
+}
+
 impl<S: ContextProvider> SqlToRel<'_, S> {
     /// Create a `LogicalPlan` that scans the named relation
     fn create_relation(
         &self,
         relation: TableFactor,
         planner_context: &mut PlannerContext,
     ) -> Result<LogicalPlan> {
+        let planned_relation =

Review Comment:
   maybe is it worth mentioning here in a comment that the idea is to try 
planning with a registered extension planner, and if that is not possible, fall 
back to the default planner?



##########
datafusion-examples/examples/relation_planner/table_sample.rs:
##########
@@ -0,0 +1,969 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! This example demonstrates using custom relation planners to implement
+//! SQL TABLESAMPLE clause support.
+//!
+//! TABLESAMPLE allows sampling a fraction or number of rows from a table:
+//!   - `SELECT * FROM table TABLESAMPLE BERNOULLI(10)` - 10% sample
+//!   - `SELECT * FROM table TABLESAMPLE (100 ROWS)` - 100 rows
+//!   - `SELECT * FROM table TABLESAMPLE (10 PERCENT) REPEATABLE(42)` - 
Reproducible
+
+use std::{
+    any::Any,
+    fmt::{self, Debug, Formatter},
+    hash::{Hash, Hasher},
+    ops::{Add, Div, Mul, Sub},
+    pin::Pin,
+    str::FromStr,
+    sync::Arc,
+    task::{Context, Poll},
+};
+
+use arrow::{
+    array::{ArrayRef, Int32Array, RecordBatch, StringArray, UInt32Array},
+    compute,
+};
+use arrow_schema::SchemaRef;
+use futures::{
+    ready,
+    stream::{Stream, StreamExt},
+};
+use rand::{rngs::StdRng, Rng, SeedableRng};
+use rand_distr::{Distribution, Poisson};
+use tonic::async_trait;
+
+use datafusion::{
+    execution::{
+        context::QueryPlanner, RecordBatchStream, SendableRecordBatchStream,
+        SessionState, SessionStateBuilder, TaskContext,
+    },
+    physical_expr::EquivalenceProperties,
+    physical_plan::{
+        metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet, 
RecordOutput},
+        DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties,
+    },
+    physical_planner::{DefaultPhysicalPlanner, ExtensionPlanner, 
PhysicalPlanner},
+    prelude::*,
+};
+use datafusion_common::{
+    internal_err, not_impl_err, plan_datafusion_err, plan_err, DFSchemaRef,
+    DataFusionError, Result, Statistics,
+};
+use datafusion_expr::{
+    logical_plan::{Extension, LogicalPlan, LogicalPlanBuilder},
+    planner::{
+        PlannedRelation, RelationPlanner, RelationPlannerContext, 
RelationPlanning,
+    },
+    UserDefinedLogicalNode, UserDefinedLogicalNodeCore,
+};
+use datafusion_sql::sqlparser::ast::{
+    self, TableFactor, TableSampleMethod, TableSampleUnit,
+};
+
+/// This example demonstrates using custom relation planners to implement
+/// SQL TABLESAMPLE clause support.
+pub async fn table_sample() -> Result<()> {
+    let state = SessionStateBuilder::new()
+        .with_default_features()
+        .with_query_planner(Arc::new(TableSampleQueryPlanner {}))
+        .build();
+
+    let ctx = SessionContext::new_with_state(state.clone());
+
+    // Register sample data table
+    register_sample_data(&ctx)?;
+
+    // Register custom planner
+    ctx.register_relation_planner(Arc::new(TableSamplePlanner))?;
+
+    println!("Custom Relation Planner: TABLESAMPLE Support");
+    println!("============================================\n");
+    println!("Note: This shows logical planning for TABLESAMPLE.");
+    println!("Physical execution requires additional implementation.\n");

Review Comment:
   I don't understand this comment -- the example includes a physical 
implementation (`SampleExec`), right?



##########
datafusion/core/tests/user_defined/relation_planner.rs:
##########
@@ -0,0 +1,513 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Tests for the RelationPlanner extension point
+
+use std::sync::Arc;
+
+use arrow::array::{Int64Array, RecordBatch, StringArray};
+use arrow::datatypes::{DataType, Field, Schema};
+use datafusion::catalog::memory::MemTable;
+use datafusion::common::test_util::batches_to_string;
+use datafusion::prelude::*;
+use datafusion_common::{Result, ScalarValue};
+use datafusion_expr::logical_plan::builder::LogicalPlanBuilder;
+use datafusion_expr::planner::{
+    PlannedRelation, RelationPlanner, RelationPlannerContext, RelationPlanning,
+};
+use datafusion_expr::Expr;
+use datafusion_sql::sqlparser::ast::TableFactor;
+use insta::assert_snapshot;
+
+// ============================================================================
+// Test Planners - Example Implementations
+// ============================================================================
+
+// The planners in this section are deliberately minimal, static examples used
+// only for tests. In real applications a `RelationPlanner` would typically
+// construct richer logical plans tailored to external systems or custom
+// semantics rather than hard-coded in-memory tables.
+
+/// Helper to build simple static values-backed virtual tables used by the
+/// example planners below.
+fn plan_static_values_table(
+    relation: TableFactor,
+    table_name: &str,
+    column_name: &str,
+    values: Vec<ScalarValue>,
+) -> Result<RelationPlanning> {
+    match relation {
+        TableFactor::Table { name, alias, .. }
+            if name.to_string().eq_ignore_ascii_case(table_name) =>
+        {
+            let rows = values
+                .into_iter()
+                .map(|v| vec![Expr::Literal(v, None)])
+                .collect::<Vec<_>>();
+
+            let plan = LogicalPlanBuilder::values(rows)?
+                .project(vec![col("column1").alias(column_name)])?
+                .build()?;
+
+            Ok(RelationPlanning::Planned(PlannedRelation::new(plan, alias)))
+        }
+        other => Ok(RelationPlanning::Original(other)),
+    }
+}
+
+/// Example planner that provides a virtual `numbers` table with values
+/// 1, 2, 3.
+#[derive(Debug)]
+struct NumbersPlanner;
+
+impl RelationPlanner for NumbersPlanner {
+    fn plan_relation(
+        &self,
+        relation: TableFactor,
+        _context: &mut dyn RelationPlannerContext,
+    ) -> Result<RelationPlanning> {
+        plan_static_values_table(
+            relation,
+            "numbers",
+            "number",
+            vec![
+                ScalarValue::Int64(Some(1)),
+                ScalarValue::Int64(Some(2)),
+                ScalarValue::Int64(Some(3)),
+            ],
+        )
+    }
+}
+
+/// Example planner that provides a virtual `colors` table with three string
+/// values: `red`, `green`, `blue`.
+#[derive(Debug)]
+struct ColorsPlanner;
+
+impl RelationPlanner for ColorsPlanner {
+    fn plan_relation(
+        &self,
+        relation: TableFactor,
+        _context: &mut dyn RelationPlannerContext,
+    ) -> Result<RelationPlanning> {
+        plan_static_values_table(
+            relation,
+            "colors",
+            "color",
+            vec![
+                ScalarValue::Utf8(Some("red".into())),
+                ScalarValue::Utf8(Some("green".into())),
+                ScalarValue::Utf8(Some("blue".into())),
+            ],
+        )
+    }
+}
+
+/// Alternative implementation of `numbers` (returns 100, 200) used to
+/// demonstrate planner precedence (last registered planner wins).
+#[derive(Debug)]
+struct AlternativeNumbersPlanner;
+
+impl RelationPlanner for AlternativeNumbersPlanner {
+    fn plan_relation(
+        &self,
+        relation: TableFactor,
+        _context: &mut dyn RelationPlannerContext,
+    ) -> Result<RelationPlanning> {
+        plan_static_values_table(
+            relation,
+            "numbers",
+            "number",
+            vec![ScalarValue::Int64(Some(100)), ScalarValue::Int64(Some(200))],
+        )
+    }
+}
+
+/// Example planner that intercepts nested joins and samples both sides (limit 
2)
+/// before joining, demonstrating recursive planning with `context.plan()`.
+#[derive(Debug)]
+struct SamplingJoinPlanner;
+
+impl RelationPlanner for SamplingJoinPlanner {
+    fn plan_relation(
+        &self,
+        relation: TableFactor,
+        context: &mut dyn RelationPlannerContext,
+    ) -> Result<RelationPlanning> {
+        match relation {
+            TableFactor::NestedJoin {
+                table_with_joins,
+                alias,
+                ..
+            } if table_with_joins.joins.len() == 1 => {
+                // Use context.plan() to recursively plan both sides
+                // This ensures other planners (like NumbersPlanner) can 
handle them
+                let left = context.plan(table_with_joins.relation.clone())?;
+                let right = 
context.plan(table_with_joins.joins[0].relation.clone())?;
+
+                // Sample each table to 2 rows
+                let left_sampled =
+                    LogicalPlanBuilder::from(left).limit(0, Some(2))?.build()?;
+
+                let right_sampled =
+                    LogicalPlanBuilder::from(right).limit(0, 
Some(2))?.build()?;
+
+                // Cross join: 2 rows × 2 rows = 4 rows (instead of 3×3=9 
without sampling)
+                let plan = LogicalPlanBuilder::from(left_sampled)
+                    .cross_join(right_sampled)?
+                    .build()?;
+
+                Ok(RelationPlanning::Planned(PlannedRelation::new(plan, 
alias)))
+            }
+            other => Ok(RelationPlanning::Original(other)),
+        }
+    }
+}
+
+/// Example planner that never handles any relation and always delegates by
+/// returning `RelationPlanning::Original`.
+#[derive(Debug)]
+struct PassThroughPlanner;
+
+impl RelationPlanner for PassThroughPlanner {
+    fn plan_relation(
+        &self,
+        relation: TableFactor,
+        _context: &mut dyn RelationPlannerContext,
+    ) -> Result<RelationPlanning> {
+        // Never handles anything - always delegates
+        Ok(RelationPlanning::Original(relation))
+    }
+}
+
+/// Example planner that shows how planners can block specific constructs and
+/// surface custom error messages by rejecting `UNNEST` relations (here framed
+/// as a mock premium feature check).
+#[derive(Debug)]
+struct PremiumFeaturePlanner;
+
+impl RelationPlanner for PremiumFeaturePlanner {
+    fn plan_relation(
+        &self,
+        relation: TableFactor,
+        _context: &mut dyn RelationPlannerContext,
+    ) -> Result<RelationPlanning> {
+        match relation {
+            TableFactor::UNNEST { .. } => 
Err(datafusion_common::DataFusionError::Plan(
+                "UNNEST is a premium feature! Please upgrade to DataFusion 
Pro™ \

Review Comment:
   "DataFusion Pro" -- if only! Some day maybe



##########
datafusion/expr/src/planner.rs:
##########
@@ -324,6 +334,85 @@ pub enum PlannerResult<T> {
     Original(T),
 }
 
+/// Result of planning a relation with [`RelationPlanner`]
+#[cfg(feature = "sql")]
+#[derive(Debug, Clone)]
+pub struct PlannedRelation {
+    /// The logical plan for the relation
+    pub plan: LogicalPlan,
+    /// Optional table alias for the relation
+    pub alias: Option<TableAlias>,
+}
+
+#[cfg(feature = "sql")]
+impl PlannedRelation {
+    /// Create a new `PlannedRelation` with the given plan and alias
+    pub fn new(plan: LogicalPlan, alias: Option<TableAlias>) -> Self {
+        Self { plan, alias }
+    }
+}
+
+/// Result of attempting to plan a relation with extension planners
+#[cfg(feature = "sql")]
+#[derive(Debug)]
+pub enum RelationPlanning {
+    /// The relation was successfully planned by an extension planner
+    Planned(PlannedRelation),
+    /// No extension planner handled the relation, return it for default 
processing
+    Original(TableFactor),
+}
+
+/// Customize planning SQL table factors to [`LogicalPlan`]s.
+#[cfg(feature = "sql")]
+pub trait RelationPlanner: Debug + Send + Sync {
+    /// Plan a table factor into a [`LogicalPlan`].
+    ///
+    /// Returning `Ok(RelationPlanning::Transformed(planned_relation))` 
short-circuits further planning and uses the
+    /// provided plan. Returning `Ok(RelationPlanning::Original(relation))` 
allows the next registered planner,
+    /// or DataFusion's default logic, to handle the relation.

Review Comment:
   You can have the compiler check this for you if you make the link directly 
like this.
   
   ```
       /// Returning [`RelationPlanning::Planned`] short-circuits further 
planning and uses the
       /// provided plan. Returning [`RelationPlanning::Original`] allows the 
next registered planner,
       /// or DataFusion's default logic, to handle the relation.
   ```
   
   While this is slightly less precise, I don't think the `Ok` or parameter 
names add much additional information



##########
datafusion-examples/examples/relation_planner/table_sample.rs:
##########
@@ -0,0 +1,969 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! This example demonstrates using custom relation planners to implement
+//! SQL TABLESAMPLE clause support.
+//!
+//! TABLESAMPLE allows sampling a fraction or number of rows from a table:
+//!   - `SELECT * FROM table TABLESAMPLE BERNOULLI(10)` - 10% sample
+//!   - `SELECT * FROM table TABLESAMPLE (100 ROWS)` - 100 rows
+//!   - `SELECT * FROM table TABLESAMPLE (10 PERCENT) REPEATABLE(42)` - 
Reproducible
+
+use std::{
+    any::Any,
+    fmt::{self, Debug, Formatter},
+    hash::{Hash, Hasher},
+    ops::{Add, Div, Mul, Sub},
+    pin::Pin,
+    str::FromStr,
+    sync::Arc,
+    task::{Context, Poll},
+};
+
+use arrow::{
+    array::{ArrayRef, Int32Array, RecordBatch, StringArray, UInt32Array},
+    compute,
+};
+use arrow_schema::SchemaRef;
+use futures::{
+    ready,
+    stream::{Stream, StreamExt},
+};
+use rand::{rngs::StdRng, Rng, SeedableRng};
+use rand_distr::{Distribution, Poisson};
+use tonic::async_trait;
+
+use datafusion::{
+    execution::{
+        context::QueryPlanner, RecordBatchStream, SendableRecordBatchStream,
+        SessionState, SessionStateBuilder, TaskContext,
+    },
+    physical_expr::EquivalenceProperties,
+    physical_plan::{
+        metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet, 
RecordOutput},
+        DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties,
+    },
+    physical_planner::{DefaultPhysicalPlanner, ExtensionPlanner, 
PhysicalPlanner},
+    prelude::*,
+};
+use datafusion_common::{
+    internal_err, not_impl_err, plan_datafusion_err, plan_err, DFSchemaRef,
+    DataFusionError, Result, Statistics,
+};
+use datafusion_expr::{
+    logical_plan::{Extension, LogicalPlan, LogicalPlanBuilder},
+    planner::{
+        PlannedRelation, RelationPlanner, RelationPlannerContext, 
RelationPlanning,
+    },
+    UserDefinedLogicalNode, UserDefinedLogicalNodeCore,
+};
+use datafusion_sql::sqlparser::ast::{
+    self, TableFactor, TableSampleMethod, TableSampleUnit,
+};
+
+/// This example demonstrates using custom relation planners to implement
+/// SQL TABLESAMPLE clause support.
+pub async fn table_sample() -> Result<()> {
+    let state = SessionStateBuilder::new()
+        .with_default_features()
+        .with_query_planner(Arc::new(TableSampleQueryPlanner {}))
+        .build();
+
+    let ctx = SessionContext::new_with_state(state.clone());
+
+    // Register sample data table
+    register_sample_data(&ctx)?;
+
+    // Register custom planner
+    ctx.register_relation_planner(Arc::new(TableSamplePlanner))?;
+
+    println!("Custom Relation Planner: TABLESAMPLE Support");
+    println!("============================================\n");
+    println!("Note: This shows logical planning for TABLESAMPLE.");
+    println!("Physical execution requires additional implementation.\n");
+
+    // Example 1: Full table without any sampling (baseline)
+    // Shows: Complete dataset with all 10 rows (1-10 with row_1 to row_10)
+    // Expected: 10 rows showing the full sample_data table
+    // Actual:
+    // +---------+---------+
+    // | column1 | column2 |
+    // +---------+---------+
+    // | 1       | row_1   |
+    // | 2       | row_2   |
+    // | 3       | row_3   |
+    // | 4       | row_4   |
+    // | 5       | row_5   |
+    // | 6       | row_6   |
+    // | 7       | row_7   |
+    // | 8       | row_8   |
+    // | 9       | row_9   |
+    // | 10      | row_10  |
+    // +---------+---------+
+    run_example(
+        &ctx,
+        "Example 1: Full table (no sampling)",
+        "SELECT * FROM sample_data",
+    )
+    .await?;
+
+    // Example 2: TABLESAMPLE with BERNOULLI sampling at 30% probability
+    // Shows: Random sampling where each row has 30% chance of being selected
+    // Expected: ~3 rows (varies due to randomness) from the 10-row dataset
+    // Actual:
+    // +---------+---------+
+    // | column1 | column2 |
+    // +---------+---------+
+    // | 4       | row_4   |
+    // | 6       | row_6   |
+    // | 9       | row_9   |
+    // +---------+---------+
+    run_example(
+        &ctx,
+        "Example 2: TABLESAMPLE with percentage",
+        "SELECT * FROM sample_data TABLESAMPLE BERNOULLI(30 PERCENT)",
+    )
+    .await?;
+
+    // Example 3: TABLESAMPLE with fractional sampling (50% of data)
+    // Shows: Random sampling using decimal fraction instead of percentage
+    // Expected: ~5 rows (varies due to randomness) from the 10-row dataset
+    // Actual:
+    // +---------+---------+
+    // | column1 | column2 |
+    // +---------+---------+
+    // | 3       | row_3   |
+    // | 4       | row_4   |
+    // | 5       | row_5   |
+    // +---------+---------+
+    run_example(
+        &ctx,
+        "Example 3: TABLESAMPLE with fraction",
+        "SELECT * FROM sample_data TABLESAMPLE (0.5)",
+    )
+    .await?;
+
+    // Example 4: TABLESAMPLE with REPEATABLE seed for reproducible results
+    // Shows: Deterministic sampling using a fixed seed for consistent results
+    // Expected: Same rows selected each time due to fixed seed (42)
+    // Actual:
+    // +---------+---------+
+    // | column1 | column2 |
+    // +---------+---------+
+    // | 5       | row_5   |
+    // | 9       | row_9   |
+    // | 10      | row_10  |
+    // +---------+---------+
+    run_example(
+        &ctx,
+        "Example 4: TABLESAMPLE with REPEATABLE seed",
+        "SELECT * FROM sample_data TABLESAMPLE (0.3) REPEATABLE(42)",
+    )
+    .await?;
+
+    // Example 5: TABLESAMPLE with exact row count limit
+    // Shows: Sampling by limiting to a specific number of rows (not 
probabilistic)
+    // Expected: Exactly 3 rows (first 3 rows from the dataset)
+    // Actual:
+    // +---------+---------+
+    // | column1 | column2 |
+    // +---------+---------+
+    // | 1       | row_1   |
+    // | 2       | row_2   |
+    // | 3       | row_3   |
+    // +---------+---------+
+    run_example(
+        &ctx,
+        "Example 5: TABLESAMPLE with row count",
+        "SELECT * FROM sample_data TABLESAMPLE (3 ROWS)",
+    )
+    .await?;
+
+    // Example 6: TABLESAMPLE combined with WHERE clause filtering
+    // Shows: How sampling works with other query operations like filtering
+    // Expected: 3 rows where column1 > 2 (from the 5-row sample)
+    // Actual:
+    // +---------+---------+
+    // | column1 | column2 |
+    // +---------+---------+
+    // | 3       | row_3   |
+    // | 4       | row_4   |
+    // | 5       | row_5   |
+    // +---------+---------+
+    run_example(
+        &ctx,
+        "Example 6: TABLESAMPLE with WHERE clause",
+        r#"SELECT * FROM sample_data 
+           TABLESAMPLE (5 ROWS) 
+           WHERE column1 > 2"#,
+    )
+    .await?;
+
+    // Example 7: JOIN between two independently sampled tables
+    // Shows: How sampling works in complex queries with multiple table 
references
+    // Expected: Rows where both sampled tables have matching column1 values
+    // Actual:
+    // +---------+---------+---------+---------+
+    // | column1 | column1 | column2 | column2 |
+    // +---------+---------+---------+---------+
+    // | 2       | 2       | row_2   | row_2   |
+    // | 8       | 8       | row_8   | row_8   |
+    // | 10      | 10      | row_10  | row_10  |
+    // +---------+---------+---------+---------+
+    run_example(
+        &ctx,
+        "Example 7: JOIN between two different TABLESAMPLE tables",
+        r#"SELECT t1.column1, t2.column1, t1.column2, t2.column2 
+           FROM sample_data t1 TABLESAMPLE (0.7) 
+           JOIN sample_data t2 TABLESAMPLE (0.7) 
+           ON t1.column1 = t2.column1"#,
+    )
+    .await?;
+
+    Ok(())
+}
+
+/// Register sample data table for the examples
+fn register_sample_data(ctx: &SessionContext) -> Result<()> {
+    // Create sample_data table with 10 rows: column1 (1-10), column2 (row_1 
to row_10)
+    let column1: ArrayRef = 
Arc::new(Int32Array::from((1..=10).collect::<Vec<i32>>()));
+    let column2: ArrayRef = Arc::new(StringArray::from(
+        (1..=10)
+            .map(|i| format!("row_{i}"))
+            .collect::<Vec<String>>(),
+    ));
+    let batch =
+        RecordBatch::try_from_iter(vec![("column1", column1), ("column2", 
column2)])?;
+    ctx.register_batch("sample_data", batch)?;
+
+    Ok(())
+}
+
+async fn run_example(ctx: &SessionContext, title: &str, sql: &str) -> 
Result<()> {
+    println!("{title}:\n{sql}\n");
+    let df = ctx.sql(sql).await?;
+    println!("Logical Plan:\n{}\n", df.logical_plan().display_indent());
+    df.show().await?;
+    Ok(())
+}
+
+/// Hashable and comparable f64 for sampling bounds
+#[derive(Debug, Clone, Copy, PartialOrd)]
+struct Bound(f64);
+
+impl PartialEq for Bound {
+    fn eq(&self, other: &Self) -> bool {
+        (self.0 - other.0).abs() < f64::EPSILON
+    }
+}
+
+impl Eq for Bound {}
+
+impl Hash for Bound {
+    fn hash<H: Hasher>(&self, state: &mut H) {
+        // Hash the bits of the f64
+        self.0.to_bits().hash(state);
+    }
+}
+
+impl From<f64> for Bound {
+    fn from(value: f64) -> Self {
+        Self(value)
+    }
+}
+impl From<Bound> for f64 {
+    fn from(value: Bound) -> Self {
+        value.0
+    }
+}
+
+impl AsRef<f64> for Bound {
+    fn as_ref(&self) -> &f64 {
+        &self.0
+    }
+}
+
+#[derive(Debug, Clone, Hash, Eq, PartialEq, PartialOrd)]
+struct TableSamplePlanNode {
+    inner_plan: LogicalPlan,
+
+    lower_bound: Bound,
+    upper_bound: Bound,
+    with_replacement: bool,
+    seed: u64,
+}
+
+impl TableSamplePlanNode {
+    pub fn new(
+        input: LogicalPlan,
+        fraction: f64,
+        with_replacement: Option<bool>,
+        seed: Option<u64>,
+    ) -> Self {
+        TableSamplePlanNode {
+            inner_plan: input,
+            lower_bound: Bound::from(0.0),
+            upper_bound: Bound::from(fraction),
+            with_replacement: with_replacement.unwrap_or(false),
+            seed: seed.unwrap_or_else(rand::random),
+        }
+    }
+
+    pub fn into_plan(self) -> LogicalPlan {
+        LogicalPlan::Extension(Extension {
+            node: Arc::new(self),
+        })
+    }
+}
+
+impl UserDefinedLogicalNodeCore for TableSamplePlanNode {
+    fn name(&self) -> &str {
+        "TableSample"
+    }
+
+    fn inputs(&self) -> Vec<&LogicalPlan> {
+        vec![&self.inner_plan]
+    }
+
+    fn schema(&self) -> &DFSchemaRef {
+        self.inner_plan.schema()
+    }
+
+    fn expressions(&self) -> Vec<Expr> {
+        vec![]
+    }
+
+    fn fmt_for_explain(&self, f: &mut Formatter) -> fmt::Result {
+        f.write_fmt(format_args!(
+            "Sample: {:?} {:?} {:?}",
+            self.lower_bound, self.upper_bound, self.seed
+        ))
+    }
+
+    fn with_exprs_and_inputs(
+        &self,
+        _exprs: Vec<Expr>,
+        inputs: Vec<LogicalPlan>,
+    ) -> Result<Self> {
+        let input = inputs
+            .first()
+            .ok_or(DataFusionError::Plan("Should have input".into()))?;
+        Ok(Self {
+            inner_plan: input.clone(),
+            lower_bound: self.lower_bound,
+            upper_bound: self.upper_bound,
+            with_replacement: self.with_replacement,
+            seed: self.seed,
+        })
+    }
+}
+
+/// Execution planner with `SampleExec` for `TableSamplePlanNode`
+struct TableSampleExtensionPlanner {}
+
+impl TableSampleExtensionPlanner {
+    fn build_execution_plan(
+        &self,
+        specific_node: &TableSamplePlanNode,
+        physical_input: &Arc<dyn ExecutionPlan>,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        Ok(Arc::new(SampleExec {
+            input: Arc::clone(physical_input),
+            lower_bound: 0.0,
+            upper_bound: specific_node.upper_bound.into(),
+            with_replacement: specific_node.with_replacement,
+            seed: specific_node.seed,
+            metrics: Default::default(),
+            cache: SampleExec::compute_properties(physical_input),
+        }))
+    }
+}
+
+#[async_trait]
+impl ExtensionPlanner for TableSampleExtensionPlanner {
+    /// Create a physical plan for an extension node
+    async fn plan_extension(
+        &self,
+        _planner: &dyn PhysicalPlanner,
+        node: &dyn UserDefinedLogicalNode,
+        logical_inputs: &[&LogicalPlan],
+        physical_inputs: &[Arc<dyn ExecutionPlan>],
+        _session_state: &SessionState,
+    ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
+        if let Some(specific_node) = 
node.as_any().downcast_ref::<TableSamplePlanNode>() {
+            println!("Extension planner plan_extension: {:?}", 
&logical_inputs);
+            assert_eq!(logical_inputs.len(), 1, "Inconsistent number of 
inputs");
+            assert_eq!(physical_inputs.len(), 1, "Inconsistent number of 
inputs");
+
+            let exec_plan =
+                self.build_execution_plan(specific_node, &physical_inputs[0])?;
+            Ok(Some(exec_plan))
+        } else {
+            Ok(None)
+        }
+    }
+}
+
+/// Query planner supporting a `TableSampleExtensionPlanner`
+#[derive(Debug)]
+struct TableSampleQueryPlanner {}
+
+#[async_trait]
+impl QueryPlanner for TableSampleQueryPlanner {
+    /// Given a `LogicalPlan` created from above, create an
+    /// `ExecutionPlan` suitable for execution
+    async fn create_physical_plan(
+        &self,
+        logical_plan: &LogicalPlan,
+        session_state: &SessionState,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        // Additional extension for table sample node
+        let physical_planner =
+            DefaultPhysicalPlanner::with_extension_planners(vec![Arc::new(
+                TableSampleExtensionPlanner {},
+            )]);
+        // Delegate most work of physical planning to the default physical 
planner
+        physical_planner
+            .create_physical_plan(logical_plan, session_state)
+            .await
+    }
+}
+
+/// Physical plan implementation
+trait Sampler: Send + Sync {
+    fn sample(&mut self, batch: &RecordBatch) -> Result<RecordBatch>;
+}
+
+struct BernoulliSampler {
+    lower_bound: f64,
+    upper_bound: f64,
+    rng: StdRng,
+}
+
+impl BernoulliSampler {
+    fn new(lower_bound: f64, upper_bound: f64, seed: u64) -> Self {
+        Self {
+            lower_bound,
+            upper_bound,
+            rng: StdRng::seed_from_u64(seed),
+        }
+    }
+}
+
+impl Sampler for BernoulliSampler {
+    fn sample(&mut self, batch: &RecordBatch) -> Result<RecordBatch> {
+        if self.upper_bound <= self.lower_bound {
+            return Ok(RecordBatch::new_empty(batch.schema()));
+        }
+
+        let mut indices = Vec::new();
+
+        for i in 0..batch.num_rows() {
+            let rnd: f64 = self.rng.random();
+
+            if rnd >= self.lower_bound && rnd < self.upper_bound {
+                indices.push(i as u32);
+            }
+        }
+
+        if indices.is_empty() {
+            return Ok(RecordBatch::new_empty(batch.schema()));
+        }
+        let indices = UInt32Array::from(indices);
+        compute::take_record_batch(batch, &indices).map_err(|e| e.into())
+    }
+}
+
+struct PoissonSampler {
+    ratio: f64,
+    poisson: Poisson<f64>,
+    rng: StdRng,
+}
+
+impl PoissonSampler {
+    fn try_new(ratio: f64, seed: u64) -> Result<Self> {
+        let poisson = Poisson::new(ratio).map_err(|e| 
plan_datafusion_err!("{}", e))?;
+        Ok(Self {
+            ratio,
+            poisson,
+            rng: StdRng::seed_from_u64(seed),
+        })
+    }
+}
+
+impl Sampler for PoissonSampler {
+    fn sample(&mut self, batch: &RecordBatch) -> Result<RecordBatch> {
+        if self.ratio <= 0.0 {
+            return Ok(RecordBatch::new_empty(batch.schema()));
+        }
+
+        let mut indices = Vec::new();
+
+        for i in 0..batch.num_rows() {
+            let k = self.poisson.sample(&mut self.rng) as i32;
+            for _ in 0..k {
+                indices.push(i as u32);
+            }
+        }
+
+        if indices.is_empty() {
+            return Ok(RecordBatch::new_empty(batch.schema()));
+        }
+
+        let indices = UInt32Array::from(indices);
+        compute::take_record_batch(batch, &indices).map_err(|e| e.into())
+    }
+}
+
+/// SampleExec samples rows from its input based on a sampling method.
+/// This is used to implement SQL `SAMPLE` clause.
+#[derive(Debug, Clone)]
+pub struct SampleExec {
+    /// The input plan
+    input: Arc<dyn ExecutionPlan>,
+    /// The lower bound of the sampling ratio
+    lower_bound: f64,
+    /// The upper bound of the sampling ratio
+    upper_bound: f64,
+    /// Whether to sample with replacement
+    with_replacement: bool,
+    /// Random seed for reproducible sampling
+    seed: u64,
+    /// Execution metrics
+    metrics: ExecutionPlanMetricsSet,
+    /// Properties equivalence properties, partitioning, etc.
+    cache: PlanProperties,
+}
+
+impl SampleExec {
+    /// Create a new SampleExec with a custom sampling method
+    pub fn try_new(
+        input: Arc<dyn ExecutionPlan>,
+        lower_bound: f64,
+        upper_bound: f64,
+        with_replacement: bool,
+        seed: u64,
+    ) -> Result<Self> {
+        if lower_bound < 0.0 || upper_bound > 1.0 || lower_bound > upper_bound 
{
+            return internal_err!(
+                "Sampling bounds must be between 0.0 and 1.0, and lower_bound 
<= upper_bound, got [{}, {}]",
+                lower_bound, upper_bound
+            );
+        }
+
+        let cache = Self::compute_properties(&input);
+
+        Ok(Self {
+            input,
+            lower_bound,
+            upper_bound,
+            with_replacement,
+            seed,
+            metrics: ExecutionPlanMetricsSet::new(),
+            cache,
+        })
+    }
+
+    fn create_sampler(&self, partition: usize) -> Result<Box<dyn Sampler>> {
+        if self.with_replacement {
+            Ok(Box::new(PoissonSampler::try_new(
+                self.upper_bound - self.lower_bound,
+                self.seed + partition as u64,
+            )?))
+        } else {
+            Ok(Box::new(BernoulliSampler::new(
+                self.lower_bound,
+                self.upper_bound,
+                self.seed + partition as u64,
+            )))
+        }
+    }
+
+    /// Whether to sample with replacement
+    #[allow(dead_code)]
+    pub fn with_replacement(&self) -> bool {
+        self.with_replacement
+    }
+
+    /// The lower bound of the sampling ratio
+    #[allow(dead_code)]
+    pub fn lower_bound(&self) -> f64 {
+        self.lower_bound
+    }
+
+    /// The upper bound of the sampling ratio
+    #[allow(dead_code)]
+    pub fn upper_bound(&self) -> f64 {
+        self.upper_bound
+    }
+
+    /// The random seed
+    #[allow(dead_code)]
+    pub fn seed(&self) -> u64 {
+        self.seed
+    }
+
+    /// The input plan
+    #[allow(dead_code)]
+    pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
+        &self.input
+    }
+
+    /// This function creates the cache object that stores the plan properties 
such as schema, equivalence properties, ordering, partitioning, etc.
+    fn compute_properties(input: &Arc<dyn ExecutionPlan>) -> PlanProperties {
+        input
+            .properties()
+            .clone()
+            .with_eq_properties(EquivalenceProperties::new(input.schema()))
+    }
+}
+
+impl DisplayAs for SampleExec {
+    fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> fmt::Result {
+        match t {
+            DisplayFormatType::Default | DisplayFormatType::Verbose => {
+                write!(
+                    f,
+                    "SampleExec: lower_bound={}, upper_bound={}, 
with_replacement={}, seed={}",
+                    self.lower_bound, self.upper_bound, self.with_replacement, 
self.seed
+                )
+            }
+            DisplayFormatType::TreeRender => {
+                write!(
+                    f,
+                    "SampleExec: lower_bound={}, upper_bound={}, 
with_replacement={}, seed={}",
+                    self.lower_bound, self.upper_bound, self.with_replacement, 
self.seed
+                )
+            }
+        }
+    }
+}
+
+impl ExecutionPlan for SampleExec {
+    fn name(&self) -> &'static str {
+        "SampleExec"
+    }
+
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn properties(&self) -> &PlanProperties {
+        &self.cache
+    }
+
+    fn maintains_input_order(&self) -> Vec<bool> {
+        vec![false] // Sampling does not maintain input order
+    }
+
+    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
+        vec![&self.input]
+    }
+
+    fn with_new_children(
+        self: Arc<Self>,
+        children: Vec<Arc<dyn ExecutionPlan>>,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        Ok(Arc::new(SampleExec::try_new(
+            Arc::clone(&children[0]),
+            self.lower_bound,
+            self.upper_bound,
+            self.with_replacement,
+            self.seed,
+        )?))
+    }
+
+    fn execute(
+        &self,
+        partition: usize,
+        context: Arc<TaskContext>,
+    ) -> Result<SendableRecordBatchStream> {
+        let input_stream = self.input.execute(partition, context)?;
+        let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);
+
+        Ok(Box::pin(SampleExecStream {
+            input: input_stream,
+            sampler: self.create_sampler(partition)?,
+            baseline_metrics,
+        }))
+    }
+
+    fn metrics(&self) -> Option<MetricsSet> {
+        Some(self.metrics.clone_inner())
+    }
+
+    fn partition_statistics(&self, partition: Option<usize>) -> 
Result<Statistics> {
+        let input_stats = self.input.partition_statistics(partition)?;
+
+        // Apply sampling ratio to statistics
+        let mut stats = input_stats;
+        let ratio = self.upper_bound - self.lower_bound;
+
+        stats.num_rows = stats
+            .num_rows
+            .map(|nr| (nr as f64 * ratio) as usize)
+            .to_inexact();
+        stats.total_byte_size = stats
+            .total_byte_size
+            .map(|tb| (tb as f64 * ratio) as usize)
+            .to_inexact();
+
+        Ok(stats)
+    }
+}
+
+/// Stream for the SampleExec operator
+struct SampleExecStream {
+    /// The input stream
+    input: SendableRecordBatchStream,
+    /// The sampling method
+    sampler: Box<dyn Sampler>,
+    /// Runtime metrics recording
+    baseline_metrics: BaselineMetrics,
+}
+
+impl Stream for SampleExecStream {
+    type Item = Result<RecordBatch>;
+
+    fn poll_next(
+        mut self: Pin<&mut Self>,
+        cx: &mut Context<'_>,
+    ) -> Poll<Option<Self::Item>> {
+        match ready!(self.input.poll_next_unpin(cx)) {
+            Some(Ok(batch)) => {
+                let start = self.baseline_metrics.elapsed_compute().clone();
+                let result = self.sampler.sample(&batch);
+                let result = result.record_output(&self.baseline_metrics);
+                let _timer = start.timer();

Review Comment:
   I think the Sample example would be nice to leave in as I can see people 
copy/paste/modifying it to add sampling to their applications (which has been 
requested several times)
   



##########
datafusion-examples/examples/relation_planner/table_sample.rs:
##########
@@ -0,0 +1,969 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! This example demonstrates using custom relation planners to implement
+//! SQL TABLESAMPLE clause support.
+//!
+//! TABLESAMPLE allows sampling a fraction or number of rows from a table:
+//!   - `SELECT * FROM table TABLESAMPLE BERNOULLI(10)` - 10% sample
+//!   - `SELECT * FROM table TABLESAMPLE (100 ROWS)` - 100 rows
+//!   - `SELECT * FROM table TABLESAMPLE (10 PERCENT) REPEATABLE(42)` - 
Reproducible
+
+use std::{
+    any::Any,
+    fmt::{self, Debug, Formatter},
+    hash::{Hash, Hasher},
+    ops::{Add, Div, Mul, Sub},
+    pin::Pin,
+    str::FromStr,
+    sync::Arc,
+    task::{Context, Poll},
+};
+
+use arrow::{
+    array::{ArrayRef, Int32Array, RecordBatch, StringArray, UInt32Array},
+    compute,
+};
+use arrow_schema::SchemaRef;
+use futures::{
+    ready,
+    stream::{Stream, StreamExt},
+};
+use rand::{rngs::StdRng, Rng, SeedableRng};
+use rand_distr::{Distribution, Poisson};
+use tonic::async_trait;
+
+use datafusion::{
+    execution::{
+        context::QueryPlanner, RecordBatchStream, SendableRecordBatchStream,
+        SessionState, SessionStateBuilder, TaskContext,
+    },
+    physical_expr::EquivalenceProperties,
+    physical_plan::{
+        metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet, 
RecordOutput},
+        DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties,
+    },
+    physical_planner::{DefaultPhysicalPlanner, ExtensionPlanner, 
PhysicalPlanner},
+    prelude::*,
+};
+use datafusion_common::{
+    internal_err, not_impl_err, plan_datafusion_err, plan_err, DFSchemaRef,
+    DataFusionError, Result, Statistics,
+};
+use datafusion_expr::{
+    logical_plan::{Extension, LogicalPlan, LogicalPlanBuilder},
+    planner::{
+        PlannedRelation, RelationPlanner, RelationPlannerContext, 
RelationPlanning,
+    },
+    UserDefinedLogicalNode, UserDefinedLogicalNodeCore,
+};
+use datafusion_sql::sqlparser::ast::{
+    self, TableFactor, TableSampleMethod, TableSampleUnit,
+};
+
+/// This example demonstrates using custom relation planners to implement
+/// SQL TABLESAMPLE clause support.

Review Comment:
   In order to help people find the relevant code (rather than all the 
supporting bits), maybe you could also add a link to `TableSamplePlanner` in 
this comment
   
   Something like
   
   > In this example the [`TableSamplePlanner`] is used to provide custom 
planning
   > for queries with a TABLESAMPLE clause



##########
datafusion-examples/examples/relation_planner/pivot_unpivot.rs:
##########
@@ -0,0 +1,543 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! This example demonstrates using custom relation planners to implement
+//! PIVOT and UNPIVOT operations for reshaping data.
+//!
+//! PIVOT transforms rows into columns (wide format), while UNPIVOT does the
+//! reverse, transforming columns into rows (long format). This example shows
+//! how to use custom planners to implement these SQL clauses by rewriting them
+//! into equivalent standard SQL operations:
+//!
+//! - PIVOT is rewritten to GROUP BY with CASE expressions
+//! - UNPIVOT is rewritten to UNION ALL of projections
+
+use std::sync::Arc;
+
+use arrow::array::{ArrayRef, Int64Array, StringArray};
+use arrow::record_batch::RecordBatch;
+use datafusion::prelude::*;
+use datafusion_common::{DataFusionError, Result, ScalarValue};
+use datafusion_expr::{
+    case, lit,
+    logical_plan::builder::LogicalPlanBuilder,
+    planner::{
+        PlannedRelation, RelationPlanner, RelationPlannerContext, 
RelationPlanning,
+    },
+    Expr,
+};
+use datafusion_sql::sqlparser::ast::TableFactor;
+
+/// This example demonstrates using custom relation planners to implement
+/// PIVOT and UNPIVOT operations for reshaping data.
+pub async fn pivot_unpivot() -> Result<()> {
+    let ctx = SessionContext::new();
+
+    // Register sample data tables
+    register_sample_data(&ctx)?;
+
+    // Register custom planner
+    ctx.register_relation_planner(Arc::new(PivotUnpivotPlanner))?;
+
+    println!("Custom Relation Planner: PIVOT and UNPIVOT Operations");
+    println!("======================================================\n");
+
+    // Example 1: Basic PIVOT to transform monthly sales data from rows to 
columns

Review Comment:
   likewise here, it would be great to programmatically verify this output to 
ensure it remains in sync



##########
datafusion-examples/examples/relation_planner/table_sample.rs:
##########
@@ -0,0 +1,969 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! This example demonstrates using custom relation planners to implement
+//! SQL TABLESAMPLE clause support.
+//!
+//! TABLESAMPLE allows sampling a fraction or number of rows from a table:
+//!   - `SELECT * FROM table TABLESAMPLE BERNOULLI(10)` - 10% sample
+//!   - `SELECT * FROM table TABLESAMPLE (100 ROWS)` - 100 rows
+//!   - `SELECT * FROM table TABLESAMPLE (10 PERCENT) REPEATABLE(42)` - 
Reproducible
+
+use std::{
+    any::Any,
+    fmt::{self, Debug, Formatter},
+    hash::{Hash, Hasher},
+    ops::{Add, Div, Mul, Sub},
+    pin::Pin,
+    str::FromStr,
+    sync::Arc,
+    task::{Context, Poll},
+};
+
+use arrow::{
+    array::{ArrayRef, Int32Array, RecordBatch, StringArray, UInt32Array},
+    compute,
+};
+use arrow_schema::SchemaRef;
+use futures::{
+    ready,
+    stream::{Stream, StreamExt},
+};
+use rand::{rngs::StdRng, Rng, SeedableRng};
+use rand_distr::{Distribution, Poisson};
+use tonic::async_trait;
+
+use datafusion::{
+    execution::{
+        context::QueryPlanner, RecordBatchStream, SendableRecordBatchStream,
+        SessionState, SessionStateBuilder, TaskContext,
+    },
+    physical_expr::EquivalenceProperties,
+    physical_plan::{
+        metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet, 
RecordOutput},
+        DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties,
+    },
+    physical_planner::{DefaultPhysicalPlanner, ExtensionPlanner, 
PhysicalPlanner},
+    prelude::*,
+};
+use datafusion_common::{
+    internal_err, not_impl_err, plan_datafusion_err, plan_err, DFSchemaRef,
+    DataFusionError, Result, Statistics,
+};
+use datafusion_expr::{
+    logical_plan::{Extension, LogicalPlan, LogicalPlanBuilder},
+    planner::{
+        PlannedRelation, RelationPlanner, RelationPlannerContext, 
RelationPlanning,
+    },
+    UserDefinedLogicalNode, UserDefinedLogicalNodeCore,
+};
+use datafusion_sql::sqlparser::ast::{
+    self, TableFactor, TableSampleMethod, TableSampleUnit,
+};
+
+/// This example demonstrates using custom relation planners to implement
+/// SQL TABLESAMPLE clause support.
+pub async fn table_sample() -> Result<()> {
+    let state = SessionStateBuilder::new()
+        .with_default_features()
+        .with_query_planner(Arc::new(TableSampleQueryPlanner {}))
+        .build();
+
+    let ctx = SessionContext::new_with_state(state.clone());
+
+    // Register sample data table
+    register_sample_data(&ctx)?;
+
+    // Register custom planner
+    ctx.register_relation_planner(Arc::new(TableSamplePlanner))?;
+
+    println!("Custom Relation Planner: TABLESAMPLE Support");
+    println!("============================================\n");
+    println!("Note: This shows logical planning for TABLESAMPLE.");
+    println!("Physical execution requires additional implementation.\n");
+
+    // Example 1: Full table without any sampling (baseline)
+    // Shows: Complete dataset with all 10 rows (1-10 with row_1 to row_10)
+    // Expected: 10 rows showing the full sample_data table
+    // Actual:
+    // +---------+---------+
+    // | column1 | column2 |
+    // +---------+---------+
+    // | 1       | row_1   |
+    // | 2       | row_2   |
+    // | 3       | row_3   |
+    // | 4       | row_4   |
+    // | 5       | row_5   |
+    // | 6       | row_6   |
+    // | 7       | row_7   |
+    // | 8       | row_8   |
+    // | 9       | row_9   |
+    // | 10      | row_10  |
+    // +---------+---------+
+    run_example(
+        &ctx,
+        "Example 1: Full table (no sampling)",
+        "SELECT * FROM sample_data",
+    )
+    .await?;
+
+    // Example 2: TABLESAMPLE with BERNOULLI sampling at 30% probability
+    // Shows: Random sampling where each row has 30% chance of being selected
+    // Expected: ~3 rows (varies due to randomness) from the 10-row dataset
+    // Actual:
+    // +---------+---------+
+    // | column1 | column2 |
+    // +---------+---------+
+    // | 4       | row_4   |
+    // | 6       | row_6   |
+    // | 9       | row_9   |
+    // +---------+---------+
+    run_example(
+        &ctx,
+        "Example 2: TABLESAMPLE with percentage",
+        "SELECT * FROM sample_data TABLESAMPLE BERNOULLI(30 PERCENT)",
+    )
+    .await?;
+
+    // Example 3: TABLESAMPLE with fractional sampling (50% of data)
+    // Shows: Random sampling using decimal fraction instead of percentage
+    // Expected: ~5 rows (varies due to randomness) from the 10-row dataset
+    // Actual:
+    // +---------+---------+
+    // | column1 | column2 |
+    // +---------+---------+
+    // | 3       | row_3   |
+    // | 4       | row_4   |
+    // | 5       | row_5   |
+    // +---------+---------+
+    run_example(
+        &ctx,
+        "Example 3: TABLESAMPLE with fraction",
+        "SELECT * FROM sample_data TABLESAMPLE (0.5)",
+    )
+    .await?;
+
+    // Example 4: TABLESAMPLE with REPEATABLE seed for reproducible results
+    // Shows: Deterministic sampling using a fixed seed for consistent results
+    // Expected: Same rows selected each time due to fixed seed (42)
+    // Actual:
+    // +---------+---------+
+    // | column1 | column2 |
+    // +---------+---------+
+    // | 5       | row_5   |
+    // | 9       | row_9   |
+    // | 10      | row_10  |
+    // +---------+---------+
+    run_example(
+        &ctx,
+        "Example 4: TABLESAMPLE with REPEATABLE seed",
+        "SELECT * FROM sample_data TABLESAMPLE (0.3) REPEATABLE(42)",
+    )
+    .await?;
+
+    // Example 5: TABLESAMPLE with exact row count limit
+    // Shows: Sampling by limiting to a specific number of rows (not 
probabilistic)
+    // Expected: Exactly 3 rows (first 3 rows from the dataset)
+    // Actual:
+    // +---------+---------+
+    // | column1 | column2 |
+    // +---------+---------+
+    // | 1       | row_1   |
+    // | 2       | row_2   |
+    // | 3       | row_3   |
+    // +---------+---------+
+    run_example(
+        &ctx,
+        "Example 5: TABLESAMPLE with row count",
+        "SELECT * FROM sample_data TABLESAMPLE (3 ROWS)",
+    )
+    .await?;
+
+    // Example 6: TABLESAMPLE combined with WHERE clause filtering
+    // Shows: How sampling works with other query operations like filtering
+    // Expected: 3 rows where column1 > 2 (from the 5-row sample)
+    // Actual:
+    // +---------+---------+
+    // | column1 | column2 |
+    // +---------+---------+
+    // | 3       | row_3   |
+    // | 4       | row_4   |
+    // | 5       | row_5   |
+    // +---------+---------+
+    run_example(
+        &ctx,
+        "Example 6: TABLESAMPLE with WHERE clause",
+        r#"SELECT * FROM sample_data 
+           TABLESAMPLE (5 ROWS) 
+           WHERE column1 > 2"#,
+    )
+    .await?;
+
+    // Example 7: JOIN between two independently sampled tables
+    // Shows: How sampling works in complex queries with multiple table 
references
+    // Expected: Rows where both sampled tables have matching column1 values
+    // Actual:
+    // +---------+---------+---------+---------+
+    // | column1 | column1 | column2 | column2 |
+    // +---------+---------+---------+---------+
+    // | 2       | 2       | row_2   | row_2   |
+    // | 8       | 8       | row_8   | row_8   |
+    // | 10      | 10      | row_10  | row_10  |
+    // +---------+---------+---------+---------+
+    run_example(
+        &ctx,
+        "Example 7: JOIN between two different TABLESAMPLE tables",
+        r#"SELECT t1.column1, t2.column1, t1.column2, t2.column2 
+           FROM sample_data t1 TABLESAMPLE (0.7) 
+           JOIN sample_data t2 TABLESAMPLE (0.7) 
+           ON t1.column1 = t2.column1"#,
+    )
+    .await?;
+
+    Ok(())
+}
+
+/// Register sample data table for the examples
+fn register_sample_data(ctx: &SessionContext) -> Result<()> {
+    // Create sample_data table with 10 rows: column1 (1-10), column2 (row_1 
to row_10)
+    let column1: ArrayRef = 
Arc::new(Int32Array::from((1..=10).collect::<Vec<i32>>()));
+    let column2: ArrayRef = Arc::new(StringArray::from(
+        (1..=10)
+            .map(|i| format!("row_{i}"))
+            .collect::<Vec<String>>(),
+    ));
+    let batch =
+        RecordBatch::try_from_iter(vec![("column1", column1), ("column2", 
column2)])?;
+    ctx.register_batch("sample_data", batch)?;
+
+    Ok(())
+}
+
+async fn run_example(ctx: &SessionContext, title: &str, sql: &str) -> 
Result<()> {
+    println!("{title}:\n{sql}\n");
+    let df = ctx.sql(sql).await?;
+    println!("Logical Plan:\n{}\n", df.logical_plan().display_indent());
+    df.show().await?;
+    Ok(())
+}
+
+/// Hashable and comparable f64 for sampling bounds
+#[derive(Debug, Clone, Copy, PartialOrd)]
+struct Bound(f64);
+
+impl PartialEq for Bound {
+    fn eq(&self, other: &Self) -> bool {
+        (self.0 - other.0).abs() < f64::EPSILON
+    }
+}
+
+impl Eq for Bound {}
+
+impl Hash for Bound {
+    fn hash<H: Hasher>(&self, state: &mut H) {
+        // Hash the bits of the f64
+        self.0.to_bits().hash(state);
+    }
+}
+
+impl From<f64> for Bound {
+    fn from(value: f64) -> Self {
+        Self(value)
+    }
+}
+impl From<Bound> for f64 {
+    fn from(value: Bound) -> Self {
+        value.0
+    }
+}
+
+impl AsRef<f64> for Bound {
+    fn as_ref(&self) -> &f64 {
+        &self.0
+    }
+}
+
+#[derive(Debug, Clone, Hash, Eq, PartialEq, PartialOrd)]
+struct TableSamplePlanNode {
+    inner_plan: LogicalPlan,
+
+    lower_bound: Bound,
+    upper_bound: Bound,
+    with_replacement: bool,
+    seed: u64,
+}
+
+impl TableSamplePlanNode {
+    pub fn new(
+        input: LogicalPlan,
+        fraction: f64,
+        with_replacement: Option<bool>,
+        seed: Option<u64>,
+    ) -> Self {
+        TableSamplePlanNode {
+            inner_plan: input,
+            lower_bound: Bound::from(0.0),
+            upper_bound: Bound::from(fraction),
+            with_replacement: with_replacement.unwrap_or(false),
+            seed: seed.unwrap_or_else(rand::random),
+        }
+    }
+
+    pub fn into_plan(self) -> LogicalPlan {
+        LogicalPlan::Extension(Extension {
+            node: Arc::new(self),
+        })
+    }
+}
+
+impl UserDefinedLogicalNodeCore for TableSamplePlanNode {
+    fn name(&self) -> &str {
+        "TableSample"
+    }
+
+    fn inputs(&self) -> Vec<&LogicalPlan> {
+        vec![&self.inner_plan]
+    }
+
+    fn schema(&self) -> &DFSchemaRef {
+        self.inner_plan.schema()
+    }
+
+    fn expressions(&self) -> Vec<Expr> {
+        vec![]
+    }
+
+    fn fmt_for_explain(&self, f: &mut Formatter) -> fmt::Result {
+        f.write_fmt(format_args!(
+            "Sample: {:?} {:?} {:?}",
+            self.lower_bound, self.upper_bound, self.seed
+        ))
+    }
+
+    fn with_exprs_and_inputs(
+        &self,
+        _exprs: Vec<Expr>,
+        inputs: Vec<LogicalPlan>,
+    ) -> Result<Self> {
+        let input = inputs
+            .first()
+            .ok_or(DataFusionError::Plan("Should have input".into()))?;
+        Ok(Self {
+            inner_plan: input.clone(),
+            lower_bound: self.lower_bound,
+            upper_bound: self.upper_bound,
+            with_replacement: self.with_replacement,
+            seed: self.seed,
+        })
+    }
+}
+
+/// Execution planner with `SampleExec` for `TableSamplePlanNode`
+struct TableSampleExtensionPlanner {}
+
+impl TableSampleExtensionPlanner {
+    fn build_execution_plan(
+        &self,
+        specific_node: &TableSamplePlanNode,
+        physical_input: &Arc<dyn ExecutionPlan>,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        Ok(Arc::new(SampleExec {
+            input: Arc::clone(physical_input),
+            lower_bound: 0.0,
+            upper_bound: specific_node.upper_bound.into(),
+            with_replacement: specific_node.with_replacement,
+            seed: specific_node.seed,
+            metrics: Default::default(),
+            cache: SampleExec::compute_properties(physical_input),
+        }))
+    }
+}
+
+#[async_trait]
+impl ExtensionPlanner for TableSampleExtensionPlanner {
+    /// Create a physical plan for an extension node
+    async fn plan_extension(
+        &self,
+        _planner: &dyn PhysicalPlanner,
+        node: &dyn UserDefinedLogicalNode,
+        logical_inputs: &[&LogicalPlan],
+        physical_inputs: &[Arc<dyn ExecutionPlan>],
+        _session_state: &SessionState,
+    ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
+        if let Some(specific_node) = 
node.as_any().downcast_ref::<TableSamplePlanNode>() {
+            println!("Extension planner plan_extension: {:?}", 
&logical_inputs);
+            assert_eq!(logical_inputs.len(), 1, "Inconsistent number of 
inputs");
+            assert_eq!(physical_inputs.len(), 1, "Inconsistent number of 
inputs");
+
+            let exec_plan =
+                self.build_execution_plan(specific_node, &physical_inputs[0])?;
+            Ok(Some(exec_plan))
+        } else {
+            Ok(None)
+        }
+    }
+}
+
+/// Query planner supporting a `TableSampleExtensionPlanner`
+#[derive(Debug)]
+struct TableSampleQueryPlanner {}
+
+#[async_trait]
+impl QueryPlanner for TableSampleQueryPlanner {
+    /// Given a `LogicalPlan` created from above, create an
+    /// `ExecutionPlan` suitable for execution
+    async fn create_physical_plan(
+        &self,
+        logical_plan: &LogicalPlan,
+        session_state: &SessionState,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        // Additional extension for table sample node
+        let physical_planner =
+            DefaultPhysicalPlanner::with_extension_planners(vec![Arc::new(
+                TableSampleExtensionPlanner {},
+            )]);
+        // Delegate most work of physical planning to the default physical 
planner
+        physical_planner
+            .create_physical_plan(logical_plan, session_state)
+            .await
+    }
+}
+
+/// Physical plan implementation
+trait Sampler: Send + Sync {
+    fn sample(&mut self, batch: &RecordBatch) -> Result<RecordBatch>;
+}
+
+struct BernoulliSampler {
+    lower_bound: f64,
+    upper_bound: f64,
+    rng: StdRng,
+}
+
+impl BernoulliSampler {
+    fn new(lower_bound: f64, upper_bound: f64, seed: u64) -> Self {
+        Self {
+            lower_bound,
+            upper_bound,
+            rng: StdRng::seed_from_u64(seed),
+        }
+    }
+}
+
+impl Sampler for BernoulliSampler {

Review Comment:
   FYI @theirix  -- is this adequate for your sampling needs?



##########
datafusion-examples/examples/relation_planner/table_sample.rs:
##########
@@ -0,0 +1,969 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! This example demonstrates using custom relation planners to implement
+//! SQL TABLESAMPLE clause support.
+//!
+//! TABLESAMPLE allows sampling a fraction or number of rows from a table:
+//!   - `SELECT * FROM table TABLESAMPLE BERNOULLI(10)` - 10% sample
+//!   - `SELECT * FROM table TABLESAMPLE (100 ROWS)` - 100 rows
+//!   - `SELECT * FROM table TABLESAMPLE (10 PERCENT) REPEATABLE(42)` - 
Reproducible
+
+use std::{
+    any::Any,
+    fmt::{self, Debug, Formatter},
+    hash::{Hash, Hasher},
+    ops::{Add, Div, Mul, Sub},
+    pin::Pin,
+    str::FromStr,
+    sync::Arc,
+    task::{Context, Poll},
+};
+
+use arrow::{
+    array::{ArrayRef, Int32Array, RecordBatch, StringArray, UInt32Array},
+    compute,
+};
+use arrow_schema::SchemaRef;
+use futures::{
+    ready,
+    stream::{Stream, StreamExt},
+};
+use rand::{rngs::StdRng, Rng, SeedableRng};
+use rand_distr::{Distribution, Poisson};
+use tonic::async_trait;
+
+use datafusion::{
+    execution::{
+        context::QueryPlanner, RecordBatchStream, SendableRecordBatchStream,
+        SessionState, SessionStateBuilder, TaskContext,
+    },
+    physical_expr::EquivalenceProperties,
+    physical_plan::{
+        metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet, 
RecordOutput},
+        DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties,
+    },
+    physical_planner::{DefaultPhysicalPlanner, ExtensionPlanner, 
PhysicalPlanner},
+    prelude::*,
+};
+use datafusion_common::{
+    internal_err, not_impl_err, plan_datafusion_err, plan_err, DFSchemaRef,
+    DataFusionError, Result, Statistics,
+};
+use datafusion_expr::{
+    logical_plan::{Extension, LogicalPlan, LogicalPlanBuilder},
+    planner::{
+        PlannedRelation, RelationPlanner, RelationPlannerContext, 
RelationPlanning,
+    },
+    UserDefinedLogicalNode, UserDefinedLogicalNodeCore,
+};
+use datafusion_sql::sqlparser::ast::{
+    self, TableFactor, TableSampleMethod, TableSampleUnit,
+};
+
+/// This example demonstrates using custom relation planners to implement
+/// SQL TABLESAMPLE clause support.
+pub async fn table_sample() -> Result<()> {
+    let state = SessionStateBuilder::new()
+        .with_default_features()
+        .with_query_planner(Arc::new(TableSampleQueryPlanner {}))
+        .build();
+
+    let ctx = SessionContext::new_with_state(state.clone());
+
+    // Register sample data table
+    register_sample_data(&ctx)?;
+
+    // Register custom planner
+    ctx.register_relation_planner(Arc::new(TableSamplePlanner))?;
+
+    println!("Custom Relation Planner: TABLESAMPLE Support");
+    println!("============================================\n");
+    println!("Note: This shows logical planning for TABLESAMPLE.");
+    println!("Physical execution requires additional implementation.\n");
+
+    // Example 1: Full table without any sampling (baseline)
+    // Shows: Complete dataset with all 10 rows (1-10 with row_1 to row_10)
+    // Expected: 10 rows showing the full sample_data table
+    // Actual:
+    // +---------+---------+
+    // | column1 | column2 |
+    // +---------+---------+
+    // | 1       | row_1   |
+    // | 2       | row_2   |
+    // | 3       | row_3   |
+    // | 4       | row_4   |
+    // | 5       | row_5   |
+    // | 6       | row_6   |
+    // | 7       | row_7   |
+    // | 8       | row_8   |
+    // | 9       | row_9   |
+    // | 10      | row_10  |
+    // +---------+---------+
+    run_example(
+        &ctx,
+        "Example 1: Full table (no sampling)",
+        "SELECT * FROM sample_data",
+    )
+    .await?;
+
+    // Example 2: TABLESAMPLE with BERNOULLI sampling at 30% probability
+    // Shows: Random sampling where each row has 30% chance of being selected
+    // Expected: ~3 rows (varies due to randomness) from the 10-row dataset
+    // Actual:
+    // +---------+---------+
+    // | column1 | column2 |
+    // +---------+---------+
+    // | 4       | row_4   |
+    // | 6       | row_6   |
+    // | 9       | row_9   |
+    // +---------+---------+
+    run_example(
+        &ctx,
+        "Example 2: TABLESAMPLE with percentage",
+        "SELECT * FROM sample_data TABLESAMPLE BERNOULLI(30 PERCENT)",
+    )
+    .await?;
+
+    // Example 3: TABLESAMPLE with fractional sampling (50% of data)
+    // Shows: Random sampling using decimal fraction instead of percentage
+    // Expected: ~5 rows (varies due to randomness) from the 10-row dataset
+    // Actual:
+    // +---------+---------+
+    // | column1 | column2 |
+    // +---------+---------+
+    // | 3       | row_3   |
+    // | 4       | row_4   |
+    // | 5       | row_5   |
+    // +---------+---------+
+    run_example(
+        &ctx,
+        "Example 3: TABLESAMPLE with fraction",
+        "SELECT * FROM sample_data TABLESAMPLE (0.5)",
+    )
+    .await?;
+
+    // Example 4: TABLESAMPLE with REPEATABLE seed for reproducible results
+    // Shows: Deterministic sampling using a fixed seed for consistent results
+    // Expected: Same rows selected each time due to fixed seed (42)
+    // Actual:
+    // +---------+---------+
+    // | column1 | column2 |
+    // +---------+---------+
+    // | 5       | row_5   |
+    // | 9       | row_9   |
+    // | 10      | row_10  |
+    // +---------+---------+
+    run_example(
+        &ctx,
+        "Example 4: TABLESAMPLE with REPEATABLE seed",
+        "SELECT * FROM sample_data TABLESAMPLE (0.3) REPEATABLE(42)",
+    )
+    .await?;
+
+    // Example 5: TABLESAMPLE with exact row count limit
+    // Shows: Sampling by limiting to a specific number of rows (not 
probabilistic)
+    // Expected: Exactly 3 rows (first 3 rows from the dataset)
+    // Actual:
+    // +---------+---------+
+    // | column1 | column2 |
+    // +---------+---------+
+    // | 1       | row_1   |
+    // | 2       | row_2   |
+    // | 3       | row_3   |
+    // +---------+---------+
+    run_example(
+        &ctx,
+        "Example 5: TABLESAMPLE with row count",
+        "SELECT * FROM sample_data TABLESAMPLE (3 ROWS)",
+    )
+    .await?;
+
+    // Example 6: TABLESAMPLE combined with WHERE clause filtering
+    // Shows: How sampling works with other query operations like filtering
+    // Expected: 3 rows where column1 > 2 (from the 5-row sample)
+    // Actual:
+    // +---------+---------+
+    // | column1 | column2 |
+    // +---------+---------+
+    // | 3       | row_3   |
+    // | 4       | row_4   |
+    // | 5       | row_5   |
+    // +---------+---------+
+    run_example(
+        &ctx,
+        "Example 6: TABLESAMPLE with WHERE clause",
+        r#"SELECT * FROM sample_data 
+           TABLESAMPLE (5 ROWS) 
+           WHERE column1 > 2"#,
+    )
+    .await?;
+
+    // Example 7: JOIN between two independently sampled tables
+    // Shows: How sampling works in complex queries with multiple table 
references
+    // Expected: Rows where both sampled tables have matching column1 values
+    // Actual:
+    // +---------+---------+---------+---------+
+    // | column1 | column1 | column2 | column2 |
+    // +---------+---------+---------+---------+
+    // | 2       | 2       | row_2   | row_2   |
+    // | 8       | 8       | row_8   | row_8   |
+    // | 10      | 10      | row_10  | row_10  |
+    // +---------+---------+---------+---------+
+    run_example(
+        &ctx,
+        "Example 7: JOIN between two different TABLESAMPLE tables",
+        r#"SELECT t1.column1, t2.column1, t1.column2, t2.column2 
+           FROM sample_data t1 TABLESAMPLE (0.7) 
+           JOIN sample_data t2 TABLESAMPLE (0.7) 
+           ON t1.column1 = t2.column1"#,
+    )
+    .await?;
+
+    Ok(())
+}
+
+/// Register sample data table for the examples
+fn register_sample_data(ctx: &SessionContext) -> Result<()> {
+    // Create sample_data table with 10 rows: column1 (1-10), column2 (row_1 
to row_10)
+    let column1: ArrayRef = 
Arc::new(Int32Array::from((1..=10).collect::<Vec<i32>>()));
+    let column2: ArrayRef = Arc::new(StringArray::from(
+        (1..=10)
+            .map(|i| format!("row_{i}"))
+            .collect::<Vec<String>>(),
+    ));
+    let batch =
+        RecordBatch::try_from_iter(vec![("column1", column1), ("column2", 
column2)])?;
+    ctx.register_batch("sample_data", batch)?;
+
+    Ok(())
+}
+
+async fn run_example(ctx: &SessionContext, title: &str, sql: &str) -> 
Result<()> {
+    println!("{title}:\n{sql}\n");
+    let df = ctx.sql(sql).await?;
+    println!("Logical Plan:\n{}\n", df.logical_plan().display_indent());
+    df.show().await?;
+    Ok(())
+}
+
+/// Hashable and comparable f64 for sampling bounds
+#[derive(Debug, Clone, Copy, PartialOrd)]
+struct Bound(f64);
+
+impl PartialEq for Bound {
+    fn eq(&self, other: &Self) -> bool {
+        (self.0 - other.0).abs() < f64::EPSILON
+    }
+}
+
+impl Eq for Bound {}
+
+impl Hash for Bound {
+    fn hash<H: Hasher>(&self, state: &mut H) {
+        // Hash the bits of the f64
+        self.0.to_bits().hash(state);
+    }
+}
+
+impl From<f64> for Bound {
+    fn from(value: f64) -> Self {
+        Self(value)
+    }
+}
+impl From<Bound> for f64 {
+    fn from(value: Bound) -> Self {
+        value.0
+    }
+}
+
+impl AsRef<f64> for Bound {
+    fn as_ref(&self) -> &f64 {
+        &self.0
+    }
+}
+
+#[derive(Debug, Clone, Hash, Eq, PartialEq, PartialOrd)]
+struct TableSamplePlanNode {
+    inner_plan: LogicalPlan,
+
+    lower_bound: Bound,
+    upper_bound: Bound,
+    with_replacement: bool,
+    seed: u64,
+}
+
+impl TableSamplePlanNode {
+    pub fn new(
+        input: LogicalPlan,
+        fraction: f64,
+        with_replacement: Option<bool>,
+        seed: Option<u64>,
+    ) -> Self {
+        TableSamplePlanNode {
+            inner_plan: input,
+            lower_bound: Bound::from(0.0),
+            upper_bound: Bound::from(fraction),
+            with_replacement: with_replacement.unwrap_or(false),
+            seed: seed.unwrap_or_else(rand::random),
+        }
+    }
+
+    pub fn into_plan(self) -> LogicalPlan {
+        LogicalPlan::Extension(Extension {
+            node: Arc::new(self),
+        })
+    }
+}
+
+impl UserDefinedLogicalNodeCore for TableSamplePlanNode {
+    fn name(&self) -> &str {
+        "TableSample"
+    }
+
+    fn inputs(&self) -> Vec<&LogicalPlan> {
+        vec![&self.inner_plan]
+    }
+
+    fn schema(&self) -> &DFSchemaRef {
+        self.inner_plan.schema()
+    }
+
+    fn expressions(&self) -> Vec<Expr> {
+        vec![]
+    }
+
+    fn fmt_for_explain(&self, f: &mut Formatter) -> fmt::Result {
+        f.write_fmt(format_args!(
+            "Sample: {:?} {:?} {:?}",
+            self.lower_bound, self.upper_bound, self.seed
+        ))
+    }
+
+    fn with_exprs_and_inputs(
+        &self,
+        _exprs: Vec<Expr>,
+        inputs: Vec<LogicalPlan>,
+    ) -> Result<Self> {
+        let input = inputs
+            .first()
+            .ok_or(DataFusionError::Plan("Should have input".into()))?;
+        Ok(Self {
+            inner_plan: input.clone(),
+            lower_bound: self.lower_bound,
+            upper_bound: self.upper_bound,
+            with_replacement: self.with_replacement,
+            seed: self.seed,
+        })
+    }
+}
+
+/// Execution planner with `SampleExec` for `TableSamplePlanNode`
+struct TableSampleExtensionPlanner {}
+
+impl TableSampleExtensionPlanner {
+    fn build_execution_plan(
+        &self,
+        specific_node: &TableSamplePlanNode,
+        physical_input: &Arc<dyn ExecutionPlan>,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        Ok(Arc::new(SampleExec {
+            input: Arc::clone(physical_input),
+            lower_bound: 0.0,
+            upper_bound: specific_node.upper_bound.into(),
+            with_replacement: specific_node.with_replacement,
+            seed: specific_node.seed,
+            metrics: Default::default(),
+            cache: SampleExec::compute_properties(physical_input),
+        }))
+    }
+}
+
+#[async_trait]
+impl ExtensionPlanner for TableSampleExtensionPlanner {
+    /// Create a physical plan for an extension node
+    async fn plan_extension(
+        &self,
+        _planner: &dyn PhysicalPlanner,
+        node: &dyn UserDefinedLogicalNode,
+        logical_inputs: &[&LogicalPlan],
+        physical_inputs: &[Arc<dyn ExecutionPlan>],
+        _session_state: &SessionState,
+    ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
+        if let Some(specific_node) = 
node.as_any().downcast_ref::<TableSamplePlanNode>() {
+            println!("Extension planner plan_extension: {:?}", 
&logical_inputs);
+            assert_eq!(logical_inputs.len(), 1, "Inconsistent number of 
inputs");
+            assert_eq!(physical_inputs.len(), 1, "Inconsistent number of 
inputs");
+
+            let exec_plan =
+                self.build_execution_plan(specific_node, &physical_inputs[0])?;
+            Ok(Some(exec_plan))
+        } else {
+            Ok(None)
+        }
+    }
+}
+
+/// Query planner supporting a `TableSampleExtensionPlanner`
+#[derive(Debug)]
+struct TableSampleQueryPlanner {}
+
+#[async_trait]
+impl QueryPlanner for TableSampleQueryPlanner {
+    /// Given a `LogicalPlan` created from above, create an
+    /// `ExecutionPlan` suitable for execution
+    async fn create_physical_plan(
+        &self,
+        logical_plan: &LogicalPlan,
+        session_state: &SessionState,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        // Additional extension for table sample node
+        let physical_planner =
+            DefaultPhysicalPlanner::with_extension_planners(vec![Arc::new(
+                TableSampleExtensionPlanner {},
+            )]);
+        // Delegate most work of physical planning to the default physical 
planner
+        physical_planner
+            .create_physical_plan(logical_plan, session_state)
+            .await
+    }
+}
+
+/// Physical plan implementation
+trait Sampler: Send + Sync {
+    fn sample(&mut self, batch: &RecordBatch) -> Result<RecordBatch>;
+}
+
+struct BernoulliSampler {
+    lower_bound: f64,
+    upper_bound: f64,
+    rng: StdRng,
+}
+
+impl BernoulliSampler {
+    fn new(lower_bound: f64, upper_bound: f64, seed: u64) -> Self {
+        Self {
+            lower_bound,
+            upper_bound,
+            rng: StdRng::seed_from_u64(seed),
+        }
+    }
+}
+
+impl Sampler for BernoulliSampler {
+    fn sample(&mut self, batch: &RecordBatch) -> Result<RecordBatch> {
+        if self.upper_bound <= self.lower_bound {
+            return Ok(RecordBatch::new_empty(batch.schema()));
+        }
+
+        let mut indices = Vec::new();
+
+        for i in 0..batch.num_rows() {
+            let rnd: f64 = self.rng.random();
+
+            if rnd >= self.lower_bound && rnd < self.upper_bound {
+                indices.push(i as u32);
+            }
+        }
+
+        if indices.is_empty() {
+            return Ok(RecordBatch::new_empty(batch.schema()));
+        }
+        let indices = UInt32Array::from(indices);
+        compute::take_record_batch(batch, &indices).map_err(|e| e.into())
+    }
+}
+
+struct PoissonSampler {
+    ratio: f64,
+    poisson: Poisson<f64>,
+    rng: StdRng,
+}
+
+impl PoissonSampler {
+    fn try_new(ratio: f64, seed: u64) -> Result<Self> {
+        let poisson = Poisson::new(ratio).map_err(|e| 
plan_datafusion_err!("{}", e))?;
+        Ok(Self {
+            ratio,
+            poisson,
+            rng: StdRng::seed_from_u64(seed),
+        })
+    }
+}
+
+impl Sampler for PoissonSampler {
+    fn sample(&mut self, batch: &RecordBatch) -> Result<RecordBatch> {
+        if self.ratio <= 0.0 {
+            return Ok(RecordBatch::new_empty(batch.schema()));
+        }
+
+        let mut indices = Vec::new();
+
+        for i in 0..batch.num_rows() {
+            let k = self.poisson.sample(&mut self.rng) as i32;
+            for _ in 0..k {
+                indices.push(i as u32);
+            }
+        }
+
+        if indices.is_empty() {
+            return Ok(RecordBatch::new_empty(batch.schema()));
+        }
+
+        let indices = UInt32Array::from(indices);
+        compute::take_record_batch(batch, &indices).map_err(|e| e.into())
+    }
+}
+
+/// SampleExec samples rows from its input based on a sampling method.
+/// This is used to implement SQL `SAMPLE` clause.
+#[derive(Debug, Clone)]
+pub struct SampleExec {
+    /// The input plan
+    input: Arc<dyn ExecutionPlan>,
+    /// The lower bound of the sampling ratio
+    lower_bound: f64,
+    /// The upper bound of the sampling ratio
+    upper_bound: f64,
+    /// Whether to sample with replacement
+    with_replacement: bool,
+    /// Random seed for reproducible sampling
+    seed: u64,
+    /// Execution metrics
+    metrics: ExecutionPlanMetricsSet,
+    /// Properties equivalence properties, partitioning, etc.
+    cache: PlanProperties,
+}
+
+impl SampleExec {
+    /// Create a new SampleExec with a custom sampling method
+    pub fn try_new(
+        input: Arc<dyn ExecutionPlan>,
+        lower_bound: f64,
+        upper_bound: f64,
+        with_replacement: bool,
+        seed: u64,
+    ) -> Result<Self> {
+        if lower_bound < 0.0 || upper_bound > 1.0 || lower_bound > upper_bound 
{
+            return internal_err!(
+                "Sampling bounds must be between 0.0 and 1.0, and lower_bound 
<= upper_bound, got [{}, {}]",
+                lower_bound, upper_bound
+            );
+        }
+
+        let cache = Self::compute_properties(&input);
+
+        Ok(Self {
+            input,
+            lower_bound,
+            upper_bound,
+            with_replacement,
+            seed,
+            metrics: ExecutionPlanMetricsSet::new(),
+            cache,
+        })
+    }
+
+    fn create_sampler(&self, partition: usize) -> Result<Box<dyn Sampler>> {
+        if self.with_replacement {
+            Ok(Box::new(PoissonSampler::try_new(
+                self.upper_bound - self.lower_bound,
+                self.seed + partition as u64,
+            )?))
+        } else {
+            Ok(Box::new(BernoulliSampler::new(
+                self.lower_bound,
+                self.upper_bound,
+                self.seed + partition as u64,
+            )))
+        }
+    }
+
+    /// Whether to sample with replacement
+    #[allow(dead_code)]

Review Comment:
   is there any reason to have this dead code?



##########
datafusion/expr/src/planner.rs:
##########
@@ -324,6 +334,85 @@ pub enum PlannerResult<T> {
     Original(T),
 }
 
+/// Result of planning a relation with [`RelationPlanner`]
+#[cfg(feature = "sql")]
+#[derive(Debug, Clone)]
+pub struct PlannedRelation {
+    /// The logical plan for the relation
+    pub plan: LogicalPlan,
+    /// Optional table alias for the relation
+    pub alias: Option<TableAlias>,
+}
+
+#[cfg(feature = "sql")]
+impl PlannedRelation {
+    /// Create a new `PlannedRelation` with the given plan and alias
+    pub fn new(plan: LogicalPlan, alias: Option<TableAlias>) -> Self {
+        Self { plan, alias }
+    }
+}
+
+/// Result of attempting to plan a relation with extension planners
+#[cfg(feature = "sql")]
+#[derive(Debug)]
+pub enum RelationPlanning {
+    /// The relation was successfully planned by an extension planner
+    Planned(PlannedRelation),
+    /// No extension planner handled the relation, return it for default 
processing
+    Original(TableFactor),
+}
+
+/// Customize planning SQL table factors to [`LogicalPlan`]s.
+#[cfg(feature = "sql")]
+pub trait RelationPlanner: Debug + Send + Sync {
+    /// Plan a table factor into a [`LogicalPlan`].
+    ///
+    /// Returning `Ok(RelationPlanning::Planned(planned_relation))` 
short-circuits further planning and uses the
+    /// provided plan. Returning `Ok(RelationPlanning::Original(relation))` 
allows the next registered planner,
+    /// or DataFusion's default logic, to handle the relation.
+    fn plan_relation(
+        &self,
+        relation: TableFactor,
+        context: &mut dyn RelationPlannerContext,
+    ) -> Result<RelationPlanning>;
+}
+
+/// Provides utilities for relation planners to interact with DataFusion's SQL
+/// planner.
+///
+/// This trait provides SQL planning utilities specific to relation planning,
+/// such as converting SQL expressions to logical expressions and normalizing
+/// identifiers. It uses composition to provide access to session context via

Review Comment:
   Could you be clearer about what "composition" means in this context? I am 
not sure what this means



##########
datafusion-examples/examples/relation_planner/table_sample.rs:
##########
@@ -0,0 +1,969 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! This example demonstrates using custom relation planners to implement
+//! SQL TABLESAMPLE clause support.
+//!
+//! TABLESAMPLE allows sampling a fraction or number of rows from a table:
+//!   - `SELECT * FROM table TABLESAMPLE BERNOULLI(10)` - 10% sample
+//!   - `SELECT * FROM table TABLESAMPLE (100 ROWS)` - 100 rows
+//!   - `SELECT * FROM table TABLESAMPLE (10 PERCENT) REPEATABLE(42)` - 
Reproducible
+
+use std::{
+    any::Any,
+    fmt::{self, Debug, Formatter},
+    hash::{Hash, Hasher},
+    ops::{Add, Div, Mul, Sub},
+    pin::Pin,
+    str::FromStr,
+    sync::Arc,
+    task::{Context, Poll},
+};
+
+use arrow::{
+    array::{ArrayRef, Int32Array, RecordBatch, StringArray, UInt32Array},
+    compute,
+};
+use arrow_schema::SchemaRef;
+use futures::{
+    ready,
+    stream::{Stream, StreamExt},
+};
+use rand::{rngs::StdRng, Rng, SeedableRng};
+use rand_distr::{Distribution, Poisson};
+use tonic::async_trait;
+
+use datafusion::{
+    execution::{
+        context::QueryPlanner, RecordBatchStream, SendableRecordBatchStream,
+        SessionState, SessionStateBuilder, TaskContext,
+    },
+    physical_expr::EquivalenceProperties,
+    physical_plan::{
+        metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet, 
RecordOutput},
+        DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties,
+    },
+    physical_planner::{DefaultPhysicalPlanner, ExtensionPlanner, 
PhysicalPlanner},
+    prelude::*,
+};
+use datafusion_common::{
+    internal_err, not_impl_err, plan_datafusion_err, plan_err, DFSchemaRef,
+    DataFusionError, Result, Statistics,
+};
+use datafusion_expr::{
+    logical_plan::{Extension, LogicalPlan, LogicalPlanBuilder},
+    planner::{
+        PlannedRelation, RelationPlanner, RelationPlannerContext, 
RelationPlanning,
+    },
+    UserDefinedLogicalNode, UserDefinedLogicalNodeCore,
+};
+use datafusion_sql::sqlparser::ast::{
+    self, TableFactor, TableSampleMethod, TableSampleUnit,
+};
+
+/// This example demonstrates using custom relation planners to implement
+/// SQL TABLESAMPLE clause support.
+pub async fn table_sample() -> Result<()> {
+    let state = SessionStateBuilder::new()
+        .with_default_features()
+        .with_query_planner(Arc::new(TableSampleQueryPlanner {}))
+        .build();
+
+    let ctx = SessionContext::new_with_state(state.clone());
+
+    // Register sample data table
+    register_sample_data(&ctx)?;
+
+    // Register custom planner
+    ctx.register_relation_planner(Arc::new(TableSamplePlanner))?;
+
+    println!("Custom Relation Planner: TABLESAMPLE Support");
+    println!("============================================\n");
+    println!("Note: This shows logical planning for TABLESAMPLE.");
+    println!("Physical execution requires additional implementation.\n");
+
+    // Example 1: Full table without any sampling (baseline)
+    // Shows: Complete dataset with all 10 rows (1-10 with row_1 to row_10)
+    // Expected: 10 rows showing the full sample_data table
+    // Actual:
+    // +---------+---------+
+    // | column1 | column2 |
+    // +---------+---------+
+    // | 1       | row_1   |
+    // | 2       | row_2   |
+    // | 3       | row_3   |
+    // | 4       | row_4   |
+    // | 5       | row_5   |
+    // | 6       | row_6   |
+    // | 7       | row_7   |
+    // | 8       | row_8   |
+    // | 9       | row_9   |
+    // | 10      | row_10  |
+    // +---------+---------+
+    run_example(
+        &ctx,
+        "Example 1: Full table (no sampling)",
+        "SELECT * FROM sample_data",
+    )
+    .await?;
+
+    // Example 2: TABLESAMPLE with BERNOULLI sampling at 30% probability
+    // Shows: Random sampling where each row has 30% chance of being selected
+    // Expected: ~3 rows (varies due to randomness) from the 10-row dataset
+    // Actual:
+    // +---------+---------+
+    // | column1 | column2 |
+    // +---------+---------+
+    // | 4       | row_4   |
+    // | 6       | row_6   |
+    // | 9       | row_9   |
+    // +---------+---------+
+    run_example(
+        &ctx,
+        "Example 2: TABLESAMPLE with percentage",
+        "SELECT * FROM sample_data TABLESAMPLE BERNOULLI(30 PERCENT)",
+    )
+    .await?;
+
+    // Example 3: TABLESAMPLE with fractional sampling (50% of data)
+    // Shows: Random sampling using decimal fraction instead of percentage
+    // Expected: ~5 rows (varies due to randomness) from the 10-row dataset
+    // Actual:
+    // +---------+---------+
+    // | column1 | column2 |
+    // +---------+---------+
+    // | 3       | row_3   |
+    // | 4       | row_4   |
+    // | 5       | row_5   |
+    // +---------+---------+
+    run_example(
+        &ctx,
+        "Example 3: TABLESAMPLE with fraction",
+        "SELECT * FROM sample_data TABLESAMPLE (0.5)",
+    )
+    .await?;
+
+    // Example 4: TABLESAMPLE with REPEATABLE seed for reproducible results
+    // Shows: Deterministic sampling using a fixed seed for consistent results
+    // Expected: Same rows selected each time due to fixed seed (42)
+    // Actual:
+    // +---------+---------+
+    // | column1 | column2 |
+    // +---------+---------+
+    // | 5       | row_5   |
+    // | 9       | row_9   |
+    // | 10      | row_10  |
+    // +---------+---------+
+    run_example(
+        &ctx,
+        "Example 4: TABLESAMPLE with REPEATABLE seed",
+        "SELECT * FROM sample_data TABLESAMPLE (0.3) REPEATABLE(42)",
+    )
+    .await?;
+
+    // Example 5: TABLESAMPLE with exact row count limit
+    // Shows: Sampling by limiting to a specific number of rows (not 
probabilistic)
+    // Expected: Exactly 3 rows (first 3 rows from the dataset)
+    // Actual:
+    // +---------+---------+
+    // | column1 | column2 |
+    // +---------+---------+
+    // | 1       | row_1   |
+    // | 2       | row_2   |
+    // | 3       | row_3   |
+    // +---------+---------+
+    run_example(
+        &ctx,
+        "Example 5: TABLESAMPLE with row count",
+        "SELECT * FROM sample_data TABLESAMPLE (3 ROWS)",
+    )
+    .await?;
+
+    // Example 6: TABLESAMPLE combined with WHERE clause filtering
+    // Shows: How sampling works with other query operations like filtering
+    // Expected: 3 rows where column1 > 2 (from the 5-row sample)
+    // Actual:
+    // +---------+---------+
+    // | column1 | column2 |
+    // +---------+---------+
+    // | 3       | row_3   |
+    // | 4       | row_4   |
+    // | 5       | row_5   |
+    // +---------+---------+
+    run_example(
+        &ctx,
+        "Example 6: TABLESAMPLE with WHERE clause",
+        r#"SELECT * FROM sample_data 
+           TABLESAMPLE (5 ROWS) 
+           WHERE column1 > 2"#,
+    )
+    .await?;
+
+    // Example 7: JOIN between two independently sampled tables
+    // Shows: How sampling works in complex queries with multiple table 
references
+    // Expected: Rows where both sampled tables have matching column1 values
+    // Actual:
+    // +---------+---------+---------+---------+
+    // | column1 | column1 | column2 | column2 |
+    // +---------+---------+---------+---------+
+    // | 2       | 2       | row_2   | row_2   |
+    // | 8       | 8       | row_8   | row_8   |
+    // | 10      | 10      | row_10  | row_10  |
+    // +---------+---------+---------+---------+
+    run_example(
+        &ctx,
+        "Example 7: JOIN between two different TABLESAMPLE tables",
+        r#"SELECT t1.column1, t2.column1, t1.column2, t2.column2 
+           FROM sample_data t1 TABLESAMPLE (0.7) 
+           JOIN sample_data t2 TABLESAMPLE (0.7) 
+           ON t1.column1 = t2.column1"#,
+    )
+    .await?;
+
+    Ok(())
+}
+
+/// Register sample data table for the examples
+fn register_sample_data(ctx: &SessionContext) -> Result<()> {
+    // Create sample_data table with 10 rows: column1 (1-10), column2 (row_1 
to row_10)
+    let column1: ArrayRef = 
Arc::new(Int32Array::from((1..=10).collect::<Vec<i32>>()));
+    let column2: ArrayRef = Arc::new(StringArray::from(
+        (1..=10)
+            .map(|i| format!("row_{i}"))
+            .collect::<Vec<String>>(),
+    ));
+    let batch =
+        RecordBatch::try_from_iter(vec![("column1", column1), ("column2", 
column2)])?;
+    ctx.register_batch("sample_data", batch)?;
+
+    Ok(())
+}
+
+async fn run_example(ctx: &SessionContext, title: &str, sql: &str) -> 
Result<()> {
+    println!("{title}:\n{sql}\n");
+    let df = ctx.sql(sql).await?;
+    println!("Logical Plan:\n{}\n", df.logical_plan().display_indent());
+    df.show().await?;
+    Ok(())
+}
+
+/// Hashable and comparable f64 for sampling bounds
+#[derive(Debug, Clone, Copy, PartialOrd)]
+struct Bound(f64);
+
+impl PartialEq for Bound {
+    fn eq(&self, other: &Self) -> bool {
+        (self.0 - other.0).abs() < f64::EPSILON
+    }
+}
+
+impl Eq for Bound {}
+
+impl Hash for Bound {
+    fn hash<H: Hasher>(&self, state: &mut H) {
+        // Hash the bits of the f64
+        self.0.to_bits().hash(state);
+    }
+}
+
+impl From<f64> for Bound {
+    fn from(value: f64) -> Self {
+        Self(value)
+    }
+}
+impl From<Bound> for f64 {
+    fn from(value: Bound) -> Self {
+        value.0
+    }
+}
+
+impl AsRef<f64> for Bound {
+    fn as_ref(&self) -> &f64 {
+        &self.0
+    }
+}
+
+#[derive(Debug, Clone, Hash, Eq, PartialEq, PartialOrd)]
+struct TableSamplePlanNode {
+    inner_plan: LogicalPlan,
+
+    lower_bound: Bound,
+    upper_bound: Bound,
+    with_replacement: bool,
+    seed: u64,
+}
+
+impl TableSamplePlanNode {
+    pub fn new(
+        input: LogicalPlan,
+        fraction: f64,
+        with_replacement: Option<bool>,
+        seed: Option<u64>,
+    ) -> Self {
+        TableSamplePlanNode {
+            inner_plan: input,
+            lower_bound: Bound::from(0.0),
+            upper_bound: Bound::from(fraction),
+            with_replacement: with_replacement.unwrap_or(false),
+            seed: seed.unwrap_or_else(rand::random),
+        }
+    }
+
+    pub fn into_plan(self) -> LogicalPlan {
+        LogicalPlan::Extension(Extension {
+            node: Arc::new(self),
+        })
+    }
+}
+
+impl UserDefinedLogicalNodeCore for TableSamplePlanNode {
+    fn name(&self) -> &str {
+        "TableSample"
+    }
+
+    fn inputs(&self) -> Vec<&LogicalPlan> {
+        vec![&self.inner_plan]
+    }
+
+    fn schema(&self) -> &DFSchemaRef {
+        self.inner_plan.schema()
+    }
+
+    fn expressions(&self) -> Vec<Expr> {
+        vec![]
+    }
+
+    fn fmt_for_explain(&self, f: &mut Formatter) -> fmt::Result {
+        f.write_fmt(format_args!(
+            "Sample: {:?} {:?} {:?}",
+            self.lower_bound, self.upper_bound, self.seed
+        ))
+    }
+
+    fn with_exprs_and_inputs(
+        &self,
+        _exprs: Vec<Expr>,
+        inputs: Vec<LogicalPlan>,
+    ) -> Result<Self> {
+        let input = inputs
+            .first()
+            .ok_or(DataFusionError::Plan("Should have input".into()))?;
+        Ok(Self {
+            inner_plan: input.clone(),
+            lower_bound: self.lower_bound,
+            upper_bound: self.upper_bound,
+            with_replacement: self.with_replacement,
+            seed: self.seed,
+        })
+    }
+}
+
+/// Execution planner with `SampleExec` for `TableSamplePlanNode`
+struct TableSampleExtensionPlanner {}
+
+impl TableSampleExtensionPlanner {
+    fn build_execution_plan(
+        &self,
+        specific_node: &TableSamplePlanNode,
+        physical_input: &Arc<dyn ExecutionPlan>,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        Ok(Arc::new(SampleExec {
+            input: Arc::clone(physical_input),
+            lower_bound: 0.0,
+            upper_bound: specific_node.upper_bound.into(),
+            with_replacement: specific_node.with_replacement,
+            seed: specific_node.seed,
+            metrics: Default::default(),
+            cache: SampleExec::compute_properties(physical_input),
+        }))
+    }
+}
+
+#[async_trait]
+impl ExtensionPlanner for TableSampleExtensionPlanner {
+    /// Create a physical plan for an extension node
+    async fn plan_extension(
+        &self,
+        _planner: &dyn PhysicalPlanner,
+        node: &dyn UserDefinedLogicalNode,
+        logical_inputs: &[&LogicalPlan],
+        physical_inputs: &[Arc<dyn ExecutionPlan>],
+        _session_state: &SessionState,
+    ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
+        if let Some(specific_node) = 
node.as_any().downcast_ref::<TableSamplePlanNode>() {
+            println!("Extension planner plan_extension: {:?}", 
&logical_inputs);
+            assert_eq!(logical_inputs.len(), 1, "Inconsistent number of 
inputs");
+            assert_eq!(physical_inputs.len(), 1, "Inconsistent number of 
inputs");
+
+            let exec_plan =
+                self.build_execution_plan(specific_node, &physical_inputs[0])?;
+            Ok(Some(exec_plan))
+        } else {
+            Ok(None)
+        }
+    }
+}
+
+/// Query planner supporting a `TableSampleExtensionPlanner`
+#[derive(Debug)]
+struct TableSampleQueryPlanner {}

Review Comment:
   Not anything caused by this PR, but the amount of ceremony required for a 
custom logical node is somewhat embarrassing 
   
   It seems like we could just register extension planners and skip this 
QueryPlanner stage 🤔 
   
   (no change required / suggested for this PR)



##########
datafusion-examples/examples/relation_planner/pivot_unpivot.rs:
##########
@@ -0,0 +1,543 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! This example demonstrates using custom relation planners to implement
+//! PIVOT and UNPIVOT operations for reshaping data.
+//!
+//! PIVOT transforms rows into columns (wide format), while UNPIVOT does the
+//! reverse, transforming columns into rows (long format). This example shows
+//! how to use custom planners to implement these SQL clauses by rewriting them
+//! into equivalent standard SQL operations:
+//!
+//! - PIVOT is rewritten to GROUP BY with CASE expressions
+//! - UNPIVOT is rewritten to UNION ALL of projections
+
+use std::sync::Arc;
+
+use arrow::array::{ArrayRef, Int64Array, StringArray};
+use arrow::record_batch::RecordBatch;
+use datafusion::prelude::*;
+use datafusion_common::{DataFusionError, Result, ScalarValue};
+use datafusion_expr::{
+    case, lit,
+    logical_plan::builder::LogicalPlanBuilder,
+    planner::{
+        PlannedRelation, RelationPlanner, RelationPlannerContext, 
RelationPlanning,
+    },
+    Expr,
+};
+use datafusion_sql::sqlparser::ast::TableFactor;
+
+/// This example demonstrates using custom relation planners to implement
+/// PIVOT and UNPIVOT operations for reshaping data.
+pub async fn pivot_unpivot() -> Result<()> {
+    let ctx = SessionContext::new();
+
+    // Register sample data tables
+    register_sample_data(&ctx)?;
+
+    // Register custom planner
+    ctx.register_relation_planner(Arc::new(PivotUnpivotPlanner))?;
+
+    println!("Custom Relation Planner: PIVOT and UNPIVOT Operations");
+    println!("======================================================\n");
+
+    // Example 1: Basic PIVOT to transform monthly sales data from rows to 
columns
+    // Shows: How to pivot sales data so each quarter becomes a column
+    // The PIVOT is rewritten to: SELECT region, SUM(CASE WHEN quarter = 'Q1' 
THEN amount END) as Q1,
+    //                             SUM(CASE WHEN quarter = 'Q2' THEN amount 
END) as Q2
+    //                             FROM quarterly_sales GROUP BY region
+    // Expected Output:
+    // +--------+------+------+
+    // | region | Q1   | Q2   |
+    // +--------+------+------+
+    // | North  | 1000 | 1500 |
+    // | South  | 1200 | 1300 |
+    // +--------+------+------+
+    run_example(
+        &ctx,
+        "Example 1: Basic PIVOT - Transform quarters from rows to columns",
+        r#"SELECT * FROM quarterly_sales
+           PIVOT (
+             SUM(amount)
+             FOR quarter IN ('Q1', 'Q2')
+           ) AS pivoted"#,
+    )
+    .await?;
+
+    // Example 2: PIVOT with multiple aggregate functions
+    // Shows: How to apply multiple aggregations (SUM and AVG) during pivot
+    // Expected: Logical plan showing MiniPivot with both SUM and AVG 
aggregates
+    // Actual (Logical Plan):
+    // Projection: pivoted.region, pivoted.Q1, pivoted.Q2
+    //   SubqueryAlias: pivoted
+    //     MiniPivot aggregate=[SUM(amount), AVG(amount)] 
value_column=[quarter] values=["Q1", "Q2"]
+    //       Values: (Utf8("North"), Utf8("Q1"), Int64(1000)), (Utf8("North"), 
Utf8("Q2"), Int64(1500)), ...
+    run_example(
+        &ctx,
+        "Example 2: PIVOT with multiple aggregates (SUM and AVG)",
+        r#"SELECT * FROM quarterly_sales
+           PIVOT (
+             SUM(amount), AVG(amount)
+             FOR quarter IN ('Q1', 'Q2')
+           ) AS pivoted"#,
+    )
+    .await?;
+
+    // Example 3: PIVOT with additional grouping columns
+    // Shows: How pivot works when there are multiple non-pivot columns
+    // The region and product both appear in GROUP BY
+    // Expected Output:
+    // +--------+-----------+------+------+
+    // | region | product   | Q1   | Q2   |
+    // +--------+-----------+------+------+
+    // | North  | ProductA  | 500  |      |
+    // | North  | ProductB  | 500  |      |
+    // | South  | ProductA  |      | 650  |
+    // +--------+-----------+------+------+
+    run_example(
+        &ctx,
+        "Example 3: PIVOT with multiple grouping columns",
+        r#"SELECT * FROM product_sales
+           PIVOT (
+             SUM(amount)
+             FOR quarter IN ('Q1', 'Q2')
+           ) AS pivoted"#,
+    )
+    .await?;
+
+    // Example 4: Basic UNPIVOT to transform columns back into rows

Review Comment:
   Per your other comments I am not sure how important it is to show all these 
corner cases working, as I don't think they add much to the example, but they 
do help to give confidence to anyone who might copy/paste this code into their 
system



##########
datafusion/core/tests/user_defined/relation_planner.rs:
##########
@@ -0,0 +1,513 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Tests for the RelationPlanner extension point
+
+use std::sync::Arc;
+
+use arrow::array::{Int64Array, RecordBatch, StringArray};
+use arrow::datatypes::{DataType, Field, Schema};
+use datafusion::catalog::memory::MemTable;
+use datafusion::common::test_util::batches_to_string;
+use datafusion::prelude::*;
+use datafusion_common::{Result, ScalarValue};
+use datafusion_expr::logical_plan::builder::LogicalPlanBuilder;
+use datafusion_expr::planner::{
+    PlannedRelation, RelationPlanner, RelationPlannerContext, RelationPlanning,
+};
+use datafusion_expr::Expr;
+use datafusion_sql::sqlparser::ast::TableFactor;
+use insta::assert_snapshot;
+
+// ============================================================================
+// Test Planners - Example Implementations
+// ============================================================================
+
+// The planners in this section are deliberately minimal, static examples used

Review Comment:
   It might be worth linking to the examples as well



##########
datafusion/core/src/execution/session_state.rs:
##########
@@ -139,6 +139,8 @@ pub struct SessionState {
     analyzer: Analyzer,
     /// Provides support for customizing the SQL planner, e.g. to add support 
for custom operators like `->>` or `?`
     expr_planners: Vec<Arc<dyn ExprPlanner>>,
+    #[cfg(feature = "sql")]

Review Comment:
   I like how this follows the same model as expr planners and type planners
   
   I do think it might be worth considering (as a follow on PR) moving all the 
SQL related extensions into their own struct (as we now have type_planners, 
expr_planners and relation_planners)



##########
datafusion/sql/src/relation/mod.rs:
##########
@@ -24,19 +24,119 @@ use datafusion_common::{
     not_impl_err, plan_err, DFSchema, Diagnostic, Result, Span, Spans, 
TableReference,
 };
 use datafusion_expr::builder::subquery_alias;
+use datafusion_expr::planner::{
+    PlannedRelation, RelationPlannerContext, RelationPlanning,
+};
 use datafusion_expr::{expr::Unnest, Expr, LogicalPlan, LogicalPlanBuilder};
 use datafusion_expr::{Subquery, SubqueryAlias};
 use sqlparser::ast::{FunctionArg, FunctionArgExpr, Spanned, TableFactor};
 
 mod join;
 
+struct SqlToRelRelationContext<'a, 'b, S: ContextProvider> {

Review Comment:
   > I’ve been thinking about this. In addition to providing a code example, it 
would be valuable to add documentation for this extension if you want broader 
adoption.
   
   I agree this is a very nice framework and documenting it more is a good idea 
(I'll leave notes in the review)



##########
datafusion/core/tests/user_defined/relation_planner.rs:
##########
@@ -0,0 +1,513 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Tests for the RelationPlanner extension point
+
+use std::sync::Arc;
+
+use arrow::array::{Int64Array, RecordBatch, StringArray};
+use arrow::datatypes::{DataType, Field, Schema};
+use datafusion::catalog::memory::MemTable;
+use datafusion::common::test_util::batches_to_string;
+use datafusion::prelude::*;
+use datafusion_common::{Result, ScalarValue};
+use datafusion_expr::logical_plan::builder::LogicalPlanBuilder;
+use datafusion_expr::planner::{
+    PlannedRelation, RelationPlanner, RelationPlannerContext, RelationPlanning,
+};
+use datafusion_expr::Expr;
+use datafusion_sql::sqlparser::ast::TableFactor;
+use insta::assert_snapshot;
+
+// ============================================================================
+// Test Planners - Example Implementations
+// ============================================================================
+
+// The planners in this section are deliberately minimal, static examples used
+// only for tests. In real applications a `RelationPlanner` would typically
+// construct richer logical plans tailored to external systems or custom
+// semantics rather than hard-coded in-memory tables.
+
+/// Helper to build simple static values-backed virtual tables used by the
+/// example planners below.
+fn plan_static_values_table(
+    relation: TableFactor,
+    table_name: &str,
+    column_name: &str,
+    values: Vec<ScalarValue>,
+) -> Result<RelationPlanning> {
+    match relation {
+        TableFactor::Table { name, alias, .. }
+            if name.to_string().eq_ignore_ascii_case(table_name) =>
+        {
+            let rows = values
+                .into_iter()
+                .map(|v| vec![Expr::Literal(v, None)])
+                .collect::<Vec<_>>();
+
+            let plan = LogicalPlanBuilder::values(rows)?
+                .project(vec![col("column1").alias(column_name)])?
+                .build()?;
+
+            Ok(RelationPlanning::Planned(PlannedRelation::new(plan, alias)))
+        }
+        other => Ok(RelationPlanning::Original(other)),
+    }
+}
+
+/// Example planner that provides a virtual `numbers` table with values
+/// 1, 2, 3.
+#[derive(Debug)]
+struct NumbersPlanner;
+
+impl RelationPlanner for NumbersPlanner {
+    fn plan_relation(
+        &self,
+        relation: TableFactor,
+        _context: &mut dyn RelationPlannerContext,
+    ) -> Result<RelationPlanning> {
+        plan_static_values_table(
+            relation,
+            "numbers",
+            "number",
+            vec![
+                ScalarValue::Int64(Some(1)),
+                ScalarValue::Int64(Some(2)),
+                ScalarValue::Int64(Some(3)),
+            ],
+        )
+    }
+}
+
+/// Example planner that provides a virtual `colors` table with three string
+/// values: `red`, `green`, `blue`.
+#[derive(Debug)]
+struct ColorsPlanner;
+
+impl RelationPlanner for ColorsPlanner {
+    fn plan_relation(
+        &self,
+        relation: TableFactor,
+        _context: &mut dyn RelationPlannerContext,
+    ) -> Result<RelationPlanning> {
+        plan_static_values_table(
+            relation,
+            "colors",
+            "color",
+            vec![
+                ScalarValue::Utf8(Some("red".into())),
+                ScalarValue::Utf8(Some("green".into())),
+                ScalarValue::Utf8(Some("blue".into())),
+            ],
+        )
+    }
+}
+
+/// Alternative implementation of `numbers` (returns 100, 200) used to
+/// demonstrate planner precedence (last registered planner wins).
+#[derive(Debug)]
+struct AlternativeNumbersPlanner;
+
+impl RelationPlanner for AlternativeNumbersPlanner {
+    fn plan_relation(
+        &self,
+        relation: TableFactor,
+        _context: &mut dyn RelationPlannerContext,
+    ) -> Result<RelationPlanning> {
+        plan_static_values_table(
+            relation,
+            "numbers",
+            "number",
+            vec![ScalarValue::Int64(Some(100)), ScalarValue::Int64(Some(200))],
+        )
+    }
+}
+
+/// Example planner that intercepts nested joins and samples both sides (limit 
2)
+/// before joining, demonstrating recursive planning with `context.plan()`.
+#[derive(Debug)]
+struct SamplingJoinPlanner;
+
+impl RelationPlanner for SamplingJoinPlanner {
+    fn plan_relation(
+        &self,
+        relation: TableFactor,
+        context: &mut dyn RelationPlannerContext,
+    ) -> Result<RelationPlanning> {
+        match relation {
+            TableFactor::NestedJoin {
+                table_with_joins,
+                alias,
+                ..
+            } if table_with_joins.joins.len() == 1 => {
+                // Use context.plan() to recursively plan both sides
+                // This ensures other planners (like NumbersPlanner) can 
handle them
+                let left = context.plan(table_with_joins.relation.clone())?;
+                let right = 
context.plan(table_with_joins.joins[0].relation.clone())?;
+
+                // Sample each table to 2 rows
+                let left_sampled =
+                    LogicalPlanBuilder::from(left).limit(0, Some(2))?.build()?;
+
+                let right_sampled =
+                    LogicalPlanBuilder::from(right).limit(0, 
Some(2))?.build()?;
+
+                // Cross join: 2 rows × 2 rows = 4 rows (instead of 3×3=9 
without sampling)
+                let plan = LogicalPlanBuilder::from(left_sampled)
+                    .cross_join(right_sampled)?
+                    .build()?;
+
+                Ok(RelationPlanning::Planned(PlannedRelation::new(plan, 
alias)))
+            }
+            other => Ok(RelationPlanning::Original(other)),
+        }
+    }
+}
+
+/// Example planner that never handles any relation and always delegates by
+/// returning `RelationPlanning::Original`.
+#[derive(Debug)]
+struct PassThroughPlanner;
+
+impl RelationPlanner for PassThroughPlanner {
+    fn plan_relation(
+        &self,
+        relation: TableFactor,
+        _context: &mut dyn RelationPlannerContext,
+    ) -> Result<RelationPlanning> {
+        // Never handles anything - always delegates
+        Ok(RelationPlanning::Original(relation))
+    }
+}
+
+/// Example planner that shows how planners can block specific constructs and
+/// surface custom error messages by rejecting `UNNEST` relations (here framed
+/// as a mock premium feature check).
+#[derive(Debug)]
+struct PremiumFeaturePlanner;
+
+impl RelationPlanner for PremiumFeaturePlanner {
+    fn plan_relation(
+        &self,
+        relation: TableFactor,
+        _context: &mut dyn RelationPlannerContext,
+    ) -> Result<RelationPlanning> {
+        match relation {
+            TableFactor::UNNEST { .. } => 
Err(datafusion_common::DataFusionError::Plan(
+                "UNNEST is a premium feature! Please upgrade to DataFusion 
Pro™ \
+                     to unlock advanced array operations."
+                    .to_string(),
+            )),
+            other => Ok(RelationPlanning::Original(other)),
+        }
+    }
+}
+
+// ============================================================================
+// Test Helpers - SQL Execution
+// ============================================================================
+
+/// Execute SQL and return results with better error messages.
+async fn execute_sql(ctx: &SessionContext, sql: &str) -> 
Result<Vec<RecordBatch>> {
+    let df = ctx.sql(sql).await?;
+    df.collect().await
+}
+
+/// Execute SQL and convert to string format for snapshot comparison.
+async fn execute_sql_to_string(ctx: &SessionContext, sql: &str) -> String {
+    let batches = execute_sql(ctx, sql)
+        .await
+        .expect("SQL execution should succeed");
+    batches_to_string(&batches)
+}
+
+// ============================================================================
+// Test Helpers - Context Builders
+// ============================================================================
+
+/// Create a SessionContext with a catalog table containing Int64 and Utf8 
columns.
+///
+/// Creates a table with the specified name and sample data for 
fallback/integration tests.
+fn create_context_with_catalog_table(
+    table_name: &str,
+    id_values: Vec<i64>,
+    name_values: Vec<&str>,
+) -> SessionContext {
+    let ctx = SessionContext::new();
+
+    let schema = Arc::new(Schema::new(vec![
+        Field::new("id", DataType::Int64, false),
+        Field::new("name", DataType::Utf8, false),
+    ]));
+
+    let batch = RecordBatch::try_new(
+        schema.clone(),
+        vec![
+            Arc::new(Int64Array::from(id_values)),
+            Arc::new(StringArray::from(name_values)),
+        ],
+    )
+    .unwrap();
+
+    let table = MemTable::try_new(schema, vec![vec![batch]]).unwrap();
+    ctx.register_table(table_name, Arc::new(table)).unwrap();
+
+    ctx
+}
+
+/// Create a SessionContext with a simple single-column Int64 table.
+///
+/// Useful for basic tests that need a real catalog table.
+fn create_context_with_simple_table(
+    table_name: &str,
+    values: Vec<i64>,
+) -> SessionContext {
+    let ctx = SessionContext::new();
+
+    let schema = Arc::new(Schema::new(vec![Field::new(
+        "value",
+        DataType::Int64,
+        true,
+    )]));
+
+    let batch =
+        RecordBatch::try_new(schema.clone(), 
vec![Arc::new(Int64Array::from(values))])
+            .unwrap();
+
+    let table = MemTable::try_new(schema, vec![vec![batch]]).unwrap();
+    ctx.register_table(table_name, Arc::new(table)).unwrap();
+
+    ctx
+}
+
+// ============================================================================
+// TESTS: Ordered from Basic to Complex
+// ============================================================================
+
+/// Comprehensive test suite for RelationPlanner extension point.
+/// Tests are ordered from simplest smoke test to most complex scenarios.
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    /// Small extension trait to make test setup read fluently.
+    trait TestSessionExt {
+        fn with_planner<P: RelationPlanner + 'static>(self, planner: P) -> 
Self;
+    }
+
+    impl TestSessionExt for SessionContext {
+        fn with_planner<P: RelationPlanner + 'static>(self, planner: P) -> 
Self {
+            self.register_relation_planner(Arc::new(planner)).unwrap();
+            self
+        }
+    }
+
+    /// Session context with only the `NumbersPlanner` registered.
+    fn ctx_with_numbers() -> SessionContext {
+        SessionContext::new().with_planner(NumbersPlanner)
+    }
+
+    /// Session context with virtual tables (`numbers`, `colors`) and the
+    /// `SamplingJoinPlanner` registered for nested joins.
+    fn ctx_with_virtual_tables_and_sampling() -> SessionContext {
+        SessionContext::new()
+            .with_planner(NumbersPlanner)
+            .with_planner(ColorsPlanner)
+            .with_planner(SamplingJoinPlanner)
+    }
+
+    // Basic smoke test: virtual table can be queried like a regular table.
+    #[tokio::test]
+    async fn virtual_table_basic_select() {
+        let ctx = ctx_with_numbers();
+
+        let result = execute_sql_to_string(&ctx, "SELECT * FROM 
numbers").await;
+
+        assert_snapshot!(result, @r"
+        +--------+
+        | number |
+        +--------+
+        | 1      |
+        | 2      |
+        | 3      |
+        +--------+");
+    }
+
+    // Virtual table supports standard SQL operations (projection, filter, 
aggregation).

Review Comment:
   🤯 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to