This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 7d774481ae Implement Support for Copy To Logical and Physical plans 
(#7283)
7d774481ae is described below

commit 7d774481aedc027b7f68226b2c3a4fc0db959fc2
Author: Devin D'Angelo <[email protected]>
AuthorDate: Wed Aug 16 12:39:30 2023 -0400

    Implement Support for Copy To Logical and Physical plans (#7283)
    
    * rebase
    
    * maybe windows fix
    
    * rebase add explain copy tests
    
    * rebase and fix pipeline
---
 .gitignore                                         |   5 +-
 datafusion/core/src/dataframe.rs                   |  55 ++++++---
 datafusion/core/src/datasource/file_format/csv.rs  |  99 +++++++++++-----
 datafusion/core/src/datasource/file_format/json.rs |  83 +++++++++----
 .../core/src/datasource/file_format/parquet.rs     |  79 +++++++-----
 .../core/src/datasource/file_format/write.rs       |  17 ++-
 datafusion/core/src/datasource/listing/table.rs    |   2 +
 datafusion/core/src/datasource/listing/url.rs      |   1 -
 datafusion/core/src/datasource/memory.rs           |   8 +-
 .../core/src/datasource/physical_plan/csv.rs       |  25 +++-
 .../core/src/datasource/physical_plan/json.rs      |  25 +++-
 .../core/src/datasource/physical_plan/mod.rs       |   4 +
 .../core/src/datasource/physical_plan/parquet.rs   |  27 ++++-
 datafusion/core/src/datasource/provider.rs         |   6 +-
 datafusion/core/src/physical_plan/insert.rs        |  14 +--
 datafusion/core/src/physical_planner.rs            |  73 ++++++++++++
 datafusion/expr/src/logical_plan/builder.rs        |  18 +++
 datafusion/expr/src/logical_plan/dml.rs            |  60 +++++++++-
 datafusion/expr/src/logical_plan/mod.rs            |   2 +-
 datafusion/expr/src/logical_plan/plan.rs           |  27 +++++
 datafusion/expr/src/utils.rs                       |  14 +++
 .../optimizer/src/common_subexpr_eliminate.rs      |   1 +
 datafusion/proto/src/logical_plan/mod.rs           |   3 +
 datafusion/sql/src/statement.rs                    |  78 ++++++++++--
 datafusion/sql/tests/sql_integration.rs            |  24 ++++
 datafusion/sqllogictest/bin/sqllogictests.rs       |  17 +++
 datafusion/sqllogictest/test_files/copy.slt        | 132 +++++++++++++++++++--
 27 files changed, 756 insertions(+), 143 deletions(-)

diff --git a/.gitignore b/.gitignore
index 65d3c0f345..203455e4a7 100644
--- a/.gitignore
+++ b/.gitignore
@@ -103,4 +103,7 @@ datafusion/CHANGELOG.md.bak
 .githubchangeloggenerator.cache*
 
 # Generated tpch data
-datafusion/core/tests/sqllogictests/test_files/tpch/data/*
+datafusion/sqllogictests/test_files/tpch/data/*
+
+# Scratch temp dir for sqllogictests
+datafusion/sqllogictest/test_files/scratch*
diff --git a/datafusion/core/src/dataframe.rs b/datafusion/core/src/dataframe.rs
index 5b1983f567..232bedfe0b 100644
--- a/datafusion/core/src/dataframe.rs
+++ b/datafusion/core/src/dataframe.rs
@@ -25,6 +25,7 @@ use arrow::compute::{cast, concat};
 use arrow::datatypes::{DataType, Field};
 use async_trait::async_trait;
 use datafusion_common::{DataFusionError, SchemaError, UnnestOptions};
+use datafusion_expr::dml::OutputFileFormat;
 use parquet::file::properties::WriterProperties;
 
 use datafusion_common::{Column, DFSchema, ScalarValue};
@@ -37,7 +38,6 @@ use crate::arrow::datatypes::Schema;
 use crate::arrow::datatypes::SchemaRef;
 use crate::arrow::record_batch::RecordBatch;
 use crate::arrow::util::pretty;
-use crate::datasource::physical_plan::{plan_to_csv, plan_to_json, 
plan_to_parquet};
 use crate::datasource::{provider_as_source, MemTable, TableProvider};
 use crate::error::Result;
 use crate::execution::{
@@ -992,28 +992,55 @@ impl DataFrame {
     }
 
     /// Write a `DataFrame` to a CSV file.
-    pub async fn write_csv(self, path: &str) -> Result<()> {
-        let plan = self.session_state.create_physical_plan(&self.plan).await?;
-        let task_ctx = Arc::new(self.task_ctx());
-        plan_to_csv(task_ctx, plan, path).await
+    pub async fn write_csv(
+        self,
+        path: &str,
+    ) -> Result<Vec<RecordBatch>, DataFusionError> {
+        let plan = LogicalPlanBuilder::copy_to(
+            self.plan,
+            path.into(),
+            OutputFileFormat::CSV,
+            true,
+            // TODO implement options
+            vec![],
+        )?
+        .build()?;
+        DataFrame::new(self.session_state, plan).collect().await
     }
 
     /// Write a `DataFrame` to a Parquet file.
     pub async fn write_parquet(
         self,
         path: &str,
-        writer_properties: Option<WriterProperties>,
-    ) -> Result<()> {
-        let plan = self.session_state.create_physical_plan(&self.plan).await?;
-        let task_ctx = Arc::new(self.task_ctx());
-        plan_to_parquet(task_ctx, plan, path, writer_properties).await
+        _writer_properties: Option<WriterProperties>,
+    ) -> Result<Vec<RecordBatch>, DataFusionError> {
+        let plan = LogicalPlanBuilder::copy_to(
+            self.plan,
+            path.into(),
+            OutputFileFormat::PARQUET,
+            true,
+            // TODO implement options
+            vec![],
+        )?
+        .build()?;
+        DataFrame::new(self.session_state, plan).collect().await
     }
 
     /// Executes a query and writes the results to a partitioned JSON file.
-    pub async fn write_json(self, path: impl AsRef<str>) -> Result<()> {
-        let plan = self.session_state.create_physical_plan(&self.plan).await?;
-        let task_ctx = Arc::new(self.task_ctx());
-        plan_to_json(task_ctx, plan, path).await
+    pub async fn write_json(
+        self,
+        path: &str,
+    ) -> Result<Vec<RecordBatch>, DataFusionError> {
+        let plan = LogicalPlanBuilder::copy_to(
+            self.plan,
+            path.into(),
+            OutputFileFormat::JSON,
+            true,
+            // TODO implement options
+            vec![],
+        )?
+        .build()?;
+        DataFrame::new(self.session_state, plan).collect().await
     }
 
     /// Add an additional column to the DataFrame.
diff --git a/datafusion/core/src/datasource/file_format/csv.rs 
b/datafusion/core/src/datasource/file_format/csv.rs
index c3ab50fd43..59c4fedeff 100644
--- a/datafusion/core/src/datasource/file_format/csv.rs
+++ b/datafusion/core/src/datasource/file_format/csv.rs
@@ -47,7 +47,7 @@ use crate::datasource::physical_plan::{
 };
 use crate::error::Result;
 use crate::execution::context::SessionState;
-use crate::physical_plan::insert::{DataSink, InsertExec};
+use crate::physical_plan::insert::{DataSink, FileSinkExec};
 use crate::physical_plan::{DisplayAs, DisplayFormatType, Statistics};
 use crate::physical_plan::{ExecutionPlan, SendableRecordBatchStream};
 use rand::distributions::{Alphanumeric, DistString};
@@ -277,6 +277,7 @@ impl FileFormat for CsvFormat {
                 "Inserting compressed CSV is not implemented yet.".into(),
             ));
         }
+
         let sink_schema = conf.output_schema().clone();
         let sink = Arc::new(CsvSink::new(
             conf,
@@ -285,7 +286,7 @@ impl FileFormat for CsvFormat {
             self.file_compression_type,
         ));
 
-        Ok(Arc::new(InsertExec::new(input, sink, sink_schema)) as _)
+        Ok(Arc::new(FileSinkExec::new(input, sink, sink_schema)) as _)
     }
 }
 
@@ -505,12 +506,14 @@ impl DataSink for CsvSink {
         let object_store = context
             .runtime_env()
             .object_store(&self.config.object_store_url)?;
-
         // Construct serializer and writer for each file group
         let mut serializers: Vec<Box<dyn BatchSerializer>> = vec![];
         let mut writers = vec![];
         match self.config.writer_mode {
             FileWriterMode::Append => {
+                if !self.config.per_thread_output {
+                    return 
Err(DataFusionError::NotImplemented("per_thread_output=false is not implemented 
for CsvSink in Append mode".into()));
+                }
                 for file_group in &self.config.file_groups {
                     // In append mode, consider has_header flag only when file 
is empty (at the start).
                     // For other modes, use has_header flag as is.
@@ -542,38 +545,72 @@ impl DataSink for CsvSink {
             FileWriterMode::PutMultipart => {
                 // Currently assuming only 1 partition path (i.e. not 
hive-style partitioning on a column)
                 let base_path = &self.config.table_paths[0];
-                // Uniquely identify this batch of files with a random string, 
to prevent collisions overwriting files
-                let write_id = Alphanumeric.sample_string(&mut 
rand::thread_rng(), 16);
-                for part_idx in 0..num_partitions {
-                    let header = self.has_header;
-                    let builder = 
WriterBuilder::new().with_delimiter(self.delimiter);
-                    let serializer = CsvSerializer::new()
-                        .with_builder(builder)
-                        .with_header(header);
-                    let file_path = base_path
-                        .prefix()
-                        .child(format!("/{}_{}.csv", write_id, part_idx));
-                    let object_meta = ObjectMeta {
-                        location: file_path,
-                        last_modified: chrono::offset::Utc::now(),
-                        size: 0,
-                        e_tag: None,
-                    };
-                    let writer = create_writer(
-                        self.config.writer_mode,
-                        self.file_compression_type,
-                        object_meta.into(),
-                        object_store.clone(),
-                    )
-                    .await?;
-
-                    serializers.push(Box::new(serializer));
-                    writers.push(writer);
+                match self.config.per_thread_output {
+                    true => {
+                        // Uniquely identify this batch of files with a random 
string, to prevent collisions overwriting files
+                        let write_id =
+                            Alphanumeric.sample_string(&mut 
rand::thread_rng(), 16);
+                        for part_idx in 0..num_partitions {
+                            let header = self.has_header;
+                            let builder =
+                                
WriterBuilder::new().with_delimiter(self.delimiter);
+                            let serializer = CsvSerializer::new()
+                                .with_builder(builder)
+                                .with_header(header);
+                            serializers.push(Box::new(serializer));
+                            let file_path = base_path
+                                .prefix()
+                                .child(format!("{}_{}.csv", write_id, 
part_idx));
+                            let object_meta = ObjectMeta {
+                                location: file_path,
+                                last_modified: chrono::offset::Utc::now(),
+                                size: 0,
+                                e_tag: None,
+                            };
+                            let writer = create_writer(
+                                self.config.writer_mode,
+                                self.file_compression_type,
+                                object_meta.into(),
+                                object_store.clone(),
+                            )
+                            .await?;
+                            writers.push(writer);
+                        }
+                    }
+                    false => {
+                        let header = self.has_header;
+                        let builder = 
WriterBuilder::new().with_delimiter(self.delimiter);
+                        let serializer = CsvSerializer::new()
+                            .with_builder(builder)
+                            .with_header(header);
+                        serializers.push(Box::new(serializer));
+                        let file_path = base_path.prefix();
+                        let object_meta = ObjectMeta {
+                            location: file_path.clone(),
+                            last_modified: chrono::offset::Utc::now(),
+                            size: 0,
+                            e_tag: None,
+                        };
+                        let writer = create_writer(
+                            self.config.writer_mode,
+                            self.file_compression_type,
+                            object_meta.into(),
+                            object_store.clone(),
+                        )
+                        .await?;
+                        writers.push(writer);
+                    }
                 }
             }
         }
 
-        stateless_serialize_and_write_files(data, serializers, writers).await
+        stateless_serialize_and_write_files(
+            data,
+            serializers,
+            writers,
+            self.config.per_thread_output,
+        )
+        .await
     }
 }
 
diff --git a/datafusion/core/src/datasource/file_format/json.rs 
b/datafusion/core/src/datasource/file_format/json.rs
index 8472f4e5c1..6870fc1b41 100644
--- a/datafusion/core/src/datasource/file_format/json.rs
+++ b/datafusion/core/src/datasource/file_format/json.rs
@@ -43,7 +43,7 @@ use object_store::{GetResult, ObjectMeta, ObjectStore};
 
 use crate::datasource::physical_plan::FileGroupDisplay;
 use crate::physical_plan::insert::DataSink;
-use crate::physical_plan::insert::InsertExec;
+use crate::physical_plan::insert::FileSinkExec;
 use crate::physical_plan::SendableRecordBatchStream;
 use crate::physical_plan::{DisplayAs, DisplayFormatType, Statistics};
 
@@ -187,7 +187,7 @@ impl FileFormat for JsonFormat {
         let sink_schema = conf.output_schema().clone();
         let sink = Arc::new(JsonSink::new(conf, self.file_compression_type));
 
-        Ok(Arc::new(InsertExec::new(input, sink, sink_schema)) as _)
+        Ok(Arc::new(FileSinkExec::new(input, sink, sink_schema)) as _)
     }
 }
 
@@ -280,6 +280,9 @@ impl DataSink for JsonSink {
         let mut writers = vec![];
         match self.config.writer_mode {
             FileWriterMode::Append => {
+                if !self.config.per_thread_output {
+                    return 
Err(DataFusionError::NotImplemented("per_thread_output=false is not implemented 
for JsonSink in Append mode".into()));
+                }
                 for file_group in &self.config.file_groups {
                     let serializer = JsonSerializer::new();
                     serializers.push(Box::new(serializer));
@@ -303,33 +306,63 @@ impl DataSink for JsonSink {
             FileWriterMode::PutMultipart => {
                 // Currently assuming only 1 partition path (i.e. not 
hive-style partitioning on a column)
                 let base_path = &self.config.table_paths[0];
-                // Uniquely identify this batch of files with a random string, 
to prevent collisions overwriting files
-                let write_id = Alphanumeric.sample_string(&mut 
rand::thread_rng(), 16);
-                for part_idx in 0..num_partitions {
-                    let serializer = JsonSerializer::new();
-                    serializers.push(Box::new(serializer));
-                    let file_path = base_path
-                        .prefix()
-                        .child(format!("/{}_{}.json", write_id, part_idx));
-                    let object_meta = ObjectMeta {
-                        location: file_path,
-                        last_modified: chrono::offset::Utc::now(),
-                        size: 0,
-                        e_tag: None,
-                    };
-                    let writer = create_writer(
-                        self.config.writer_mode,
-                        self.file_compression_type,
-                        object_meta.into(),
-                        object_store.clone(),
-                    )
-                    .await?;
-                    writers.push(writer);
+                match self.config.per_thread_output {
+                    true => {
+                        // Uniquely identify this batch of files with a random 
string, to prevent collisions overwriting files
+                        let write_id =
+                            Alphanumeric.sample_string(&mut 
rand::thread_rng(), 16);
+                        for part_idx in 0..num_partitions {
+                            let serializer = JsonSerializer::new();
+                            serializers.push(Box::new(serializer));
+                            let file_path = base_path
+                                .prefix()
+                                .child(format!("{}_{}.json", write_id, 
part_idx));
+                            let object_meta = ObjectMeta {
+                                location: file_path,
+                                last_modified: chrono::offset::Utc::now(),
+                                size: 0,
+                                e_tag: None,
+                            };
+                            let writer = create_writer(
+                                self.config.writer_mode,
+                                self.file_compression_type,
+                                object_meta.into(),
+                                object_store.clone(),
+                            )
+                            .await?;
+                            writers.push(writer);
+                        }
+                    }
+                    false => {
+                        let serializer = JsonSerializer::new();
+                        serializers.push(Box::new(serializer));
+                        let file_path = base_path.prefix();
+                        let object_meta = ObjectMeta {
+                            location: file_path.clone(),
+                            last_modified: chrono::offset::Utc::now(),
+                            size: 0,
+                            e_tag: None,
+                        };
+                        let writer = create_writer(
+                            self.config.writer_mode,
+                            self.file_compression_type,
+                            object_meta.into(),
+                            object_store.clone(),
+                        )
+                        .await?;
+                        writers.push(writer);
+                    }
                 }
             }
         }
 
-        stateless_serialize_and_write_files(data, serializers, writers).await
+        stateless_serialize_and_write_files(
+            data,
+            serializers,
+            writers,
+            self.config.per_thread_output,
+        )
+        .await
     }
 }
 
diff --git a/datafusion/core/src/datasource/file_format/parquet.rs 
b/datafusion/core/src/datasource/file_format/parquet.rs
index de3ec3ffb7..6688d3dd37 100644
--- a/datafusion/core/src/datasource/file_format/parquet.rs
+++ b/datafusion/core/src/datasource/file_format/parquet.rs
@@ -53,11 +53,12 @@ use crate::config::ConfigOptions;
 use crate::datasource::physical_plan::{
     FileGroupDisplay, FileMeta, FileSinkConfig, ParquetExec, SchemaAdapter,
 };
+
 use crate::datasource::{create_max_min_accs, get_col_stats};
 use crate::error::Result;
 use crate::execution::context::SessionState;
 use crate::physical_plan::expressions::{MaxAccumulator, MinAccumulator};
-use crate::physical_plan::insert::{DataSink, InsertExec};
+use crate::physical_plan::insert::{DataSink, FileSinkExec};
 use crate::physical_plan::{
     Accumulator, DisplayAs, DisplayFormatType, ExecutionPlan, 
SendableRecordBatchStream,
     Statistics,
@@ -238,7 +239,7 @@ impl FileFormat for ParquetFormat {
         let sink_schema = conf.output_schema().clone();
         let sink = Arc::new(ParquetSink::new(conf));
 
-        Ok(Arc::new(InsertExec::new(input, sink, sink_schema)) as _)
+        Ok(Arc::new(FileSinkExec::new(input, sink, sink_schema)) as _)
     }
 }
 
@@ -604,7 +605,6 @@ impl DisplayAs for ParquetSink {
 }
 
 /// Parses datafusion.execution.parquet.encoding String to a 
parquet::basic::Encoding
-/// TODO use upstream version: <https://github.com/apache/arrow-rs/issues/4693>
 fn parse_encoding_string(str_setting: &str) -> 
Result<parquet::basic::Encoding> {
     let str_setting_lower: &str = &str_setting.to_lowercase();
     match str_setting_lower {
@@ -668,7 +668,6 @@ fn require_level(codec: &str, level: Option<u32>) -> 
Result<u32> {
 }
 
 /// Parses datafusion.execution.parquet.compression String to a 
parquet::basic::Compression
-/// TODO use upstream version: <https://github.com/apache/arrow-rs/issues/4693>
 fn parse_compression_string(str_setting: &str) -> 
Result<parquet::basic::Compression> {
     let str_setting_lower: &str = &str_setting.to_lowercase();
     let (codec, level) = split_compression_string(str_setting_lower)?;
@@ -719,7 +718,6 @@ fn parse_compression_string(str_setting: &str) -> 
Result<parquet::basic::Compres
     }
 }
 
-/// TODO use upstream version: <https://github.com/apache/arrow-rs/issues/4693>
 fn parse_version_string(str_setting: &str) -> Result<WriterVersion> {
     let str_setting_lower: &str = &str_setting.to_lowercase();
     match str_setting_lower {
@@ -732,7 +730,6 @@ fn parse_version_string(str_setting: &str) -> 
Result<WriterVersion> {
     }
 }
 
-/// TODO use upstream version: <https://github.com/apache/arrow-rs/issues/4693>
 fn parse_statistics_string(str_setting: &str) -> Result<EnabledStatistics> {
     let str_setting_lower: &str = &str_setting.to_lowercase();
     match str_setting_lower {
@@ -848,26 +845,48 @@ impl DataSink for ParquetSink {
             FileWriterMode::PutMultipart => {
                 // Currently assuming only 1 partition path (i.e. not 
hive-style partitioning on a column)
                 let base_path = &self.config.table_paths[0];
-                // Uniquely identify this batch of files with a random string, 
to prevent collisions overwriting files
-                let write_id = Alphanumeric.sample_string(&mut 
rand::thread_rng(), 16);
-                for part_idx in 0..num_partitions {
-                    let file_path = base_path
-                        .prefix()
-                        .child(format!("/{}_{}.parquet", write_id, part_idx));
-                    let object_meta = ObjectMeta {
-                        location: file_path,
-                        last_modified: chrono::offset::Utc::now(),
-                        size: 0,
-                        e_tag: None,
-                    };
-                    let writer = self
-                        .create_writer(
-                            object_meta.into(),
-                            object_store.clone(),
-                            parquet_props.clone(),
-                        )
-                        .await?;
-                    writers.push(writer);
+                match self.config.per_thread_output {
+                    true => {
+                        // Uniquely identify this batch of files with a random 
string, to prevent collisions overwriting files
+                        let write_id =
+                            Alphanumeric.sample_string(&mut 
rand::thread_rng(), 16);
+                        for part_idx in 0..num_partitions {
+                            let file_path = base_path
+                                .prefix()
+                                .child(format!("{}_{}.parquet", write_id, 
part_idx));
+                            let object_meta = ObjectMeta {
+                                location: file_path,
+                                last_modified: chrono::offset::Utc::now(),
+                                size: 0,
+                                e_tag: None,
+                            };
+                            let writer = self
+                                .create_writer(
+                                    object_meta.into(),
+                                    object_store.clone(),
+                                    parquet_props.clone(),
+                                )
+                                .await?;
+                            writers.push(writer);
+                        }
+                    }
+                    false => {
+                        let file_path = base_path.prefix();
+                        let object_meta = ObjectMeta {
+                            location: file_path.clone(),
+                            last_modified: chrono::offset::Utc::now(),
+                            size: 0,
+                            e_tag: None,
+                        };
+                        let writer = self
+                            .create_writer(
+                                object_meta.into(),
+                                object_store.clone(),
+                                parquet_props.clone(),
+                            )
+                            .await?;
+                        writers.push(writer);
+                    }
                 }
             }
         }
@@ -875,8 +894,12 @@ impl DataSink for ParquetSink {
         let mut row_count = 0;
         // TODO parallelize serialization accross partitions and batches 
within partitions
         // see: https://github.com/apache/arrow-datafusion/issues/7079
-        for idx in 0..num_partitions {
-            while let Some(batch) = data[idx].next().await.transpose()? {
+        for (part_idx, data_stream) in 
data.iter_mut().enumerate().take(num_partitions) {
+            let idx = match self.config.per_thread_output {
+                true => part_idx,
+                false => 0,
+            };
+            while let Some(batch) = data_stream.next().await.transpose()? {
                 row_count += batch.num_rows();
                 // TODO cleanup all multipart writes when any encounters an 
error
                 writers[idx].write(&batch).await?;
diff --git a/datafusion/core/src/datasource/file_format/write.rs 
b/datafusion/core/src/datasource/file_format/write.rs
index c256c9689a..3c005894f6 100644
--- a/datafusion/core/src/datasource/file_format/write.rs
+++ b/datafusion/core/src/datasource/file_format/write.rs
@@ -328,16 +328,29 @@ pub(crate) async fn stateless_serialize_and_write_files(
     mut data: Vec<SendableRecordBatchStream>,
     mut serializers: Vec<Box<dyn BatchSerializer>>,
     mut writers: Vec<AbortableWrite<Box<dyn AsyncWrite + Send + Unpin>>>,
+    per_thread_output: bool,
 ) -> Result<u64> {
+    if !per_thread_output && (serializers.len() != 1 || writers.len() != 1) {
+        return Err(DataFusionError::Internal(
+            "per_thread_output is false, but got more than 1 writer!".into(),
+        ));
+    }
     let num_partitions = data.len();
+    if per_thread_output && (num_partitions != writers.len()) {
+        return Err(DataFusionError::Internal("per_thread_output is true, but 
did not get 1 writer for each output partition!".into()));
+    }
     let mut row_count = 0;
     // Map errors to DatafusionError.
     let err_converter =
         |_| DataFusionError::Internal("Unexpected FileSink Error".to_string());
     // TODO parallelize serialization accross partitions and batches within 
partitions
     // see: https://github.com/apache/arrow-datafusion/issues/7079
-    for idx in 0..num_partitions {
-        while let Some(maybe_batch) = data[idx].next().await {
+    for (part_idx, data_stream) in 
data.iter_mut().enumerate().take(num_partitions) {
+        let idx = match per_thread_output {
+            true => part_idx,
+            false => 0,
+        };
+        while let Some(maybe_batch) = data_stream.next().await {
             // Write data to files in a round robin fashion:
             let serializer = &mut serializers[idx];
             let batch = check_for_errors(maybe_batch, &mut writers).await?;
diff --git a/datafusion/core/src/datasource/listing/table.rs 
b/datafusion/core/src/datasource/listing/table.rs
index d4e2c4aafe..5cc31c8397 100644
--- a/datafusion/core/src/datasource/listing/table.rs
+++ b/datafusion/core/src/datasource/listing/table.rs
@@ -883,6 +883,8 @@ impl TableProvider for ListingTable {
             output_schema: self.schema(),
             table_partition_cols: self.options.table_partition_cols.clone(),
             writer_mode,
+            // TODO: when listing table is known to be backed by a single 
file, this should be false
+            per_thread_output: true,
             overwrite,
         };
 
diff --git a/datafusion/core/src/datasource/listing/url.rs 
b/datafusion/core/src/datasource/listing/url.rs
index a4940f57f8..0c017a517c 100644
--- a/datafusion/core/src/datasource/listing/url.rs
+++ b/datafusion/core/src/datasource/listing/url.rs
@@ -107,7 +107,6 @@ impl ListingTableUrl {
         .map_err(|_| DataFusionError::Internal(format!("Can not open path: 
{s}")))?;
         // TODO: Currently we do not have an IO-related error variant that 
accepts ()
         //       or a string. Once we have such a variant, change the error 
type above.
-
         Ok(Self::new(url, glob))
     }
 
diff --git a/datafusion/core/src/datasource/memory.rs 
b/datafusion/core/src/datasource/memory.rs
index 0441ac7058..54ebfd5cab 100644
--- a/datafusion/core/src/datasource/memory.rs
+++ b/datafusion/core/src/datasource/memory.rs
@@ -35,7 +35,7 @@ use crate::datasource::{TableProvider, TableType};
 use crate::error::Result;
 use crate::execution::context::SessionState;
 use crate::logical_expr::Expr;
-use crate::physical_plan::insert::{DataSink, InsertExec};
+use crate::physical_plan::insert::{DataSink, FileSinkExec};
 use crate::physical_plan::memory::MemoryExec;
 use crate::physical_plan::{common, SendableRecordBatchStream};
 use crate::physical_plan::{repartition::RepartitionExec, Partitioning};
@@ -219,7 +219,11 @@ impl TableProvider for MemTable {
             ));
         }
         let sink = Arc::new(MemSink::new(self.batches.clone()));
-        Ok(Arc::new(InsertExec::new(input, sink, self.schema.clone())))
+        Ok(Arc::new(FileSinkExec::new(
+            input,
+            sink,
+            self.schema.clone(),
+        )))
     }
 }
 
diff --git a/datafusion/core/src/datasource/physical_plan/csv.rs 
b/datafusion/core/src/datasource/physical_plan/csv.rs
index 6bf5e36340..e3ac811736 100644
--- a/datafusion/core/src/datasource/physical_plan/csv.rs
+++ b/datafusion/core/src/datasource/physical_plan/csv.rs
@@ -660,7 +660,7 @@ mod tests {
     use futures::StreamExt;
     use object_store::local::LocalFileSystem;
     use rstest::*;
-    use std::fs::File;
+    use std::fs::{self, File};
     use std::io::Write;
     use tempfile::TempDir;
     use url::Url;
@@ -1191,11 +1191,32 @@ mod tests {
             Field::new("c2", DataType::UInt64, false),
         ]));
 
+        // get name of first part
+        let paths = fs::read_dir(&out_dir).unwrap();
+        let mut part_0_name: String = "".to_owned();
+        for path in paths {
+            let path = path.unwrap();
+            let name = path
+                .path()
+                .file_name()
+                .expect("Should be a file name")
+                .to_str()
+                .expect("Should be a str")
+                .to_owned();
+            if name.ends_with("_0.csv") {
+                part_0_name = name;
+                break;
+            }
+        }
+
+        if part_0_name.is_empty() {
+            panic!("Did not find part_0 in csv output files!")
+        }
         // register each partition as well as the top level dir
         let csv_read_option = CsvReadOptions::new().schema(&schema);
         ctx.register_csv(
             "part0",
-            &format!("{out_dir}/part-0.csv"),
+            &format!("{out_dir}/{part_0_name}"),
             csv_read_option.clone(),
         )
         .await?;
diff --git a/datafusion/core/src/datasource/physical_plan/json.rs 
b/datafusion/core/src/datasource/physical_plan/json.rs
index b8ad2aa0a6..62fcc320eb 100644
--- a/datafusion/core/src/datasource/physical_plan/json.rs
+++ b/datafusion/core/src/datasource/physical_plan/json.rs
@@ -331,6 +331,7 @@ mod tests {
     use crate::test::partitioned_file_groups;
     use datafusion_common::cast::{as_int32_array, as_int64_array, 
as_string_array};
     use rstest::*;
+    use std::fs;
     use std::path::Path;
     use tempfile::TempDir;
     use url::Url;
@@ -699,11 +700,33 @@ mod tests {
         // create a new context and verify that the results were saved to a 
partitioned csv file
         let ctx = SessionContext::new();
 
+        // get name of first part
+        let paths = fs::read_dir(&out_dir).unwrap();
+        let mut part_0_name: String = "".to_owned();
+        for path in paths {
+            let name = path
+                .unwrap()
+                .path()
+                .file_name()
+                .expect("Should be a file name")
+                .to_str()
+                .expect("Should be a str")
+                .to_owned();
+            if name.ends_with("_0.json") {
+                part_0_name = name;
+                break;
+            }
+        }
+
+        if part_0_name.is_empty() {
+            panic!("Did not find part_0 in json output files!")
+        }
+
         // register each partition as well as the top level dir
         let json_read_option = NdJsonReadOptions::default();
         ctx.register_json(
             "part0",
-            &format!("{out_dir}/part-0.json"),
+            &format!("{out_dir}/{part_0_name}"),
             json_read_option.clone(),
         )
         .await?;
diff --git a/datafusion/core/src/datasource/physical_plan/mod.rs 
b/datafusion/core/src/datasource/physical_plan/mod.rs
index b0914b0816..06c16ad751 100644
--- a/datafusion/core/src/datasource/physical_plan/mod.rs
+++ b/datafusion/core/src/datasource/physical_plan/mod.rs
@@ -332,6 +332,10 @@ pub struct FileSinkConfig {
     pub table_partition_cols: Vec<(String, DataType)>,
     /// A writer mode that determines how data is written to the file
     pub writer_mode: FileWriterMode,
+    /// If false, it is assumed there is a single table_path which is a file 
to which all data should be written
+    /// regardless of input partitioning. Otherwise, each table path is 
assumed to be a directory
+    /// to which each output partition is written to its own output file.
+    pub per_thread_output: bool,
     /// Controls whether existing data should be overwritten by this sink
     pub overwrite: bool,
 }
diff --git a/datafusion/core/src/datasource/physical_plan/parquet.rs 
b/datafusion/core/src/datasource/physical_plan/parquet.rs
index 24243ec749..3ef1d13c26 100644
--- a/datafusion/core/src/datasource/physical_plan/parquet.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet.rs
@@ -768,7 +768,7 @@ mod tests {
     use object_store::local::LocalFileSystem;
     use object_store::path::Path;
     use object_store::ObjectMeta;
-    use std::fs::File;
+    use std::fs::{self, File};
     use std::io::Write;
     use tempfile::TempDir;
     use url::Url;
@@ -1956,31 +1956,46 @@ mod tests {
         df.write_parquet(out_dir_url, None).await?;
         // write_parquet(&mut ctx, "SELECT c1, c2 FROM test", &out_dir, 
None).await?;
 
-        // create a new context and verify that the results were saved to a 
partitioned csv file
+        // create a new context and verify that the results were saved to a 
partitioned parquet file
         let ctx = SessionContext::new();
 
+        // get write_id
+        let mut paths = fs::read_dir(&out_dir).unwrap();
+        let path = paths.next();
+        let name = path
+            .unwrap()?
+            .path()
+            .file_name()
+            .expect("Should be a file name")
+            .to_str()
+            .expect("Should be a str")
+            .to_owned();
+        println!("{name}");
+        let (parsed_id, _) = name.split_once('_').expect("File should contain 
_ !");
+        let write_id = parsed_id.to_owned();
+
         // register each partition as well as the top level dir
         ctx.register_parquet(
             "part0",
-            &format!("{out_dir}/part-0.parquet"),
+            &format!("{out_dir}/{write_id}_0.parquet"),
             ParquetReadOptions::default(),
         )
         .await?;
         ctx.register_parquet(
             "part1",
-            &format!("{out_dir}/part-1.parquet"),
+            &format!("{out_dir}/{write_id}_1.parquet"),
             ParquetReadOptions::default(),
         )
         .await?;
         ctx.register_parquet(
             "part2",
-            &format!("{out_dir}/part-2.parquet"),
+            &format!("{out_dir}/{write_id}_2.parquet"),
             ParquetReadOptions::default(),
         )
         .await?;
         ctx.register_parquet(
             "part3",
-            &format!("{out_dir}/part-3.parquet"),
+            &format!("{out_dir}/{write_id}_3.parquet"),
             ParquetReadOptions::default(),
         )
         .await?;
diff --git a/datafusion/core/src/datasource/provider.rs 
b/datafusion/core/src/datasource/provider.rs
index 6a81f89696..48965addc7 100644
--- a/datafusion/core/src/datasource/provider.rs
+++ b/datafusion/core/src/datasource/provider.rs
@@ -119,10 +119,10 @@ pub trait TableProvider: Sync + Send {
     ///
     /// # See Also
     ///
-    /// See [`InsertExec`] for the common pattern of inserting a
-    /// single stream of `RecordBatch`es.
+    /// See [`FileSinkExec`] for the common pattern of inserting a
+    /// streams of `RecordBatch`es as files to an ObjectStore.
     ///
-    /// [`InsertExec`]: crate::physical_plan::insert::InsertExec
+    /// [`FileSinkExec`]: crate::physical_plan::insert::FileSinkExec
     async fn insert_into(
         &self,
         _state: &SessionState,
diff --git a/datafusion/core/src/physical_plan/insert.rs 
b/datafusion/core/src/physical_plan/insert.rs
index a05cb5fb15..acbca834f2 100644
--- a/datafusion/core/src/physical_plan/insert.rs
+++ b/datafusion/core/src/physical_plan/insert.rs
@@ -64,7 +64,7 @@ pub trait DataSink: DisplayAs + Debug + Send + Sync {
 /// Execution plan for writing record batches to a [`DataSink`]
 ///
 /// Returns a single row with the number of values written
-pub struct InsertExec {
+pub struct FileSinkExec {
     /// Input plan that produces the record batches to be written.
     input: Arc<dyn ExecutionPlan>,
     /// Sink to which to write
@@ -75,13 +75,13 @@ pub struct InsertExec {
     count_schema: SchemaRef,
 }
 
-impl fmt::Debug for InsertExec {
+impl fmt::Debug for FileSinkExec {
     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
-        write!(f, "InsertExec schema: {:?}", self.count_schema)
+        write!(f, "FileSinkExec schema: {:?}", self.count_schema)
     }
 }
 
-impl InsertExec {
+impl FileSinkExec {
     /// Create a plan to write to `sink`
     pub fn new(
         input: Arc<dyn ExecutionPlan>,
@@ -149,7 +149,7 @@ impl InsertExec {
     }
 }
 
-impl DisplayAs for InsertExec {
+impl DisplayAs for FileSinkExec {
     fn fmt_as(
         &self,
         t: DisplayFormatType,
@@ -164,7 +164,7 @@ impl DisplayAs for InsertExec {
     }
 }
 
-impl ExecutionPlan for InsertExec {
+impl ExecutionPlan for FileSinkExec {
     /// Return a reference to Any that can be used for downcasting
     fn as_any(&self) -> &dyn Any {
         self
@@ -233,7 +233,7 @@ impl ExecutionPlan for InsertExec {
     ) -> Result<SendableRecordBatchStream> {
         if partition != 0 {
             return Err(DataFusionError::Internal(
-                "InsertExec can only be called on partition 0!".into(),
+                "FileSinkExec can only be called on partition 0!".into(),
             ));
         }
         let data = self.execute_all_input_streams(context.clone())?;
diff --git a/datafusion/core/src/physical_planner.rs 
b/datafusion/core/src/physical_planner.rs
index 6b868b9b24..f154c2a173 100644
--- a/datafusion/core/src/physical_planner.rs
+++ b/datafusion/core/src/physical_planner.rs
@@ -17,6 +17,15 @@
 
 //! Planner for [`LogicalPlan`] to [`ExecutionPlan`]
 
+use crate::datasource::file_format::arrow::ArrowFormat;
+use crate::datasource::file_format::avro::AvroFormat;
+use crate::datasource::file_format::csv::CsvFormat;
+use crate::datasource::file_format::json::JsonFormat;
+use crate::datasource::file_format::parquet::ParquetFormat;
+use crate::datasource::file_format::write::FileWriterMode;
+use crate::datasource::file_format::FileFormat;
+use crate::datasource::listing::ListingTableUrl;
+use crate::datasource::physical_plan::FileSinkConfig;
 use crate::datasource::source_as_provider;
 use crate::execution::context::{ExecutionProps, SessionState};
 use crate::logical_expr::utils::generate_sort_key;
@@ -29,6 +38,8 @@ use crate::logical_expr::{
     Repartition, Union, UserDefinedLogicalNode,
 };
 use datafusion_common::display::ToStringifiedPlan;
+use datafusion_expr::dml::{CopyTo, OutputFileFormat};
+use url::Url;
 
 use crate::logical_expr::{Limit, Values};
 use crate::physical_expr::create_physical_expr;
@@ -80,6 +91,7 @@ use itertools::{multiunzip, Itertools};
 use log::{debug, trace};
 use std::collections::HashMap;
 use std::fmt::Write;
+use std::fs;
 use std::sync::Arc;
 
 fn create_function_physical_name(
@@ -544,6 +556,67 @@ impl DefaultPhysicalPlanner {
                     let unaliased: Vec<Expr> = 
filters.into_iter().map(unalias).collect();
                     source.scan(session_state, projection.as_ref(), 
&unaliased, *fetch).await
                 }
+                LogicalPlan::Copy(CopyTo{
+                    input,
+                    output_url,
+                    file_format,
+                    per_thread_output,
+                    options: _,
+                }) => {
+                    let input_exec = self.create_initial_plan(input, 
session_state).await?;
+
+                    // Get object store for specified output_url
+                    // if user did not pass in a url, we assume it is a local 
file path
+                    // this requires some special handling as copy can create 
non
+                    // existing file paths
+                    let is_valid_url = Url::parse(output_url).is_ok();
+
+                    // TODO: make this behavior configurable via options 
(should copy to create path/file as needed?)
+                    // TODO: add additional configurable options for if 
existing files should be overwritten or
+                    // appended to
+                    let parsed_url = match is_valid_url {
+                        true => ListingTableUrl::parse(output_url),
+                        false => {
+                            let path = std::path::PathBuf::from(output_url);
+                            if !path.exists(){
+                                if *per_thread_output{
+                                    fs::create_dir_all(path)?;
+                                } else{
+                                    fs::File::create(path)?;
+                                }
+                            }
+                            ListingTableUrl::parse(output_url)
+                        }
+                    }?;
+
+                    let object_store_url = parsed_url.object_store();
+
+                    let schema: Schema = (**input.schema()).clone().into();
+
+                    // Set file sink related options
+                    let config = FileSinkConfig {
+                        object_store_url,
+                        table_paths: vec![parsed_url],
+                        file_groups: vec![],
+                        output_schema: Arc::new(schema),
+                        table_partition_cols: vec![],
+                        writer_mode: FileWriterMode::PutMultipart,
+                        per_thread_output: *per_thread_output,
+                        overwrite: false,
+                    };
+
+                    // TODO: implement statement level overrides for each file 
type
+                    // E.g. CsvFormat::from_options(options)
+                    let sink_format: Arc<dyn FileFormat> = match file_format {
+                        OutputFileFormat::CSV => 
Arc::new(CsvFormat::default()),
+                        OutputFileFormat::PARQUET => 
Arc::new(ParquetFormat::default()),
+                        OutputFileFormat::JSON => 
Arc::new(JsonFormat::default()),
+                        OutputFileFormat::AVRO => Arc::new(AvroFormat {} ),
+                        OutputFileFormat::ARROW => Arc::new(ArrowFormat {}),
+                    };
+
+                    sink_format.create_writer_physical_plan(input_exec, 
session_state, config).await
+                }
                 LogicalPlan::Dml(DmlStatement {
                     table_name,
                     op: WriteOp::InsertInto,
diff --git a/datafusion/expr/src/logical_plan/builder.rs 
b/datafusion/expr/src/logical_plan/builder.rs
index f89be03f79..29d7571d36 100644
--- a/datafusion/expr/src/logical_plan/builder.rs
+++ b/datafusion/expr/src/logical_plan/builder.rs
@@ -17,6 +17,7 @@
 
 //! This module provides a builder for creating LogicalPlans
 
+use crate::dml::{CopyTo, OutputFileFormat};
 use crate::expr::Alias;
 use crate::expr_rewriter::{
     coerce_plan_expr_for_schema, normalize_col,
@@ -232,6 +233,23 @@ impl LogicalPlanBuilder {
         Self::scan_with_filters(table_name, table_source, projection, vec![])
     }
 
+    /// Create a [CopyTo] for copying the contents of this builder to the 
specified file(s)
+    pub fn copy_to(
+        input: LogicalPlan,
+        output_url: String,
+        file_format: OutputFileFormat,
+        per_thread_output: bool,
+        options: Vec<(String, String)>,
+    ) -> Result<Self> {
+        Ok(Self::from(LogicalPlan::Copy(CopyTo {
+            input: Arc::new(input),
+            output_url,
+            file_format,
+            per_thread_output,
+            options,
+        })))
+    }
+
     /// Create a [DmlStatement] for inserting the contents of this builder 
into the named table
     pub fn insert_into(
         input: LogicalPlan,
diff --git a/datafusion/expr/src/logical_plan/dml.rs 
b/datafusion/expr/src/logical_plan/dml.rs
index 07f34101eb..ecdea7dcc6 100644
--- a/datafusion/expr/src/logical_plan/dml.rs
+++ b/datafusion/expr/src/logical_plan/dml.rs
@@ -17,13 +17,71 @@
 
 use std::{
     fmt::{self, Display},
+    str::FromStr,
     sync::Arc,
 };
 
-use datafusion_common::{DFSchemaRef, OwnedTableReference};
+use datafusion_common::{DFSchemaRef, DataFusionError, OwnedTableReference};
 
 use crate::LogicalPlan;
 
+/// Operator that copies the contents of a database to file(s)
+#[derive(Clone, PartialEq, Eq, Hash)]
+pub struct CopyTo {
+    /// The relation that determines the tuples to write to the output file(s)
+    pub input: Arc<LogicalPlan>,
+    /// The location to write the file(s)
+    pub output_url: String,
+    /// The file format to output (explicitly defined or inferred from file 
extension)
+    pub file_format: OutputFileFormat,
+    /// If false, it is assumed output_url is a file to which all data should 
be written
+    /// regardless of input partitioning. Otherwise, output_url is assumed to 
be a directory
+    /// to which each output partition is written to its own output file
+    pub per_thread_output: bool,
+    /// Arbitrary options as tuples
+    pub options: Vec<(String, String)>,
+}
+
+/// The file formats that CopyTo can output
+#[derive(Clone, PartialEq, Eq, Hash)]
+pub enum OutputFileFormat {
+    CSV,
+    JSON,
+    PARQUET,
+    AVRO,
+    ARROW,
+}
+
+impl FromStr for OutputFileFormat {
+    type Err = DataFusionError;
+
+    fn from_str(s: &str) -> Result<Self, DataFusionError> {
+        match s {
+            "csv" => Ok(OutputFileFormat::CSV),
+            "json" => Ok(OutputFileFormat::JSON),
+            "parquet" => Ok(OutputFileFormat::PARQUET),
+            "avro" => Ok(OutputFileFormat::AVRO),
+            "arrow" => Ok(OutputFileFormat::ARROW),
+            _ => Err(DataFusionError::NotImplemented(format!(
+                "Unknown or not supported file format {s}!"
+            ))),
+        }
+    }
+}
+
+impl Display for OutputFileFormat {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        let out = match self {
+            OutputFileFormat::CSV => "csv",
+            OutputFileFormat::JSON => "json",
+            OutputFileFormat::PARQUET => "parquet",
+            OutputFileFormat::AVRO => "avro",
+            OutputFileFormat::ARROW => "arrow",
+        };
+        write!(f, "{}", out)
+    }
+}
+
 /// The operator that modifies the content of a database (adapted from
 /// substrait WriteRel)
 #[derive(Clone, PartialEq, Eq, Hash)]
diff --git a/datafusion/expr/src/logical_plan/mod.rs 
b/datafusion/expr/src/logical_plan/mod.rs
index 01862c3d54..8316417138 100644
--- a/datafusion/expr/src/logical_plan/mod.rs
+++ b/datafusion/expr/src/logical_plan/mod.rs
@@ -18,7 +18,7 @@
 pub mod builder;
 mod ddl;
 pub mod display;
-mod dml;
+pub mod dml;
 mod extension;
 mod plan;
 mod statement;
diff --git a/datafusion/expr/src/logical_plan/plan.rs 
b/datafusion/expr/src/logical_plan/plan.rs
index 3557745ed3..1ee4fb810d 100644
--- a/datafusion/expr/src/logical_plan/plan.rs
+++ b/datafusion/expr/src/logical_plan/plan.rs
@@ -31,6 +31,7 @@ use crate::{
     build_join_schema, Expr, ExprSchemable, TableProviderFilterPushDown, 
TableSource,
 };
 
+use super::dml::CopyTo;
 use super::DdlStatement;
 
 use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
@@ -120,6 +121,8 @@ pub enum LogicalPlan {
     Dml(DmlStatement),
     /// CREATE / DROP TABLES / VIEWS / SCHEMAs
     Ddl(DdlStatement),
+    /// COPY TO
+    Copy(CopyTo),
     /// Describe the schema of table
     DescribeTable(DescribeTable),
     /// Unnest a column that contains a nested list type.
@@ -157,6 +160,7 @@ impl LogicalPlan {
                 dummy_schema
             }
             LogicalPlan::Dml(DmlStatement { table_schema, .. }) => 
table_schema,
+            LogicalPlan::Copy(CopyTo { input, .. }) => input.schema(),
             LogicalPlan::Ddl(ddl) => ddl.schema(),
             LogicalPlan::Unnest(Unnest { schema, .. }) => schema,
         }
@@ -203,6 +207,7 @@ impl LogicalPlan {
             | LogicalPlan::EmptyRelation(_)
             | LogicalPlan::Ddl(_)
             | LogicalPlan::Dml(_)
+            | LogicalPlan::Copy(_)
             | LogicalPlan::Values(_)
             | LogicalPlan::SubqueryAlias(_)
             | LogicalPlan::Union(_)
@@ -343,6 +348,7 @@ impl LogicalPlan {
             | LogicalPlan::Distinct(_)
             | LogicalPlan::Dml(_)
             | LogicalPlan::Ddl(_)
+            | LogicalPlan::Copy(_)
             | LogicalPlan::DescribeTable(_)
             | LogicalPlan::Prepare(_) => Ok(()),
         }
@@ -371,6 +377,7 @@ impl LogicalPlan {
             LogicalPlan::Explain(explain) => vec![&explain.plan],
             LogicalPlan::Analyze(analyze) => vec![&analyze.input],
             LogicalPlan::Dml(write) => vec![&write.input],
+            LogicalPlan::Copy(copy) => vec![&copy.input],
             LogicalPlan::Ddl(ddl) => ddl.inputs(),
             LogicalPlan::Unnest(Unnest { input, .. }) => vec![input],
             LogicalPlan::Prepare(Prepare { input, .. }) => vec![input],
@@ -477,6 +484,7 @@ impl LogicalPlan {
             | LogicalPlan::Analyze(_)
             | LogicalPlan::Extension(_)
             | LogicalPlan::Dml(_)
+            | LogicalPlan::Copy(_)
             | LogicalPlan::Ddl(_)
             | LogicalPlan::DescribeTable(_)
             | LogicalPlan::Unnest(_) => Ok(None),
@@ -640,6 +648,7 @@ impl LogicalPlan {
             | LogicalPlan::Explain(_)
             | LogicalPlan::Analyze(_)
             | LogicalPlan::Dml(_)
+            | LogicalPlan::Copy(_)
             | LogicalPlan::DescribeTable(_)
             | LogicalPlan::Prepare(_)
             | LogicalPlan::Statement(_)
@@ -1083,6 +1092,24 @@ impl LogicalPlan {
                     LogicalPlan::Dml(DmlStatement { table_name, op, .. }) => {
                         write!(f, "Dml: op=[{op}] table=[{table_name}]")
                     }
+                    LogicalPlan::Copy(CopyTo {
+                        input: _,
+                        output_url,
+                        file_format,
+                        per_thread_output,
+                        options,
+                    }) => {
+                        let mut op_str = String::new();
+                        op_str.push('(');
+                        for (key, val) in options {
+                            if !op_str.is_empty() {
+                                op_str.push(',');
+                            }
+                            op_str.push_str(&format!("{key} {val}"));
+                        }
+                        op_str.push(')');
+                        write!(f, "CopyTo: format={file_format} 
output_url={output_url} per_thread_output={per_thread_output} options: 
{op_str}")
+                    }
                     LogicalPlan::Ddl(ddl) => {
                         write!(f, "{}", ddl.display())
                     }
diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs
index bffcd0669c..90307f1491 100644
--- a/datafusion/expr/src/utils.rs
+++ b/datafusion/expr/src/utils.rs
@@ -17,6 +17,7 @@
 
 //! Expression utilities
 
+use crate::dml::CopyTo;
 use crate::expr::{Alias, Sort, WindowFunction};
 use crate::logical_plan::builder::build_join_schema;
 use crate::logical_plan::{
@@ -745,6 +746,19 @@ pub fn from_plan(
             op: op.clone(),
             input: Arc::new(inputs[0].clone()),
         })),
+        LogicalPlan::Copy(CopyTo {
+            input: _,
+            output_url,
+            file_format,
+            per_thread_output,
+            options,
+        }) => Ok(LogicalPlan::Copy(CopyTo {
+            input: Arc::new(inputs[0].clone()),
+            output_url: output_url.clone(),
+            file_format: file_format.clone(),
+            per_thread_output: *per_thread_output,
+            options: options.clone(),
+        })),
         LogicalPlan::Values(Values { schema, .. }) => 
Ok(LogicalPlan::Values(Values {
             schema: schema.clone(),
             values: expr
diff --git a/datafusion/optimizer/src/common_subexpr_eliminate.rs 
b/datafusion/optimizer/src/common_subexpr_eliminate.rs
index 74c4b1d36f..08b28567fb 100644
--- a/datafusion/optimizer/src/common_subexpr_eliminate.rs
+++ b/datafusion/optimizer/src/common_subexpr_eliminate.rs
@@ -368,6 +368,7 @@ impl OptimizerRule for CommonSubexprEliminate {
             | LogicalPlan::Distinct(_)
             | LogicalPlan::Extension(_)
             | LogicalPlan::Dml(_)
+            | LogicalPlan::Copy(_)
             | LogicalPlan::Unnest(_)
             | LogicalPlan::Prepare(_) => {
                 // apply the optimization to all inputs of the plan
diff --git a/datafusion/proto/src/logical_plan/mod.rs 
b/datafusion/proto/src/logical_plan/mod.rs
index d00e5e2f59..511b288442 100644
--- a/datafusion/proto/src/logical_plan/mod.rs
+++ b/datafusion/proto/src/logical_plan/mod.rs
@@ -1429,6 +1429,9 @@ impl AsLogicalPlan for LogicalPlanNode {
             LogicalPlan::Dml(_) => Err(proto_error(
                 "LogicalPlan serde is not yet implemented for Dml",
             )),
+            LogicalPlan::Copy(_) => Err(proto_error(
+                "LogicalPlan serde is not yet implemented for Copy",
+            )),
             LogicalPlan::DescribeTable(_) => Err(proto_error(
                 "LogicalPlan serde is not yet implemented for DescribeTable",
             )),
diff --git a/datafusion/sql/src/statement.rs b/datafusion/sql/src/statement.rs
index 16036defda..0f5dbb9ec0 100644
--- a/datafusion/sql/src/statement.rs
+++ b/datafusion/sql/src/statement.rs
@@ -16,8 +16,8 @@
 // under the License.
 
 use crate::parser::{
-    CopyToStatement, CreateExternalTable, DFParser, DescribeTableStmt, 
LexOrdering,
-    Statement as DFStatement,
+    CopyToSource, CopyToStatement, CreateExternalTable, DFParser, 
DescribeTableStmt,
+    LexOrdering, Statement as DFStatement,
 };
 use crate::planner::{
     object_name_to_qualifier, ContextProvider, PlannerContext, SqlToRel,
@@ -31,6 +31,7 @@ use datafusion_common::{
     DataFusionError, ExprSchema, OwnedTableReference, Result, SchemaReference,
     TableReference, ToDFSchema,
 };
+use datafusion_expr::dml::{CopyTo, OutputFileFormat};
 use datafusion_expr::expr::Placeholder;
 use 
datafusion_expr::expr_rewriter::normalize_col_with_schemas_and_ambiguity_check;
 use datafusion_expr::logical_plan::builder::project;
@@ -55,6 +56,8 @@ use sqlparser::parser::ParserError::ParserError;
 
 use datafusion_common::plan_err;
 use std::collections::{BTreeMap, HashMap, HashSet};
+use std::path::Path;
+use std::str::FromStr;
 use std::sync::Arc;
 
 fn ident_to_string(ident: &Ident) -> String {
@@ -377,7 +380,6 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
                 let _ = into; // optional keyword doesn't change behavior
                 self.insert_to_plan(table_name, columns, source, overwrite)
             }
-
             Statement::Update {
                 table,
                 assignments,
@@ -547,11 +549,71 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
         }))
     }
 
-    fn copy_to_plan(&self, _statement: CopyToStatement) -> Result<LogicalPlan> 
{
-        // TODO: implement as part of 
https://github.com/apache/arrow-datafusion/issues/5654
-        Err(DataFusionError::NotImplemented(
-            "`COPY .. TO ..` statement is not yet supported".to_string(),
-        ))
+    fn copy_to_plan(&self, statement: CopyToStatement) -> Result<LogicalPlan> {
+        // determine if source is table or query and handle accordingly
+        let copy_source = statement.source;
+        let input = match copy_source {
+            CopyToSource::Relation(object_name) => {
+                let table_ref =
+                    self.object_name_to_table_reference(object_name.clone())?;
+                let table_source = 
self.schema_provider.get_table_provider(table_ref)?;
+                LogicalPlanBuilder::scan(
+                    object_name_to_string(&object_name),
+                    table_source,
+                    None,
+                )?
+                .build()?
+            }
+            CopyToSource::Query(query) => {
+                self.query_to_plan(query, &mut PlannerContext::new())?
+            }
+        };
+
+        // convert options to lowercase strings, check for explicitly set 
"format" option
+        let mut options = vec![];
+        let mut explicit_format = None;
+        // default behavior is to assume the user is specifying a single file 
to which
+        // we should output all data regardless of input partitioning.
+        let mut per_thread_output: bool = false;
+        for (key, value) in statement.options {
+            let (k, v) = (key.to_lowercase(), 
value.to_string().to_lowercase());
+            // check for options important to planning
+            if k == "format" {
+                explicit_format = Some(OutputFileFormat::from_str(&v)?);
+            }
+            if k == "per_thread_output" {
+                per_thread_output = match v.as_str(){
+                    "true" => true,
+                    "false" => false,
+                    _ => return Err(DataFusionError::Plan(
+                        format!("Copy to option 'per_thread_output' must be 
true or false, got {value}")
+                    ))
+                }
+            }
+            options.push((k, v));
+        }
+        let format = match explicit_format {
+            Some(file_format) => file_format,
+            None => {
+                // try to infer file format from file extension
+                let extension: &str = &Path::new(&statement.target)
+                    .extension()
+                    .ok_or(
+                        DataFusionError::Plan("Copy To format not explicitly 
set and unable to get file extension!".to_string()))?
+                    .to_str()
+                    .ok_or(DataFusionError::Plan("Copy to format not 
explicitly set and failed to parse file extension!".to_string()))?
+                    .to_lowercase();
+
+                OutputFileFormat::from_str(extension)?
+            }
+        };
+        Ok(LogicalPlan::Copy(CopyTo {
+            input: Arc::new(input),
+            output_url: statement.target,
+            file_format: format,
+            per_thread_output,
+            options,
+        }))
     }
 
     fn build_order_by(
diff --git a/datafusion/sql/tests/sql_integration.rs 
b/datafusion/sql/tests/sql_integration.rs
index accb6ec9ce..53ffccbdb4 100644
--- a/datafusion/sql/tests/sql_integration.rs
+++ b/datafusion/sql/tests/sql_integration.rs
@@ -325,6 +325,30 @@ fn plan_rollback_transaction_chained() {
     quick_test(sql, plan);
 }
 
+#[test]
+fn plan_copy_to() {
+    let sql = "COPY test_decimal to 'output.csv'";
+    let plan = r#"
+CopyTo: format=csv output_url=output.csv per_thread_output=false options: ()
+  TableScan: test_decimal
+    "#
+    .trim();
+    quick_test(sql, plan);
+}
+
+#[test]
+fn plan_copy_to_query() {
+    let sql = "COPY (select * from test_decimal limit 10) to 'output.csv'";
+    let plan = r#"
+CopyTo: format=csv output_url=output.csv per_thread_output=false options: ()
+  Limit: skip=0, fetch=10
+    Projection: test_decimal.id, test_decimal.price
+      TableScan: test_decimal
+    "#
+    .trim();
+    quick_test(sql, plan);
+}
+
 #[test]
 fn plan_insert() {
     let sql =
diff --git a/datafusion/sqllogictest/bin/sqllogictests.rs 
b/datafusion/sqllogictest/bin/sqllogictests.rs
index f28fdbe23c..d097d97fb7 100644
--- a/datafusion/sqllogictest/bin/sqllogictests.rs
+++ b/datafusion/sqllogictest/bin/sqllogictests.rs
@@ -16,6 +16,7 @@
 // under the License.
 
 use std::ffi::OsStr;
+use std::fs;
 use std::path::{Path, PathBuf};
 #[cfg(target_family = "windows")]
 use std::thread;
@@ -54,10 +55,26 @@ pub async fn main() -> Result<()> {
     run_tests().await
 }
 
+/// Sets up an empty directory at test_files/scratch
+/// creating it if needed and clearing any file contents if it exists
+/// This allows tests for inserting to external tables or copy to
+/// to persist data to disk and have consistent state when running
+/// a new test
+fn setup_scratch_dir() -> Result<()> {
+    let path = std::path::Path::new("test_files/scratch");
+    if path.exists() {
+        fs::remove_dir_all(path)?;
+    }
+    fs::create_dir(path)?;
+    Ok(())
+}
+
 async fn run_tests() -> Result<()> {
     // Enable logging (e.g. set RUST_LOG=debug to see debug logs)
     env_logger::init();
 
+    setup_scratch_dir()?;
+
     let options = Options::new();
 
     // Run all tests in parallel, reporting failures at the end
diff --git a/datafusion/sqllogictest/test_files/copy.slt 
b/datafusion/sqllogictest/test_files/copy.slt
index e7bde89d29..364459fa2d 100644
--- a/datafusion/sqllogictest/test_files/copy.slt
+++ b/datafusion/sqllogictest/test_files/copy.slt
@@ -16,29 +16,141 @@
 # under the License.
 
 # tests for copy command
-
 statement ok
 create table source_table(col1 integer, col2 varchar) as values (1, 'Foo'), 
(2, 'Bar');
 
-# Copy from table
-statement error DataFusion error: This feature is not implemented: `COPY \.\. 
TO \.\.` statement is not yet supported
-COPY source_table  to '/tmp/table.parquet';
+# Copy to directory as multiple files
+query IT
+COPY source_table TO 'test_files/scratch/table' (format parquet, 
per_thread_output true);
+----
+2
+
+#Explain copy queries not currently working
+query error DataFusion error: This feature is not implemented: Unsupported SQL 
statement: Some\("COPY source_table TO 'test_files/scratch/table'"\)
+EXPLAIN COPY source_table to 'test_files/scratch/table'
+
+query error DataFusion error: SQL error: ParserError\("Expected end of 
statement, found: source_table"\)
+EXPLAIN COPY source_table to 'test_files/scratch/table' (format parquet, 
per_thread_output true)
+
+# Copy more files to directory via query
+query IT
+COPY (select * from source_table UNION ALL select * from source_table) to 
'test_files/scratch/table' (format parquet, per_thread_output true);
+----
+4
+
+# validate multiple parquet file output
+statement ok
+CREATE EXTERNAL TABLE validate_parquet STORED AS PARQUET LOCATION 
'test_files/scratch/table/';
+
+query IT
+select * from validate_parquet;
+----
+1 Foo
+2 Bar
+1 Foo
+2 Bar
+1 Foo
+2 Bar
+
+# Copy from table to single file
+query IT
+COPY source_table to 'test_files/scratch/table.parquet';
+----
+2
+
+# validate single parquet file output
+statement ok
+CREATE EXTERNAL TABLE validate_parquet_single STORED AS PARQUET LOCATION 
'test_files/scratch/table.parquet';
+
+query IT
+select * from validate_parquet_single;
+----
+1 Foo
+2 Bar
+
+# copy from table to folder of csv files
+query IT
+COPY source_table  to 'test_files/scratch/table_csv' (format csv, 
per_thread_output true);
+----
+2
+
+# validate folder of csv files
+statement ok
+CREATE EXTERNAL TABLE validate_csv STORED AS csv WITH HEADER ROW LOCATION 
'test_files/scratch/table_csv';
+
+query IT
+select * from validate_csv;
+----
+1 Foo
+2 Bar
+
+# Copy from table to single csv
+query IT
+COPY source_table  to 'test_files/scratch/table.csv';
+----
+2
+
+# Validate single csv output
+statement ok
+CREATE EXTERNAL TABLE validate_single_csv STORED AS csv WITH HEADER ROW 
LOCATION 'test_files/scratch/table.csv';
+
+query IT
+select * from validate_single_csv;
+----
+1 Foo
+2 Bar
+
+# Copy from table to folder of json
+query IT
+COPY source_table to 'test_files/scratch/table_json' (format json, 
per_thread_output true);
+----
+2
+
+# Validate json output
+statement ok
+CREATE EXTERNAL TABLE validate_json STORED AS json LOCATION 
'test_files/scratch/table_json';
+
+query IT
+select * from validate_json;
+----
+1 Foo
+2 Bar
+
+# Copy from table to single json file
+query IT
+COPY source_table  to 'test_files/scratch/table.json';
+----
+2
+
+# Validate single JSON file`
+statement ok
+CREATE EXTERNAL TABLE validate_single_json STORED AS json LOCATION 
'test_files/scratch/table_json';
+
+query IT
+select * from validate_single_json;
+----
+1 Foo
+2 Bar
 
 # Copy from table with options
-statement error DataFusion error: This feature is not implemented: `COPY \.\. 
TO \.\.` statement is not yet supported
-COPY source_table  to '/tmp/table.parquet' (row_group_size 55);
+query IT
+COPY source_table  to 'test_files/scratch/table.json' (row_group_size 55);
+----
+2
 
 # Copy from table with options (and trailing comma)
-statement error DataFusion error: This feature is not implemented: `COPY \.\. 
TO \.\.` statement is not yet supported
-COPY source_table  to '/tmp/table.parquet' (row_group_size 55, 
row_group_limit_bytes 9,);
+query IT
+COPY source_table  to 'test_files/scratch/table.json' (row_group_size 55, 
row_group_limit_bytes 9,);
+----
+2
 
 
 # Error cases:
 
 # Incomplete statement
-statement error DataFusion error: SQL error: ParserError\("Expected \), found: 
EOF"\)
+query error DataFusion error: SQL error: ParserError\("Expected \), found: 
EOF"\)
 COPY (select col2, sum(col1) from source_table
 
 # Copy from table with non literal
-statement error DataFusion error: SQL error: ParserError\("Expected ',' or 
'\)' after option definition, found: \+"\)
+query error DataFusion error: SQL error: ParserError\("Expected ',' or '\)' 
after option definition, found: \+"\)
 COPY source_table  to '/tmp/table.parquet' (row_group_size 55 + 102);

Reply via email to