This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 0b9b30a  Add write_csv to DataFrame (#1922)
0b9b30a is described below

commit 0b9b30a3f03f65ba64392ca6a29d3ec48753bff8
Author: Matthew Turner <[email protected]>
AuthorDate: Sun Mar 6 06:15:25 2022 -0500

    Add write_csv to DataFrame (#1922)
    
    * Add write_csv to DataFrame
    
    * Cleanup
    
    * Update write_csv signature
---
 datafusion/src/dataframe.rs                     |   3 +
 datafusion/src/execution/context.rs             |  35 +-----
 datafusion/src/execution/dataframe_impl.rs      |   8 ++
 datafusion/src/physical_plan/file_format/csv.rs | 136 +++++++++++++++++++++++-
 datafusion/src/physical_plan/file_format/mod.rs |   1 +
 5 files changed, 148 insertions(+), 35 deletions(-)

diff --git a/datafusion/src/dataframe.rs b/datafusion/src/dataframe.rs
index c8c5dcc..dfbbc61 100644
--- a/datafusion/src/dataframe.rs
+++ b/datafusion/src/dataframe.rs
@@ -405,4 +405,7 @@ pub trait DataFrame: Send + Sync {
     /// # }
     /// ```
     fn except(&self, dataframe: Arc<dyn DataFrame>) -> Result<Arc<dyn 
DataFrame>>;
+
+    /// Write a `DataFrame` to a CSV file.
+    async fn write_csv(&self, path: &str) -> Result<()>;
 }
diff --git a/datafusion/src/execution/context.rs 
b/datafusion/src/execution/context.rs
index 3017660..a554a7c 100644
--- a/datafusion/src/execution/context.rs
+++ b/datafusion/src/execution/context.rs
@@ -49,7 +49,7 @@ use std::{fs, path::PathBuf};
 use futures::{StreamExt, TryStreamExt};
 use tokio::task::{self, JoinHandle};
 
-use arrow::{csv, datatypes::SchemaRef};
+use arrow::datatypes::SchemaRef;
 
 use crate::catalog::{
     catalog::{CatalogProvider, MemoryCatalogProvider},
@@ -80,6 +80,7 @@ use crate::physical_optimizer::repartition::Repartition;
 
 use crate::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
 use crate::logical_plan::plan::Explain;
+use crate::physical_plan::file_format::plan_to_csv;
 use crate::physical_plan::planner::DefaultPhysicalPlanner;
 use crate::physical_plan::udf::ScalarUDF;
 use crate::physical_plan::ExecutionPlan;
@@ -717,37 +718,7 @@ impl ExecutionContext {
         plan: Arc<dyn ExecutionPlan>,
         path: impl AsRef<str>,
     ) -> Result<()> {
-        let path = path.as_ref();
-        // create directory to contain the CSV files (one per partition)
-        let fs_path = Path::new(path);
-        let runtime = self.runtime_env();
-        match fs::create_dir(fs_path) {
-            Ok(()) => {
-                let mut tasks = vec![];
-                for i in 0..plan.output_partitioning().partition_count() {
-                    let plan = plan.clone();
-                    let filename = format!("part-{}.csv", i);
-                    let path = fs_path.join(&filename);
-                    let file = fs::File::create(path)?;
-                    let mut writer = csv::Writer::new(file);
-                    let stream = plan.execute(i, runtime.clone()).await?;
-                    let handle: JoinHandle<Result<()>> = task::spawn(async 
move {
-                        stream
-                            .map(|batch| writer.write(&batch?))
-                            .try_collect()
-                            .await
-                            .map_err(DataFusionError::from)
-                    });
-                    tasks.push(handle);
-                }
-                futures::future::join_all(tasks).await;
-                Ok(())
-            }
-            Err(e) => Err(DataFusionError::Execution(format!(
-                "Could not create directory {}: {:?}",
-                path, e
-            ))),
-        }
+        plan_to_csv(self, plan, path).await
     }
 
     /// Executes a query and writes the results to a partitioned Parquet file.
diff --git a/datafusion/src/execution/dataframe_impl.rs 
b/datafusion/src/execution/dataframe_impl.rs
index 0e3cc61..c00729b 100644
--- a/datafusion/src/execution/dataframe_impl.rs
+++ b/datafusion/src/execution/dataframe_impl.rs
@@ -39,6 +39,7 @@ use crate::{
 use crate::arrow::util::pretty;
 use crate::datasource::TableProvider;
 use crate::datasource::TableType;
+use crate::physical_plan::file_format::plan_to_csv;
 use crate::physical_plan::{
     execute_stream, execute_stream_partitioned, ExecutionPlan, 
SendableRecordBatchStream,
 };
@@ -313,6 +314,13 @@ impl DataFrame for DataFrameImpl {
             &LogicalPlanBuilder::except(left_plan, right_plan, true)?,
         )))
     }
+
+    async fn write_csv(&self, path: &str) -> Result<()> {
+        let plan = self.create_physical_plan().await?;
+        let state = self.ctx_state.lock().clone();
+        let ctx = ExecutionContext::from(Arc::new(Mutex::new(state)));
+        plan_to_csv(&ctx, plan, path).await
+    }
 }
 
 #[cfg(test)]
