This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new b276d47991 Add test for DataFrame::write_table (#8531)
b276d47991 is described below

commit b276d479918400105017db1f7f46dcb67b52206d
Author: Devin D'Angelo <[email protected]>
AuthorDate: Thu Dec 14 19:32:46 2023 -0500

    Add test for DataFrame::write_table (#8531)
    
    * add test for DataFrame::write_table
    
    * remove duplicate let df=...
    
    * remove println!
---
 .../src/datasource/physical_plan/parquet/mod.rs    | 95 +++++++++++++++++++++-
 1 file changed, 92 insertions(+), 3 deletions(-)

diff --git a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs 
b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
index 641b7bbb15..847ea65056 100644
--- a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
@@ -752,7 +752,7 @@ mod tests {
     use crate::datasource::file_format::options::CsvReadOptions;
     use crate::datasource::file_format::parquet::test_util::store_parquet;
     use crate::datasource::file_format::test_util::scan_format;
-    use crate::datasource::listing::{FileRange, PartitionedFile};
+    use crate::datasource::listing::{FileRange, ListingOptions, 
PartitionedFile};
     use crate::datasource::object_store::ObjectStoreUrl;
     use crate::execution::context::SessionState;
     use crate::physical_plan::displayable;
@@ -772,8 +772,8 @@ mod tests {
     };
     use arrow_array::Date64Array;
     use chrono::{TimeZone, Utc};
-    use datafusion_common::ScalarValue;
     use datafusion_common::{assert_contains, ToDFSchema};
+    use datafusion_common::{FileType, GetExt, ScalarValue};
     use datafusion_expr::{col, lit, when, Expr};
     use datafusion_physical_expr::create_physical_expr;
     use datafusion_physical_expr::execution_props::ExecutionProps;
@@ -1941,6 +1941,96 @@ mod tests {
         Ok(schema)
     }
 
+    #[tokio::test]
+    async fn write_table_results() -> Result<()> {
+        // create partitioned input file and context
+        let tmp_dir = TempDir::new()?;
+        // let mut ctx = create_ctx(&tmp_dir, 4).await?;
+        let ctx = SessionContext::new_with_config(
+            SessionConfig::new().with_target_partitions(8),
+        );
+        let schema = populate_csv_partitions(&tmp_dir, 4, ".csv")?;
+        // register csv file with the execution context
+        ctx.register_csv(
+            "test",
+            tmp_dir.path().to_str().unwrap(),
+            CsvReadOptions::new().schema(&schema),
+        )
+        .await?;
+
+        // register a local file system object store for /tmp directory
+        let local = Arc::new(LocalFileSystem::new_with_prefix(&tmp_dir)?);
+        let local_url = Url::parse("file://local").unwrap();
+        ctx.runtime_env().register_object_store(&local_url, local);
+
+        // Configure listing options
+        let file_format = 
ParquetFormat::default().with_enable_pruning(Some(true));
+        let listing_options = ListingOptions::new(Arc::new(file_format))
+            .with_file_extension(FileType::PARQUET.get_ext());
+
+        // execute a simple query and write the results to parquet
+        let out_dir = tmp_dir.as_ref().to_str().unwrap().to_string() + "/out";
+        std::fs::create_dir(&out_dir).unwrap();
+        let df = ctx.sql("SELECT c1, c2 FROM test").await?;
+        let schema: Schema = df.schema().into();
+        // Register a listing table - this will use all files in the directory 
as data sources
+        // for the query
+        ctx.register_listing_table(
+            "my_table",
+            &out_dir,
+            listing_options,
+            Some(Arc::new(schema)),
+            None,
+        )
+        .await
+        .unwrap();
+        df.write_table("my_table", DataFrameWriteOptions::new())
+            .await?;
+
+        // create a new context and verify that the results were saved to a 
partitioned parquet file
+        let ctx = SessionContext::new();
+
+        // get write_id
+        let mut paths = fs::read_dir(&out_dir).unwrap();
+        let path = paths.next();
+        let name = path
+            .unwrap()?
+            .path()
+            .file_name()
+            .expect("Should be a file name")
+            .to_str()
+            .expect("Should be a str")
+            .to_owned();
+        let (parsed_id, _) = name.split_once('_').expect("File should contain 
_ !");
+        let write_id = parsed_id.to_owned();
+
+        // register each partition as well as the top level dir
+        ctx.register_parquet(
+            "part0",
+            &format!("{out_dir}/{write_id}_0.parquet"),
+            ParquetReadOptions::default(),
+        )
+        .await?;
+
+        ctx.register_parquet("allparts", &out_dir, 
ParquetReadOptions::default())
+            .await?;
+
+        let part0 = ctx.sql("SELECT c1, c2 FROM 
part0").await?.collect().await?;
+        let allparts = ctx
+            .sql("SELECT c1, c2 FROM allparts")
+            .await?
+            .collect()
+            .await?;
+
+        let allparts_count: usize = allparts.iter().map(|batch| 
batch.num_rows()).sum();
+
+        assert_eq!(part0[0].schema(), allparts[0].schema());
+
+        assert_eq!(allparts_count, 40);
+
+        Ok(())
+    }
+
     #[tokio::test]
     async fn write_parquet_results() -> Result<()> {
         // create partitioned input file and context
@@ -1985,7 +2075,6 @@ mod tests {
             .to_str()
             .expect("Should be a str")
             .to_owned();
-        println!("{name}");
         let (parsed_id, _) = name.split_once('_').expect("File should contain 
_ !");
         let write_id = parsed_id.to_owned();
 

Reply via email to