This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 2b15ad16f3 consolidate dataframe_subquery.rs into dataframe.rs (#13950)
2b15ad16f3 is described below

commit 2b15ad16f3516f29d6540c974170591a6c085478
Author: zjregee <[email protected]>
AuthorDate: Tue Dec 31 22:22:10 2024 +0800

    consolidate dataframe_subquery.rs into dataframe.rs (#13950)
---
 datafusion-examples/README.md                      |   2 +-
 datafusion-examples/examples/dataframe.rs          |  91 ++++++++++++++++
 datafusion-examples/examples/dataframe_subquery.rs | 118 ---------------------
 3 files changed, 92 insertions(+), 119 deletions(-)

diff --git a/datafusion-examples/README.md b/datafusion-examples/README.md
index 3ec008a602..23cf8830e3 100644
--- a/datafusion-examples/README.md
+++ b/datafusion-examples/README.md
@@ -57,7 +57,7 @@ cargo run --example dataframe
 - [`custom_datasource.rs`](examples/custom_datasource.rs): Run queries against 
a custom datasource (TableProvider)
 - [`custom_file_format.rs`](examples/custom_file_format.rs): Write data to a 
custom file format
 - [`dataframe-to-s3.rs`](examples/external_dependency/dataframe-to-s3.rs): Run 
a query using a DataFrame against a parquet file from s3 and writing back to s3
-- [`dataframe.rs`](examples/dataframe.rs): Run a query using a DataFrame API 
against parquet files, csv files, and in-memory data. Also demonstrates the 
various methods to write out a DataFrame to a table, parquet file, csv file, 
and json file.
+- [`dataframe.rs`](examples/dataframe.rs): Run a query using a DataFrame API 
against parquet files, csv files, and in-memory data, including multiple 
subqueries. Also demonstrates the various methods to write out a DataFrame to a 
table, parquet file, csv file, and json file.
 - [`deserialize_to_struct.rs`](examples/deserialize_to_struct.rs): Convert 
query results into rust structs using serde
 - [`expr_api.rs`](examples/expr_api.rs): Create, execute, simplify, analyze 
and coerce `Expr`s
 - [`file_stream_provider.rs`](examples/file_stream_provider.rs): Run a query 
on `FileStreamProvider` which implements `StreamProvider` for reading and 
writing to arbitrary stream sources / sinks.
diff --git a/datafusion-examples/examples/dataframe.rs 
b/datafusion-examples/examples/dataframe.rs
index 90d7d778ea..91d62135b9 100644
--- a/datafusion-examples/examples/dataframe.rs
+++ b/datafusion-examples/examples/dataframe.rs
@@ -19,10 +19,13 @@ use arrow::array::{ArrayRef, Int32Array, RecordBatch, 
StringArray};
 use datafusion::arrow::datatypes::{DataType, Field, Schema};
 use datafusion::dataframe::DataFrameWriteOptions;
 use datafusion::error::Result;
+use datafusion::functions_aggregate::average::avg;
+use datafusion::functions_aggregate::min_max::max;
 use datafusion::prelude::*;
 use datafusion_common::config::CsvOptions;
 use datafusion_common::parsers::CompressionTypeVariant;
 use datafusion_common::DataFusionError;
+use datafusion_common::ScalarValue;
 use std::fs::File;
 use std::io::Write;
 use std::sync::Arc;
@@ -44,7 +47,14 @@ use tempfile::tempdir;
 ///
 /// * [write_out]: write out a DataFrame to a table, parquet file, csv file, 
or json file
 ///
+/// # Executing subqueries
+///
+/// * [where_scalar_subquery]: execute a scalar subquery
+/// * [where_in_subquery]: execute a subquery with an IN clause
+/// * [where_exist_subquery]: execute a subquery with an EXISTS clause
+///
 /// # Querying data
+///
 /// * [query_to_date]: execute queries against parquet files
 #[tokio::main]
 async fn main() -> Result<()> {
@@ -55,6 +65,11 @@ async fn main() -> Result<()> {
     read_memory(&ctx).await?;
     write_out(&ctx).await?;
     query_to_date().await?;
+    register_aggregate_test_data("t1", &ctx).await?;
+    register_aggregate_test_data("t2", &ctx).await?;
+    where_scalar_subquery(&ctx).await?;
+    where_in_subquery(&ctx).await?;
+    where_exist_subquery(&ctx).await?;
     Ok(())
 }
 
@@ -250,3 +265,79 @@ async fn query_to_date() -> Result<()> {
 
     Ok(())
 }
+
+/// Use the DataFrame API to execute the following subquery:
+/// select c1,c2 from t1 where (select avg(t2.c2) from t2 where t1.c1 = 
t2.c1)>0 limit 3;
+async fn where_scalar_subquery(ctx: &SessionContext) -> Result<()> {
+    ctx.table("t1")
+        .await?
+        .filter(
+            scalar_subquery(Arc::new(
+                ctx.table("t2")
+                    .await?
+                    .filter(out_ref_col(DataType::Utf8, 
"t1.c1").eq(col("t2.c1")))?
+                    .aggregate(vec![], vec![avg(col("t2.c2"))])?
+                    .select(vec![avg(col("t2.c2"))])?
+                    .into_unoptimized_plan(),
+            ))
+            .gt(lit(0u8)),
+        )?
+        .select(vec![col("t1.c1"), col("t1.c2")])?
+        .limit(0, Some(3))?
+        .show()
+        .await?;
+    Ok(())
+}
+
+/// Use the DataFrame API to execute the following subquery:
+/// select t1.c1, t1.c2 from t1 where t1.c2 in (select max(t2.c2) from t2 
where t2.c1 > 0 ) limit 3;
+async fn where_in_subquery(ctx: &SessionContext) -> Result<()> {
+    ctx.table("t1")
+        .await?
+        .filter(in_subquery(
+            col("t1.c2"),
+            Arc::new(
+                ctx.table("t2")
+                    .await?
+                    .filter(col("t2.c1").gt(lit(ScalarValue::UInt8(Some(0)))))?
+                    .aggregate(vec![], vec![max(col("t2.c2"))])?
+                    .select(vec![max(col("t2.c2"))])?
+                    .into_unoptimized_plan(),
+            ),
+        ))?
+        .select(vec![col("t1.c1"), col("t1.c2")])?
+        .limit(0, Some(3))?
+        .show()
+        .await?;
+    Ok(())
+}
+
+/// Use the DataFrame API to execute the following subquery:
+/// select t1.c1, t1.c2 from t1 where exists (select t2.c2 from t2 where t1.c1 
= t2.c1) limit 3;
+async fn where_exist_subquery(ctx: &SessionContext) -> Result<()> {
+    ctx.table("t1")
+        .await?
+        .filter(exists(Arc::new(
+            ctx.table("t2")
+                .await?
+                .filter(out_ref_col(DataType::Utf8, "t1.c1").eq(col("t2.c1")))?
+                .select(vec![col("t2.c2")])?
+                .into_unoptimized_plan(),
+        )))?
+        .select(vec![col("t1.c1"), col("t1.c2")])?
+        .limit(0, Some(3))?
+        .show()
+        .await?;
+    Ok(())
+}
+
+async fn register_aggregate_test_data(name: &str, ctx: &SessionContext) -> 
Result<()> {
+    let testdata = datafusion::test_util::arrow_test_data();
+    ctx.register_csv(
+        name,
+        &format!("{testdata}/csv/aggregate_test_100.csv"),
+        CsvReadOptions::default(),
+    )
+    .await?;
+    Ok(())
+}
diff --git a/datafusion-examples/examples/dataframe_subquery.rs 
b/datafusion-examples/examples/dataframe_subquery.rs
deleted file mode 100644
index 3e3d0c1b5a..0000000000
--- a/datafusion-examples/examples/dataframe_subquery.rs
+++ /dev/null
@@ -1,118 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use arrow_schema::DataType;
-use std::sync::Arc;
-
-use datafusion::error::Result;
-use datafusion::functions_aggregate::average::avg;
-use datafusion::functions_aggregate::min_max::max;
-use datafusion::prelude::*;
-use datafusion::test_util::arrow_test_data;
-use datafusion_common::ScalarValue;
-
-/// This example demonstrates how to use the DataFrame API to create a 
subquery.
-#[tokio::main]
-async fn main() -> Result<()> {
-    let ctx = SessionContext::new();
-    register_aggregate_test_data("t1", &ctx).await?;
-    register_aggregate_test_data("t2", &ctx).await?;
-
-    where_scalar_subquery(&ctx).await?;
-
-    where_in_subquery(&ctx).await?;
-
-    where_exist_subquery(&ctx).await?;
-
-    Ok(())
-}
-
-//select c1,c2 from t1 where (select avg(t2.c2) from t2 where t1.c1 = t2.c1)>0 
limit 3;
-async fn where_scalar_subquery(ctx: &SessionContext) -> Result<()> {
-    ctx.table("t1")
-        .await?
-        .filter(
-            scalar_subquery(Arc::new(
-                ctx.table("t2")
-                    .await?
-                    .filter(out_ref_col(DataType::Utf8, 
"t1.c1").eq(col("t2.c1")))?
-                    .aggregate(vec![], vec![avg(col("t2.c2"))])?
-                    .select(vec![avg(col("t2.c2"))])?
-                    .into_unoptimized_plan(),
-            ))
-            .gt(lit(0u8)),
-        )?
-        .select(vec![col("t1.c1"), col("t1.c2")])?
-        .limit(0, Some(3))?
-        .show()
-        .await?;
-    Ok(())
-}
-
-//SELECT t1.c1, t1.c2 FROM t1 WHERE t1.c2 in (select max(t2.c2) from t2 where 
t2.c1 > 0 ) limit 3;
-async fn where_in_subquery(ctx: &SessionContext) -> Result<()> {
-    ctx.table("t1")
-        .await?
-        .filter(in_subquery(
-            col("t1.c2"),
-            Arc::new(
-                ctx.table("t2")
-                    .await?
-                    .filter(col("t2.c1").gt(lit(ScalarValue::UInt8(Some(0)))))?
-                    .aggregate(vec![], vec![max(col("t2.c2"))])?
-                    .select(vec![max(col("t2.c2"))])?
-                    .into_unoptimized_plan(),
-            ),
-        ))?
-        .select(vec![col("t1.c1"), col("t1.c2")])?
-        .limit(0, Some(3))?
-        .show()
-        .await?;
-    Ok(())
-}
-
-//SELECT t1.c1, t1.c2 FROM t1 WHERE EXISTS (select t2.c2 from t2 where t1.c1 = 
t2.c1) limit 3;
-async fn where_exist_subquery(ctx: &SessionContext) -> Result<()> {
-    ctx.table("t1")
-        .await?
-        .filter(exists(Arc::new(
-            ctx.table("t2")
-                .await?
-                .filter(out_ref_col(DataType::Utf8, "t1.c1").eq(col("t2.c1")))?
-                .select(vec![col("t2.c2")])?
-                .into_unoptimized_plan(),
-        )))?
-        .select(vec![col("t1.c1"), col("t1.c2")])?
-        .limit(0, Some(3))?
-        .show()
-        .await?;
-    Ok(())
-}
-
-pub async fn register_aggregate_test_data(
-    name: &str,
-    ctx: &SessionContext,
-) -> Result<()> {
-    let testdata = arrow_test_data();
-    ctx.register_csv(
-        name,
-        &format!("{testdata}/csv/aggregate_test_100.csv"),
-        CsvReadOptions::default(),
-    )
-    .await?;
-    Ok(())
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to