diff --git a/datafusion/src/physical_plan/file_format/csv.rs 
b/datafusion/src/physical_plan/file_format/csv.rs
index 709705b..d9f4706 100644
--- a/datafusion/src/physical_plan/file_format/csv.rs
+++ b/datafusion/src/physical_plan/file_format/csv.rs
@@ -18,18 +18,22 @@
 //! Execution plan for reading CSV files
 
 use crate::error::{DataFusionError, Result};
+use crate::execution::context::ExecutionContext;
 use crate::physical_plan::expressions::PhysicalSortExpr;
 use crate::physical_plan::{
     DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, 
Statistics,
 };
 
+use crate::execution::runtime_env::RuntimeEnv;
 use arrow::csv;
 use arrow::datatypes::SchemaRef;
+use async_trait::async_trait;
+use futures::{StreamExt, TryStreamExt};
 use std::any::Any;
+use std::fs;
+use std::path::Path;
 use std::sync::Arc;
-
-use crate::execution::runtime_env::RuntimeEnv;
-use async_trait::async_trait;
+use tokio::task::{self, JoinHandle};
 
 use super::file_stream::{BatchIter, FileStream};
 use super::FileScanConfig;
@@ -176,16 +180,59 @@ impl ExecutionPlan for CsvExec {
     }
 }
 
