This is an automated email from the ASF dual-hosted git repository.
dheres pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 0ff5305db6 Implement logical plan serde for CopyTo (#8618)
0ff5305db6 is described below
commit 0ff5305db6b03128282d31afac69fa727e1fe7c4
Author: Andy Grove <[email protected]>
AuthorDate: Fri Dec 22 04:14:45 2023 -0700
Implement logical plan serde for CopyTo (#8618)
* Implement logical plan serde for CopyTo
* add link to issue
* clippy
* remove debug logging
---
datafusion/proto/proto/datafusion.proto | 21 ++
datafusion/proto/src/generated/pbjson.rs | 395 +++++++++++++++++++++
datafusion/proto/src/generated/prost.rs | 43 ++-
datafusion/proto/src/logical_plan/mod.rs | 86 ++++-
.../proto/tests/cases/roundtrip_logical_plan.rs | 68 +++-
5 files changed, 603 insertions(+), 10 deletions(-)
diff --git a/datafusion/proto/proto/datafusion.proto
b/datafusion/proto/proto/datafusion.proto
index cc802ee957..05f0b64343 100644
--- a/datafusion/proto/proto/datafusion.proto
+++ b/datafusion/proto/proto/datafusion.proto
@@ -74,6 +74,7 @@ message LogicalPlanNode {
PrepareNode prepare = 26;
DropViewNode drop_view = 27;
DistinctOnNode distinct_on = 28;
+ CopyToNode copy_to = 29;
}
}
@@ -317,6 +318,26 @@ message DistinctOnNode {
LogicalPlanNode input = 4;
}
+message CopyToNode {
+ LogicalPlanNode input = 1;
+ string output_url = 2;
+ bool single_file_output = 3;
+ oneof CopyOptions {
+ SQLOptions sql_options = 4;
+ FileTypeWriterOptions writer_options = 5;
+ }
+ string file_type = 6;
+}
+
+message SQLOptions {
+ repeated SQLOption option = 1;
+}
+
+message SQLOption {
+ string key = 1;
+ string value = 2;
+}
+
message UnionNode {
repeated LogicalPlanNode inputs = 1;
}
diff --git a/datafusion/proto/src/generated/pbjson.rs
b/datafusion/proto/src/generated/pbjson.rs
index fb3a3ad91d..0fdeab0a40 100644
--- a/datafusion/proto/src/generated/pbjson.rs
+++ b/datafusion/proto/src/generated/pbjson.rs
@@ -3704,6 +3704,188 @@ impl<'de> serde::Deserialize<'de> for Constraints {
deserializer.deserialize_struct("datafusion.Constraints", FIELDS,
GeneratedVisitor)
}
}
+impl serde::Serialize for CopyToNode {
+ #[allow(deprecated)]
+ fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok,
S::Error>
+ where
+ S: serde::Serializer,
+ {
+ use serde::ser::SerializeStruct;
+ let mut len = 0;
+ if self.input.is_some() {
+ len += 1;
+ }
+ if !self.output_url.is_empty() {
+ len += 1;
+ }
+ if self.single_file_output {
+ len += 1;
+ }
+ if !self.file_type.is_empty() {
+ len += 1;
+ }
+ if self.copy_options.is_some() {
+ len += 1;
+ }
+ let mut struct_ser =
serializer.serialize_struct("datafusion.CopyToNode", len)?;
+ if let Some(v) = self.input.as_ref() {
+ struct_ser.serialize_field("input", v)?;
+ }
+ if !self.output_url.is_empty() {
+ struct_ser.serialize_field("outputUrl", &self.output_url)?;
+ }
+ if self.single_file_output {
+ struct_ser.serialize_field("singleFileOutput",
&self.single_file_output)?;
+ }
+ if !self.file_type.is_empty() {
+ struct_ser.serialize_field("fileType", &self.file_type)?;
+ }
+ if let Some(v) = self.copy_options.as_ref() {
+ match v {
+ copy_to_node::CopyOptions::SqlOptions(v) => {
+ struct_ser.serialize_field("sqlOptions", v)?;
+ }
+ copy_to_node::CopyOptions::WriterOptions(v) => {
+ struct_ser.serialize_field("writerOptions", v)?;
+ }
+ }
+ }
+ struct_ser.end()
+ }
+}
+impl<'de> serde::Deserialize<'de> for CopyToNode {
+ #[allow(deprecated)]
+ fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
+ where
+ D: serde::Deserializer<'de>,
+ {
+ const FIELDS: &[&str] = &[
+ "input",
+ "output_url",
+ "outputUrl",
+ "single_file_output",
+ "singleFileOutput",
+ "file_type",
+ "fileType",
+ "sql_options",
+ "sqlOptions",
+ "writer_options",
+ "writerOptions",
+ ];
+
+ #[allow(clippy::enum_variant_names)]
+ enum GeneratedField {
+ Input,
+ OutputUrl,
+ SingleFileOutput,
+ FileType,
+ SqlOptions,
+ WriterOptions,
+ }
+ impl<'de> serde::Deserialize<'de> for GeneratedField {
+ fn deserialize<D>(deserializer: D) ->
std::result::Result<GeneratedField, D::Error>
+ where
+ D: serde::Deserializer<'de>,
+ {
+ struct GeneratedVisitor;
+
+ impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
+ type Value = GeneratedField;
+
+ fn expecting(&self, formatter: &mut
std::fmt::Formatter<'_>) -> std::fmt::Result {
+ write!(formatter, "expected one of: {:?}", &FIELDS)
+ }
+
+ #[allow(unused_variables)]
+ fn visit_str<E>(self, value: &str) ->
std::result::Result<GeneratedField, E>
+ where
+ E: serde::de::Error,
+ {
+ match value {
+ "input" => Ok(GeneratedField::Input),
+ "outputUrl" | "output_url" =>
Ok(GeneratedField::OutputUrl),
+ "singleFileOutput" | "single_file_output" =>
Ok(GeneratedField::SingleFileOutput),
+ "fileType" | "file_type" =>
Ok(GeneratedField::FileType),
+ "sqlOptions" | "sql_options" =>
Ok(GeneratedField::SqlOptions),
+ "writerOptions" | "writer_options" =>
Ok(GeneratedField::WriterOptions),
+ _ => Err(serde::de::Error::unknown_field(value,
FIELDS)),
+ }
+ }
+ }
+ deserializer.deserialize_identifier(GeneratedVisitor)
+ }
+ }
+ struct GeneratedVisitor;
+ impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
+ type Value = CopyToNode;
+
+ fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) ->
std::fmt::Result {
+ formatter.write_str("struct datafusion.CopyToNode")
+ }
+
+ fn visit_map<V>(self, mut map_: V) ->
std::result::Result<CopyToNode, V::Error>
+ where
+ V: serde::de::MapAccess<'de>,
+ {
+ let mut input__ = None;
+ let mut output_url__ = None;
+ let mut single_file_output__ = None;
+ let mut file_type__ = None;
+ let mut copy_options__ = None;
+ while let Some(k) = map_.next_key()? {
+ match k {
+ GeneratedField::Input => {
+ if input__.is_some() {
+ return
Err(serde::de::Error::duplicate_field("input"));
+ }
+ input__ = map_.next_value()?;
+ }
+ GeneratedField::OutputUrl => {
+ if output_url__.is_some() {
+ return
Err(serde::de::Error::duplicate_field("outputUrl"));
+ }
+ output_url__ = Some(map_.next_value()?);
+ }
+ GeneratedField::SingleFileOutput => {
+ if single_file_output__.is_some() {
+ return
Err(serde::de::Error::duplicate_field("singleFileOutput"));
+ }
+ single_file_output__ = Some(map_.next_value()?);
+ }
+ GeneratedField::FileType => {
+ if file_type__.is_some() {
+ return
Err(serde::de::Error::duplicate_field("fileType"));
+ }
+ file_type__ = Some(map_.next_value()?);
+ }
+ GeneratedField::SqlOptions => {
+ if copy_options__.is_some() {
+ return
Err(serde::de::Error::duplicate_field("sqlOptions"));
+ }
+ copy_options__ =
map_.next_value::<::std::option::Option<_>>()?.map(copy_to_node::CopyOptions::SqlOptions)
+;
+ }
+ GeneratedField::WriterOptions => {
+ if copy_options__.is_some() {
+ return
Err(serde::de::Error::duplicate_field("writerOptions"));
+ }
+ copy_options__ =
map_.next_value::<::std::option::Option<_>>()?.map(copy_to_node::CopyOptions::WriterOptions)
+;
+ }
+ }
+ }
+ Ok(CopyToNode {
+ input: input__,
+ output_url: output_url__.unwrap_or_default(),
+ single_file_output:
single_file_output__.unwrap_or_default(),
+ file_type: file_type__.unwrap_or_default(),
+ copy_options: copy_options__,
+ })
+ }
+ }
+ deserializer.deserialize_struct("datafusion.CopyToNode", FIELDS,
GeneratedVisitor)
+ }
+}
impl serde::Serialize for CreateCatalogNode {
#[allow(deprecated)]
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok,
S::Error>
@@ -13336,6 +13518,9 @@ impl serde::Serialize for LogicalPlanNode {
logical_plan_node::LogicalPlanType::DistinctOn(v) => {
struct_ser.serialize_field("distinctOn", v)?;
}
+ logical_plan_node::LogicalPlanType::CopyTo(v) => {
+ struct_ser.serialize_field("copyTo", v)?;
+ }
}
}
struct_ser.end()
@@ -13387,6 +13572,8 @@ impl<'de> serde::Deserialize<'de> for LogicalPlanNode {
"dropView",
"distinct_on",
"distinctOn",
+ "copy_to",
+ "copyTo",
];
#[allow(clippy::enum_variant_names)]
@@ -13418,6 +13605,7 @@ impl<'de> serde::Deserialize<'de> for LogicalPlanNode {
Prepare,
DropView,
DistinctOn,
+ CopyTo,
}
impl<'de> serde::Deserialize<'de> for GeneratedField {
fn deserialize<D>(deserializer: D) ->
std::result::Result<GeneratedField, D::Error>
@@ -13466,6 +13654,7 @@ impl<'de> serde::Deserialize<'de> for LogicalPlanNode {
"prepare" => Ok(GeneratedField::Prepare),
"dropView" | "drop_view" =>
Ok(GeneratedField::DropView),
"distinctOn" | "distinct_on" =>
Ok(GeneratedField::DistinctOn),
+ "copyTo" | "copy_to" => Ok(GeneratedField::CopyTo),
_ => Err(serde::de::Error::unknown_field(value,
FIELDS)),
}
}
@@ -13675,6 +13864,13 @@ impl<'de> serde::Deserialize<'de> for LogicalPlanNode {
return
Err(serde::de::Error::duplicate_field("distinctOn"));
}
logical_plan_type__ =
map_.next_value::<::std::option::Option<_>>()?.map(logical_plan_node::LogicalPlanType::DistinctOn)
+;
+ }
+ GeneratedField::CopyTo => {
+ if logical_plan_type__.is_some() {
+ return
Err(serde::de::Error::duplicate_field("copyTo"));
+ }
+ logical_plan_type__ =
map_.next_value::<::std::option::Option<_>>()?.map(logical_plan_node::LogicalPlanType::CopyTo)
;
}
}
@@ -20742,6 +20938,205 @@ impl<'de> serde::Deserialize<'de> for RollupNode {
deserializer.deserialize_struct("datafusion.RollupNode", FIELDS,
GeneratedVisitor)
}
}
+impl serde::Serialize for SqlOption {
+ #[allow(deprecated)]
+ fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok,
S::Error>
+ where
+ S: serde::Serializer,
+ {
+ use serde::ser::SerializeStruct;
+ let mut len = 0;
+ if !self.key.is_empty() {
+ len += 1;
+ }
+ if !self.value.is_empty() {
+ len += 1;
+ }
+ let mut struct_ser =
serializer.serialize_struct("datafusion.SQLOption", len)?;
+ if !self.key.is_empty() {
+ struct_ser.serialize_field("key", &self.key)?;
+ }
+ if !self.value.is_empty() {
+ struct_ser.serialize_field("value", &self.value)?;
+ }
+ struct_ser.end()
+ }
+}
+impl<'de> serde::Deserialize<'de> for SqlOption {
+ #[allow(deprecated)]
+ fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
+ where
+ D: serde::Deserializer<'de>,
+ {
+ const FIELDS: &[&str] = &[
+ "key",
+ "value",
+ ];
+
+ #[allow(clippy::enum_variant_names)]
+ enum GeneratedField {
+ Key,
+ Value,
+ }
+ impl<'de> serde::Deserialize<'de> for GeneratedField {
+ fn deserialize<D>(deserializer: D) ->
std::result::Result<GeneratedField, D::Error>
+ where
+ D: serde::Deserializer<'de>,
+ {
+ struct GeneratedVisitor;
+
+ impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
+ type Value = GeneratedField;
+
+ fn expecting(&self, formatter: &mut
std::fmt::Formatter<'_>) -> std::fmt::Result {
+ write!(formatter, "expected one of: {:?}", &FIELDS)
+ }
+
+ #[allow(unused_variables)]
+ fn visit_str<E>(self, value: &str) ->
std::result::Result<GeneratedField, E>
+ where
+ E: serde::de::Error,
+ {
+ match value {
+ "key" => Ok(GeneratedField::Key),
+ "value" => Ok(GeneratedField::Value),
+ _ => Err(serde::de::Error::unknown_field(value,
FIELDS)),
+ }
+ }
+ }
+ deserializer.deserialize_identifier(GeneratedVisitor)
+ }
+ }
+ struct GeneratedVisitor;
+ impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
+ type Value = SqlOption;
+
+ fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) ->
std::fmt::Result {
+ formatter.write_str("struct datafusion.SQLOption")
+ }
+
+ fn visit_map<V>(self, mut map_: V) ->
std::result::Result<SqlOption, V::Error>
+ where
+ V: serde::de::MapAccess<'de>,
+ {
+ let mut key__ = None;
+ let mut value__ = None;
+ while let Some(k) = map_.next_key()? {
+ match k {
+ GeneratedField::Key => {
+ if key__.is_some() {
+ return
Err(serde::de::Error::duplicate_field("key"));
+ }
+ key__ = Some(map_.next_value()?);
+ }
+ GeneratedField::Value => {
+ if value__.is_some() {
+ return
Err(serde::de::Error::duplicate_field("value"));
+ }
+ value__ = Some(map_.next_value()?);
+ }
+ }
+ }
+ Ok(SqlOption {
+ key: key__.unwrap_or_default(),
+ value: value__.unwrap_or_default(),
+ })
+ }
+ }
+ deserializer.deserialize_struct("datafusion.SQLOption", FIELDS,
GeneratedVisitor)
+ }
+}
+impl serde::Serialize for SqlOptions {
+ #[allow(deprecated)]
+ fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok,
S::Error>
+ where
+ S: serde::Serializer,
+ {
+ use serde::ser::SerializeStruct;
+ let mut len = 0;
+ if !self.option.is_empty() {
+ len += 1;
+ }
+ let mut struct_ser =
serializer.serialize_struct("datafusion.SQLOptions", len)?;
+ if !self.option.is_empty() {
+ struct_ser.serialize_field("option", &self.option)?;
+ }
+ struct_ser.end()
+ }
+}
+impl<'de> serde::Deserialize<'de> for SqlOptions {
+ #[allow(deprecated)]
+ fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
+ where
+ D: serde::Deserializer<'de>,
+ {
+ const FIELDS: &[&str] = &[
+ "option",
+ ];
+
+ #[allow(clippy::enum_variant_names)]
+ enum GeneratedField {
+ Option,
+ }
+ impl<'de> serde::Deserialize<'de> for GeneratedField {
+ fn deserialize<D>(deserializer: D) ->
std::result::Result<GeneratedField, D::Error>
+ where
+ D: serde::Deserializer<'de>,
+ {
+ struct GeneratedVisitor;
+
+ impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
+ type Value = GeneratedField;
+
+ fn expecting(&self, formatter: &mut
std::fmt::Formatter<'_>) -> std::fmt::Result {
+ write!(formatter, "expected one of: {:?}", &FIELDS)
+ }
+
+ #[allow(unused_variables)]
+ fn visit_str<E>(self, value: &str) ->
std::result::Result<GeneratedField, E>
+ where
+ E: serde::de::Error,
+ {
+ match value {
+ "option" => Ok(GeneratedField::Option),
+ _ => Err(serde::de::Error::unknown_field(value,
FIELDS)),
+ }
+ }
+ }
+ deserializer.deserialize_identifier(GeneratedVisitor)
+ }
+ }
+ struct GeneratedVisitor;
+ impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
+ type Value = SqlOptions;
+
+ fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) ->
std::fmt::Result {
+ formatter.write_str("struct datafusion.SQLOptions")
+ }
+
+ fn visit_map<V>(self, mut map_: V) ->
std::result::Result<SqlOptions, V::Error>
+ where
+ V: serde::de::MapAccess<'de>,
+ {
+ let mut option__ = None;
+ while let Some(k) = map_.next_key()? {
+ match k {
+ GeneratedField::Option => {
+ if option__.is_some() {
+ return
Err(serde::de::Error::duplicate_field("option"));
+ }
+ option__ = Some(map_.next_value()?);
+ }
+ }
+ }
+ Ok(SqlOptions {
+ option: option__.unwrap_or_default(),
+ })
+ }
+ }
+ deserializer.deserialize_struct("datafusion.SQLOptions", FIELDS,
GeneratedVisitor)
+ }
+}
impl serde::Serialize for ScalarDictionaryValue {
#[allow(deprecated)]
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok,
S::Error>
diff --git a/datafusion/proto/src/generated/prost.rs
b/datafusion/proto/src/generated/prost.rs
index 9030e90a24..e44355859d 100644
--- a/datafusion/proto/src/generated/prost.rs
+++ b/datafusion/proto/src/generated/prost.rs
@@ -38,7 +38,7 @@ pub struct DfSchema {
pub struct LogicalPlanNode {
#[prost(
oneof = "logical_plan_node::LogicalPlanType",
- tags = "1, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18,
19, 20, 21, 22, 23, 24, 25, 26, 27, 28"
+ tags = "1, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18,
19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29"
)]
pub logical_plan_type:
::core::option::Option<logical_plan_node::LogicalPlanType>,
}
@@ -101,6 +101,8 @@ pub mod logical_plan_node {
DropView(super::DropViewNode),
#[prost(message, tag = "28")]
DistinctOn(::prost::alloc::boxed::Box<super::DistinctOnNode>),
+ #[prost(message, tag = "29")]
+ CopyTo(::prost::alloc::boxed::Box<super::CopyToNode>),
}
}
#[allow(clippy::derive_partial_eq_without_eq)]
@@ -502,6 +504,45 @@ pub struct DistinctOnNode {
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct CopyToNode {
+ #[prost(message, optional, boxed, tag = "1")]
+ pub input:
::core::option::Option<::prost::alloc::boxed::Box<LogicalPlanNode>>,
+ #[prost(string, tag = "2")]
+ pub output_url: ::prost::alloc::string::String,
+ #[prost(bool, tag = "3")]
+ pub single_file_output: bool,
+ #[prost(string, tag = "6")]
+ pub file_type: ::prost::alloc::string::String,
+ #[prost(oneof = "copy_to_node::CopyOptions", tags = "4, 5")]
+ pub copy_options: ::core::option::Option<copy_to_node::CopyOptions>,
+}
+/// Nested message and enum types in `CopyToNode`.
+pub mod copy_to_node {
+ #[allow(clippy::derive_partial_eq_without_eq)]
+ #[derive(Clone, PartialEq, ::prost::Oneof)]
+ pub enum CopyOptions {
+ #[prost(message, tag = "4")]
+ SqlOptions(super::SqlOptions),
+ #[prost(message, tag = "5")]
+ WriterOptions(super::FileTypeWriterOptions),
+ }
+}
+#[allow(clippy::derive_partial_eq_without_eq)]
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct SqlOptions {
+ #[prost(message, repeated, tag = "1")]
+ pub option: ::prost::alloc::vec::Vec<SqlOption>,
+}
+#[allow(clippy::derive_partial_eq_without_eq)]
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct SqlOption {
+ #[prost(string, tag = "1")]
+ pub key: ::prost::alloc::string::String,
+ #[prost(string, tag = "2")]
+ pub value: ::prost::alloc::string::String,
+}
+#[allow(clippy::derive_partial_eq_without_eq)]
+#[derive(Clone, PartialEq, ::prost::Message)]
pub struct UnionNode {
#[prost(message, repeated, tag = "1")]
pub inputs: ::prost::alloc::vec::Vec<LogicalPlanNode>,
diff --git a/datafusion/proto/src/logical_plan/mod.rs
b/datafusion/proto/src/logical_plan/mod.rs
index 948228d87d..e03b3ffa7b 100644
--- a/datafusion/proto/src/logical_plan/mod.rs
+++ b/datafusion/proto/src/logical_plan/mod.rs
@@ -22,7 +22,9 @@ use std::sync::Arc;
use crate::common::{byte_to_string, proto_error, str_to_byte};
use crate::protobuf::logical_plan_node::LogicalPlanType::CustomScan;
-use crate::protobuf::{CustomTableScanNode, LogicalExprNodeCollection};
+use crate::protobuf::{
+ copy_to_node, CustomTableScanNode, LogicalExprNodeCollection, SqlOption,
+};
use crate::{
convert_required,
protobuf::{
@@ -44,12 +46,13 @@ use datafusion::{
datasource::{provider_as_source, source_as_provider},
prelude::SessionContext,
};
-use datafusion_common::plan_datafusion_err;
use datafusion_common::{
- context, internal_err, not_impl_err, parsers::CompressionTypeVariant,
- DataFusionError, OwnedTableReference, Result,
+ context, file_options::StatementOptions, internal_err, not_impl_err,
+ parsers::CompressionTypeVariant, plan_datafusion_err, DataFusionError,
FileType,
+ OwnedTableReference, Result,
};
use datafusion_expr::{
+ dml,
logical_plan::{
builder::project, Aggregate, CreateCatalog, CreateCatalogSchema,
CreateExternalTable, CreateView, CrossJoin, DdlStatement, Distinct,
@@ -59,6 +62,7 @@ use datafusion_expr::{
DistinctOn, DropView, Expr, LogicalPlan, LogicalPlanBuilder,
};
+use datafusion_expr::dml::CopyOptions;
use prost::bytes::BufMut;
use prost::Message;
@@ -823,6 +827,36 @@ impl AsLogicalPlan for LogicalPlanNode {
schema: Arc::new(convert_required!(dropview.schema)?),
}),
)),
+ LogicalPlanType::CopyTo(copy) => {
+ let input: LogicalPlan =
+ into_logical_plan!(copy.input, ctx, extension_codec)?;
+
+ let copy_options = match ©.copy_options {
+ Some(copy_to_node::CopyOptions::SqlOptions(opt)) => {
+ let options = opt.option.iter().map(|o|
(o.key.clone(), o.value.clone())).collect();
+ CopyOptions::SQLOptions(StatementOptions::from(
+ &options,
+ ))
+ }
+ Some(copy_to_node::CopyOptions::WriterOptions(_)) => {
+ return Err(proto_error(
+ "LogicalPlan serde is not yet implemented for
CopyTo with WriterOptions",
+ ))
+ }
+ other => return Err(proto_error(format!(
+ "LogicalPlan serde is not yet implemented for CopyTo
with CopyOptions {other:?}",
+ )))
+ };
+ Ok(datafusion_expr::LogicalPlan::Copy(
+ datafusion_expr::dml::CopyTo {
+ input: Arc::new(input),
+ output_url: copy.output_url.clone(),
+ file_format: FileType::from_str(©.file_type)?,
+ single_file_output: copy.single_file_output,
+ copy_options,
+ },
+ ))
+ }
}
}
@@ -1534,9 +1568,47 @@ impl AsLogicalPlan for LogicalPlanNode {
LogicalPlan::Dml(_) => Err(proto_error(
"LogicalPlan serde is not yet implemented for Dml",
)),
- LogicalPlan::Copy(_) => Err(proto_error(
- "LogicalPlan serde is not yet implemented for Copy",
- )),
+ LogicalPlan::Copy(dml::CopyTo {
+ input,
+ output_url,
+ single_file_output,
+ file_format,
+ copy_options,
+ }) => {
+ let input = protobuf::LogicalPlanNode::try_from_logical_plan(
+ input,
+ extension_codec,
+ )?;
+
+ let copy_options_proto: Option<copy_to_node::CopyOptions> =
match copy_options {
+ CopyOptions::SQLOptions(opt) => {
+ let options: Vec<SqlOption> =
opt.clone().into_inner().iter().map(|(k, v)| SqlOption {
+ key: k.to_string(),
+ value: v.to_string(),
+ }).collect();
+
Some(copy_to_node::CopyOptions::SqlOptions(protobuf::SqlOptions {
+ option: options
+ }))
+ }
+ CopyOptions::WriterOptions(_) => {
+ return Err(proto_error(
+ "LogicalPlan serde is not yet implemented for
CopyTo with WriterOptions",
+ ))
+ }
+ };
+
+ Ok(protobuf::LogicalPlanNode {
+ logical_plan_type: Some(LogicalPlanType::CopyTo(Box::new(
+ protobuf::CopyToNode {
+ input: Some(Box::new(input)),
+ single_file_output: *single_file_output,
+ output_url: output_url.to_string(),
+ file_type: file_format.to_string(),
+ copy_options: copy_options_proto,
+ },
+ ))),
+ })
+ }
LogicalPlan::DescribeTable(_) => Err(proto_error(
"LogicalPlan serde is not yet implemented for DescribeTable",
)),
diff --git a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
index 8e15b5d0d4..9798b06f47 100644
--- a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
+++ b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
@@ -31,12 +31,16 @@ use datafusion::datasource::provider::TableProviderFactory;
use datafusion::datasource::TableProvider;
use datafusion::execution::context::SessionState;
use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
+use datafusion::parquet::file::properties::WriterProperties;
use datafusion::physical_plan::functions::make_scalar_function;
use datafusion::prelude::{create_udf, CsvReadOptions, SessionConfig,
SessionContext};
use datafusion::test_util::{TestTableFactory, TestTableProvider};
-use datafusion_common::Result;
-use datafusion_common::{internal_err, not_impl_err, plan_err};
+use datafusion_common::file_options::parquet_writer::ParquetWriterOptions;
+use datafusion_common::file_options::StatementOptions;
+use datafusion_common::{internal_err, not_impl_err, plan_err,
FileTypeWriterOptions};
use datafusion_common::{DFField, DFSchema, DFSchemaRef, DataFusionError,
ScalarValue};
+use datafusion_common::{FileType, Result};
+use datafusion_expr::dml::{CopyOptions, CopyTo};
use datafusion_expr::expr::{
self, Between, BinaryExpr, Case, Cast, GroupingSet, InList, Like,
ScalarFunction,
Sort,
@@ -301,6 +305,66 @@ async fn roundtrip_logical_plan_aggregation() ->
Result<()> {
Ok(())
}
+#[tokio::test]
+async fn roundtrip_logical_plan_copy_to_sql_options() -> Result<()> {
+ let ctx = SessionContext::new();
+
+ let input = create_csv_scan(&ctx).await?;
+
+ let mut options = HashMap::new();
+ options.insert("foo".to_string(), "bar".to_string());
+
+ let plan = LogicalPlan::Copy(CopyTo {
+ input: Arc::new(input),
+ output_url: "test.csv".to_string(),
+ file_format: FileType::CSV,
+ single_file_output: true,
+ copy_options:
CopyOptions::SQLOptions(StatementOptions::from(&options)),
+ });
+
+ let bytes = logical_plan_to_bytes(&plan)?;
+ let logical_round_trip = logical_plan_from_bytes(&bytes, &ctx)?;
+ assert_eq!(format!("{plan:?}"), format!("{logical_round_trip:?}"));
+
+ Ok(())
+}
+
+#[tokio::test]
+#[ignore] // see https://github.com/apache/arrow-datafusion/issues/8619
+async fn roundtrip_logical_plan_copy_to_writer_options() -> Result<()> {
+ let ctx = SessionContext::new();
+
+ let input = create_csv_scan(&ctx).await?;
+
+ let writer_properties = WriterProperties::builder()
+ .set_bloom_filter_enabled(true)
+ .set_created_by("DataFusion Test".to_string())
+ .build();
+ let plan = LogicalPlan::Copy(CopyTo {
+ input: Arc::new(input),
+ output_url: "test.csv".to_string(),
+ file_format: FileType::CSV,
+ single_file_output: true,
+ copy_options: CopyOptions::WriterOptions(Box::new(
+
FileTypeWriterOptions::Parquet(ParquetWriterOptions::new(writer_properties)),
+ )),
+ });
+
+ let bytes = logical_plan_to_bytes(&plan)?;
+ let logical_round_trip = logical_plan_from_bytes(&bytes, &ctx)?;
+ assert_eq!(format!("{plan:?}"), format!("{logical_round_trip:?}"));
+
+ Ok(())
+}
+
+async fn create_csv_scan(ctx: &SessionContext) -> Result<LogicalPlan,
DataFusionError> {
+ ctx.register_csv("t1", "tests/testdata/test.csv",
CsvReadOptions::default())
+ .await?;
+
+ let input = ctx.table("t1").await?.into_optimized_plan()?;
+ Ok(input)
+}
+
#[tokio::test]
async fn roundtrip_logical_plan_distinct_on() -> Result<()> {
let ctx = SessionContext::new();