This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 0ff5305db6 Implement logical plan serde for CopyTo (#8618)
0ff5305db6 is described below

commit 0ff5305db6b03128282d31afac69fa727e1fe7c4
Author: Andy Grove <[email protected]>
AuthorDate: Fri Dec 22 04:14:45 2023 -0700

    Implement logical plan serde for CopyTo (#8618)
    
    * Implement logical plan serde for CopyTo
    
    * add link to issue
    
    * clippy
    
    * remove debug logging
---
 datafusion/proto/proto/datafusion.proto            |  21 ++
 datafusion/proto/src/generated/pbjson.rs           | 395 +++++++++++++++++++++
 datafusion/proto/src/generated/prost.rs            |  43 ++-
 datafusion/proto/src/logical_plan/mod.rs           |  86 ++++-
 .../proto/tests/cases/roundtrip_logical_plan.rs    |  68 +++-
 5 files changed, 603 insertions(+), 10 deletions(-)

diff --git a/datafusion/proto/proto/datafusion.proto 
b/datafusion/proto/proto/datafusion.proto
index cc802ee957..05f0b64343 100644
--- a/datafusion/proto/proto/datafusion.proto
+++ b/datafusion/proto/proto/datafusion.proto
@@ -74,6 +74,7 @@ message LogicalPlanNode {
     PrepareNode prepare = 26;
     DropViewNode drop_view = 27;
     DistinctOnNode distinct_on = 28;
+    CopyToNode copy_to = 29;
   }
 }
 
@@ -317,6 +318,26 @@ message DistinctOnNode {
   LogicalPlanNode input = 4;
 }
 
+message CopyToNode {
+    LogicalPlanNode input = 1;
+    string output_url = 2;
+    bool single_file_output = 3;
+    oneof CopyOptions {
+        SQLOptions sql_options = 4;
+        FileTypeWriterOptions writer_options = 5;
+    }
+    string file_type = 6;
+}
+
+message SQLOptions {
+  repeated SQLOption option = 1;
+}
+
+message SQLOption {
+    string key = 1;
+    string value = 2;
+}
+
 message UnionNode {
   repeated LogicalPlanNode inputs = 1;
 }
diff --git a/datafusion/proto/src/generated/pbjson.rs 
b/datafusion/proto/src/generated/pbjson.rs
index fb3a3ad91d..0fdeab0a40 100644
--- a/datafusion/proto/src/generated/pbjson.rs
+++ b/datafusion/proto/src/generated/pbjson.rs
@@ -3704,6 +3704,188 @@ impl<'de> serde::Deserialize<'de> for Constraints {
         deserializer.deserialize_struct("datafusion.Constraints", FIELDS, 
GeneratedVisitor)
     }
 }
