This is an automated email from the ASF dual-hosted git repository.

jonah pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 98cb948545 Improve deserialize_to_struct example (#13958)
98cb948545 is described below

commit 98cb9485457409be83ab5e749e13aa4f32c0e8a0
Author: Andrew Lamb <[email protected]>
AuthorDate: Sat Jan 4 04:26:35 2025 -0500

    Improve deserialize_to_struct example (#13958)
    
    * Cleanup deserialize_to_struct example
    
    * prettier
    
    * Apply suggestions from code review
    
    Co-authored-by: Jonah Gao <[email protected]>
    
    ---------
    
    Co-authored-by: Jonah Gao <[email protected]>
---
 datafusion-examples/README.md                      |   2 +-
 .../examples/deserialize_to_struct.rs              | 158 +++++++++++++++------
 2 files changed, 117 insertions(+), 43 deletions(-)

diff --git a/datafusion-examples/README.md b/datafusion-examples/README.md
index 23cf8830e3..a3f100cbcb 100644
--- a/datafusion-examples/README.md
+++ b/datafusion-examples/README.md
@@ -58,7 +58,7 @@ cargo run --example dataframe
 - [`custom_file_format.rs`](examples/custom_file_format.rs): Write data to a 
custom file format
 - [`dataframe-to-s3.rs`](examples/external_dependency/dataframe-to-s3.rs): Run 
a query using a DataFrame against a parquet file from s3 and writing back to s3
 - [`dataframe.rs`](examples/dataframe.rs): Run a query using a DataFrame API 
against parquet files, csv files, and in-memory data, including multiple 
subqueries. Also demonstrates the various methods to write out a DataFrame to a 
table, parquet file, csv file, and json file.
-- [`deserialize_to_struct.rs`](examples/deserialize_to_struct.rs): Convert 
query results into rust structs using serde
+- [`deserialize_to_struct.rs`](examples/deserialize_to_struct.rs): Convert 
query results (Arrow ArrayRefs) into Rust structs
 - [`expr_api.rs`](examples/expr_api.rs): Create, execute, simplify, analyze 
and coerce `Expr`s
 - [`file_stream_provider.rs`](examples/file_stream_provider.rs): Run a query 
on `FileStreamProvider` which implements `StreamProvider` for reading and 
writing to arbitrary stream sources / sinks.
 - [`flight_sql_server.rs`](examples/flight/flight_sql_server.rs): Run 
DataFusion as a standalone process and execute SQL queries from JDBC clients
diff --git a/datafusion-examples/examples/deserialize_to_struct.rs 
b/datafusion-examples/examples/deserialize_to_struct.rs
index 985cab703a..5ac3ee6187 100644
--- a/datafusion-examples/examples/deserialize_to_struct.rs
+++ b/datafusion-examples/examples/deserialize_to_struct.rs
@@ -15,62 +15,136 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use arrow::array::AsArray;
+use arrow::array::{AsArray, PrimitiveArray};
 use arrow::datatypes::{Float64Type, Int32Type};
 use datafusion::error::Result;
 use datafusion::prelude::*;
+use datafusion_common::assert_batches_eq;
 use futures::StreamExt;
 
-/// This example shows that it is possible to convert query results into Rust 
structs .
+/// This example shows how to convert query results into Rust structs by using
+/// the Arrow APIs to convert the results into Rust native types.
+///
+/// This is a bit tricky initially as the results are returned as columns 
stored
+/// as [ArrayRef]
+///
+/// [ArrayRef]: arrow::array::ArrayRef
 #[tokio::main]
 async fn main() -> Result<()> {
-    let data_list = Data::new().await?;
-    println!("{data_list:#?}");
-    Ok(())
-}
+    // Run a query that returns two columns of data
+    let ctx = SessionContext::new();
+    let testdata = datafusion::test_util::parquet_test_data();
+    ctx.register_parquet(
+        "alltypes_plain",
+        &format!("{testdata}/alltypes_plain.parquet"),
+        ParquetReadOptions::default(),
+    )
+    .await?;
+    let df = ctx
+        .sql("SELECT int_col, double_col FROM alltypes_plain")
+        .await?;
 
-#[derive(Debug)]
-struct Data {
-    #[allow(dead_code)]
-    int_col: i32,
-    #[allow(dead_code)]
-    double_col: f64,
-}
+    // print out the results showing we have an int32 and a float64 column
+    let results = df.clone().collect().await?;
+    assert_batches_eq!(
+        [
+            "+---------+------------+",
+            "| int_col | double_col |",
+            "+---------+------------+",
+            "| 0       | 0.0        |",
+            "| 1       | 10.1       |",
+            "| 0       | 0.0        |",
+            "| 1       | 10.1       |",
+            "| 0       | 0.0        |",
+            "| 1       | 10.1       |",
+            "| 0       | 0.0        |",
+            "| 1       | 10.1       |",
+            "+---------+------------+",
+        ],
+        &results
+    );
 
-impl Data {
-    pub async fn new() -> Result<Vec<Self>> {
-        // this group is almost the same as the one you find it in 
parquet_sql.rs
-        let ctx = SessionContext::new();
+    // We will now convert the query results into a Rust struct
+    let mut stream = df.execute_stream().await?;
+    let mut list = vec![];
 
-        let testdata = datafusion::test_util::parquet_test_data();
+    // DataFusion produces data in chunks called `RecordBatch`es which are
+    // typically 8000 rows each. This loop processes each `RecordBatch` as it 
is
+    // produced by the query plan and adds it to the list
+    while let Some(b) = stream.next().await.transpose()? {
+        // Each `RecordBatch` has one or more columns. Each column is stored as
+        // an `ArrayRef`. To interact with data using Rust native types we 
need to
+        // convert these `ArrayRef`s into concrete array types using APIs from
+        // the arrow crate.
 
-        ctx.register_parquet(
-            "alltypes_plain",
-            &format!("{testdata}/alltypes_plain.parquet"),
-            ParquetReadOptions::default(),
-        )
-        .await?;
+        // In this case, we know that each batch has two columns of the  Arrow
+        // types Int32 and Float64, so first we cast the two columns to the
+        // appropriate Arrow PrimitiveArray (this is a fast / zero-copy cast).:
+        let int_col: &PrimitiveArray<Int32Type> = b.column(0).as_primitive();
+        let float_col: &PrimitiveArray<Float64Type> = 
b.column(1).as_primitive();
 
-        let df = ctx
-            .sql("SELECT int_col, double_col FROM alltypes_plain")
-            .await?;
+        // With PrimitiveArrays, we can access to the values as native Rust
+        // types i32 and f64, and forming the desired `Data` structs
+        for (i, f) in int_col.values().iter().zip(float_col.values()) {
+            list.push(Data {
+                int_col: *i,
+                double_col: *f,
+            })
+        }
+    }
 
-        df.clone().show().await?;
+    // Finally, we have the results in the list of Rust structs
+    let res = format!("{list:#?}");
+    assert_eq!(
+        res,
+        r#"[
+    Data {
+        int_col: 0,
+        double_col: 0.0,
+    },
+    Data {
+        int_col: 1,
+        double_col: 10.1,
+    },
+    Data {
+        int_col: 0,
+        double_col: 0.0,
+    },
+    Data {
+        int_col: 1,
+        double_col: 10.1,
+    },
+    Data {
+        int_col: 0,
+        double_col: 0.0,
+    },
+    Data {
+        int_col: 1,
+        double_col: 10.1,
+    },
+    Data {
+        int_col: 0,
+        double_col: 0.0,
+    },
+    Data {
+        int_col: 1,
+        double_col: 10.1,
+    },
+]"#
+    );
 
-        let mut stream = df.execute_stream().await?;
-        let mut list = vec![];
-        while let Some(b) = stream.next().await.transpose()? {
-            let int_col = b.column(0).as_primitive::<Int32Type>();
-            let float_col = b.column(1).as_primitive::<Float64Type>();
+    // Use the fields in the struct to avoid clippy complaints
+    let int_sum = list.iter().fold(0, |acc, x| acc + x.int_col);
+    let double_sum = list.iter().fold(0.0, |acc, x| acc + x.double_col);
+    assert_eq!(int_sum, 4);
+    assert_eq!(double_sum, 40.4);
 
-            for (i, f) in int_col.values().iter().zip(float_col.values()) {
-                list.push(Data {
-                    int_col: *i,
-                    double_col: *f,
-                })
-            }
-        }
+    Ok(())
+}
 
-        Ok(list)
-    }
+/// This is target struct where we want the query results.
+#[derive(Debug)]
+struct Data {
+    int_col: i32,
+    double_col: f64,
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to