This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 87eb126eba Support Write Options in DataFrame::write_* methods (#7435)
87eb126eba is described below

commit 87eb126ebaee0f832a61a73a0f601a1c278b16b1
Author: Devin D'Angelo <[email protected]>
AuthorDate: Wed Sep 6 09:16:50 2023 -0400

    Support Write Options in DataFrame::write_* methods (#7435)
    
    * merge main, squash
    
    * remove not existing import
    
    * add test
    
    * fix pathing for windows
    
    * test all supported compression codecs
    
    * update comment
---
 datafusion-examples/examples/dataframe-to-s3.rs    |  12 +-
 datafusion/common/src/file_options/csv_writer.rs   |  13 ++
 datafusion/common/src/file_options/json_writer.rs  |   6 +
 .../common/src/file_options/parquet_writer.rs      |   8 +-
 datafusion/core/src/dataframe.rs                   | 155 ++++++++++++++++++---
 .../core/src/datasource/physical_plan/csv.rs       |   6 +-
 .../core/src/datasource/physical_plan/json.rs      |   6 +-
 .../core/src/datasource/physical_plan/parquet.rs   |   6 +-
 datafusion/core/src/physical_planner.rs            |  17 ++-
 datafusion/expr/src/logical_plan/builder.rs        |   7 +-
 datafusion/expr/src/logical_plan/dml.rs            |  45 +++++-
 datafusion/expr/src/logical_plan/plan.rs           |  20 +--
 datafusion/expr/src/utils.rs                       |   4 +-
 datafusion/sql/src/statement.rs                    |   6 +-
 14 files changed, 261 insertions(+), 50 deletions(-)

diff --git a/datafusion-examples/examples/dataframe-to-s3.rs 
b/datafusion-examples/examples/dataframe-to-s3.rs
index 0d8d0a9fc1..883da7d0d1 100644
--- a/datafusion-examples/examples/dataframe-to-s3.rs
+++ b/datafusion-examples/examples/dataframe-to-s3.rs
@@ -15,6 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use datafusion::dataframe::DataFrameWriteOptions;
 use datafusion::datasource::file_format::parquet::ParquetFormat;
 use datafusion::datasource::listing::ListingOptions;
 use datafusion::error::Result;
@@ -61,15 +62,20 @@ async fn main() -> Result<()> {
     let df = ctx.sql("SELECT * from test").await?;
 
     let out_path = format!("s3://{bucket_name}/test_write/");
-    df.clone().write_parquet(&out_path, None).await?;
+    df.clone()
+        .write_parquet(&out_path, DataFrameWriteOptions::new(), None)
+        .await?;
 
     //write as JSON to s3
     let json_out = format!("s3://{bucket_name}/json_out");
-    df.clone().write_json(&json_out).await?;
+    df.clone()
+        .write_json(&json_out, DataFrameWriteOptions::new())
+        .await?;
 
     //write as csv to s3
     let csv_out = format!("s3://{bucket_name}/csv_out");
-    df.write_csv(&csv_out).await?;
+    df.write_csv(&csv_out, DataFrameWriteOptions::new(), None)
+        .await?;
 
     let file_format = ParquetFormat::default().with_enable_pruning(Some(true));
     let listing_options = ListingOptions::new(Arc::new(file_format))
diff --git a/datafusion/common/src/file_options/csv_writer.rs 
b/datafusion/common/src/file_options/csv_writer.rs
index ebf177fdce..336180b256 100644
--- a/datafusion/common/src/file_options/csv_writer.rs
+++ b/datafusion/common/src/file_options/csv_writer.rs
@@ -46,6 +46,19 @@ pub struct CsvWriterOptions {
     // https://github.com/apache/arrow-rs/issues/4735
 }
 
+impl CsvWriterOptions {
+    pub fn new(
+        writer_options: WriterBuilder,
+        compression: CompressionTypeVariant,
+    ) -> Self {
+        Self {
+            writer_options,
+            compression,
+            has_header: true,
+        }
+    }
+}
+
 impl TryFrom<(&ConfigOptions, &StatementOptions)> for CsvWriterOptions {
     type Error = DataFusionError;
 
diff --git a/datafusion/common/src/file_options/json_writer.rs 
b/datafusion/common/src/file_options/json_writer.rs
index b3ea76b651..7f988016c6 100644
--- a/datafusion/common/src/file_options/json_writer.rs
+++ b/datafusion/common/src/file_options/json_writer.rs
@@ -33,6 +33,12 @@ pub struct JsonWriterOptions {
     pub compression: CompressionTypeVariant,
 }
 
+impl JsonWriterOptions {
+    pub fn new(compression: CompressionTypeVariant) -> Self {
+        Self { compression }
+    }
+}
+
 impl TryFrom<(&ConfigOptions, &StatementOptions)> for JsonWriterOptions {
     type Error = DataFusionError;
 
diff --git a/datafusion/common/src/file_options/parquet_writer.rs 
b/datafusion/common/src/file_options/parquet_writer.rs
index ea3276b062..fed773f29e 100644
--- a/datafusion/common/src/file_options/parquet_writer.rs
+++ b/datafusion/common/src/file_options/parquet_writer.rs
@@ -36,6 +36,12 @@ pub struct ParquetWriterOptions {
     pub writer_options: WriterProperties,
 }
 
+impl ParquetWriterOptions {
+    pub fn new(writer_options: WriterProperties) -> Self {
+        Self { writer_options }
+    }
+}
+
 impl ParquetWriterOptions {
     pub fn writer_options(&self) -> &WriterProperties {
         &self.writer_options
@@ -44,7 +50,7 @@ impl ParquetWriterOptions {
 
 /// Constructs a default Parquet WriterPropertiesBuilder using
 /// Session level ConfigOptions to initialize settings
-fn default_builder(options: &ConfigOptions) -> Result<WriterPropertiesBuilder> 
{
+pub fn default_builder(options: &ConfigOptions) -> 
Result<WriterPropertiesBuilder> {
     let parquet_session_options = &options.execution.parquet;
     let mut builder = WriterProperties::builder()
         .set_data_page_size_limit(parquet_session_options.data_pagesize_limit)
diff --git a/datafusion/core/src/dataframe.rs b/datafusion/core/src/dataframe.rs
index fbd78ec9d7..648b2340d4 100644
--- a/datafusion/core/src/dataframe.rs
+++ b/datafusion/core/src/dataframe.rs
@@ -22,10 +22,19 @@ use std::sync::Arc;
 
 use arrow::array::{Array, ArrayRef, Int64Array, StringArray};
 use arrow::compute::{cast, concat};
+use arrow::csv::WriterBuilder;
 use arrow::datatypes::{DataType, Field};
 use async_trait::async_trait;
-use datafusion_common::file_options::StatementOptions;
-use datafusion_common::{DataFusionError, FileType, SchemaError, UnnestOptions};
+use datafusion_common::file_options::csv_writer::CsvWriterOptions;
+use datafusion_common::file_options::json_writer::JsonWriterOptions;
+use datafusion_common::file_options::parquet_writer::{
+    default_builder, ParquetWriterOptions,
+};
+use datafusion_common::parsers::CompressionTypeVariant;
+use datafusion_common::{
+    DataFusionError, FileType, FileTypeWriterOptions, SchemaError, 
UnnestOptions,
+};
+use datafusion_expr::dml::CopyOptions;
 use parquet::file::properties::WriterProperties;
 
 use datafusion_common::{Column, DFSchema, ScalarValue};
@@ -57,21 +66,42 @@ use crate::prelude::SessionContext;
 /// written out from a DataFrame
 pub struct DataFrameWriteOptions {
     /// Controls if existing data should be overwritten
-    overwrite: bool, // TODO, enable DataFrame COPY TO write without 
TableProvider
-                     // settings such as LOCATION and FILETYPE can be set here
-                     // e.g. add location: Option<Path>
+    overwrite: bool,
+    /// Controls if all partitions should be coalesced into a single output 
file
+    /// Generally will have slower performance when set to true.
+    single_file_output: bool,
+    /// Sets compression by DataFusion applied after file serialization.
+    /// Allows compression of CSV and JSON.
+    /// Not supported for parquet.
+    compression: CompressionTypeVariant,
 }
 
 impl DataFrameWriteOptions {
     /// Create a new DataFrameWriteOptions with default values
     pub fn new() -> Self {
-        DataFrameWriteOptions { overwrite: false }
+        DataFrameWriteOptions {
+            overwrite: false,
+            single_file_output: false,
+            compression: CompressionTypeVariant::UNCOMPRESSED,
+        }
     }
     /// Set the overwrite option to true or false
     pub fn with_overwrite(mut self, overwrite: bool) -> Self {
         self.overwrite = overwrite;
         self
     }
+
+    /// Set the single_file_output value to true or false
+    pub fn with_single_file_output(mut self, single_file_output: bool) -> Self 
{
+        self.single_file_output = single_file_output;
+        self
+    }
+
+    /// Sets the compression type applied to the output file(s)
+    pub fn with_compression(mut self, compression: CompressionTypeVariant) -> 
Self {
+        self.compression = compression;
+        self
+    }
 }
 
 impl Default for DataFrameWriteOptions {
@@ -995,14 +1025,29 @@ impl DataFrame {
     pub async fn write_csv(
         self,
         path: &str,
+        options: DataFrameWriteOptions,
+        writer_properties: Option<WriterBuilder>,
     ) -> Result<Vec<RecordBatch>, DataFusionError> {
+        if options.overwrite {
+            return Err(DataFusionError::NotImplemented(
+                "Overwrites are not implemented for 
DataFrame::write_csv.".to_owned(),
+            ));
+        }
+        let props = match writer_properties {
+            Some(props) => props,
+            None => WriterBuilder::new(),
+        };
+
+        let file_type_writer_options =
+            FileTypeWriterOptions::CSV(CsvWriterOptions::new(props, 
options.compression));
+        let copy_options = 
CopyOptions::WriterOptions(Box::new(file_type_writer_options));
+
         let plan = LogicalPlanBuilder::copy_to(
             self.plan,
             path.into(),
             FileType::CSV,
-            false,
-            // TODO implement options
-            StatementOptions::new(vec![]),
+            options.single_file_output,
+            copy_options,
         )?
         .build()?;
         DataFrame::new(self.session_state, plan).collect().await
@@ -1012,15 +1057,31 @@ impl DataFrame {
     pub async fn write_parquet(
         self,
         path: &str,
-        _writer_properties: Option<WriterProperties>,
+        options: DataFrameWriteOptions,
+        writer_properties: Option<WriterProperties>,
     ) -> Result<Vec<RecordBatch>, DataFusionError> {
+        if options.overwrite {
+            return Err(DataFusionError::NotImplemented(
+                "Overwrites are not implemented for 
DataFrame::write_parquet.".to_owned(),
+            ));
+        }
+        match options.compression{
+            CompressionTypeVariant::UNCOMPRESSED => (),
+            _ => return 
Err(DataFusionError::Configuration("DataFrame::write_parquet method does not 
support compression set via DataFrameWriteOptions. Set parquet compression via 
writer_properties instead.".to_owned()))
+        }
+        let props = match writer_properties {
+            Some(props) => props,
+            None => 
default_builder(self.session_state.config_options())?.build(),
+        };
+        let file_type_writer_options =
+            FileTypeWriterOptions::Parquet(ParquetWriterOptions::new(props));
+        let copy_options = 
CopyOptions::WriterOptions(Box::new(file_type_writer_options));
         let plan = LogicalPlanBuilder::copy_to(
             self.plan,
             path.into(),
             FileType::PARQUET,
-            false,
-            // TODO implement options
-            StatementOptions::new(vec![]),
+            options.single_file_output,
+            copy_options,
         )?
         .build()?;
         DataFrame::new(self.session_state, plan).collect().await
@@ -1030,14 +1091,22 @@ impl DataFrame {
     pub async fn write_json(
         self,
         path: &str,
+        options: DataFrameWriteOptions,
     ) -> Result<Vec<RecordBatch>, DataFusionError> {
+        if options.overwrite {
+            return Err(DataFusionError::NotImplemented(
+                "Overwrites are not implemented for 
DataFrame::write_json.".to_owned(),
+            ));
+        }
+        let file_type_writer_options =
+            
FileTypeWriterOptions::JSON(JsonWriterOptions::new(options.compression));
+        let copy_options = 
CopyOptions::WriterOptions(Box::new(file_type_writer_options));
         let plan = LogicalPlanBuilder::copy_to(
             self.plan,
             path.into(),
             FileType::JSON,
-            false,
-            // TODO implement options
-            StatementOptions::new(vec![]),
+            options.single_file_output,
+            copy_options,
         )?
         .build()?;
         DataFrame::new(self.session_state, plan).collect().await
@@ -1249,6 +1318,11 @@ mod tests {
         WindowFunction,
     };
     use datafusion_physical_expr::expressions::Column;
+    use object_store::local::LocalFileSystem;
+    use parquet::basic::{BrotliLevel, GzipLevel, ZstdLevel};
+    use parquet::file::reader::FileReader;
+    use tempfile::TempDir;
+    use url::Url;
 
     use crate::execution::context::SessionConfig;
     use crate::execution::options::{CsvReadOptions, ParquetReadOptions};
@@ -2292,4 +2366,53 @@ mod tests {
 
         Ok(())
     }
+
+    #[tokio::test]
+    async fn write_parquet_with_compression() -> Result<()> {
+        let test_df = test_table().await?;
+
+        let output_path = "file://local/test.parquet";
+        let test_compressions = vec![
+            parquet::basic::Compression::SNAPPY,
+            parquet::basic::Compression::LZ4,
+            parquet::basic::Compression::LZ4_RAW,
+            parquet::basic::Compression::GZIP(GzipLevel::default()),
+            parquet::basic::Compression::BROTLI(BrotliLevel::default()),
+            parquet::basic::Compression::ZSTD(ZstdLevel::default()),
+        ];
+        for compression in test_compressions.into_iter() {
+            let df = test_df.clone();
+            let tmp_dir = TempDir::new()?;
+            let local = Arc::new(LocalFileSystem::new_with_prefix(&tmp_dir)?);
+            let local_url = Url::parse("file://local").unwrap();
+            let ctx = &test_df.session_state;
+            ctx.runtime_env().register_object_store(&local_url, local);
+            df.write_parquet(
+                output_path,
+                DataFrameWriteOptions::new().with_single_file_output(true),
+                Some(
+                    WriterProperties::builder()
+                        .set_compression(compression)
+                        .build(),
+                ),
+            )
+            .await?;
+
+            // Check that file actually used the specified compression
+            let file = 
std::fs::File::open(tmp_dir.into_path().join("test.parquet"))?;
+
+            let reader =
+                
parquet::file::serialized_reader::SerializedFileReader::new(file)
+                    .unwrap();
+
+            let parquet_metadata = reader.metadata();
+
+            let written_compression =
+                parquet_metadata.row_group(0).column(0).compression();
+
+            assert_eq!(written_compression, compression);
+        }
+
+        Ok(())
+    }
 }
diff --git a/datafusion/core/src/datasource/physical_plan/csv.rs 
b/datafusion/core/src/datasource/physical_plan/csv.rs
index 9f670431cb..e692810381 100644
--- a/datafusion/core/src/datasource/physical_plan/csv.rs
+++ b/datafusion/core/src/datasource/physical_plan/csv.rs
@@ -577,6 +577,7 @@ pub async fn plan_to_csv(
 #[cfg(test)]
 mod tests {
     use super::*;
+    use crate::dataframe::DataFrameWriteOptions;
     use crate::prelude::*;
     use crate::test::{partitioned_csv_config, partitioned_file_groups};
     use crate::{scalar::ScalarValue, test_util::aggr_test_schema};
@@ -1071,7 +1072,7 @@ mod tests {
 
         let out_dir_url = "file://local/out";
         let e = df
-            .write_csv(out_dir_url)
+            .write_csv(out_dir_url, DataFrameWriteOptions::new(), None)
             .await
             .expect_err("should fail because input file does not match 
inferred schema");
         assert_eq!("Arrow error: Parser error: Error while parsing value d for 
column 0 at line 4", format!("{e}"));
@@ -1106,7 +1107,8 @@ mod tests {
         let out_dir = tmp_dir.as_ref().to_str().unwrap().to_string() + "/out";
         let out_dir_url = "file://local/out";
         let df = ctx.sql("SELECT c1, c2 FROM test").await?;
-        df.write_csv(out_dir_url).await?;
+        df.write_csv(out_dir_url, DataFrameWriteOptions::new(), None)
+            .await?;
 
         // create a new context and verify that the results were saved to a 
partitioned csv file
         let ctx = SessionContext::new();
diff --git a/datafusion/core/src/datasource/physical_plan/json.rs 
b/datafusion/core/src/datasource/physical_plan/json.rs
index 4ad268a976..ec5a4c1cef 100644
--- a/datafusion/core/src/datasource/physical_plan/json.rs
+++ b/datafusion/core/src/datasource/physical_plan/json.rs
@@ -321,6 +321,7 @@ mod tests {
     use object_store::local::LocalFileSystem;
 
     use crate::assert_batches_eq;
+    use crate::dataframe::DataFrameWriteOptions;
     use crate::datasource::file_format::{json::JsonFormat, FileFormat};
     use crate::datasource::listing::PartitionedFile;
     use crate::datasource::object_store::ObjectStoreUrl;
@@ -696,7 +697,8 @@ mod tests {
         let out_dir = tmp_dir.as_ref().to_str().unwrap().to_string() + "/out";
         let out_dir_url = "file://local/out";
         let df = ctx.sql("SELECT a, b FROM test").await?;
-        df.write_json(out_dir_url).await?;
+        df.write_json(out_dir_url, DataFrameWriteOptions::new())
+            .await?;
 
         // create a new context and verify that the results were saved to a 
partitioned csv file
         let ctx = SessionContext::new();
@@ -789,7 +791,7 @@ mod tests {
         let df = ctx.read_csv("tests/data/corrupt.csv", options).await?;
         let out_dir_url = "file://local/out";
         let e = df
-            .write_json(out_dir_url)
+            .write_json(out_dir_url, DataFrameWriteOptions::new())
             .await
             .expect_err("should fail because input file does not match 
inferred schema");
         assert_eq!("Arrow error: Parser error: Error while parsing value d for 
column 0 at line 4", format!("{e}"));
diff --git a/datafusion/core/src/datasource/physical_plan/parquet.rs 
b/datafusion/core/src/datasource/physical_plan/parquet.rs
index d973f13fad..861a37a302 100644
--- a/datafusion/core/src/datasource/physical_plan/parquet.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet.rs
@@ -736,6 +736,7 @@ mod tests {
     // See also `parquet_exec` integration test
 
     use super::*;
+    use crate::dataframe::DataFrameWriteOptions;
     use crate::datasource::file_format::options::CsvReadOptions;
     use crate::datasource::file_format::parquet::test_util::store_parquet;
     use crate::datasource::file_format::test_util::scan_format;
@@ -932,7 +933,7 @@ mod tests {
         let df = ctx.read_csv("tests/data/corrupt.csv", options).await?;
         let out_dir_url = "file://local/out";
         let e = df
-            .write_parquet(out_dir_url, None)
+            .write_parquet(out_dir_url, DataFrameWriteOptions::new(), None)
             .await
             .expect_err("should fail because input file does not match 
inferred schema");
         assert_eq!("Arrow error: Parser error: Error while parsing value d for 
column 0 at line 4", format!("{e}"));
@@ -1951,7 +1952,8 @@ mod tests {
         let out_dir = tmp_dir.as_ref().to_str().unwrap().to_string() + "/out";
         let out_dir_url = "file://local/out";
         let df = ctx.sql("SELECT c1, c2 FROM test").await?;
-        df.write_parquet(out_dir_url, None).await?;
+        df.write_parquet(out_dir_url, DataFrameWriteOptions::new(), None)
+            .await?;
         // write_parquet(&mut ctx, "SELECT c1, c2 FROM test", &out_dir, 
None).await?;
 
         // create a new context and verify that the results were saved to a 
partitioned parquet file
diff --git a/datafusion/core/src/physical_planner.rs 
b/datafusion/core/src/physical_planner.rs
index 25b11b1f97..3a59f40ede 100644
--- a/datafusion/core/src/physical_planner.rs
+++ b/datafusion/core/src/physical_planner.rs
@@ -40,7 +40,7 @@ use crate::logical_expr::{
 use datafusion_common::display::ToStringifiedPlan;
 use datafusion_common::file_options::FileTypeWriterOptions;
 use datafusion_common::FileType;
-use datafusion_expr::dml::CopyTo;
+use datafusion_expr::dml::{CopyOptions, CopyTo};
 
 use crate::logical_expr::{Limit, Values};
 use crate::physical_expr::create_physical_expr;
@@ -557,7 +557,7 @@ impl DefaultPhysicalPlanner {
                     output_url,
                     file_format,
                     single_file_output,
-                    statement_options,
+                    copy_options,
                 }) => {
                     let input_exec = self.create_initial_plan(input, 
session_state).await?;
 
@@ -569,10 +569,15 @@ impl DefaultPhysicalPlanner {
 
                     let schema: Schema = (**input.schema()).clone().into();
 
-                    let file_type_writer_options = 
FileTypeWriterOptions::build(
-                        file_format,
-                        session_state.config_options(),
-                        statement_options)?;
+                    let file_type_writer_options = match copy_options{
+                        CopyOptions::SQLOptions(statement_options) => {
+                            FileTypeWriterOptions::build(
+                                file_format,
+                                session_state.config_options(),
+                                statement_options)?
+                        },
+                        CopyOptions::WriterOptions(writer_options) => 
*writer_options.clone()
+                    };
 
                     // Set file sink related options
                     let config = FileSinkConfig {
diff --git a/datafusion/expr/src/logical_plan/builder.rs 
b/datafusion/expr/src/logical_plan/builder.rs
index d3cc42afbe..eb7833504b 100644
--- a/datafusion/expr/src/logical_plan/builder.rs
+++ b/datafusion/expr/src/logical_plan/builder.rs
@@ -17,7 +17,7 @@
 
 //! This module provides a builder for creating LogicalPlans
 
-use crate::dml::CopyTo;
+use crate::dml::{CopyOptions, CopyTo};
 use crate::expr::Alias;
 use crate::expr_rewriter::{
     coerce_plan_expr_for_schema, normalize_col,
@@ -41,7 +41,6 @@ use crate::{
     Expr, ExprSchemable, TableSource,
 };
 use arrow::datatypes::{DataType, Schema, SchemaRef};
-use datafusion_common::file_options::StatementOptions;
 use datafusion_common::plan_err;
 use datafusion_common::UnnestOptions;
 use datafusion_common::{
@@ -240,14 +239,14 @@ impl LogicalPlanBuilder {
         output_url: String,
         file_format: FileType,
         single_file_output: bool,
-        statement_options: StatementOptions,
+        copy_options: CopyOptions,
     ) -> Result<Self> {
         Ok(Self::from(LogicalPlan::Copy(CopyTo {
             input: Arc::new(input),
             output_url,
             file_format,
             single_file_output,
-            statement_options,
+            copy_options,
         })))
     }
 
diff --git a/datafusion/expr/src/logical_plan/dml.rs 
b/datafusion/expr/src/logical_plan/dml.rs
index 501f2eeba1..4cd56b89ac 100644
--- a/datafusion/expr/src/logical_plan/dml.rs
+++ b/datafusion/expr/src/logical_plan/dml.rs
@@ -21,7 +21,8 @@ use std::{
 };
 
 use datafusion_common::{
-    file_options::StatementOptions, DFSchemaRef, FileType, OwnedTableReference,
+    file_options::StatementOptions, DFSchemaRef, FileType, 
FileTypeWriterOptions,
+    OwnedTableReference,
 };
 
 use crate::LogicalPlan;
@@ -40,7 +41,47 @@ pub struct CopyTo {
     /// to which each output partition is written to its own output file
     pub single_file_output: bool,
     /// Arbitrary options as tuples
-    pub statement_options: StatementOptions,
+    pub copy_options: CopyOptions,
+}
+
+/// When the logical plan is constructed from SQL, CopyOptions
+/// will contain arbitrary string tuples which must be parsed into
+/// FileTypeWriterOptions. When the logical plan is constructed directly
+/// from rust code (such as via the DataFrame API), FileTypeWriterOptions
+/// can be provided directly, avoiding the run time cost and fallibility of
+/// parsing string based options.
+#[derive(Clone)]
+pub enum CopyOptions {
+    /// Holds StatementOptions parsed from a SQL statement
+    SQLOptions(StatementOptions),
+    /// Holds FileTypeWriterOptions directly provided
+    WriterOptions(Box<FileTypeWriterOptions>),
+}
+
+impl PartialEq for CopyOptions {
+    fn eq(&self, other: &CopyOptions) -> bool {
+        match self {
+            Self::SQLOptions(statement1) => match other {
+                Self::SQLOptions(statement2) => statement1.eq(statement2),
+                Self::WriterOptions(_) => false,
+            },
+            Self::WriterOptions(_) => false,
+        }
+    }
+}
+
+impl Eq for CopyOptions {}
+
+impl std::hash::Hash for CopyOptions {
+    fn hash<H>(&self, hasher: &mut H)
+    where
+        H: std::hash::Hasher,
+    {
+        match self {
+            Self::SQLOptions(statement) => statement.hash(hasher),
+            Self::WriterOptions(_) => (),
+        }
+    }
 }
 
 /// The operator that modifies the content of a database (adapted from
diff --git a/datafusion/expr/src/logical_plan/plan.rs 
b/datafusion/expr/src/logical_plan/plan.rs
index b17db245e6..083ee230c7 100644
--- a/datafusion/expr/src/logical_plan/plan.rs
+++ b/datafusion/expr/src/logical_plan/plan.rs
@@ -17,6 +17,7 @@
 
 //! Logical plan types
 
+use crate::dml::CopyOptions;
 use crate::expr::{Alias, Exists, InSubquery, Placeholder};
 use crate::expr_rewriter::create_col_from_scalar_expr;
 use crate::expr_vec_fmt;
@@ -1118,15 +1119,18 @@ impl LogicalPlan {
                         output_url,
                         file_format,
                         single_file_output,
-                        statement_options,
+                        copy_options,
                     }) => {
-                        let op_str = statement_options
-                            .clone()
-                            .into_inner()
-                            .iter()
-                            .map(|(k, v)| format!("{k} {v}"))
-                            .collect::<Vec<String>>()
-                            .join(", ");
+                        let op_str = match copy_options {
+                            CopyOptions::SQLOptions(statement) => statement
+                                .clone()
+                                .into_inner()
+                                .iter()
+                                .map(|(k, v)| format!("{k} {v}"))
+                                .collect::<Vec<String>>()
+                                .join(", "),
+                            CopyOptions::WriterOptions(_) => "".into(),
+                        };
 
                         write!(f, "CopyTo: format={file_format} 
output_url={output_url} single_file_output={single_file_output} options: 
({op_str})")
                     }
diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs
index fd6a65b312..be48418cb8 100644
--- a/datafusion/expr/src/utils.rs
+++ b/datafusion/expr/src/utils.rs
@@ -756,13 +756,13 @@ pub fn from_plan(
             output_url,
             file_format,
             single_file_output,
-            statement_options,
+            copy_options,
         }) => Ok(LogicalPlan::Copy(CopyTo {
             input: Arc::new(inputs[0].clone()),
             output_url: output_url.clone(),
             file_format: file_format.clone(),
             single_file_output: *single_file_output,
-            statement_options: statement_options.clone(),
+            copy_options: copy_options.clone(),
         })),
         LogicalPlan::Values(Values { schema, .. }) => 
Ok(LogicalPlan::Values(Values {
             schema: schema.clone(),
diff --git a/datafusion/sql/src/statement.rs b/datafusion/sql/src/statement.rs
index 655442d7e3..0b0c391134 100644
--- a/datafusion/sql/src/statement.rs
+++ b/datafusion/sql/src/statement.rs
@@ -32,7 +32,7 @@ use datafusion_common::{
     DFSchemaRef, DataFusionError, ExprSchema, OwnedTableReference, Result,
     SchemaReference, TableReference, ToDFSchema,
 };
-use datafusion_expr::dml::CopyTo;
+use datafusion_expr::dml::{CopyOptions, CopyTo};
 use datafusion_expr::expr::Placeholder;
 use 
datafusion_expr::expr_rewriter::normalize_col_with_schemas_and_ambiguity_check;
 use datafusion_expr::logical_plan::builder::project;
@@ -617,12 +617,14 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
         // COPY defaults to outputting a single file if not otherwise specified
         let single_file_output = single_file_output.unwrap_or(true);
 
+        let copy_options = CopyOptions::SQLOptions(statement_options);
+
         Ok(LogicalPlan::Copy(CopyTo {
             input: Arc::new(input),
             output_url: statement.target,
             file_format,
             single_file_output,
-            statement_options,
+            copy_options,
         }))
     }
 

Reply via email to