+impl serde::Serialize for CopyToNode {
+    #[allow(deprecated)]
+    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, 
S::Error>
+    where
+        S: serde::Serializer,
+    {
+        use serde::ser::SerializeStruct;
+        let mut len = 0;
+        if self.input.is_some() {
+            len += 1;
+        }
+        if !self.output_url.is_empty() {
+            len += 1;
+        }
+        if self.single_file_output {
+            len += 1;
+        }
+        if !self.file_type.is_empty() {
+            len += 1;
+        }
+        if self.copy_options.is_some() {
+            len += 1;
+        }
+        let mut struct_ser = 
serializer.serialize_struct("datafusion.CopyToNode", len)?;
+        if let Some(v) = self.input.as_ref() {
+            struct_ser.serialize_field("input", v)?;
+        }
+        if !self.output_url.is_empty() {
+            struct_ser.serialize_field("outputUrl", &self.output_url)?;
+        }
+        if self.single_file_output {
+            struct_ser.serialize_field("singleFileOutput", 
&self.single_file_output)?;
+        }
+        if !self.file_type.is_empty() {
+            struct_ser.serialize_field("fileType", &self.file_type)?;
+        }
+        if let Some(v) = self.copy_options.as_ref() {
+            match v {
+                copy_to_node::CopyOptions::SqlOptions(v) => {
+                    struct_ser.serialize_field("sqlOptions", v)?;
+                }
+                copy_to_node::CopyOptions::WriterOptions(v) => {
+                    struct_ser.serialize_field("writerOptions", v)?;
+                }
+            }
+        }
+        struct_ser.end()
+    }
+}
+impl<'de> serde::Deserialize<'de> for CopyToNode {
+    #[allow(deprecated)]
+    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
+    where
+        D: serde::Deserializer<'de>,
+    {
+        const FIELDS: &[&str] = &[
+            "input",
+            "output_url",
+            "outputUrl",
+            "single_file_output",
+            "singleFileOutput",
+            "file_type",
+            "fileType",
+            "sql_options",
+            "sqlOptions",
+            "writer_options",
+            "writerOptions",
+        ];
+
+        #[allow(clippy::enum_variant_names)]
+        enum GeneratedField {
+            Input,
+            OutputUrl,
+            SingleFileOutput,
+            FileType,
+            SqlOptions,
+            WriterOptions,
+        }
+        impl<'de> serde::Deserialize<'de> for GeneratedField {
+            fn deserialize<D>(deserializer: D) -> 
std::result::Result<GeneratedField, D::Error>
+            where
+                D: serde::Deserializer<'de>,
+            {
+                struct GeneratedVisitor;
+
+                impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
+                    type Value = GeneratedField;
+
+                    fn expecting(&self, formatter: &mut 
std::fmt::Formatter<'_>) -> std::fmt::Result {
+                        write!(formatter, "expected one of: {:?}", &FIELDS)
+                    }
+
+                    #[allow(unused_variables)]
+                    fn visit_str<E>(self, value: &str) -> 
std::result::Result<GeneratedField, E>
+                    where
+                        E: serde::de::Error,
+                    {
+                        match value {
+                            "input" => Ok(GeneratedField::Input),
+                            "outputUrl" | "output_url" => 
Ok(GeneratedField::OutputUrl),
+                            "singleFileOutput" | "single_file_output" => 
Ok(GeneratedField::SingleFileOutput),
+                            "fileType" | "file_type" => 
Ok(GeneratedField::FileType),
+                            "sqlOptions" | "sql_options" => 
Ok(GeneratedField::SqlOptions),
+                            "writerOptions" | "writer_options" => 
Ok(GeneratedField::WriterOptions),
+                            _ => Err(serde::de::Error::unknown_field(value, 
FIELDS)),
+                        }
+                    }
+                }
+                deserializer.deserialize_identifier(GeneratedVisitor)
+            }
+        }
+        struct GeneratedVisitor;
+        impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
+            type Value = CopyToNode;
+
+            fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> 
std::fmt::Result {
+                formatter.write_str("struct datafusion.CopyToNode")
+            }
+
+            fn visit_map<V>(self, mut map_: V) -> 
std::result::Result<CopyToNode, V::Error>
+                where
+                    V: serde::de::MapAccess<'de>,
+            {
+                let mut input__ = None;
+                let mut output_url__ = None;
+                let mut single_file_output__ = None;
+                let mut file_type__ = None;
+                let mut copy_options__ = None;
+                while let Some(k) = map_.next_key()? {
+                    match k {
+                        GeneratedField::Input => {
+                            if input__.is_some() {
+                                return 
Err(serde::de::Error::duplicate_field("input"));
+                            }
+                            input__ = map_.next_value()?;
+                        }
+                        GeneratedField::OutputUrl => {
+                            if output_url__.is_some() {
+                                return 
Err(serde::de::Error::duplicate_field("outputUrl"));
+                            }
+                            output_url__ = Some(map_.next_value()?);
+                        }
+                        GeneratedField::SingleFileOutput => {
+                            if single_file_output__.is_some() {
+                                return 
Err(serde::de::Error::duplicate_field("singleFileOutput"));
+                            }
+                            single_file_output__ = Some(map_.next_value()?);
+                        }
+                        GeneratedField::FileType => {
+                            if file_type__.is_some() {
+                                return 
Err(serde::de::Error::duplicate_field("fileType"));
+                            }
+                            file_type__ = Some(map_.next_value()?);
+                        }
+                        GeneratedField::SqlOptions => {
+                            if copy_options__.is_some() {
+                                return 
Err(serde::de::Error::duplicate_field("sqlOptions"));
+                            }
+                            copy_options__ = 
map_.next_value::<::std::option::Option<_>>()?.map(copy_to_node::CopyOptions::SqlOptions)
+;
+                        }
+                        GeneratedField::WriterOptions => {
+                            if copy_options__.is_some() {
+                                return 
Err(serde::de::Error::duplicate_field("writerOptions"));
+                            }
+                            copy_options__ = 
map_.next_value::<::std::option::Option<_>>()?.map(copy_to_node::CopyOptions::WriterOptions)
+;
+                        }
+                    }
+                }
+                Ok(CopyToNode {
+                    input: input__,
+                    output_url: output_url__.unwrap_or_default(),
+                    single_file_output: 
single_file_output__.unwrap_or_default(),
+                    file_type: file_type__.unwrap_or_default(),
+                    copy_options: copy_options__,
+                })
+            }
+        }
+        deserializer.deserialize_struct("datafusion.CopyToNode", FIELDS, 
GeneratedVisitor)
+    }
+}
 impl serde::Serialize for CreateCatalogNode {
     #[allow(deprecated)]
     fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, 
S::Error>
@@ -13336,6 +13518,9 @@ impl serde::Serialize for LogicalPlanNode {
                 logical_plan_node::LogicalPlanType::DistinctOn(v) => {
                     struct_ser.serialize_field("distinctOn", v)?;
                 }
+                logical_plan_node::LogicalPlanType::CopyTo(v) => {
+                    struct_ser.serialize_field("copyTo", v)?;
+                }
             }
         }
         struct_ser.end()
