alamb commented on code in PR #21398: URL: https://github.com/apache/datafusion/pull/21398#discussion_r3060564753
########## docs/source/library-user-guide/custom-table-providers.md: ########## @@ -19,568 +19,938 @@ # Custom Table Provider -Like other areas of DataFusion, you extend DataFusion's functionality by implementing a trait. The [`TableProvider`] and associated traits allow you to implement a custom table provider, i.e. use DataFusion's other functionality with your custom data source. - -This section describes how to create a [`TableProvider`] and how to configure DataFusion to use it for reading. +One of DataFusion's greatest strengths is its extensibility. If your data lives +in a custom format, behind an API, or in a system that DataFusion does not +natively support, you can teach DataFusion to read it by implementing a +**custom table provider**. This post walks through the three layers you need to +understand to design a table provider and where planning and execution work should happen. For details on how table constraints such as primary keys or unique constraints are handled, see [Table Constraint Enforcement](table-constraints.md). -## Table Provider and Scan - -The [`TableProvider::scan`] method reads data from the table and is likely the most important. It returns an [`ExecutionPlan`] that DataFusion will use to read the actual data during execution of the query. The [`TableProvider::insert_into`] method is used to `INSERT` data into the table. - -### Scan - -As mentioned, [`TableProvider::scan`] returns an execution plan, and in particular a `Result<Arc<dyn ExecutionPlan>>`. The core of this is returning something that can be dynamically dispatched to an `ExecutionPlan`. And as per the general DataFusion idea, we'll need to implement it. - -[`tableprovider`]: https://docs.rs/datafusion/latest/datafusion/datasource/trait.TableProvider.html -[`tableprovider::scan`]: https://docs.rs/datafusion/latest/datafusion/datasource/trait.TableProvider.html#tymethod.scan -[`tableprovider::insert_into`]: https://docs.rs/datafusion/latest/datafusion/datasource/trait.TableProvider.html#tymethod.insert_into -[`executionplan`]: https://docs.rs/datafusion/latest/datafusion/physical_plan/trait.ExecutionPlan.html - -#### Execution Plan - -The `ExecutionPlan` trait at its core is a way to get a stream of batches. The aptly-named `execute` method returns a `Result<SendableRecordBatchStream>`, which should be a stream of `RecordBatch`es that can be sent across threads, and has a schema that matches the data to be contained in those batches. - -There are many different types of `SendableRecordBatchStream` implemented in DataFusion -- you can use a pre existing one, such as `MemoryStream` (if your `RecordBatch`es are all in memory) or implement your own custom logic, depending on your usecase. - -Looking at the full example below: - -```rust -use std::any::Any; -use std::sync::{Arc, Mutex}; -use std::collections::{BTreeMap, HashMap}; -use datafusion::common::Result; -use datafusion::common::tree_node::TreeNodeRecursion; -use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef}; -use datafusion::physical_plan::expressions::PhysicalSortExpr; -use datafusion::physical_plan::{ - ExecutionPlan, SendableRecordBatchStream, DisplayAs, DisplayFormatType, - Statistics, PlanProperties, PhysicalExpr -}; -use datafusion::execution::context::TaskContext; -use datafusion::arrow::array::{UInt64Builder, UInt8Builder}; -use datafusion::physical_plan::memory::MemoryStream; -use datafusion::arrow::record_batch::RecordBatch; - -/// A User, with an id and a bank account -#[derive(Clone, Debug)] -struct User { - id: u8, - bank_account: u64, -} - -/// A custom datasource, used to represent a datastore with a single index -#[derive(Clone, Debug)] -pub struct CustomDataSource { - inner: Arc<Mutex<CustomDataSourceInner>>, -} - -#[derive(Debug)] -struct CustomDataSourceInner { - data: HashMap<u8, User>, - bank_account_index: BTreeMap<u64, u8>, -} +The majority of this content was originally posted in the blog +[Writing Custom Table Providers in Apache DataFusion](https://datafusion.apache.org/blog/2026/03/31/writing-table-providers/). + +## The Three Layers + +When DataFusion executes a query against a table, three abstractions collaborate +to produce results: + +1. **[TableProvider]** -- Describes the table (schema, capabilities) and + produces an execution plan when queried. This is part of the **Logical Plan**. +2. **[ExecutionPlan]** -- Describes _how_ to compute the result: partitioning, + ordering, and child plan relationships. This is part of the **Physical Plan**. +3. **[SendableRecordBatchStream]** -- The async stream that _actually does the + work_, yielding `RecordBatch`es one at a time. + +Think of these as a funnel: `TableProvider::scan()` is called once during +planning to create an `ExecutionPlan`, then `ExecutionPlan::execute()` is called +once per partition to create a stream, and those streams are where rows are +actually produced during execution. + +[tableprovider]: https://docs.rs/datafusion/latest/datafusion/catalog/trait.TableProvider.html +[executionplan]: https://docs.rs/datafusion/latest/datafusion/physical_plan/trait.ExecutionPlan.html +[sendablerecordbatchstream]: https://docs.rs/datafusion/latest/datafusion/execution/type.SendableRecordBatchStream.html +[memtable]: https://docs.rs/datafusion/latest/datafusion/datasource/memory/struct.MemTable.html +[streamtable]: https://docs.rs/datafusion/latest/datafusion/datasource/stream/struct.StreamTable.html +[listingtable]: https://docs.rs/datafusion/latest/datafusion/datasource/listing/struct.ListingTable.html +[viewtable]: https://docs.rs/datafusion/latest/datafusion/datasource/view/struct.ViewTable.html +[planproperties]: https://docs.rs/datafusion/latest/datafusion/physical_plan/struct.PlanProperties.html +[streamingtableexec]: https://docs.rs/datafusion/latest/datafusion/physical_plan/streaming/struct.StreamingTableExec.html +[datasourceexec]: https://docs.rs/datafusion/latest/datafusion/datasource/source/struct.DataSourceExec.html + +## Background: Logical and Physical Planning + +Before diving into the three layers, it helps to understand how DataFusion +processes a query. There are several phases between a SQL string (or DataFrame +call) and streaming results: + +```text +SQL / DataFrame API + → Logical Plan (abstract: what to compute) + → Logical Optimization (rewrite rules that preserve semantics) + → Physical Plan (concrete: how to compute it) + → Physical Optimization (hardware- and data-aware rewrites) + → Execution (streaming RecordBatches) +``` -#[derive(Debug)] -struct CustomExec { - db: CustomDataSource, - projected_schema: SchemaRef, -} +### Logical Planning + +A **logical plan** describes _what_ the query computes without specifying _how_. +It is a tree of relational operators -- `TableScan`, `Filter`, `Projection`, +`Aggregate`, `Join`, `Sort`, `Limit`, and so on. The logical optimizer rewrites +this tree to reduce work while preserving the query's meaning. Some logical +optimizations include: + +- **Predicate pushdown** -- moves filters as close to the data source as + possible, so fewer rows flow through the rest of the plan. +- **Projection pruning** -- eliminates columns that are never referenced + downstream, reducing memory and I/O. +- **Expression simplification** -- rewrites expressions like `1 = 1` or + `x AND true` into simpler forms. +- **Subquery decorrelation** -- converts correlated `IN` / `EXISTS` subqueries + into more efficient semi-joins. +- **Limit pushdown** -- pushes `LIMIT` earlier in the plan so operators + produce less data. + +### Physical Planning + +The **physical planner** converts the optimized logical plan into an +`ExecutionPlan` tree -- the concrete plan that will actually run. This is where +decisions like "use a hash join vs. a sort-merge join" or "how many partitions +to scan" are made. The physical optimizer then refines this tree further with rewrites such as: + +- **Distribution enforcement** -- inserts `RepartitionExec` nodes so that data + is partitioned correctly for joins and aggregations. +- **Sort enforcement** -- inserts `SortExec` nodes where ordering is required, + and removes them where the data is already sorted. +- **Join selection** -- picks the most efficient join strategy based on + statistics and table sizes. +- **Aggregate optimization** -- combines partial and final aggregation stages, + and can use exact statistics to skip scanning entirely. + +### Why This Matters for Table Providers + +Your `TableProvider` sits at the boundary between logical and physical planning. +During logical optimization, DataFusion determines which filters and projections +_could_ be pushed down to the source. When `scan()` is called during physical +planning, those hints are passed to you. By implementing capabilities like +`supports_filters_pushdown`, you influence what the optimizer can do -- and the +metadata you declare in your `ExecutionPlan` (partitioning, ordering) directly +affects which physical optimizations apply. + +## Choosing the Right Starting Point + +Not every custom data source requires implementing all three layers from +scratch. DataFusion provides building blocks that let you plug in at whatever +level makes sense: + +| If your data is... | Start with | You implement | +| -------------------------------------------------- | ------------------------------------------------------------------------- | ------------------------------ | +| Already in `RecordBatch`es in memory | [MemTable] | Nothing -- just construct it | +| An async stream of batches | [StreamTable] | A stream factory | +| A logical transformation of other tables | [ViewTable] wrapping a logical plan | The logical plan | +| A variant of an existing file format | [ListingTable] with a custom [FileFormat] wrapping an existing one | A thin `FileFormat` wrapper | +| Files in a custom format on disk or object storage | [ListingTable] with a custom [FileFormat], [FileSource], and [FileOpener] | The format, source, and opener | +| A custom source needing full control | `TableProvider` + `ExecutionPlan` + stream | All three layers | + +[fileformat]: https://docs.rs/datafusion/latest/datafusion/datasource/file_format/trait.FileFormat.html +[filesource]: https://docs.rs/datafusion-datasource/latest/datafusion_datasource/file/trait.FileSource.html +[fileopener]: https://docs.rs/datafusion-datasource/latest/datafusion_datasource/file_stream/trait.FileOpener.html + +If your data is file-based, `ListingTable` handles file discovery, partition +column inference, and plan construction -- you only need to implement +`FileFormat`, `FileSource`, and `FileOpener` to describe how to read your +files. See the [custom_file_format example] for a minimal wrapping approach, +or [ParquetSource] and [ParquetOpener] for a full custom implementation to +use as a reference. + +[custom_file_format example]: https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/custom_data_source/custom_file_format.rs +[parquetsource]: https://docs.rs/datafusion/latest/datafusion/datasource/physical_plan/struct.ParquetSource.html +[parquetopener]: https://github.com/apache/datafusion/blob/main/datafusion/datasource-parquet/src/opener.rs + +The rest of this post focuses on the full `TableProvider` + `ExecutionPlan` + +stream path, which gives you complete control and applies to any data source. + +## Layer 1: TableProvider + +A [TableProvider] represents a queryable data source. For a minimal read-only +table, you need four methods: + +```rust,ignore Review Comment: it would be great (as a follow on PR) to update this example so it was actually compiled/testsed (and thus did not drift out of sync with the code) In other words change this to `rust` and then ensure `cargo test -p datafusion --doc` passes It currently fails like this: ```diff --- a/docs/source/library-user-guide/custom-table-providers.md +++ b/docs/source/library-user-guide/custom-table-providers.md @@ -157,7 +157,7 @@ stream path, which gives you complete control and applies to any data source. A [TableProvider] represents a queryable data source. For a minimal read-only table, you need four methods: -```rust,ignore +```rust impl TableProvider for MyTable { fn as_any(&self) -> &dyn Any { self } diff --git a/testing b/testing index 7df2b70baf..0d60ccae40 160000 --- a/testing +++ b/testing @@ -1 +1 @@ ``` ```rust (venv) andrewlamb@Andrews-MacBook-Pro-3:~/Software/datafusion3$ cargo test -p datafusion --doc -- library_user_guide_custom_table_providers Finished `test` profile [unoptimized + debuginfo] target(s) in 0.41s Doc-tests datafusion running 10 tests test datafusion/core/src/lib.rs - library_user_guide_custom_table_providers (line 1365) ... ignored test datafusion/core/src/lib.rs - library_user_guide_custom_table_providers (line 1441) ... ignored test datafusion/core/src/lib.rs - library_user_guide_custom_table_providers (line 1498) ... ignored test datafusion/core/src/lib.rs - library_user_guide_custom_table_providers (line 1512) ... ignored test datafusion/core/src/lib.rs - library_user_guide_custom_table_providers (line 1551) ... ignored test datafusion/core/src/lib.rs - library_user_guide_custom_table_providers (line 2058) ... ignored test datafusion/core/src/lib.rs - library_user_guide_custom_table_providers (line 1285) ... FAILED test datafusion/core/src/lib.rs - library_user_guide_custom_table_providers (line 1913) ... ok test datafusion/core/src/lib.rs - library_user_guide_custom_table_providers (line 1631) ... ok test datafusion/core/src/lib.rs - library_user_guide_custom_table_providers (line 1772) ... ok failures: ---- datafusion/core/src/lib.rs - library_user_guide_custom_table_providers (line 1285) stdout ---- error[E0405]: cannot find trait `TableProvider` in this scope --> datafusion/core/src/lib.rs:1286:6 | 1286 | impl TableProvider for MyTable { | ^^^^^^^^^^^^^ not found in this scope | help: consider importing one of these traits | 1285 + use datafusion::catalog::TableProvider; | 1285 + use datafusion_catalog::TableProvider; ... ``` ########## docs/source/library-user-guide/custom-table-providers.md: ########## @@ -19,568 +19,938 @@ # Custom Table Provider -Like other areas of DataFusion, you extend DataFusion's functionality by implementing a trait. The [`TableProvider`] and associated traits allow you to implement a custom table provider, i.e. use DataFusion's other functionality with your custom data source. - -This section describes how to create a [`TableProvider`] and how to configure DataFusion to use it for reading. +One of DataFusion's greatest strengths is its extensibility. If your data lives +in a custom format, behind an API, or in a system that DataFusion does not +natively support, you can teach DataFusion to read it by implementing a +**custom table provider**. This post walks through the three layers you need to +understand to design a table provider and where planning and execution work should happen. For details on how table constraints such as primary keys or unique constraints are handled, see [Table Constraint Enforcement](table-constraints.md). -## Table Provider and Scan - -The [`TableProvider::scan`] method reads data from the table and is likely the most important. It returns an [`ExecutionPlan`] that DataFusion will use to read the actual data during execution of the query. The [`TableProvider::insert_into`] method is used to `INSERT` data into the table. - -### Scan - -As mentioned, [`TableProvider::scan`] returns an execution plan, and in particular a `Result<Arc<dyn ExecutionPlan>>`. The core of this is returning something that can be dynamically dispatched to an `ExecutionPlan`. And as per the general DataFusion idea, we'll need to implement it. - -[`tableprovider`]: https://docs.rs/datafusion/latest/datafusion/datasource/trait.TableProvider.html -[`tableprovider::scan`]: https://docs.rs/datafusion/latest/datafusion/datasource/trait.TableProvider.html#tymethod.scan -[`tableprovider::insert_into`]: https://docs.rs/datafusion/latest/datafusion/datasource/trait.TableProvider.html#tymethod.insert_into -[`executionplan`]: https://docs.rs/datafusion/latest/datafusion/physical_plan/trait.ExecutionPlan.html - -#### Execution Plan - -The `ExecutionPlan` trait at its core is a way to get a stream of batches. The aptly-named `execute` method returns a `Result<SendableRecordBatchStream>`, which should be a stream of `RecordBatch`es that can be sent across threads, and has a schema that matches the data to be contained in those batches. - -There are many different types of `SendableRecordBatchStream` implemented in DataFusion -- you can use a pre existing one, such as `MemoryStream` (if your `RecordBatch`es are all in memory) or implement your own custom logic, depending on your usecase. - -Looking at the full example below: - -```rust -use std::any::Any; -use std::sync::{Arc, Mutex}; -use std::collections::{BTreeMap, HashMap}; -use datafusion::common::Result; -use datafusion::common::tree_node::TreeNodeRecursion; -use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef}; -use datafusion::physical_plan::expressions::PhysicalSortExpr; -use datafusion::physical_plan::{ - ExecutionPlan, SendableRecordBatchStream, DisplayAs, DisplayFormatType, - Statistics, PlanProperties, PhysicalExpr -}; -use datafusion::execution::context::TaskContext; -use datafusion::arrow::array::{UInt64Builder, UInt8Builder}; -use datafusion::physical_plan::memory::MemoryStream; -use datafusion::arrow::record_batch::RecordBatch; - -/// A User, with an id and a bank account -#[derive(Clone, Debug)] -struct User { - id: u8, - bank_account: u64, -} - -/// A custom datasource, used to represent a datastore with a single index -#[derive(Clone, Debug)] -pub struct CustomDataSource { - inner: Arc<Mutex<CustomDataSourceInner>>, -} - -#[derive(Debug)] -struct CustomDataSourceInner { - data: HashMap<u8, User>, - bank_account_index: BTreeMap<u64, u8>, -} +The majority of this content was originally posted in the blog Review Comment: 👍 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