+pub async fn plan_to_csv(
+    context: &ExecutionContext,
+    plan: Arc<dyn ExecutionPlan>,
+    path: impl AsRef<str>,
+) -> Result<()> {
+    let path = path.as_ref();
+    // create directory to contain the CSV files (one per partition)
+    let fs_path = Path::new(path);
+    let runtime = context.runtime_env();
+    match fs::create_dir(fs_path) {
+        Ok(()) => {
+            let mut tasks = vec![];
+            for i in 0..plan.output_partitioning().partition_count() {
+                let plan = plan.clone();
+                let filename = format!("part-{}.csv", i);
+                let path = fs_path.join(&filename);
+                let file = fs::File::create(path)?;
+                let mut writer = csv::Writer::new(file);
+                let stream = plan.execute(i, runtime.clone()).await?;
+                let handle: JoinHandle<Result<()>> = task::spawn(async move {
+                    stream
+                        .map(|batch| writer.write(&batch?))
+                        .try_collect()
+                        .await
+                        .map_err(DataFusionError::from)
+                });
+                tasks.push(handle);
+            }
+            futures::future::join_all(tasks).await;
+            Ok(())
+        }
+        Err(e) => Err(DataFusionError::Execution(format!(
+            "Could not create directory {}: {:?}",
+            path, e
+        ))),
+    }
+}
+
 #[cfg(test)]
 mod tests {
     use super::*;
+    use crate::prelude::*;
     use crate::test_util::aggr_test_schema_with_missing_col;
     use crate::{
         datasource::object_store::local::{local_unpartitioned_file, 
LocalFileSystem},
         scalar::ScalarValue,
         test_util::aggr_test_schema,
     };
+    use arrow::datatypes::*;
     use futures::StreamExt;
+    use std::fs::File;
+    use std::io::Write;
+    use tempfile::TempDir;
 
     #[tokio::test]
     async fn csv_exec_with_projection() -> Result<()> {
@@ -376,4 +423,87 @@ mod tests {
         crate::assert_batches_eq!(expected, &[batch.slice(0, 5)]);
         Ok(())
     }
+
+    /// Generate CSV partitions within the supplied directory
+    fn populate_csv_partitions(
+        tmp_dir: &TempDir,
+        partition_count: usize,
+        file_extension: &str,
+    ) -> Result<SchemaRef> {
+        // define schema for data source (csv file)
+        let schema = Arc::new(Schema::new(vec![
+            Field::new("c1", DataType::UInt32, false),
+            Field::new("c2", DataType::UInt64, false),
+            Field::new("c3", DataType::Boolean, false),
+        ]));
+
+        // generate a partitioned file
+        for partition in 0..partition_count {
+            let filename = format!("partition-{}.{}", partition, 
file_extension);
+            let file_path = tmp_dir.path().join(&filename);
+            let mut file = File::create(file_path)?;
+
+            // generate some data
+            for i in 0..=10 {
+                let data = format!("{},{},{}\n", partition, i, i % 2 == 0);
+                file.write_all(data.as_bytes())?;
+            }
+        }
+
+        Ok(schema)
+    }
+
+    #[tokio::test]
+    async fn write_csv_results() -> Result<()> {
+        // create partitioned input file and context
+        let tmp_dir = TempDir::new()?;
+        let mut ctx = ExecutionContext::with_config(
+            ExecutionConfig::new().with_target_partitions(8),
+        );
+
+        let schema = populate_csv_partitions(&tmp_dir, 8, ".csv")?;
+
+        // register csv file with the execution context
+        ctx.register_csv(
+            "test",
+            tmp_dir.path().to_str().unwrap(),
+            CsvReadOptions::new().schema(&schema),
+        )
+        .await?;
+
+        // execute a simple query and write the results to CSV
+        let out_dir = tmp_dir.as_ref().to_str().unwrap().to_string() + "/out";
+        let df = ctx.sql("SELECT c1, c2 FROM test").await?;
+        df.write_csv(&out_dir).await?;
+
+        // create a new context and verify that the results were saved to a 
partitioned csv file
+        let mut ctx = ExecutionContext::new();
+
+        let schema = Arc::new(Schema::new(vec![
+            Field::new("c1", DataType::UInt32, false),
+            Field::new("c2", DataType::UInt64, false),
+        ]));
+
+        // register each partition as well as the top level dir
+        let csv_read_option = CsvReadOptions::new().schema(&schema);
+        ctx.register_csv("part0", &format!("{}/part-0.csv", out_dir), 
csv_read_option)
+            .await?;
+        ctx.register_csv("allparts", &out_dir, csv_read_option)
+            .await?;
+
+        let part0 = ctx.sql("SELECT c1, c2 FROM 
part0").await?.collect().await?;
+        let allparts = ctx
+            .sql("SELECT c1, c2 FROM allparts")
+            .await?
+            .collect()
+            .await?;
+
+        let allparts_count: usize = allparts.iter().map(|batch| 
batch.num_rows()).sum();
+
+        assert_eq!(part0[0].schema(), allparts[0].schema());
+
+        assert_eq!(allparts_count, 80);
+
+        Ok(())
+    }
 }
diff --git a/datafusion/src/physical_plan/file_format/mod.rs 
b/datafusion/src/physical_plan/file_format/mod.rs
index d15c44f..f38cf41 100644
--- a/datafusion/src/physical_plan/file_format/mod.rs
+++ b/datafusion/src/physical_plan/file_format/mod.rs
@@ -32,6 +32,7 @@ use arrow::{
     record_batch::RecordBatch,
 };
 pub use avro::AvroExec;
+pub(crate) use csv::plan_to_csv;
 pub use csv::CsvExec;
 pub use json::NdJsonExec;
 

Reply via email to