This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 0b9b30a Add write_csv to DataFrame (#1922)
0b9b30a is described below
commit 0b9b30a3f03f65ba64392ca6a29d3ec48753bff8
Author: Matthew Turner <[email protected]>
AuthorDate: Sun Mar 6 06:15:25 2022 -0500
Add write_csv to DataFrame (#1922)
* Add write_csv to DataFrame
* Cleanup
* Update write_csv signature
---
datafusion/src/dataframe.rs | 3 +
datafusion/src/execution/context.rs | 35 +-----
datafusion/src/execution/dataframe_impl.rs | 8 ++
datafusion/src/physical_plan/file_format/csv.rs | 136 +++++++++++++++++++++++-
datafusion/src/physical_plan/file_format/mod.rs | 1 +
5 files changed, 148 insertions(+), 35 deletions(-)
diff --git a/datafusion/src/dataframe.rs b/datafusion/src/dataframe.rs
index c8c5dcc..dfbbc61 100644
--- a/datafusion/src/dataframe.rs
+++ b/datafusion/src/dataframe.rs
@@ -405,4 +405,7 @@ pub trait DataFrame: Send + Sync {
/// # }
/// ```
fn except(&self, dataframe: Arc<dyn DataFrame>) -> Result<Arc<dyn
DataFrame>>;
+
+ /// Write a `DataFrame` to a CSV file.
+ async fn write_csv(&self, path: &str) -> Result<()>;
}
diff --git a/datafusion/src/execution/context.rs
b/datafusion/src/execution/context.rs
index 3017660..a554a7c 100644
--- a/datafusion/src/execution/context.rs
+++ b/datafusion/src/execution/context.rs
@@ -49,7 +49,7 @@ use std::{fs, path::PathBuf};
use futures::{StreamExt, TryStreamExt};
use tokio::task::{self, JoinHandle};
-use arrow::{csv, datatypes::SchemaRef};
+use arrow::datatypes::SchemaRef;
use crate::catalog::{
catalog::{CatalogProvider, MemoryCatalogProvider},
@@ -80,6 +80,7 @@ use crate::physical_optimizer::repartition::Repartition;
use crate::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
use crate::logical_plan::plan::Explain;
+use crate::physical_plan::file_format::plan_to_csv;
use crate::physical_plan::planner::DefaultPhysicalPlanner;
use crate::physical_plan::udf::ScalarUDF;
use crate::physical_plan::ExecutionPlan;
@@ -717,37 +718,7 @@ impl ExecutionContext {
plan: Arc<dyn ExecutionPlan>,
path: impl AsRef<str>,
) -> Result<()> {
- let path = path.as_ref();
- // create directory to contain the CSV files (one per partition)
- let fs_path = Path::new(path);
- let runtime = self.runtime_env();
- match fs::create_dir(fs_path) {
- Ok(()) => {
- let mut tasks = vec![];
- for i in 0..plan.output_partitioning().partition_count() {
- let plan = plan.clone();
- let filename = format!("part-{}.csv", i);
- let path = fs_path.join(&filename);
- let file = fs::File::create(path)?;
- let mut writer = csv::Writer::new(file);
- let stream = plan.execute(i, runtime.clone()).await?;
- let handle: JoinHandle<Result<()>> = task::spawn(async
move {
- stream
- .map(|batch| writer.write(&batch?))
- .try_collect()
- .await
- .map_err(DataFusionError::from)
- });
- tasks.push(handle);
- }
- futures::future::join_all(tasks).await;
- Ok(())
- }
- Err(e) => Err(DataFusionError::Execution(format!(
- "Could not create directory {}: {:?}",
- path, e
- ))),
- }
+ plan_to_csv(self, plan, path).await
}
/// Executes a query and writes the results to a partitioned Parquet file.
diff --git a/datafusion/src/execution/dataframe_impl.rs
b/datafusion/src/execution/dataframe_impl.rs
index 0e3cc61..c00729b 100644
--- a/datafusion/src/execution/dataframe_impl.rs
+++ b/datafusion/src/execution/dataframe_impl.rs
@@ -39,6 +39,7 @@ use crate::{
use crate::arrow::util::pretty;
use crate::datasource::TableProvider;
use crate::datasource::TableType;
+use crate::physical_plan::file_format::plan_to_csv;
use crate::physical_plan::{
execute_stream, execute_stream_partitioned, ExecutionPlan,
SendableRecordBatchStream,
};
@@ -313,6 +314,13 @@ impl DataFrame for DataFrameImpl {
&LogicalPlanBuilder::except(left_plan, right_plan, true)?,
)))
}
+
+ async fn write_csv(&self, path: &str) -> Result<()> {
+ let plan = self.create_physical_plan().await?;
+ let state = self.ctx_state.lock().clone();
+ let ctx = ExecutionContext::from(Arc::new(Mutex::new(state)));
+ plan_to_csv(&ctx, plan, path).await
+ }
}
#[cfg(test)]
diff --git a/datafusion/src/physical_plan/file_format/csv.rs
b/datafusion/src/physical_plan/file_format/csv.rs
index 709705b..d9f4706 100644
--- a/datafusion/src/physical_plan/file_format/csv.rs
+++ b/datafusion/src/physical_plan/file_format/csv.rs
@@ -18,18 +18,22 @@
//! Execution plan for reading CSV files
use crate::error::{DataFusionError, Result};
+use crate::execution::context::ExecutionContext;
use crate::physical_plan::expressions::PhysicalSortExpr;
use crate::physical_plan::{
DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream,
Statistics,
};
+use crate::execution::runtime_env::RuntimeEnv;
use arrow::csv;
use arrow::datatypes::SchemaRef;
+use async_trait::async_trait;
+use futures::{StreamExt, TryStreamExt};
use std::any::Any;
+use std::fs;
+use std::path::Path;
use std::sync::Arc;
-
-use crate::execution::runtime_env::RuntimeEnv;
-use async_trait::async_trait;
+use tokio::task::{self, JoinHandle};
use super::file_stream::{BatchIter, FileStream};
use super::FileScanConfig;
@@ -176,16 +180,59 @@ impl ExecutionPlan for CsvExec {
}
}
+pub async fn plan_to_csv(
+ context: &ExecutionContext,
+ plan: Arc<dyn ExecutionPlan>,
+ path: impl AsRef<str>,
+) -> Result<()> {
+ let path = path.as_ref();
+ // create directory to contain the CSV files (one per partition)
+ let fs_path = Path::new(path);
+ let runtime = context.runtime_env();
+ match fs::create_dir(fs_path) {
+ Ok(()) => {
+ let mut tasks = vec![];
+ for i in 0..plan.output_partitioning().partition_count() {
+ let plan = plan.clone();
+ let filename = format!("part-{}.csv", i);
+ let path = fs_path.join(&filename);
+ let file = fs::File::create(path)?;
+ let mut writer = csv::Writer::new(file);
+ let stream = plan.execute(i, runtime.clone()).await?;
+ let handle: JoinHandle<Result<()>> = task::spawn(async move {
+ stream
+ .map(|batch| writer.write(&batch?))
+ .try_collect()
+ .await
+ .map_err(DataFusionError::from)
+ });
+ tasks.push(handle);
+ }
+ futures::future::join_all(tasks).await;
+ Ok(())
+ }
+ Err(e) => Err(DataFusionError::Execution(format!(
+ "Could not create directory {}: {:?}",
+ path, e
+ ))),
+ }
+}
+
#[cfg(test)]
mod tests {
use super::*;
+ use crate::prelude::*;
use crate::test_util::aggr_test_schema_with_missing_col;
use crate::{
datasource::object_store::local::{local_unpartitioned_file,
LocalFileSystem},
scalar::ScalarValue,
test_util::aggr_test_schema,
};
+ use arrow::datatypes::*;
use futures::StreamExt;
+ use std::fs::File;
+ use std::io::Write;
+ use tempfile::TempDir;
#[tokio::test]
async fn csv_exec_with_projection() -> Result<()> {
@@ -376,4 +423,87 @@ mod tests {
crate::assert_batches_eq!(expected, &[batch.slice(0, 5)]);
Ok(())
}
+
+ /// Generate CSV partitions within the supplied directory
+ fn populate_csv_partitions(
+ tmp_dir: &TempDir,
+ partition_count: usize,
+ file_extension: &str,
+ ) -> Result<SchemaRef> {
+ // define schema for data source (csv file)
+ let schema = Arc::new(Schema::new(vec![
+ Field::new("c1", DataType::UInt32, false),
+ Field::new("c2", DataType::UInt64, false),
+ Field::new("c3", DataType::Boolean, false),
+ ]));
+
+ // generate a partitioned file
+ for partition in 0..partition_count {
+ let filename = format!("partition-{}.{}", partition,
file_extension);
+ let file_path = tmp_dir.path().join(&filename);
+ let mut file = File::create(file_path)?;
+
+ // generate some data
+ for i in 0..=10 {
+ let data = format!("{},{},{}\n", partition, i, i % 2 == 0);
+ file.write_all(data.as_bytes())?;
+ }
+ }
+
+ Ok(schema)
+ }
+
+ #[tokio::test]
+ async fn write_csv_results() -> Result<()> {
+ // create partitioned input file and context
+ let tmp_dir = TempDir::new()?;
+ let mut ctx = ExecutionContext::with_config(
+ ExecutionConfig::new().with_target_partitions(8),
+ );
+
+ let schema = populate_csv_partitions(&tmp_dir, 8, ".csv")?;
+
+ // register csv file with the execution context
+ ctx.register_csv(
+ "test",
+ tmp_dir.path().to_str().unwrap(),
+ CsvReadOptions::new().schema(&schema),
+ )
+ .await?;
+
+ // execute a simple query and write the results to CSV
+ let out_dir = tmp_dir.as_ref().to_str().unwrap().to_string() + "/out";
+ let df = ctx.sql("SELECT c1, c2 FROM test").await?;
+ df.write_csv(&out_dir).await?;
+
+ // create a new context and verify that the results were saved to a
partitioned csv file
+ let mut ctx = ExecutionContext::new();
+
+ let schema = Arc::new(Schema::new(vec![
+ Field::new("c1", DataType::UInt32, false),
+ Field::new("c2", DataType::UInt64, false),
+ ]));
+
+ // register each partition as well as the top level dir
+ let csv_read_option = CsvReadOptions::new().schema(&schema);
+ ctx.register_csv("part0", &format!("{}/part-0.csv", out_dir),
csv_read_option)
+ .await?;
+ ctx.register_csv("allparts", &out_dir, csv_read_option)
+ .await?;
+
+ let part0 = ctx.sql("SELECT c1, c2 FROM
part0").await?.collect().await?;
+ let allparts = ctx
+ .sql("SELECT c1, c2 FROM allparts")
+ .await?
+ .collect()
+ .await?;
+
+ let allparts_count: usize = allparts.iter().map(|batch|
batch.num_rows()).sum();
+
+ assert_eq!(part0[0].schema(), allparts[0].schema());
+
+ assert_eq!(allparts_count, 80);
+
+ Ok(())
+ }
}
diff --git a/datafusion/src/physical_plan/file_format/mod.rs
b/datafusion/src/physical_plan/file_format/mod.rs
index d15c44f..f38cf41 100644
--- a/datafusion/src/physical_plan/file_format/mod.rs
+++ b/datafusion/src/physical_plan/file_format/mod.rs
@@ -32,6 +32,7 @@ use arrow::{
record_batch::RecordBatch,
};
pub use avro::AvroExec;
+pub(crate) use csv::plan_to_csv;
pub use csv::CsvExec;
pub use json::NdJsonExec;