@@ -13387,6 +13572,8 @@ impl<'de> serde::Deserialize<'de> for LogicalPlanNode {
             "dropView",
             "distinct_on",
             "distinctOn",
+            "copy_to",
+            "copyTo",
         ];
 
         #[allow(clippy::enum_variant_names)]
@@ -13418,6 +13605,7 @@ impl<'de> serde::Deserialize<'de> for LogicalPlanNode {
             Prepare,
             DropView,
             DistinctOn,
+            CopyTo,
         }
         impl<'de> serde::Deserialize<'de> for GeneratedField {
             fn deserialize<D>(deserializer: D) -> 
std::result::Result<GeneratedField, D::Error>
@@ -13466,6 +13654,7 @@ impl<'de> serde::Deserialize<'de> for LogicalPlanNode {
                             "prepare" => Ok(GeneratedField::Prepare),
                             "dropView" | "drop_view" => 
Ok(GeneratedField::DropView),
                             "distinctOn" | "distinct_on" => 
Ok(GeneratedField::DistinctOn),
+                            "copyTo" | "copy_to" => Ok(GeneratedField::CopyTo),
                             _ => Err(serde::de::Error::unknown_field(value, 
FIELDS)),
                         }
                     }
@@ -13675,6 +13864,13 @@ impl<'de> serde::Deserialize<'de> for LogicalPlanNode {
                                 return 
Err(serde::de::Error::duplicate_field("distinctOn"));
                             }
                             logical_plan_type__ = 
map_.next_value::<::std::option::Option<_>>()?.map(logical_plan_node::LogicalPlanType::DistinctOn)
+;
+                        }
+                        GeneratedField::CopyTo => {
+                            if logical_plan_type__.is_some() {
+                                return 
Err(serde::de::Error::duplicate_field("copyTo"));
+                            }
+                            logical_plan_type__ = 
map_.next_value::<::std::option::Option<_>>()?.map(logical_plan_node::LogicalPlanType::CopyTo)
 ;
                         }
                     }
