This is an automated email from the ASF dual-hosted git repository.
jonah pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 98cb948545 Improve deserialize_to_struct example (#13958)
98cb948545 is described below
commit 98cb9485457409be83ab5e749e13aa4f32c0e8a0
Author: Andrew Lamb <[email protected]>
AuthorDate: Sat Jan 4 04:26:35 2025 -0500
Improve deserialize_to_struct example (#13958)
* Cleanup deserialize_to_struct example
* prettier
* Apply suggestions from code review
Co-authored-by: Jonah Gao <[email protected]>
---------
Co-authored-by: Jonah Gao <[email protected]>
---
datafusion-examples/README.md | 2 +-
.../examples/deserialize_to_struct.rs | 158 +++++++++++++++------
2 files changed, 117 insertions(+), 43 deletions(-)
diff --git a/datafusion-examples/README.md b/datafusion-examples/README.md
index 23cf8830e3..a3f100cbcb 100644
--- a/datafusion-examples/README.md
+++ b/datafusion-examples/README.md
@@ -58,7 +58,7 @@ cargo run --example dataframe
- [`custom_file_format.rs`](examples/custom_file_format.rs): Write data to a
custom file format
- [`dataframe-to-s3.rs`](examples/external_dependency/dataframe-to-s3.rs): Run
a query using a DataFrame against a parquet file from s3 and writing back to s3
- [`dataframe.rs`](examples/dataframe.rs): Run a query using a DataFrame API
against parquet files, csv files, and in-memory data, including multiple
subqueries. Also demonstrates the various methods to write out a DataFrame to a
table, parquet file, csv file, and json file.
-- [`deserialize_to_struct.rs`](examples/deserialize_to_struct.rs): Convert
query results into rust structs using serde
+- [`deserialize_to_struct.rs`](examples/deserialize_to_struct.rs): Convert
query results (Arrow ArrayRefs) into Rust structs
- [`expr_api.rs`](examples/expr_api.rs): Create, execute, simplify, analyze
and coerce `Expr`s
- [`file_stream_provider.rs`](examples/file_stream_provider.rs): Run a query
on `FileStreamProvider` which implements `StreamProvider` for reading and
writing to arbitrary stream sources / sinks.
- [`flight_sql_server.rs`](examples/flight/flight_sql_server.rs): Run
DataFusion as a standalone process and execute SQL queries from JDBC clients
diff --git a/datafusion-examples/examples/deserialize_to_struct.rs
b/datafusion-examples/examples/deserialize_to_struct.rs
index 985cab703a..5ac3ee6187 100644
--- a/datafusion-examples/examples/deserialize_to_struct.rs
+++ b/datafusion-examples/examples/deserialize_to_struct.rs
@@ -15,62 +15,136 @@
// specific language governing permissions and limitations
// under the License.
-use arrow::array::AsArray;
+use arrow::array::{AsArray, PrimitiveArray};
use arrow::datatypes::{Float64Type, Int32Type};
use datafusion::error::Result;
use datafusion::prelude::*;
+use datafusion_common::assert_batches_eq;
use futures::StreamExt;
-/// This example shows that it is possible to convert query results into Rust
structs .
+/// This example shows how to convert query results into Rust structs by using
+/// the Arrow APIs to convert the results into Rust native types.
+///
+/// This is a bit tricky initially as the results are returned as columns
stored
+/// as [ArrayRef]
+///
+/// [ArrayRef]: arrow::array::ArrayRef
#[tokio::main]
async fn main() -> Result<()> {
- let data_list = Data::new().await?;
- println!("{data_list:#?}");
- Ok(())
-}
+ // Run a query that returns two columns of data
+ let ctx = SessionContext::new();
+ let testdata = datafusion::test_util::parquet_test_data();
+ ctx.register_parquet(
+ "alltypes_plain",
+ &format!("{testdata}/alltypes_plain.parquet"),
+ ParquetReadOptions::default(),
+ )
+ .await?;
+ let df = ctx
+ .sql("SELECT int_col, double_col FROM alltypes_plain")
+ .await?;
-#[derive(Debug)]
-struct Data {
- #[allow(dead_code)]
- int_col: i32,
- #[allow(dead_code)]
- double_col: f64,
-}
+ // print out the results showing we have an int32 and a float64 column
+ let results = df.clone().collect().await?;
+ assert_batches_eq!(
+ [
+ "+---------+------------+",
+ "| int_col | double_col |",
+ "+---------+------------+",
+ "| 0 | 0.0 |",
+ "| 1 | 10.1 |",
+ "| 0 | 0.0 |",
+ "| 1 | 10.1 |",
+ "| 0 | 0.0 |",
+ "| 1 | 10.1 |",
+ "| 0 | 0.0 |",
+ "| 1 | 10.1 |",
+ "+---------+------------+",
+ ],
+ &results
+ );
-impl Data {
- pub async fn new() -> Result<Vec<Self>> {
- // this group is almost the same as the one you find it in
parquet_sql.rs
- let ctx = SessionContext::new();
+ // We will now convert the query results into a Rust struct
+ let mut stream = df.execute_stream().await?;
+ let mut list = vec![];
- let testdata = datafusion::test_util::parquet_test_data();
+ // DataFusion produces data in chunks called `RecordBatch`es which are
+ // typically 8000 rows each. This loop processes each `RecordBatch` as it
is
+ // produced by the query plan and adds it to the list
+ while let Some(b) = stream.next().await.transpose()? {
+ // Each `RecordBatch` has one or more columns. Each column is stored as
+ // an `ArrayRef`. To interact with data using Rust native types we
need to
+ // convert these `ArrayRef`s into concrete array types using APIs from
+ // the arrow crate.
- ctx.register_parquet(
- "alltypes_plain",
- &format!("{testdata}/alltypes_plain.parquet"),
- ParquetReadOptions::default(),
- )
- .await?;
+ // In this case, we know that each batch has two columns of the Arrow
+ // types Int32 and Float64, so first we cast the two columns to the
+ // appropriate Arrow PrimitiveArray (this is a fast / zero-copy cast).:
+ let int_col: &PrimitiveArray<Int32Type> = b.column(0).as_primitive();
+ let float_col: &PrimitiveArray<Float64Type> =
b.column(1).as_primitive();
- let df = ctx
- .sql("SELECT int_col, double_col FROM alltypes_plain")
- .await?;
+ // With PrimitiveArrays, we can access to the values as native Rust
+ // types i32 and f64, and forming the desired `Data` structs
+ for (i, f) in int_col.values().iter().zip(float_col.values()) {
+ list.push(Data {
+ int_col: *i,
+ double_col: *f,
+ })
+ }
+ }
- df.clone().show().await?;
+ // Finally, we have the results in the list of Rust structs
+ let res = format!("{list:#?}");
+ assert_eq!(
+ res,
+ r#"[
+ Data {
+ int_col: 0,
+ double_col: 0.0,
+ },
+ Data {
+ int_col: 1,
+ double_col: 10.1,
+ },
+ Data {
+ int_col: 0,
+ double_col: 0.0,
+ },
+ Data {
+ int_col: 1,
+ double_col: 10.1,
+ },
+ Data {
+ int_col: 0,
+ double_col: 0.0,
+ },
+ Data {
+ int_col: 1,
+ double_col: 10.1,
+ },
+ Data {
+ int_col: 0,
+ double_col: 0.0,
+ },
+ Data {
+ int_col: 1,
+ double_col: 10.1,
+ },
+]"#
+ );
- let mut stream = df.execute_stream().await?;
- let mut list = vec![];
- while let Some(b) = stream.next().await.transpose()? {
- let int_col = b.column(0).as_primitive::<Int32Type>();
- let float_col = b.column(1).as_primitive::<Float64Type>();
+ // Use the fields in the struct to avoid clippy complaints
+ let int_sum = list.iter().fold(0, |acc, x| acc + x.int_col);
+ let double_sum = list.iter().fold(0.0, |acc, x| acc + x.double_col);
+ assert_eq!(int_sum, 4);
+ assert_eq!(double_sum, 40.4);
- for (i, f) in int_col.values().iter().zip(float_col.values()) {
- list.push(Data {
- int_col: *i,
- double_col: *f,
- })
- }
- }
+ Ok(())
+}
- Ok(list)
- }
+/// This is target struct where we want the query results.
+#[derive(Debug)]
+struct Data {
+ int_col: i32,
+ double_col: f64,
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]