This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 15bc6bd5c8 feat: make DefaultLogicalExtensionCodec support 
serialisation of buil… (#20638)
15bc6bd5c8 is described below

commit 15bc6bd5c8bdce8511eecdc237eacb50f5392703
Author: Acfboy <[email protected]>
AuthorDate: Mon Mar 9 17:11:11 2026 +0800

    feat: make DefaultLogicalExtensionCodec support serialisation of buil… 
(#20638)
    
    ## Which issue does this PR close?
    
    <!--
    We generally require a GitHub issue to be filed for all bug fixes and
    enhancements and this helps us generate change logs for our releases.
    You can link an issue to this PR using the GitHub syntax. For example
    `Closes #123` indicates that this PR will close issue #123.
    -->
    
    - Closes #16944.
    
    ## Rationale for this change
    
    Currently, the `LogicalExtensionCodec` implementation for
    `DefaultLogicalExtensionCodec` leaves `try_decode_file_format` /
    `try_encode_file_format` unimplemented (returning "not implemented"
    errors). However, the actual serialization logic for built-in file
    formats — arrow, parquet, csv, and json— already exists in their
    respective codec implementations. All we need to do is tag which format
    is being used, and delegate to the corresponding format-specific codec
    to handle the data.
    <!--
    Why are you proposing this change? If this is already explained clearly
    in the issue then this section is not needed.
    Explaining clearly why changes are proposed helps reviewers understand
    your changes and offer better suggestions for fixes.
    -->
    
    ## What changes are included in this PR?
    
    Added a `FileFormatKind` enum and a `FileFormatProto` message to
    `datafusion.proto` to identify the file format type during transmission.
    Implemented `try_decode_file_format` and `try_encode_file_format` for
    `DefaultLogicalExtensionCodec`, which dispatch
    serialization/deserialization to the corresponding format-specific codec
    based on the format kind. Note that Avro is not covered because the
    upstream repository has not yet implemented the corresponding Avro
    codec, so Avro support is not functional at this time.
    
    ## Are these changes tested?
    
    Yes. Roundtrip tests are included for csv, json, parquet, and arrow.
    
    <!--
    We typically require tests for all PRs in order to:
    1. Prevent the code from being accidentally broken by subsequent changes
    2. Serve as another way to document the expected behavior of the code
    
    If tests are not included in your PR, please explain why (for example,
    are they covered by existing tests)?
    -->
    
    ## Are there any user-facing changes?
    
    <!--
    If there are user-facing changes then we may require documentation to be
    updated before approving the PR.
    -->
    
    <!--
    If there are any breaking changes to public APIs, please add the `api
    change` label.
    -->
---
 datafusion/proto/proto/datafusion.proto            |  19 ++
 datafusion/proto/src/generated/pbjson.rs           | 198 +++++++++++++++++++++
 datafusion/proto/src/generated/prost.rs            |  50 ++++++
 datafusion/proto/src/logical_plan/mod.rs           | 103 ++++++++++-
 .../proto/tests/cases/roundtrip_logical_plan.rs    | 186 +++++++++++++++++++
 5 files changed, 552 insertions(+), 4 deletions(-)

diff --git a/datafusion/proto/proto/datafusion.proto 
b/datafusion/proto/proto/datafusion.proto
index f91b4d1379..8de3224c91 100644
--- a/datafusion/proto/proto/datafusion.proto
+++ b/datafusion/proto/proto/datafusion.proto
@@ -270,6 +270,25 @@ message CopyToNode {
   repeated string partition_by = 7;
 }
 
+// Identifies a built-in file format supported by DataFusion.
+// Used by DefaultLogicalExtensionCodec to serialize/deserialize
+// FileFormatFactory instances (e.g. in CopyTo plans).
+enum FileFormatKind {
+  FILE_FORMAT_KIND_UNSPECIFIED = 0;
+  FILE_FORMAT_KIND_CSV = 1;
+  FILE_FORMAT_KIND_JSON = 2;
+  FILE_FORMAT_KIND_PARQUET = 3;
+  FILE_FORMAT_KIND_ARROW = 4;
+  FILE_FORMAT_KIND_AVRO = 5;
+}
+
+// Wraps a serialized FileFormatFactory with its format kind tag,
+// so the decoder can dispatch to the correct format-specific codec.
+message FileFormatProto {
+  FileFormatKind kind = 1;
+  bytes encoded_file_format = 2;
+}
+
 message DmlNode{
    enum Type {
     UPDATE = 0;
diff --git a/datafusion/proto/src/generated/pbjson.rs 
b/datafusion/proto/src/generated/pbjson.rs
index 2b3be747e1..2b8089f563 100644
--- a/datafusion/proto/src/generated/pbjson.rs
+++ b/datafusion/proto/src/generated/pbjson.rs
@@ -6112,6 +6112,204 @@ impl<'de> serde::Deserialize<'de> for ExplainNode {
         deserializer.deserialize_struct("datafusion.ExplainNode", FIELDS, 
GeneratedVisitor)
     }
 }
+impl serde::Serialize for FileFormatKind {
+    #[allow(deprecated)]
+    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, 
S::Error>
+    where
+        S: serde::Serializer,
+    {
+        let variant = match self {
+            Self::Unspecified => "FILE_FORMAT_KIND_UNSPECIFIED",
+            Self::Csv => "FILE_FORMAT_KIND_CSV",
+            Self::Json => "FILE_FORMAT_KIND_JSON",
+            Self::Parquet => "FILE_FORMAT_KIND_PARQUET",
+            Self::Arrow => "FILE_FORMAT_KIND_ARROW",
+            Self::Avro => "FILE_FORMAT_KIND_AVRO",
+        };
+        serializer.serialize_str(variant)
+    }
+}
+impl<'de> serde::Deserialize<'de> for FileFormatKind {
+    #[allow(deprecated)]
+    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
+    where
+        D: serde::Deserializer<'de>,
+    {
+        const FIELDS: &[&str] = &[
+            "FILE_FORMAT_KIND_UNSPECIFIED",
+            "FILE_FORMAT_KIND_CSV",
+            "FILE_FORMAT_KIND_JSON",
+            "FILE_FORMAT_KIND_PARQUET",
+            "FILE_FORMAT_KIND_ARROW",
+            "FILE_FORMAT_KIND_AVRO",
+        ];
+
+        struct GeneratedVisitor;
+
+        impl serde::de::Visitor<'_> for GeneratedVisitor {
+            type Value = FileFormatKind;
+
+            fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> 
std::fmt::Result {
+                write!(formatter, "expected one of: {:?}", &FIELDS)
+            }
+
+            fn visit_i64<E>(self, v: i64) -> std::result::Result<Self::Value, 
E>
+            where
+                E: serde::de::Error,
+            {
+                i32::try_from(v)
+                    .ok()
+                    .and_then(|x| x.try_into().ok())
+                    .ok_or_else(|| {
+                        
serde::de::Error::invalid_value(serde::de::Unexpected::Signed(v), &self)
+                    })
+            }
+
+            fn visit_u64<E>(self, v: u64) -> std::result::Result<Self::Value, 
E>
+            where
+                E: serde::de::Error,
+            {
+                i32::try_from(v)
+                    .ok()
+                    .and_then(|x| x.try_into().ok())
+                    .ok_or_else(|| {
+                        
serde::de::Error::invalid_value(serde::de::Unexpected::Unsigned(v), &self)
+                    })
+            }
+
+            fn visit_str<E>(self, value: &str) -> 
std::result::Result<Self::Value, E>
+            where
+                E: serde::de::Error,
+            {
+                match value {
+                    "FILE_FORMAT_KIND_UNSPECIFIED" => 
Ok(FileFormatKind::Unspecified),
+                    "FILE_FORMAT_KIND_CSV" => Ok(FileFormatKind::Csv),
+                    "FILE_FORMAT_KIND_JSON" => Ok(FileFormatKind::Json),
+                    "FILE_FORMAT_KIND_PARQUET" => Ok(FileFormatKind::Parquet),
+                    "FILE_FORMAT_KIND_ARROW" => Ok(FileFormatKind::Arrow),
+                    "FILE_FORMAT_KIND_AVRO" => Ok(FileFormatKind::Avro),
+                    _ => Err(serde::de::Error::unknown_variant(value, FIELDS)),
+                }
+            }
+        }
+        deserializer.deserialize_any(GeneratedVisitor)
+    }
+}
+impl serde::Serialize for FileFormatProto {
+    #[allow(deprecated)]
+    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, 
S::Error>
+    where
+        S: serde::Serializer,
+    {
+        use serde::ser::SerializeStruct;
+        let mut len = 0;
+        if self.kind != 0 {
+            len += 1;
+        }
+        if !self.encoded_file_format.is_empty() {
+            len += 1;
+        }
+        let mut struct_ser = 
serializer.serialize_struct("datafusion.FileFormatProto", len)?;
+        if self.kind != 0 {
+            let v = FileFormatKind::try_from(self.kind)
+                .map_err(|_| serde::ser::Error::custom(format!("Invalid 
variant {}", self.kind)))?;
+            struct_ser.serialize_field("kind", &v)?;
+        }
+        if !self.encoded_file_format.is_empty() {
+            #[allow(clippy::needless_borrow)]
+            #[allow(clippy::needless_borrows_for_generic_args)]
+            struct_ser.serialize_field("encodedFileFormat", 
pbjson::private::base64::encode(&self.encoded_file_format).as_str())?;
+        }
+        struct_ser.end()
+    }
+}
+impl<'de> serde::Deserialize<'de> for FileFormatProto {
+    #[allow(deprecated)]
+    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
+    where
+        D: serde::Deserializer<'de>,
+    {
+        const FIELDS: &[&str] = &[
+            "kind",
+            "encoded_file_format",
+            "encodedFileFormat",
+        ];
+
+        #[allow(clippy::enum_variant_names)]
+        enum GeneratedField {
+            Kind,
+            EncodedFileFormat,
+        }
+        impl<'de> serde::Deserialize<'de> for GeneratedField {
+            fn deserialize<D>(deserializer: D) -> 
std::result::Result<GeneratedField, D::Error>
+            where
+                D: serde::Deserializer<'de>,
+            {
+                struct GeneratedVisitor;
+
+                impl serde::de::Visitor<'_> for GeneratedVisitor {
+                    type Value = GeneratedField;
+
+                    fn expecting(&self, formatter: &mut 
std::fmt::Formatter<'_>) -> std::fmt::Result {
+                        write!(formatter, "expected one of: {:?}", &FIELDS)
+                    }
+
+                    #[allow(unused_variables)]
+                    fn visit_str<E>(self, value: &str) -> 
std::result::Result<GeneratedField, E>
+                    where
+                        E: serde::de::Error,
+                    {
+                        match value {
+                            "kind" => Ok(GeneratedField::Kind),
+                            "encodedFileFormat" | "encoded_file_format" => 
Ok(GeneratedField::EncodedFileFormat),
+                            _ => Err(serde::de::Error::unknown_field(value, 
FIELDS)),
+                        }
+                    }
+                }
+                deserializer.deserialize_identifier(GeneratedVisitor)
+            }
+        }
+        struct GeneratedVisitor;
+        impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
+            type Value = FileFormatProto;
+
+            fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> 
std::fmt::Result {
+                formatter.write_str("struct datafusion.FileFormatProto")
+            }
+
+            fn visit_map<V>(self, mut map_: V) -> 
std::result::Result<FileFormatProto, V::Error>
+                where
+                    V: serde::de::MapAccess<'de>,
+            {
+                let mut kind__ = None;
+                let mut encoded_file_format__ = None;
+                while let Some(k) = map_.next_key()? {
+                    match k {
+                        GeneratedField::Kind => {
+                            if kind__.is_some() {
+                                return 
Err(serde::de::Error::duplicate_field("kind"));
+                            }
+                            kind__ = Some(map_.next_value::<FileFormatKind>()? 
as i32);
+                        }
+                        GeneratedField::EncodedFileFormat => {
+                            if encoded_file_format__.is_some() {
+                                return 
Err(serde::de::Error::duplicate_field("encodedFileFormat"));
+                            }
+                            encoded_file_format__ = 
+                                
Some(map_.next_value::<::pbjson::private::BytesDeserialize<_>>()?.0)
+                            ;
+                        }
+                    }
+                }
+                Ok(FileFormatProto {
+                    kind: kind__.unwrap_or_default(),
+                    encoded_file_format: 
encoded_file_format__.unwrap_or_default(),
+                })
+            }
+        }
+        deserializer.deserialize_struct("datafusion.FileFormatProto", FIELDS, 
GeneratedVisitor)
+    }
+}
 impl serde::Serialize for FileGroup {
     #[allow(deprecated)]
     fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, 
S::Error>
diff --git a/datafusion/proto/src/generated/prost.rs 
b/datafusion/proto/src/generated/prost.rs
index 5a33d89027..6daf81bf1a 100644
--- a/datafusion/proto/src/generated/prost.rs
+++ b/datafusion/proto/src/generated/prost.rs
@@ -412,6 +412,15 @@ pub struct CopyToNode {
     #[prost(string, repeated, tag = "7")]
     pub partition_by: ::prost::alloc::vec::Vec<::prost::alloc::string::String>,
 }
+/// Wraps a serialized FileFormatFactory with its format kind tag,
+/// so the decoder can dispatch to the correct format-specific codec.
+#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
+pub struct FileFormatProto {
+    #[prost(enumeration = "FileFormatKind", tag = "1")]
+    pub kind: i32,
+    #[prost(bytes = "vec", tag = "2")]
+    pub encoded_file_format: ::prost::alloc::vec::Vec<u8>,
+}
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct DmlNode {
     #[prost(enumeration = "dml_node::Type", tag = "1")]
@@ -2187,6 +2196,47 @@ pub struct BufferExecNode {
     #[prost(uint64, tag = "2")]
     pub capacity: u64,
 }
+/// Identifies a built-in file format supported by DataFusion.
+/// Used by DefaultLogicalExtensionCodec to serialize/deserialize
+/// FileFormatFactory instances (e.g. in CopyTo plans).
+#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, 
::prost::Enumeration)]
+#[repr(i32)]
+pub enum FileFormatKind {
+    Unspecified = 0,
+    Csv = 1,
+    Json = 2,
+    Parquet = 3,
+    Arrow = 4,
+    Avro = 5,
+}
+impl FileFormatKind {
+    /// String value of the enum field names used in the ProtoBuf definition.
+    ///
+    /// The values are not transformed in any way and thus are considered 
stable
+    /// (if the ProtoBuf definition does not change) and safe for programmatic 
use.
+    pub fn as_str_name(&self) -> &'static str {
+        match self {
+            Self::Unspecified => "FILE_FORMAT_KIND_UNSPECIFIED",
+            Self::Csv => "FILE_FORMAT_KIND_CSV",
+            Self::Json => "FILE_FORMAT_KIND_JSON",
+            Self::Parquet => "FILE_FORMAT_KIND_PARQUET",
+            Self::Arrow => "FILE_FORMAT_KIND_ARROW",
+            Self::Avro => "FILE_FORMAT_KIND_AVRO",
+        }
+    }
+    /// Creates an enum from field names used in the ProtoBuf definition.
+    pub fn from_str_name(value: &str) -> ::core::option::Option<Self> {
+        match value {
+            "FILE_FORMAT_KIND_UNSPECIFIED" => Some(Self::Unspecified),
+            "FILE_FORMAT_KIND_CSV" => Some(Self::Csv),
+            "FILE_FORMAT_KIND_JSON" => Some(Self::Json),
+            "FILE_FORMAT_KIND_PARQUET" => Some(Self::Parquet),
+            "FILE_FORMAT_KIND_ARROW" => Some(Self::Arrow),
+            "FILE_FORMAT_KIND_AVRO" => Some(Self::Avro),
+            _ => None,
+        }
+    }
+}
 #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, 
::prost::Enumeration)]
 #[repr(i32)]
 pub enum WindowFrameUnits {
diff --git a/datafusion/proto/src/logical_plan/mod.rs 
b/datafusion/proto/src/logical_plan/mod.rs
index 218c2e4e47..a5d74d7f49 100644
--- a/datafusion/proto/src/logical_plan/mod.rs
+++ b/datafusion/proto/src/logical_plan/mod.rs
@@ -44,13 +44,15 @@ use datafusion_datasource::file_format::FileFormat;
 use datafusion_datasource::file_format::{
     FileFormatFactory, file_type_to_format, format_as_file_type,
 };
-use datafusion_datasource_arrow::file_format::ArrowFormat;
+use datafusion_datasource_arrow::file_format::{ArrowFormat, 
ArrowFormatFactory};
 #[cfg(feature = "avro")]
 use datafusion_datasource_avro::file_format::AvroFormat;
-use datafusion_datasource_csv::file_format::CsvFormat;
-use datafusion_datasource_json::file_format::JsonFormat as OtherNdJsonFormat;
+use datafusion_datasource_csv::file_format::{CsvFormat, CsvFormatFactory};
+use datafusion_datasource_json::file_format::{
+    JsonFormat as OtherNdJsonFormat, JsonFormatFactory,
+};
 #[cfg(feature = "parquet")]
-use datafusion_datasource_parquet::file_format::ParquetFormat;
+use datafusion_datasource_parquet::file_format::{ParquetFormat, 
ParquetFormatFactory};
 use datafusion_expr::{
     AggregateUDF, DmlStatement, FetchType, RecursiveQuery, SkipType, 
TableSource, Unnest,
 };
@@ -207,6 +209,99 @@ impl LogicalExtensionCodec for 
DefaultLogicalExtensionCodec {
     ) -> Result<()> {
         not_impl_err!("LogicalExtensionCodec is not provided")
     }
+
+    fn try_decode_file_format(
+        &self,
+        buf: &[u8],
+        ctx: &TaskContext,
+    ) -> Result<Arc<dyn FileFormatFactory>> {
+        let proto = protobuf::FileFormatProto::decode(buf).map_err(|e| {
+            internal_datafusion_err!("Failed to decode FileFormatProto: {e}")
+        })?;
+
+        let kind = protobuf::FileFormatKind::try_from(proto.kind).map_err(|_| {
+            internal_datafusion_err!("Unknown FileFormatKind: {}", proto.kind)
+        })?;
+
+        match kind {
+            protobuf::FileFormatKind::Csv => 
file_formats::CsvLogicalExtensionCodec
+                .try_decode_file_format(&proto.encoded_file_format, ctx),
+            protobuf::FileFormatKind::Json => 
file_formats::JsonLogicalExtensionCodec
+                .try_decode_file_format(&proto.encoded_file_format, ctx),
+            #[cfg(feature = "parquet")]
+            protobuf::FileFormatKind::Parquet => {
+                file_formats::ParquetLogicalExtensionCodec
+                    .try_decode_file_format(&proto.encoded_file_format, ctx)
+            }
+            protobuf::FileFormatKind::Arrow => 
file_formats::ArrowLogicalExtensionCodec
+                .try_decode_file_format(&proto.encoded_file_format, ctx),
+            protobuf::FileFormatKind::Avro => 
file_formats::AvroLogicalExtensionCodec
+                .try_decode_file_format(&proto.encoded_file_format, ctx),
+            #[cfg(not(feature = "parquet"))]
+            protobuf::FileFormatKind::Parquet => {
+                not_impl_err!("Parquet support requires the 'parquet' feature")
+            }
+            protobuf::FileFormatKind::Unspecified => {
+                not_impl_err!("Unspecified file format kind")
+            }
+        }
+    }
+
+    fn try_encode_file_format(
+        &self,
+        buf: &mut Vec<u8>,
+        node: Arc<dyn FileFormatFactory>,
+    ) -> Result<()> {
+        let mut encoded_file_format = Vec::new();
+
+        let kind = if 
node.as_any().downcast_ref::<CsvFormatFactory>().is_some() {
+            file_formats::CsvLogicalExtensionCodec
+                .try_encode_file_format(&mut encoded_file_format, 
Arc::clone(&node))?;
+            protobuf::FileFormatKind::Csv
+        } else if node.as_any().downcast_ref::<JsonFormatFactory>().is_some() {
+            file_formats::JsonLogicalExtensionCodec
+                .try_encode_file_format(&mut encoded_file_format, 
Arc::clone(&node))?;
+            protobuf::FileFormatKind::Json
+        } else if node.as_any().downcast_ref::<ArrowFormatFactory>().is_some() 
{
+            file_formats::ArrowLogicalExtensionCodec
+                .try_encode_file_format(&mut encoded_file_format, 
Arc::clone(&node))?;
+            protobuf::FileFormatKind::Arrow
+        } else {
+            #[cfg(feature = "parquet")]
+            {
+                if node
+                    .as_any()
+                    .downcast_ref::<ParquetFormatFactory>()
+                    .is_some()
+                {
+                    
file_formats::ParquetLogicalExtensionCodec.try_encode_file_format(
+                        &mut encoded_file_format,
+                        Arc::clone(&node),
+                    )?;
+                    protobuf::FileFormatKind::Parquet
+                } else {
+                    return not_impl_err!(
+                        "Unsupported FileFormatFactory type for 
DefaultLogicalExtensionCodec"
+                    );
+                }
+            }
+            #[cfg(not(feature = "parquet"))]
+            {
+                return not_impl_err!(
+                    "Unsupported FileFormatFactory type for 
DefaultLogicalExtensionCodec"
+                );
+            }
+        };
+
+        let proto = protobuf::FileFormatProto {
+            kind: kind as i32,
+            encoded_file_format,
+        };
+        proto.encode(buf).map_err(|e| {
+            internal_datafusion_err!("Failed to encode FileFormatProto: {e}")
+        })?;
+        Ok(())
+    }
 }
 
 #[macro_export]