@@ -20742,6 +20938,205 @@ impl<'de> serde::Deserialize<'de> for RollupNode {
         deserializer.deserialize_struct("datafusion.RollupNode", FIELDS, 
GeneratedVisitor)
     }
 }
+impl serde::Serialize for SqlOption {
+    #[allow(deprecated)]
+    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, 
S::Error>
+    where
+        S: serde::Serializer,
+    {
+        use serde::ser::SerializeStruct;
+        let mut len = 0;
+        if !self.key.is_empty() {
+            len += 1;
+        }
+        if !self.value.is_empty() {
+            len += 1;
+        }
+        let mut struct_ser = 
serializer.serialize_struct("datafusion.SQLOption", len)?;
+        if !self.key.is_empty() {
+            struct_ser.serialize_field("key", &self.key)?;
+        }
+        if !self.value.is_empty() {
+            struct_ser.serialize_field("value", &self.value)?;
+        }
+        struct_ser.end()
+    }
+}
+impl<'de> serde::Deserialize<'de> for SqlOption {
+    #[allow(deprecated)]
+    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
+    where
+        D: serde::Deserializer<'de>,
+    {
+        const FIELDS: &[&str] = &[
+            "key",
+            "value",
+        ];
+
+        #[allow(clippy::enum_variant_names)]
+        enum GeneratedField {
+            Key,
+            Value,
+        }
+        impl<'de> serde::Deserialize<'de> for GeneratedField {
+            fn deserialize<D>(deserializer: D) -> 
std::result::Result<GeneratedField, D::Error>
+            where
+                D: serde::Deserializer<'de>,
+            {
+                struct GeneratedVisitor;
+
+                impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
+                    type Value = GeneratedField;
+
+                    fn expecting(&self, formatter: &mut 
std::fmt::Formatter<'_>) -> std::fmt::Result {
+                        write!(formatter, "expected one of: {:?}", &FIELDS)
+                    }
+
+                    #[allow(unused_variables)]
+                    fn visit_str<E>(self, value: &str) -> 
std::result::Result<GeneratedField, E>
+                    where
+                        E: serde::de::Error,
+                    {
+                        match value {
+                            "key" => Ok(GeneratedField::Key),
+                            "value" => Ok(GeneratedField::Value),
+                            _ => Err(serde::de::Error::unknown_field(value, 
FIELDS)),
+                        }
+                    }
+                }
+                deserializer.deserialize_identifier(GeneratedVisitor)
+            }
+        }
+        struct GeneratedVisitor;
+        impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
+            type Value = SqlOption;
+
+            fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> 
std::fmt::Result {
+                formatter.write_str("struct datafusion.SQLOption")
+            }
+
+            fn visit_map<V>(self, mut map_: V) -> 
std::result::Result<SqlOption, V::Error>
+                where
+                    V: serde::de::MapAccess<'de>,
+            {
+                let mut key__ = None;
+                let mut value__ = None;
+                while let Some(k) = map_.next_key()? {
+                    match k {
+                        GeneratedField::Key => {
+                            if key__.is_some() {
+                                return 
Err(serde::de::Error::duplicate_field("key"));
+                            }
+                            key__ = Some(map_.next_value()?);
+                        }
+                        GeneratedField::Value => {
+                            if value__.is_some() {
+                                return 
Err(serde::de::Error::duplicate_field("value"));
+                            }
+                            value__ = Some(map_.next_value()?);
+                        }
+                    }
+                }
+                Ok(SqlOption {
+                    key: key__.unwrap_or_default(),
+                    value: value__.unwrap_or_default(),
+                })
+            }
+        }
+        deserializer.deserialize_struct("datafusion.SQLOption", FIELDS, 
GeneratedVisitor)
+    }
+}
+impl serde::Serialize for SqlOptions {
+    #[allow(deprecated)]
+    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, 
S::Error>
+    where
+        S: serde::Serializer,
+    {
+        use serde::ser::SerializeStruct;
+        let mut len = 0;
+        if !self.option.is_empty() {
+            len += 1;
+        }
+        let mut struct_ser = 
serializer.serialize_struct("datafusion.SQLOptions", len)?;
+        if !self.option.is_empty() {
+            struct_ser.serialize_field("option", &self.option)?;
+        }
+        struct_ser.end()
+    }
+}
+impl<'de> serde::Deserialize<'de> for SqlOptions {
+    #[allow(deprecated)]
+    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
+    where
+        D: serde::Deserializer<'de>,
+    {
+        const FIELDS: &[&str] = &[
+            "option",
+        ];
+
+        #[allow(clippy::enum_variant_names)]
+        enum GeneratedField {
+            Option,
+        }
+        impl<'de> serde::Deserialize<'de> for GeneratedField {
+            fn deserialize<D>(deserializer: D) -> 
std::result::Result<GeneratedField, D::Error>
+            where
+                D: serde::Deserializer<'de>,
+            {
+                struct GeneratedVisitor;
+
+                impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
+                    type Value = GeneratedField;
+
+                    fn expecting(&self, formatter: &mut 
std::fmt::Formatter<'_>) -> std::fmt::Result {
+                        write!(formatter, "expected one of: {:?}", &FIELDS)
+                    }
+
+                    #[allow(unused_variables)]
+                    fn visit_str<E>(self, value: &str) -> 
std::result::Result<GeneratedField, E>
+                    where
+                        E: serde::de::Error,
+                    {
+                        match value {
+                            "option" => Ok(GeneratedField::Option),
+                            _ => Err(serde::de::Error::unknown_field(value, 
FIELDS)),
+                        }
+                    }
+                }
+                deserializer.deserialize_identifier(GeneratedVisitor)
+            }
+        }
+        struct GeneratedVisitor;
+        impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
+            type Value = SqlOptions;
+
+            fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> 
std::fmt::Result {
+                formatter.write_str("struct datafusion.SQLOptions")
+            }
+
+            fn visit_map<V>(self, mut map_: V) -> 
std::result::Result<SqlOptions, V::Error>
+                where
+                    V: serde::de::MapAccess<'de>,
+            {
+                let mut option__ = None;
+                while let Some(k) = map_.next_key()? {
+                    match k {
+                        GeneratedField::Option => {
+                            if option__.is_some() {
+                                return 
Err(serde::de::Error::duplicate_field("option"));
+                            }
+                            option__ = Some(map_.next_value()?);
+                        }
+                    }
+                }
+                Ok(SqlOptions {
+                    option: option__.unwrap_or_default(),
+                })
+            }
+        }
+        deserializer.deserialize_struct("datafusion.SQLOptions", FIELDS, 
GeneratedVisitor)
+    }
+}
 impl serde::Serialize for ScalarDictionaryValue {
     #[allow(deprecated)]
     fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, 
S::Error>
diff --git a/datafusion/proto/src/generated/prost.rs 
b/datafusion/proto/src/generated/prost.rs
index 9030e90a24..e44355859d 100644
--- a/datafusion/proto/src/generated/prost.rs
+++ b/datafusion/proto/src/generated/prost.rs
@@ -38,7 +38,7 @@ pub struct DfSchema {
 pub struct LogicalPlanNode {
     #[prost(
         oneof = "logical_plan_node::LogicalPlanType",
-        tags = "1, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 
19, 20, 21, 22, 23, 24, 25, 26, 27, 28"
+        tags = "1, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 
19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29"
     )]
     pub logical_plan_type: 
::core::option::Option<logical_plan_node::LogicalPlanType>,
 }
