This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 0cd0bbeee9 Consolidate example to_date.rs into dateframe.rs (#13939)
0cd0bbeee9 is described below
commit 0cd0bbeee9946cae8679e35b06ac87b9c153611d
Author: Andrew Lamb <[email protected]>
AuthorDate: Mon Dec 30 06:41:04 2024 -0500
Consolidate example to_date.rs into dateframe.rs (#13939)
* Consolidate example to_date.rs into dateframe.rs
* Assert results using assert_batches_eq
* clippy
---
datafusion-examples/examples/dataframe.rs | 52 ++++++++++++++++++++++++---
datafusion-examples/examples/sql_query.rs | 57 +++++++++++++++++++++--------
datafusion-examples/examples/to_date.rs | 60 -------------------------------
3 files changed, 91 insertions(+), 78 deletions(-)
diff --git a/datafusion-examples/examples/dataframe.rs
b/datafusion-examples/examples/dataframe.rs
index 5d5414e3d8..90d7d778ea 100644
--- a/datafusion-examples/examples/dataframe.rs
+++ b/datafusion-examples/examples/dataframe.rs
@@ -28,16 +28,24 @@ use std::io::Write;
use std::sync::Arc;
use tempfile::tempdir;
-/// This example demonstrates using DataFusion's DataFrame API to
+/// This example demonstrates using DataFusion's DataFrame API
+///
+/// # Reading from different formats
///
/// * [read_parquet]: execute queries against parquet files
/// * [read_csv]: execute queries against csv files
/// * [read_memory]: execute queries against in-memory arrow data
///
-/// This example demonstrates the various methods to write out a DataFrame to
local storage.
-/// See datafusion-examples/examples/external_dependency/dataframe-to-s3.rs
for an example
-/// using a remote object store.
+/// # Writing out to local storage
+///
+/// The following examples demonstrate how to write a DataFrame to local
+/// storage. See `external_dependency/dataframe-to-s3.rs` for an example
writing
+/// to a remote object store.
+///
/// * [write_out]: write out a DataFrame to a table, parquet file, csv file,
or json file
+///
+/// # Querying data
+/// * [query_to_date]: execute queries against parquet files
#[tokio::main]
async fn main() -> Result<()> {
// The SessionContext is the main high level API for interacting with
DataFusion
@@ -46,6 +54,7 @@ async fn main() -> Result<()> {
read_csv(&ctx).await?;
read_memory(&ctx).await?;
write_out(&ctx).await?;
+ query_to_date().await?;
Ok(())
}
@@ -206,3 +215,38 @@ async fn write_out(ctx: &SessionContext) ->
std::result::Result<(), DataFusionEr
Ok(())
}
+
+/// This example demonstrates how to use the to_date series
+/// of functions in the DataFrame API as well as via sql.
+async fn query_to_date() -> Result<()> {
+ // define a schema.
+ let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Utf8,
false)]));
+
+ // define data.
+ let batch = RecordBatch::try_new(
+ schema,
+ vec![Arc::new(StringArray::from(vec![
+ "2020-09-08T13:42:29Z",
+ "2020-09-08T13:42:29.190855-05:00",
+ "2020-08-09 12:13:29",
+ "2020-01-02",
+ ]))],
+ )?;
+
+ // declare a new context. In spark API, this corresponds to a new spark
SQLsession
+ let ctx = SessionContext::new();
+
+ // declare a table in memory. In spark API, this corresponds to
createDataFrame(...).
+ ctx.register_batch("t", batch)?;
+ let df = ctx.table("t").await?;
+
+ // use to_date function to convert col 'a' to timestamp type using the
default parsing
+ let df = df.with_column("a", to_date(vec![col("a")]))?;
+
+ let df = df.select_columns(&["a"])?;
+
+ // print the results
+ df.show().await?;
+
+ Ok(())
+}
diff --git a/datafusion-examples/examples/sql_query.rs
b/datafusion-examples/examples/sql_query.rs
index f6d3936568..a6e7fe91dd 100644
--- a/datafusion-examples/examples/sql_query.rs
+++ b/datafusion-examples/examples/sql_query.rs
@@ -23,12 +23,10 @@ use datafusion::datasource::listing::ListingOptions;
use datafusion::datasource::MemTable;
use datafusion::error::{DataFusionError, Result};
use datafusion::prelude::SessionContext;
-use datafusion_common::exec_datafusion_err;
+use datafusion_common::{assert_batches_eq, exec_datafusion_err};
use object_store::local::LocalFileSystem;
use std::path::Path;
use std::sync::Arc;
-use std::time::Duration;
-use tokio::time::timeout;
/// Examples of various ways to execute queries using SQL
///
@@ -52,17 +50,30 @@ pub async fn query_memtable() -> Result<()> {
// Register the in-memory table containing the data
ctx.register_table("users", Arc::new(mem_table))?;
+ // running a SQL query results in a "DataFrame", which can be used
+ // to execute the query and collect the results
let dataframe = ctx.sql("SELECT * FROM users;").await?;
- timeout(Duration::from_secs(10), async move {
- let result = dataframe.collect().await.unwrap();
- let record_batch = result.first().unwrap();
-
- assert_eq!(1, record_batch.column(0).len());
- dbg!(record_batch.columns());
- })
- .await
- .unwrap();
+ // Calling 'show' on the dataframe will execute the query and
+ // print the results
+ dataframe.clone().show().await?;
+
+ // calling 'collect' on the dataframe will execute the query and
+ // buffer the results into a vector of RecordBatch. There are other
+ // APIs on DataFrame for incrementally generating results (e.g. streaming)
+ let result = dataframe.collect().await?;
+
+ // Use the assert_batches_eq macro to compare the results
+ assert_batches_eq!(
+ [
+ "+----+--------------+",
+ "| id | bank_account |",
+ "+----+--------------+",
+ "| 1 | 9000 |",
+ "+----+--------------+",
+ ],
+ &result
+ );
Ok(())
}
@@ -133,7 +144,16 @@ async fn query_parquet() -> Result<()> {
.await?;
// print the results
- df.show().await?;
+ let results = df.collect().await?;
+ assert_batches_eq!(
+ [
+
"+----+----------+-------------+--------------+---------+------------+-----------+------------+------------------+------------+---------------------+",
+ "| id | bool_col | tinyint_col | smallint_col | int_col |
bigint_col | float_col | double_col | date_string_col | string_col |
timestamp_col |",
+
"+----+----------+-------------+--------------+---------+------------+-----------+------------+------------------+------------+---------------------+",
+ "| 4 | true | 0 | 0 | 0 | 0
| 0.0 | 0.0 | 30332f30312f3039 | 30 |
2009-03-01T00:00:00 |",
+
"+----+----------+-------------+--------------+---------+------------+-----------+------------+------------------+------------+---------------------+",
+ ],
+ &results);
// Second example were we temporarily move into the test data's parent
directory and
// simulate a relative path, this requires registering an ObjectStore.
@@ -173,7 +193,16 @@ async fn query_parquet() -> Result<()> {
.await?;
// print the results
- df.show().await?;
+ let results = df.collect().await?;
+ assert_batches_eq!(
+ [
+
"+----+----------+-------------+--------------+---------+------------+-----------+------------+------------------+------------+---------------------+",
+ "| id | bool_col | tinyint_col | smallint_col | int_col |
bigint_col | float_col | double_col | date_string_col | string_col |
timestamp_col |",
+
"+----+----------+-------------+--------------+---------+------------+-----------+------------+------------------+------------+---------------------+",
+ "| 4 | true | 0 | 0 | 0 | 0
| 0.0 | 0.0 | 30332f30312f3039 | 30 |
2009-03-01T00:00:00 |",
+
"+----+----------+-------------+--------------+---------+------------+-----------+------------+------------------+------------+---------------------+",
+ ],
+ &results);
// Reset the current directory
std::env::set_current_dir(cur_dir)?;
diff --git a/datafusion-examples/examples/to_date.rs
b/datafusion-examples/examples/to_date.rs
deleted file mode 100644
index 99ee555ffc..0000000000
--- a/datafusion-examples/examples/to_date.rs
+++ /dev/null
@@ -1,60 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use std::sync::Arc;
-
-use datafusion::arrow::array::StringArray;
-use datafusion::arrow::datatypes::{DataType, Field, Schema};
-use datafusion::arrow::record_batch::RecordBatch;
-use datafusion::error::Result;
-use datafusion::prelude::*;
-
-/// This example demonstrates how to use the to_date series
-/// of functions in the DataFrame API as well as via sql.
-#[tokio::main]
-async fn main() -> Result<()> {
- // define a schema.
- let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Utf8,
false)]));
-
- // define data.
- let batch = RecordBatch::try_new(
- schema,
- vec![Arc::new(StringArray::from(vec![
- "2020-09-08T13:42:29Z",
- "2020-09-08T13:42:29.190855-05:00",
- "2020-08-09 12:13:29",
- "2020-01-02",
- ]))],
- )?;
-
- // declare a new context. In spark API, this corresponds to a new spark
SQLsession
- let ctx = SessionContext::new();
-
- // declare a table in memory. In spark API, this corresponds to
createDataFrame(...).
- ctx.register_batch("t", batch)?;
- let df = ctx.table("t").await?;
-
- // use to_date function to convert col 'a' to timestamp type using the
default parsing
- let df = df.with_column("a", to_date(vec![col("a")]))?;
-
- let df = df.select_columns(&["a"])?;
-
- // print the results
- df.show().await?;
-
- Ok(())
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]