diff --git a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs 
b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
index 9407cbf9a0..63ad00c92e 100644
--- a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
+++ b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
@@ -743,6 +743,192 @@ async fn roundtrip_logical_plan_copy_to_parquet() -> 
Result<()> {
     Ok(())
 }
 
+#[tokio::test]
+async fn roundtrip_default_codec_csv() -> Result<()> {
+    let ctx = SessionContext::new();
+    let input = create_csv_scan(&ctx).await?;
+
+    let table_options =
+        
TableOptions::default_from_session_config(ctx.state().config_options());
+    let mut csv_format = table_options.csv;
+    csv_format.delimiter = b'|';
+    csv_format.has_header = Some(true);
+    csv_format.compression = CompressionTypeVariant::GZIP;
+
+    let file_type = 
format_as_file_type(Arc::new(CsvFormatFactory::new_with_options(
+        csv_format.clone(),
+    )));
+
+    let plan = LogicalPlan::Copy(CopyTo::new(
+        Arc::new(input),
+        "test.csv".to_string(),
+        vec![],
+        file_type,
+        Default::default(),
+    ));
+
+    let bytes = logical_plan_to_bytes(&plan)?;
+    let roundtrip = logical_plan_from_bytes(&bytes, &ctx.task_ctx())?;
+
+    match roundtrip {
+        LogicalPlan::Copy(copy_to) => {
+            assert_eq!("test.csv", copy_to.output_url);
+            assert_eq!("csv", copy_to.file_type.get_ext());
+            let dt = copy_to
+                .file_type
+                .as_ref()
+                .as_any()
+                .downcast_ref::<DefaultFileType>()
+                .unwrap();
+            let csv = dt
+                .as_format_factory()
+                .as_ref()
+                .as_any()
+                .downcast_ref::<CsvFormatFactory>()
+                .unwrap();
+            let decoded = csv.options.as_ref().unwrap();
+            assert_eq!(csv_format.delimiter, decoded.delimiter);
+            assert_eq!(csv_format.has_header, decoded.has_header);
+            assert_eq!(csv_format.compression, decoded.compression);
+        }
+        _ => panic!("Expected CopyTo plan"),
+    }
+    Ok(())
+}
+
+#[tokio::test]
+async fn roundtrip_default_codec_json() -> Result<()> {
+    let ctx = SessionContext::new();
+    let input = create_json_scan(&ctx).await?;
+
+    let table_options =
+        
TableOptions::default_from_session_config(ctx.state().config_options());
+    let mut json_format = table_options.json;
+    json_format.compression = CompressionTypeVariant::GZIP;
+    json_format.schema_infer_max_rec = Some(500);
+
+    let file_type = 
format_as_file_type(Arc::new(JsonFormatFactory::new_with_options(
+        json_format.clone(),
+    )));
+
+    let plan = LogicalPlan::Copy(CopyTo::new(
+        Arc::new(input),
+        "test.json".to_string(),
+        vec![],
+        file_type,
+        Default::default(),
+    ));
+
+    let bytes = logical_plan_to_bytes(&plan)?;
+    let roundtrip = logical_plan_from_bytes(&bytes, &ctx.task_ctx())?;
+
+    match roundtrip {
+        LogicalPlan::Copy(copy_to) => {
+            assert_eq!("test.json", copy_to.output_url);
+            assert_eq!("json", copy_to.file_type.get_ext());
+            let dt = copy_to
+                .file_type
+                .as_ref()
+                .as_any()
+                .downcast_ref::<DefaultFileType>()
+                .unwrap();
+            let json = dt
+                .as_format_factory()
+                .as_ref()
+                .as_any()
+                .downcast_ref::<JsonFormatFactory>()
+                .unwrap();
+            let decoded = json.options.as_ref().unwrap();
+            assert_eq!(json_format.compression, decoded.compression);
+            assert_eq!(
+                json_format.schema_infer_max_rec,
+                decoded.schema_infer_max_rec
+            );
+        }
+        _ => panic!("Expected CopyTo plan"),
+    }
+    Ok(())
+}
+
+#[tokio::test]
+async fn roundtrip_default_codec_parquet() -> Result<()> {
+    let ctx = SessionContext::new();
+    let input = create_parquet_scan(&ctx).await?;
+
+    let table_options =
+        
TableOptions::default_from_session_config(ctx.state().config_options());
+    let mut parquet_format = table_options.parquet;
+    parquet_format.global.bloom_filter_on_read = true;
+    parquet_format.global.created_by = "DefaultCodecTest".to_string();
+
+    let file_type = format_as_file_type(Arc::new(
+        ParquetFormatFactory::new_with_options(parquet_format.clone()),
+    ));
+
+    let plan = LogicalPlan::Copy(CopyTo::new(
+        Arc::new(input),
+        "test.parquet".to_string(),
+        vec![],
+        file_type,
+        Default::default(),
+    ));
+
+    let bytes = logical_plan_to_bytes(&plan)?;
+    let roundtrip = logical_plan_from_bytes(&bytes, &ctx.task_ctx())?;
+
+    match roundtrip {
+        LogicalPlan::Copy(copy_to) => {
+            assert_eq!("test.parquet", copy_to.output_url);
+            assert_eq!("parquet", copy_to.file_type.get_ext());
+            let dt = copy_to
+                .file_type
+                .as_ref()
+                .as_any()
+                .downcast_ref::<DefaultFileType>()
+                .unwrap();
+            let pq = dt
+                .as_format_factory()
+                .as_ref()
+                .as_any()
+                .downcast_ref::<ParquetFormatFactory>()
+                .unwrap();
+            let decoded = pq.options.as_ref().unwrap();
+            assert!(decoded.global.bloom_filter_on_read);
+            assert_eq!("DefaultCodecTest", decoded.global.created_by);
+        }
+        _ => panic!("Expected CopyTo plan"),
+    }
+    Ok(())
+}
+
+#[tokio::test]
+async fn roundtrip_default_codec_arrow() -> Result<()> {
+    let ctx = SessionContext::new();
+    let input = create_csv_scan(&ctx).await?;
+
+    let file_type = format_as_file_type(Arc::new(ArrowFormatFactory::new()));
+
+    let plan = LogicalPlan::Copy(CopyTo::new(
+        Arc::new(input),
+        "test.arrow".to_string(),
+        vec![],
+        file_type,
+        Default::default(),
+    ));
+
+    let bytes = logical_plan_to_bytes(&plan)?;
+    let roundtrip = logical_plan_from_bytes(&bytes, &ctx.task_ctx())?;
+
+    match roundtrip {
+        LogicalPlan::Copy(copy_to) => {
+            assert_eq!("test.arrow", copy_to.output_url);
+            assert_eq!("arrow", copy_to.file_type.get_ext());
+        }
+        _ => panic!("Expected CopyTo plan"),
+    }
+    Ok(())
+}
+
 async fn create_csv_scan(ctx: &SessionContext) -> Result<LogicalPlan, 
DataFusionError> {
     ctx.register_csv("t1", "tests/testdata/test.csv", 
CsvReadOptions::default())
         .await?;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to