@@ -101,6 +101,8 @@ pub mod logical_plan_node {
         DropView(super::DropViewNode),
         #[prost(message, tag = "28")]
         DistinctOn(::prost::alloc::boxed::Box<super::DistinctOnNode>),
+        #[prost(message, tag = "29")]
+        CopyTo(::prost::alloc::boxed::Box<super::CopyToNode>),
     }
 }
 #[allow(clippy::derive_partial_eq_without_eq)]
@@ -502,6 +504,45 @@ pub struct DistinctOnNode {
 }
 #[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
+pub struct CopyToNode {
+    #[prost(message, optional, boxed, tag = "1")]
+    pub input: 
::core::option::Option<::prost::alloc::boxed::Box<LogicalPlanNode>>,
+    #[prost(string, tag = "2")]
+    pub output_url: ::prost::alloc::string::String,
+    #[prost(bool, tag = "3")]
+    pub single_file_output: bool,
+    #[prost(string, tag = "6")]
+    pub file_type: ::prost::alloc::string::String,
+    #[prost(oneof = "copy_to_node::CopyOptions", tags = "4, 5")]
+    pub copy_options: ::core::option::Option<copy_to_node::CopyOptions>,
+}
+/// Nested message and enum types in `CopyToNode`.
+pub mod copy_to_node {
+    #[allow(clippy::derive_partial_eq_without_eq)]
+    #[derive(Clone, PartialEq, ::prost::Oneof)]
+    pub enum CopyOptions {
+        #[prost(message, tag = "4")]
+        SqlOptions(super::SqlOptions),
+        #[prost(message, tag = "5")]
+        WriterOptions(super::FileTypeWriterOptions),
+    }
+}
+#[allow(clippy::derive_partial_eq_without_eq)]
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct SqlOptions {
+    #[prost(message, repeated, tag = "1")]
+    pub option: ::prost::alloc::vec::Vec<SqlOption>,
+}
+#[allow(clippy::derive_partial_eq_without_eq)]
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct SqlOption {
+    #[prost(string, tag = "1")]
+    pub key: ::prost::alloc::string::String,
+    #[prost(string, tag = "2")]
+    pub value: ::prost::alloc::string::String,
+}
+#[allow(clippy::derive_partial_eq_without_eq)]
+#[derive(Clone, PartialEq, ::prost::Message)]
 pub struct UnionNode {
     #[prost(message, repeated, tag = "1")]
     pub inputs: ::prost::alloc::vec::Vec<LogicalPlanNode>,
diff --git a/datafusion/proto/src/logical_plan/mod.rs 
b/datafusion/proto/src/logical_plan/mod.rs
index 948228d87d..e03b3ffa7b 100644
--- a/datafusion/proto/src/logical_plan/mod.rs
+++ b/datafusion/proto/src/logical_plan/mod.rs
@@ -22,7 +22,9 @@ use std::sync::Arc;
 
 use crate::common::{byte_to_string, proto_error, str_to_byte};
 use crate::protobuf::logical_plan_node::LogicalPlanType::CustomScan;
-use crate::protobuf::{CustomTableScanNode, LogicalExprNodeCollection};
+use crate::protobuf::{
+    copy_to_node, CustomTableScanNode, LogicalExprNodeCollection, SqlOption,
+};
 use crate::{
     convert_required,
     protobuf::{
@@ -44,12 +46,13 @@ use datafusion::{
     datasource::{provider_as_source, source_as_provider},
     prelude::SessionContext,
 };
-use datafusion_common::plan_datafusion_err;
 use datafusion_common::{
-    context, internal_err, not_impl_err, parsers::CompressionTypeVariant,
-    DataFusionError, OwnedTableReference, Result,
+    context, file_options::StatementOptions, internal_err, not_impl_err,
+    parsers::CompressionTypeVariant, plan_datafusion_err, DataFusionError, 
FileType,
+    OwnedTableReference, Result,
 };
 use datafusion_expr::{
+    dml,
     logical_plan::{
         builder::project, Aggregate, CreateCatalog, CreateCatalogSchema,
         CreateExternalTable, CreateView, CrossJoin, DdlStatement, Distinct,
@@ -59,6 +62,7 @@ use datafusion_expr::{
     DistinctOn, DropView, Expr, LogicalPlan, LogicalPlanBuilder,
 };
 
+use datafusion_expr::dml::CopyOptions;
 use prost::bytes::BufMut;
 use prost::Message;
 
@@ -823,6 +827,36 @@ impl AsLogicalPlan for LogicalPlanNode {
                     schema: Arc::new(convert_required!(dropview.schema)?),
                 }),
             )),
+            LogicalPlanType::CopyTo(copy) => {
+                let input: LogicalPlan =
+                    into_logical_plan!(copy.input, ctx, extension_codec)?;
+
+                let copy_options = match &copy.copy_options {
+                    Some(copy_to_node::CopyOptions::SqlOptions(opt)) => {
+                        let options = opt.option.iter().map(|o| 
(o.key.clone(), o.value.clone())).collect();
+                        CopyOptions::SQLOptions(StatementOptions::from(
+                            &options,
+                        ))
+                    }
+                    Some(copy_to_node::CopyOptions::WriterOptions(_)) => {
+                        return Err(proto_error(
+                            "LogicalPlan serde is not yet implemented for 
CopyTo with WriterOptions",
+                        ))
+                    }
+                    other => return Err(proto_error(format!(
+                        "LogicalPlan serde is not yet implemented for CopyTo 
with CopyOptions {other:?}",
+                    )))
+                };
+                Ok(datafusion_expr::LogicalPlan::Copy(
+                    datafusion_expr::dml::CopyTo {
+                        input: Arc::new(input),
+                        output_url: copy.output_url.clone(),
+                        file_format: FileType::from_str(&copy.file_type)?,
+                        single_file_output: copy.single_file_output,
+                        copy_options,
+                    },
+                ))
+            }
         }
     }
 
