This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 0cd0bbeee9 Consolidate example to_date.rs into dateframe.rs (#13939)
0cd0bbeee9 is described below

commit 0cd0bbeee9946cae8679e35b06ac87b9c153611d
Author: Andrew Lamb <[email protected]>
AuthorDate: Mon Dec 30 06:41:04 2024 -0500

    Consolidate example to_date.rs into dateframe.rs (#13939)
    
    * Consolidate example to_date.rs into dateframe.rs
    
    * Assert results using assert_batches_eq
    
    * clippy
---
 datafusion-examples/examples/dataframe.rs | 52 ++++++++++++++++++++++++---
 datafusion-examples/examples/sql_query.rs | 57 +++++++++++++++++++++--------
 datafusion-examples/examples/to_date.rs   | 60 -------------------------------
 3 files changed, 91 insertions(+), 78 deletions(-)

diff --git a/datafusion-examples/examples/dataframe.rs 
b/datafusion-examples/examples/dataframe.rs
index 5d5414e3d8..90d7d778ea 100644
--- a/datafusion-examples/examples/dataframe.rs
+++ b/datafusion-examples/examples/dataframe.rs
@@ -28,16 +28,24 @@ use std::io::Write;
 use std::sync::Arc;
 use tempfile::tempdir;
 
-/// This example demonstrates using DataFusion's DataFrame API to
+/// This example demonstrates using DataFusion's DataFrame API
+///
+/// # Reading from different formats
 ///
 /// * [read_parquet]: execute queries against parquet files
 /// * [read_csv]: execute queries against csv files
 /// * [read_memory]: execute queries against in-memory arrow data
 ///
-/// This example demonstrates the various methods to write out a DataFrame to 
local storage.
-/// See datafusion-examples/examples/external_dependency/dataframe-to-s3.rs 
for an example
-/// using a remote object store.
+/// # Writing out to local storage
+///
+/// The following examples demonstrate how to write a DataFrame to local
+/// storage. See `external_dependency/dataframe-to-s3.rs` for an example 
writing
+/// to a remote object store.
+///
 /// * [write_out]: write out a DataFrame to a table, parquet file, csv file, 
or json file
+///
+/// # Querying data
+/// * [query_to_date]: execute queries against parquet files
 #[tokio::main]
 async fn main() -> Result<()> {
     // The SessionContext is the main high level API for interacting with 
DataFusion
@@ -46,6 +54,7 @@ async fn main() -> Result<()> {
     read_csv(&ctx).await?;
     read_memory(&ctx).await?;
     write_out(&ctx).await?;
+    query_to_date().await?;
     Ok(())
 }
 
@@ -206,3 +215,38 @@ async fn write_out(ctx: &SessionContext) -> 
std::result::Result<(), DataFusionEr
 
     Ok(())
 }
+
+/// This example demonstrates how to use the to_date series
+/// of functions in the DataFrame API as well as via sql.
+async fn query_to_date() -> Result<()> {
+    // define a schema.
+    let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Utf8, 
false)]));
+
+    // define data.
+    let batch = RecordBatch::try_new(
+        schema,
+        vec![Arc::new(StringArray::from(vec![
+            "2020-09-08T13:42:29Z",
+            "2020-09-08T13:42:29.190855-05:00",
+            "2020-08-09 12:13:29",
+            "2020-01-02",
+        ]))],
+    )?;
+
+    // declare a new context. In spark API, this corresponds to a new spark 
SQLsession
+    let ctx = SessionContext::new();
+
+    // declare a table in memory. In spark API, this corresponds to 
createDataFrame(...).
+    ctx.register_batch("t", batch)?;
+    let df = ctx.table("t").await?;
+
+    // use to_date function to convert col 'a' to timestamp type using the 
default parsing
+    let df = df.with_column("a", to_date(vec![col("a")]))?;
+
+    let df = df.select_columns(&["a"])?;
+
+    // print the results
+    df.show().await?;
+
+    Ok(())
+}
diff --git a/datafusion-examples/examples/sql_query.rs 
b/datafusion-examples/examples/sql_query.rs
index f6d3936568..a6e7fe91dd 100644
--- a/datafusion-examples/examples/sql_query.rs
+++ b/datafusion-examples/examples/sql_query.rs
@@ -23,12 +23,10 @@ use datafusion::datasource::listing::ListingOptions;
 use datafusion::datasource::MemTable;
 use datafusion::error::{DataFusionError, Result};
 use datafusion::prelude::SessionContext;
-use datafusion_common::exec_datafusion_err;
+use datafusion_common::{assert_batches_eq, exec_datafusion_err};
 use object_store::local::LocalFileSystem;
 use std::path::Path;
 use std::sync::Arc;
-use std::time::Duration;
-use tokio::time::timeout;
 
 /// Examples of various ways to execute queries using SQL
 ///
