alamb commented on code in PR #11290:
URL: https://github.com/apache/datafusion/pull/11290#discussion_r1667765044
##########
docs/source/library-user-guide/using-the-dataframe-api.md:
##########
@@ -19,129 +19,236 @@
# Using the DataFrame API
-## What is a DataFrame
+## What is a DataFrame?
-`DataFrame` in `DataFrame` is modeled after the Pandas DataFrame interface,
and is a thin wrapper over LogicalPlan that adds functionality for building and
executing those plans.
-
-```rust
-pub struct DataFrame {
- session_state: SessionState,
- plan: LogicalPlan,
-}
-```
-
-You can build up `DataFrame`s using its methods, similarly to building
`LogicalPlan`s using `LogicalPlanBuilder`:
-
-```rust
-let df = ctx.table("users").await?;
-
-// Create a new DataFrame sorted by `id`, `bank_account`
-let new_df = df.select(vec![col("id"), col("bank_account")])?
- .sort(vec![col("id")])?;
-
-// Build the same plan using the LogicalPlanBuilder
-let plan = LogicalPlanBuilder::from(&df.to_logical_plan())
- .project(vec![col("id"), col("bank_account")])?
- .sort(vec![col("id")])?
- .build()?;
-```
-
-You can use `collect` or `execute_stream` to execute the query.
+DataFusion [`DataFrame`]s are modeled after the [Pandas DataFrame] interface,
+and is implemented as thin wrapper over a [`LogicalPlan`] that adds
+functionality for building and executing those plans.
## How to generate a DataFrame
-You can directly use the `DataFrame` API or generate a `DataFrame` from a SQL
query.
-
-For example, to use `sql` to construct `DataFrame`:
+You can directly use the `DataFrame` API or generate a `DataFrame` from a SQL
+query. For example, to use `sql` to construct a `DataFrame`:
```rust
-let ctx = SessionContext::new();
-// Register the in-memory table containing the data
-ctx.register_table("users", Arc::new(create_memtable()?))?;
-let dataframe = ctx.sql("SELECT * FROM users;").await?;
+use std::sync::Arc;
+use datafusion::prelude::*;
+use datafusion::arrow::array::{ArrayRef, Int32Array};
+use datafusion::arrow::record_batch::RecordBatch;
+use datafusion::error::Result;
+
+#[tokio::main]
+async fn main() -> Result<()> {
+ let ctx = SessionContext::new();
+ // Register an in-memory table containing the following data
+ // id | bank_account
+ // ---|-------------
+ // 1 | 9000
+ // 2 | 8000
+ // 3 | 7000
+ let data = RecordBatch::try_from_iter(vec![
+ ("id", Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef),
+ ("bank_account", Arc::new(Int32Array::from(vec![9000, 8000, 7000]))),
+ ])?;
+ ctx.register_batch("users", data)?;
+ // Create a DataFrame using SQL
+ let dataframe = ctx.sql("SELECT * FROM users;").await?;
+ Ok(())
+}
```
-To construct `DataFrame` using the API:
+You can also construct [`DataFrame`]s programmatically using the API:
```rust
-let ctx = SessionContext::new();
-// Register the in-memory table containing the data
-ctx.register_table("users", Arc::new(create_memtable()?))?;
-let dataframe = ctx
- .table("users")
- .filter(col("a").lt_eq(col("b")))?
- .sort(vec![col("a").sort(true, true), col("b").sort(false, false)])?;
+use std::sync::Arc;
+use datafusion::prelude::*;
+use datafusion::arrow::array::{ArrayRef, Int32Array};
+use datafusion::arrow::record_batch::RecordBatch;
+use datafusion::error::Result;
+
+#[tokio::main]
+async fn main() -> Result<()> {
+ let ctx = SessionContext::new();
+ // Register the same in-memory table as the previous example
+ let data = RecordBatch::try_from_iter(vec![
+ ("id", Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef),
+ ("bank_account", Arc::new(Int32Array::from(vec![9000, 8000, 7000]))),
+ ])?;
+ ctx.register_batch("users", data)?;
+ // Create a DataFrame that scans the user table, and finds
+ // all users with a bank account at least 8000
+ // and sorts the results by bank account in descending order
+ let dataframe = ctx
+ .table("users")
+ .await?
+ .filter(col("bank_account").gt_eq(lit(8000)))? // bank_account >= 8000
+ .sort(vec![col("bank_account").sort(false, true)])?; // ORDER BY
bank_account DESC
+
+ Ok(())
+}
```
## Collect / Streaming Exec
-DataFusion `DataFrame`s are "lazy", meaning they do not do any processing
until they are executed, which allows for additional optimizations.
+DataFusion [`DataFrame`]s are "lazy", meaning they do no processing until
+they are executed, which allows for additional optimizations.
When you have a `DataFrame`, you can run it in one of three ways:
-1. `collect` which executes the query and buffers all the output into a
`Vec<RecordBatch>`
-2. `streaming_exec`, which begins executions and returns a
`SendableRecordBatchStream` which incrementally computes output on each call to
`next()`
-3. `cache` which executes the query and buffers the output into a new in
memory DataFrame.
+1. `collect`: executes the query and buffers all the output into a
`Vec<RecordBatch>`
+2. `execute_stream`: begins executions and returns a
`SendableRecordBatchStream` which incrementally computes output on each call to
`next()`
+3. `cache`: executes the query and buffers the output into a new in memory
`DataFrame.`
-You can just collect all outputs once like:
+To collect all outputs:
```rust
-let ctx = SessionContext::new();
-let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
-let batches = df.collect().await?;
+use datafusion::prelude::*;
+use datafusion::error::Result;
+
+#[tokio::main]
+async fn main() -> Result<()> {
+ let ctx = SessionContext::new();
+ // read the contents of a CSV file into a DataFrame
+ let df = ctx.read_csv("tests/data/example.csv",
CsvReadOptions::new()).await?;
+ // execute the query and collect the results as a Vec<RecordBatch>
+ let batches = df.collect().await?;
+ Ok(())
+}
```
-You can also use stream output to incrementally generate output one
`RecordBatch` at a time
+You can also use `execute_stream` to incrementally generate output one
`RecordBatch` at a time:
```rust
-let ctx = SessionContext::new();
-let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
-let mut stream = df.execute_stream().await?;
-while let Some(rb) = stream.next().await {
- println!("{rb:?}");
+use datafusion::prelude::*;
+use datafusion::error::Result;
+use futures::stream::StreamExt;
+
+#[tokio::main]
+async fn main() -> Result<()> {
+ let ctx = SessionContext::new();
+ let df = ctx.read_csv("tests/data/example.csv",
CsvReadOptions::new()).await?;
+ // begin execution (returns quickly, does not compute results)
+ let mut stream = df.execute_stream().await?;
+ // results are returned incrementally as they are computed
+ while let Some(record_batch) = stream.next().await {
+ println!("{record_batch:?}");
+ }
+ Ok(())
}
```
# Write DataFrame to Files
-You can also serialize `DataFrame` to a file. For now, `Datafusion` supports
write `DataFrame` to `csv`, `json` and `parquet`.
+You can also write the contents of a `DataFrame` to a file. When writing a
file, DataFusion
+executes the `DataFrame` and streams the results. DataFusion comes with
support for writing
+`csv`, `json` `arrow` `avro`, and `parquet` files, and supports writing custom
+file formats via API (see [`custom_file_format.rs`] for an example)
-When writing a file, DataFusion will execute the DataFrame and stream the
results to a file.
-
-For example, to write a csv_file
+For example, to read a CSV file and write it to a parquet file, use the
+[`DataFrame::write_parquet`] method
```rust
-let ctx = SessionContext::new();
-// Register the in-memory table containing the data
-ctx.register_table("users", Arc::new(mem_table))?;
-let dataframe = ctx.sql("SELECT * FROM users;").await?;
-
-dataframe
- .write_csv("user_dataframe.csv", DataFrameWriteOptions::default(), None)
- .await;
+use datafusion::prelude::*;
+use datafusion::error::Result;
+use datafusion::dataframe::DataFrameWriteOptions;
+
+#[tokio::main]
+async fn main() -> Result<()> {
+ let ctx = SessionContext::new();
Review Comment:
Done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]