@@ -1534,9 +1568,47 @@ impl AsLogicalPlan for LogicalPlanNode {
             LogicalPlan::Dml(_) => Err(proto_error(
                 "LogicalPlan serde is not yet implemented for Dml",
             )),
-            LogicalPlan::Copy(_) => Err(proto_error(
-                "LogicalPlan serde is not yet implemented for Copy",
-            )),
+            LogicalPlan::Copy(dml::CopyTo {
+                input,
+                output_url,
+                single_file_output,
+                file_format,
+                copy_options,
+            }) => {
+                let input = protobuf::LogicalPlanNode::try_from_logical_plan(
+                    input,
+                    extension_codec,
+                )?;
+
+                let copy_options_proto: Option<copy_to_node::CopyOptions> = 
match copy_options {
+                    CopyOptions::SQLOptions(opt) => {
+                        let options: Vec<SqlOption> = 
opt.clone().into_inner().iter().map(|(k, v)| SqlOption {
+                            key: k.to_string(),
+                            value: v.to_string(),
+                        }).collect();
+                        
Some(copy_to_node::CopyOptions::SqlOptions(protobuf::SqlOptions {
+                            option: options
+                        }))
+                    }
+                    CopyOptions::WriterOptions(_) => {
+                        return Err(proto_error(
+                            "LogicalPlan serde is not yet implemented for 
CopyTo with WriterOptions",
+                        ))
+                    }
+                };
+
+                Ok(protobuf::LogicalPlanNode {
+                    logical_plan_type: Some(LogicalPlanType::CopyTo(Box::new(
+                        protobuf::CopyToNode {
+                            input: Some(Box::new(input)),
+                            single_file_output: *single_file_output,
+                            output_url: output_url.to_string(),
+                            file_type: file_format.to_string(),
+                            copy_options: copy_options_proto,
+                        },
+                    ))),
+                })
+            }
             LogicalPlan::DescribeTable(_) => Err(proto_error(
                 "LogicalPlan serde is not yet implemented for DescribeTable",
             )),
diff --git a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs 
b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
index 8e15b5d0d4..9798b06f47 100644
--- a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
+++ b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
@@ -31,12 +31,16 @@ use datafusion::datasource::provider::TableProviderFactory;
 use datafusion::datasource::TableProvider;
 use datafusion::execution::context::SessionState;
 use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
+use datafusion::parquet::file::properties::WriterProperties;
 use datafusion::physical_plan::functions::make_scalar_function;
 use datafusion::prelude::{create_udf, CsvReadOptions, SessionConfig, 
SessionContext};
 use datafusion::test_util::{TestTableFactory, TestTableProvider};
-use datafusion_common::Result;
-use datafusion_common::{internal_err, not_impl_err, plan_err};
+use datafusion_common::file_options::parquet_writer::ParquetWriterOptions;
+use datafusion_common::file_options::StatementOptions;
+use datafusion_common::{internal_err, not_impl_err, plan_err, 
FileTypeWriterOptions};
 use datafusion_common::{DFField, DFSchema, DFSchemaRef, DataFusionError, 
ScalarValue};
+use datafusion_common::{FileType, Result};
+use datafusion_expr::dml::{CopyOptions, CopyTo};
 use datafusion_expr::expr::{
     self, Between, BinaryExpr, Case, Cast, GroupingSet, InList, Like, 
ScalarFunction,
     Sort,
@@ -301,6 +305,66 @@ async fn roundtrip_logical_plan_aggregation() -> 
Result<()> {
     Ok(())
 }
 
+#[tokio::test]
+async fn roundtrip_logical_plan_copy_to_sql_options() -> Result<()> {
+    let ctx = SessionContext::new();
+
+    let input = create_csv_scan(&ctx).await?;
+
+    let mut options = HashMap::new();
+    options.insert("foo".to_string(), "bar".to_string());
+
+    let plan = LogicalPlan::Copy(CopyTo {
+        input: Arc::new(input),
+        output_url: "test.csv".to_string(),
+        file_format: FileType::CSV,
+        single_file_output: true,
+        copy_options: 
CopyOptions::SQLOptions(StatementOptions::from(&options)),
+    });
+
+    let bytes = logical_plan_to_bytes(&plan)?;
+    let logical_round_trip = logical_plan_from_bytes(&bytes, &ctx)?;
+    assert_eq!(format!("{plan:?}"), format!("{logical_round_trip:?}"));
+
+    Ok(())
+}
+
+#[tokio::test]
+#[ignore] // see https://github.com/apache/arrow-datafusion/issues/8619
+async fn roundtrip_logical_plan_copy_to_writer_options() -> Result<()> {
+    let ctx = SessionContext::new();
+
+    let input = create_csv_scan(&ctx).await?;
+
+    let writer_properties = WriterProperties::builder()
+        .set_bloom_filter_enabled(true)
+        .set_created_by("DataFusion Test".to_string())
+        .build();
+    let plan = LogicalPlan::Copy(CopyTo {
+        input: Arc::new(input),
+        output_url: "test.csv".to_string(),
+        file_format: FileType::CSV,
+        single_file_output: true,
+        copy_options: CopyOptions::WriterOptions(Box::new(
+            
FileTypeWriterOptions::Parquet(ParquetWriterOptions::new(writer_properties)),
+        )),
+    });
+
+    let bytes = logical_plan_to_bytes(&plan)?;
+    let logical_round_trip = logical_plan_from_bytes(&bytes, &ctx)?;
+    assert_eq!(format!("{plan:?}"), format!("{logical_round_trip:?}"));
+
+    Ok(())
+}
+
+async fn create_csv_scan(ctx: &SessionContext) -> Result<LogicalPlan, 
DataFusionError> {
+    ctx.register_csv("t1", "tests/testdata/test.csv", 
CsvReadOptions::default())
+        .await?;
+
+    let input = ctx.table("t1").await?.into_optimized_plan()?;
+    Ok(input)
+}
+
 #[tokio::test]
 async fn roundtrip_logical_plan_distinct_on() -> Result<()> {
     let ctx = SessionContext::new();

Reply via email to