@@ -52,17 +50,30 @@ pub async fn query_memtable() -> Result<()> {
     // Register the in-memory table containing the data
     ctx.register_table("users", Arc::new(mem_table))?;
 
+    // running a SQL query results in a "DataFrame", which can be used
+    // to execute the query and collect the results
     let dataframe = ctx.sql("SELECT * FROM users;").await?;
 
-    timeout(Duration::from_secs(10), async move {
-        let result = dataframe.collect().await.unwrap();
-        let record_batch = result.first().unwrap();
-
-        assert_eq!(1, record_batch.column(0).len());
-        dbg!(record_batch.columns());
-    })
-    .await
-    .unwrap();
+    // Calling 'show' on the dataframe will execute the query and
+    // print the results
+    dataframe.clone().show().await?;
+
+    // calling 'collect' on the dataframe will execute the query and
+    // buffer the results into a vector of RecordBatch. There are other
+    // APIs on DataFrame for incrementally generating results (e.g. streaming)
+    let result = dataframe.collect().await?;
+
+    // Use the assert_batches_eq macro to compare the results
+    assert_batches_eq!(
+        [
+            "+----+--------------+",
+            "| id | bank_account |",
+            "+----+--------------+",
+            "| 1  | 9000         |",
+            "+----+--------------+",
+        ],
+        &result
+    );
 
     Ok(())
 }
@@ -133,7 +144,16 @@ async fn query_parquet() -> Result<()> {
         .await?;
 
     // print the results
-    df.show().await?;
+    let results = df.collect().await?;
+    assert_batches_eq!(
+        [
+            
"+----+----------+-------------+--------------+---------+------------+-----------+------------+------------------+------------+---------------------+",
+            "| id | bool_col | tinyint_col | smallint_col | int_col | 
bigint_col | float_col | double_col | date_string_col  | string_col | 
timestamp_col       |",
+            
"+----+----------+-------------+--------------+---------+------------+-----------+------------+------------------+------------+---------------------+",
+            "| 4  | true     | 0           | 0            | 0       | 0        
  | 0.0       | 0.0        | 30332f30312f3039 | 30         | 
2009-03-01T00:00:00 |",
+            
"+----+----------+-------------+--------------+---------+------------+-----------+------------+------------------+------------+---------------------+",
+        ],
+        &results);
 
     // Second example were we temporarily move into the test data's parent 
directory and
     // simulate a relative path, this requires registering an ObjectStore.
@@ -173,7 +193,16 @@ async fn query_parquet() -> Result<()> {
         .await?;
 
     // print the results
-    df.show().await?;
+    let results = df.collect().await?;
+    assert_batches_eq!(
+        [
+            
"+----+----------+-------------+--------------+---------+------------+-----------+------------+------------------+------------+---------------------+",
+            "| id | bool_col | tinyint_col | smallint_col | int_col | 
bigint_col | float_col | double_col | date_string_col  | string_col | 
timestamp_col       |",
+            
"+----+----------+-------------+--------------+---------+------------+-----------+------------+------------------+------------+---------------------+",
+            "| 4  | true     | 0           | 0            | 0       | 0        
  | 0.0       | 0.0        | 30332f30312f3039 | 30         | 
2009-03-01T00:00:00 |",
+            
"+----+----------+-------------+--------------+---------+------------+-----------+------------+------------------+------------+---------------------+",
+        ],
+        &results);
 
     // Reset the current directory
     std::env::set_current_dir(cur_dir)?;
diff --git a/datafusion-examples/examples/to_date.rs 
b/datafusion-examples/examples/to_date.rs
deleted file mode 100644
index 99ee555ffc..0000000000
--- a/datafusion-examples/examples/to_date.rs
+++ /dev/null
@@ -1,60 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use std::sync::Arc;
-
-use datafusion::arrow::array::StringArray;
-use datafusion::arrow::datatypes::{DataType, Field, Schema};
-use datafusion::arrow::record_batch::RecordBatch;
-use datafusion::error::Result;
-use datafusion::prelude::*;
-
-/// This example demonstrates how to use the to_date series
-/// of functions in the DataFrame API as well as via sql.
-#[tokio::main]
-async fn main() -> Result<()> {
-    // define a schema.
-    let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Utf8, 
false)]));
-
-    // define data.
-    let batch = RecordBatch::try_new(
-        schema,
-        vec![Arc::new(StringArray::from(vec![
-            "2020-09-08T13:42:29Z",
-            "2020-09-08T13:42:29.190855-05:00",
-            "2020-08-09 12:13:29",
-            "2020-01-02",
-        ]))],
-    )?;
-
-    // declare a new context. In spark API, this corresponds to a new spark 
SQLsession
-    let ctx = SessionContext::new();
-
-    // declare a table in memory. In spark API, this corresponds to 
createDataFrame(...).
-    ctx.register_batch("t", batch)?;
-    let df = ctx.table("t").await?;
-
-    // use to_date function to convert col 'a' to timestamp type using the 
default parsing
-    let df = df.with_column("a", to_date(vec![col("a")]))?;
-
-    let df = df.select_columns(&["a"])?;
-
-    // print the results
-    df.show().await?;
-
-    Ok(())
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to