This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new b276d47991 Add test for DataFrame::write_table (#8531)
b276d47991 is described below
commit b276d479918400105017db1f7f46dcb67b52206d
Author: Devin D'Angelo <[email protected]>
AuthorDate: Thu Dec 14 19:32:46 2023 -0500
Add test for DataFrame::write_table (#8531)
* add test for DataFrame::write_table
* remove duplicate let df=...
* remove println!
---
.../src/datasource/physical_plan/parquet/mod.rs | 95 +++++++++++++++++++++-
1 file changed, 92 insertions(+), 3 deletions(-)
diff --git a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
index 641b7bbb15..847ea65056 100644
--- a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
@@ -752,7 +752,7 @@ mod tests {
use crate::datasource::file_format::options::CsvReadOptions;
use crate::datasource::file_format::parquet::test_util::store_parquet;
use crate::datasource::file_format::test_util::scan_format;
- use crate::datasource::listing::{FileRange, PartitionedFile};
+ use crate::datasource::listing::{FileRange, ListingOptions,
PartitionedFile};
use crate::datasource::object_store::ObjectStoreUrl;
use crate::execution::context::SessionState;
use crate::physical_plan::displayable;
@@ -772,8 +772,8 @@ mod tests {
};
use arrow_array::Date64Array;
use chrono::{TimeZone, Utc};
- use datafusion_common::ScalarValue;
use datafusion_common::{assert_contains, ToDFSchema};
+ use datafusion_common::{FileType, GetExt, ScalarValue};
use datafusion_expr::{col, lit, when, Expr};
use datafusion_physical_expr::create_physical_expr;
use datafusion_physical_expr::execution_props::ExecutionProps;
@@ -1941,6 +1941,96 @@ mod tests {
Ok(schema)
}
+ #[tokio::test]
+ async fn write_table_results() -> Result<()> {
+ // create partitioned input file and context
+ let tmp_dir = TempDir::new()?;
+ // let mut ctx = create_ctx(&tmp_dir, 4).await?;
+ let ctx = SessionContext::new_with_config(
+ SessionConfig::new().with_target_partitions(8),
+ );
+ let schema = populate_csv_partitions(&tmp_dir, 4, ".csv")?;
+ // register csv file with the execution context
+ ctx.register_csv(
+ "test",
+ tmp_dir.path().to_str().unwrap(),
+ CsvReadOptions::new().schema(&schema),
+ )
+ .await?;
+
+ // register a local file system object store for /tmp directory
+ let local = Arc::new(LocalFileSystem::new_with_prefix(&tmp_dir)?);
+ let local_url = Url::parse("file://local").unwrap();
+ ctx.runtime_env().register_object_store(&local_url, local);
+
+ // Configure listing options
+ let file_format =
ParquetFormat::default().with_enable_pruning(Some(true));
+ let listing_options = ListingOptions::new(Arc::new(file_format))
+ .with_file_extension(FileType::PARQUET.get_ext());
+
+ // execute a simple query and write the results to parquet
+ let out_dir = tmp_dir.as_ref().to_str().unwrap().to_string() + "/out";
+ std::fs::create_dir(&out_dir).unwrap();
+ let df = ctx.sql("SELECT c1, c2 FROM test").await?;
+ let schema: Schema = df.schema().into();
+ // Register a listing table - this will use all files in the directory
as data sources
+ // for the query
+ ctx.register_listing_table(
+ "my_table",
+ &out_dir,
+ listing_options,
+ Some(Arc::new(schema)),
+ None,
+ )
+ .await
+ .unwrap();
+ df.write_table("my_table", DataFrameWriteOptions::new())
+ .await?;
+
+ // create a new context and verify that the results were saved to a
partitioned parquet file
+ let ctx = SessionContext::new();
+
+ // get write_id
+ let mut paths = fs::read_dir(&out_dir).unwrap();
+ let path = paths.next();
+ let name = path
+ .unwrap()?
+ .path()
+ .file_name()
+ .expect("Should be a file name")
+ .to_str()
+ .expect("Should be a str")
+ .to_owned();
+ let (parsed_id, _) = name.split_once('_').expect("File should contain
_ !");
+ let write_id = parsed_id.to_owned();
+
+ // register each partition as well as the top level dir
+ ctx.register_parquet(
+ "part0",
+ &format!("{out_dir}/{write_id}_0.parquet"),
+ ParquetReadOptions::default(),
+ )
+ .await?;
+
+ ctx.register_parquet("allparts", &out_dir,
ParquetReadOptions::default())
+ .await?;
+
+ let part0 = ctx.sql("SELECT c1, c2 FROM
part0").await?.collect().await?;
+ let allparts = ctx
+ .sql("SELECT c1, c2 FROM allparts")
+ .await?
+ .collect()
+ .await?;
+
+ let allparts_count: usize = allparts.iter().map(|batch|
batch.num_rows()).sum();
+
+ assert_eq!(part0[0].schema(), allparts[0].schema());
+
+ assert_eq!(allparts_count, 40);
+
+ Ok(())
+ }
+
#[tokio::test]
async fn write_parquet_results() -> Result<()> {
// create partitioned input file and context
@@ -1985,7 +2075,6 @@ mod tests {
.to_str()
.expect("Should be a str")
.to_owned();
- println!("{name}");
let (parsed_id, _) = name.split_once('_').expect("File should contain
_ !");
let write_id = parsed_id.to_owned();