tshauck commented on code in PR #7287: URL: https://github.com/apache/arrow-datafusion/pull/7287#discussion_r1295374996
########## docs/source/library-user-guide/custom-table-providers.md: ########## @@ -0,0 +1,159 @@ +<!--- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +--> + +# Custom Table Provider + +Like other areas of DataFusion, you extend DataFusion's functionality by implementing a trait. The `TableProvider` and associated traits, have methods that allow you to implement a custom table provider, i.e. use DataFusion's other functionality with your custom data source. + +This section will also touch on how to have DataFusion use the new `TableProvider` implementation. + +## Table Provider and Scan + +the `scan` method on the `TableProvider` is arguably its most important. It returns an execution plan that DataFusion will use as part of the query plan to execute the query. + +### Scan + +As mentioned, `scan` returns an execution plan, and in particular a `Result<Arc<dyn ExecutionPlan>>`. The core of this is returning something that can be dynamically dispatched to an `ExecutionPlan`. And as per the general DataFusion idea, we'll need to implement it. + +#### Execution Plan + +The `ExecutionPlan` trait at its core is a way to get a stream of batches. The aptly-named `execute` method returns a `Result<SendableRecordBatchStream>`, which should be a stream of `RecordBatch`es that can be sent across threads, and has a schema that matches the data to be contained in those batches. + +Looking at the [example in this repo][ex], the execute method: + +```rust +struct CustomExec { + db: CustomDataSource, + projected_schema: SchemaRef, +} + +impl ExecutionPlan for CustomExec { + fn execute( + &self, + _partition: usize, + _context: Arc<TaskContext>, + ) -> Result<SendableRecordBatchStream> { + let users: Vec<User> = { + let db = self.db.inner.lock().unwrap(); + db.data.values().cloned().collect() + }; + + let mut id_array = UInt8Builder::with_capacity(users.len()); + let mut account_array = UInt64Builder::with_capacity(users.len()); + + for user in users { + id_array.append_value(user.id); + account_array.append_value(user.bank_account); + } + + Ok(Box::pin(MemoryStream::try_new( + vec![RecordBatch::try_new( + self.projected_schema.clone(), + vec![ + Arc::new(id_array.finish()), + Arc::new(account_array.finish()), + ], + )?], + self.schema(), + None, + )?)) + } +} +``` + +This: + +1. Gets the users from the database +2. Constructs the individual arrays +3. Returns a `MemoryStream` of a single `RecordBatch` with the arrays + +I.e. returns the "physical" data. For other examples, refer to the [`CsvExec`][csv] and [`ParquetExec`][parquet] for more complex implementations. + +With the `ExecutionPlan` implemented, we can now return to the `scan` method of the `TableProvider`. + +#### Scan Revisited + +The `scan` method of the `TableProvider` returns a `Result<Arc<dyn ExecutionPlan>>`. We can use the `Arc` to return a reference-counted pointer to the `ExecutionPlan` we implemented. In the example, this is done by: + +```rust +impl CustomDataSource { + pub(crate) async fn create_physical_plan( + &self, + projections: Option<&Vec<usize>>, + schema: SchemaRef, + ) -> Result<Arc<dyn ExecutionPlan>> { + Ok(Arc::new(CustomExec::new(projections, schema, self.clone()))) + } +} + +#[async_trait] +impl TableProvider for CustomDataSource { + async fn scan( + &self, + _state: &SessionState, + projection: Option<&Vec<usize>>, + // filters and limit can be used here to inject some push-down operations if needed + _filters: &[Expr], + _limit: Option<usize>, + ) -> Result<Arc<dyn ExecutionPlan>> { + return self.create_physical_plan(projection, self.schema()).await; + } +} +``` + +With this, and the implementation of the omitted methods, we can now use the `CustomDataSource` as a `TableProvider` in DataFusion. + +## Using the Custom Table Provider + +In order to use the custom table provider, we need to register it with DataFusion. This is done by creating a `TableProvider` and registering it with the `ExecutionContext`. + +```rust +let mut ctx = ExecutionContext::new(); + +let custom_table_provider = CustomDataSource::new(); +ctx.register_table("custom_table", Arc::new(custom_table_provider)); +``` + +This will allow you to use the custom table provider in DataFusion. For example, you could use it in a SQL query to get a `DataFrame`. + +```rust +let df = ctx.sql("SELECT id, bank_account FROM custom_table")?; +``` + +## Recap + +To recap, in order to implement a custom table provider, you need to: + +1. Implement the `TableProvider` trait +2. Implement the `ExecutionPlan` trait +3. Register the `TableProvider` with the `ExecutionContext` + +## Next Steps + +As mentioned the [csv] and [parquet] implementations are good examples of how to implement a `TableProvider`. The [example in this repo][ex] is a good example of how to implement a `TableProvider` that uses a custom data source. + +More abstractly, see the following traits for more information on how to implement a custom `TableProvider` for a file format: + +- `FileOpener` - a trait for opening a file and inferring the schema +- `FileFormat` - a trait for reading a file format +- `ListingTableProvider` - a useful trait for implementing a `TableProvider` that lists files in a directory + +[ex]: https://github.com/apache/arrow-datafusion/blob/a5e86fae3baadbd99f8fd0df83f45fde22f7b0c6/datafusion-examples/examples/custom_datasource.rs#L214C1-L276 Review Comment: Apologies, not sure how to do this. I tried https://docs.rs/crate/datafusion/latest/source/, but the examples aren't there from what I can tell. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
