This is an automated email from the ASF dual-hosted git repository.
github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 15bc6bd5c8 feat: make DefaultLogicalExtensionCodec support
serialisation of buil… (#20638)
15bc6bd5c8 is described below
commit 15bc6bd5c8bdce8511eecdc237eacb50f5392703
Author: Acfboy <[email protected]>
AuthorDate: Mon Mar 9 17:11:11 2026 +0800
feat: make DefaultLogicalExtensionCodec support serialisation of buil…
(#20638)
## Which issue does this PR close?
<!--
We generally require a GitHub issue to be filed for all bug fixes and
enhancements and this helps us generate change logs for our releases.
You can link an issue to this PR using the GitHub syntax. For example
`Closes #123` indicates that this PR will close issue #123.
-->
- Closes #16944.
## Rationale for this change
Currently, the `LogicalExtensionCodec` implementation for
`DefaultLogicalExtensionCodec` leaves `try_decode_file_format` /
`try_encode_file_format` unimplemented (returning "not implemented"
errors). However, the actual serialization logic for built-in file
formats — arrow, parquet, csv, and json— already exists in their
respective codec implementations. All we need to do is tag which format
is being used, and delegate to the corresponding format-specific codec
to handle the data.
<!--
Why are you proposing this change? If this is already explained clearly
in the issue then this section is not needed.
Explaining clearly why changes are proposed helps reviewers understand
your changes and offer better suggestions for fixes.
-->
## What changes are included in this PR?
Added a `FileFormatKind` enum and a `FileFormatProto` message to
`datafusion.proto` to identify the file format type during transmission.
Implemented `try_decode_file_format` and `try_encode_file_format` for
`DefaultLogicalExtensionCodec`, which dispatch
serialization/deserialization to the corresponding format-specific codec
based on the format kind. Note that Avro is not covered because the
upstream repository has not yet implemented the corresponding Avro
codec, so Avro support is not functional at this time.
## Are these changes tested?
Yes. Roundtrip tests are included for csv, json, parquet, and arrow.
<!--
We typically require tests for all PRs in order to:
1. Prevent the code from being accidentally broken by subsequent changes
2. Serve as another way to document the expected behavior of the code
If tests are not included in your PR, please explain why (for example,
are they covered by existing tests)?
-->
## Are there any user-facing changes?
<!--
If there are user-facing changes then we may require documentation to be
updated before approving the PR.
-->
<!--
If there are any breaking changes to public APIs, please add the `api
change` label.
-->
---
datafusion/proto/proto/datafusion.proto | 19 ++
datafusion/proto/src/generated/pbjson.rs | 198 +++++++++++++++++++++
datafusion/proto/src/generated/prost.rs | 50 ++++++
datafusion/proto/src/logical_plan/mod.rs | 103 ++++++++++-
.../proto/tests/cases/roundtrip_logical_plan.rs | 186 +++++++++++++++++++
5 files changed, 552 insertions(+), 4 deletions(-)
diff --git a/datafusion/proto/proto/datafusion.proto
b/datafusion/proto/proto/datafusion.proto
index f91b4d1379..8de3224c91 100644
--- a/datafusion/proto/proto/datafusion.proto
+++ b/datafusion/proto/proto/datafusion.proto
@@ -270,6 +270,25 @@ message CopyToNode {
repeated string partition_by = 7;
}
+// Identifies a built-in file format supported by DataFusion.
+// Used by DefaultLogicalExtensionCodec to serialize/deserialize
+// FileFormatFactory instances (e.g. in CopyTo plans).
+enum FileFormatKind {
+ FILE_FORMAT_KIND_UNSPECIFIED = 0;
+ FILE_FORMAT_KIND_CSV = 1;
+ FILE_FORMAT_KIND_JSON = 2;
+ FILE_FORMAT_KIND_PARQUET = 3;
+ FILE_FORMAT_KIND_ARROW = 4;
+ FILE_FORMAT_KIND_AVRO = 5;
+}
+
+// Wraps a serialized FileFormatFactory with its format kind tag,
+// so the decoder can dispatch to the correct format-specific codec.
+message FileFormatProto {
+ FileFormatKind kind = 1;
+ bytes encoded_file_format = 2;
+}
+
message DmlNode{
enum Type {
UPDATE = 0;
diff --git a/datafusion/proto/src/generated/pbjson.rs
b/datafusion/proto/src/generated/pbjson.rs
index 2b3be747e1..2b8089f563 100644
--- a/datafusion/proto/src/generated/pbjson.rs
+++ b/datafusion/proto/src/generated/pbjson.rs
@@ -6112,6 +6112,204 @@ impl<'de> serde::Deserialize<'de> for ExplainNode {
deserializer.deserialize_struct("datafusion.ExplainNode", FIELDS,
GeneratedVisitor)
}
}
+impl serde::Serialize for FileFormatKind {
+ #[allow(deprecated)]
+ fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok,
S::Error>
+ where
+ S: serde::Serializer,
+ {
+ let variant = match self {
+ Self::Unspecified => "FILE_FORMAT_KIND_UNSPECIFIED",
+ Self::Csv => "FILE_FORMAT_KIND_CSV",
+ Self::Json => "FILE_FORMAT_KIND_JSON",
+ Self::Parquet => "FILE_FORMAT_KIND_PARQUET",
+ Self::Arrow => "FILE_FORMAT_KIND_ARROW",
+ Self::Avro => "FILE_FORMAT_KIND_AVRO",
+ };
+ serializer.serialize_str(variant)
+ }
+}
+impl<'de> serde::Deserialize<'de> for FileFormatKind {
+ #[allow(deprecated)]
+ fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
+ where
+ D: serde::Deserializer<'de>,
+ {
+ const FIELDS: &[&str] = &[
+ "FILE_FORMAT_KIND_UNSPECIFIED",
+ "FILE_FORMAT_KIND_CSV",
+ "FILE_FORMAT_KIND_JSON",
+ "FILE_FORMAT_KIND_PARQUET",
+ "FILE_FORMAT_KIND_ARROW",
+ "FILE_FORMAT_KIND_AVRO",
+ ];
+
+ struct GeneratedVisitor;
+
+ impl serde::de::Visitor<'_> for GeneratedVisitor {
+ type Value = FileFormatKind;
+
+ fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) ->
std::fmt::Result {
+ write!(formatter, "expected one of: {:?}", &FIELDS)
+ }
+
+ fn visit_i64<E>(self, v: i64) -> std::result::Result<Self::Value,
E>
+ where
+ E: serde::de::Error,
+ {
+ i32::try_from(v)
+ .ok()
+ .and_then(|x| x.try_into().ok())
+ .ok_or_else(|| {
+
serde::de::Error::invalid_value(serde::de::Unexpected::Signed(v), &self)
+ })
+ }
+
+ fn visit_u64<E>(self, v: u64) -> std::result::Result<Self::Value,
E>
+ where
+ E: serde::de::Error,
+ {
+ i32::try_from(v)
+ .ok()
+ .and_then(|x| x.try_into().ok())
+ .ok_or_else(|| {
+
serde::de::Error::invalid_value(serde::de::Unexpected::Unsigned(v), &self)
+ })
+ }
+
+ fn visit_str<E>(self, value: &str) ->
std::result::Result<Self::Value, E>
+ where
+ E: serde::de::Error,
+ {
+ match value {
+ "FILE_FORMAT_KIND_UNSPECIFIED" =>
Ok(FileFormatKind::Unspecified),
+ "FILE_FORMAT_KIND_CSV" => Ok(FileFormatKind::Csv),
+ "FILE_FORMAT_KIND_JSON" => Ok(FileFormatKind::Json),
+ "FILE_FORMAT_KIND_PARQUET" => Ok(FileFormatKind::Parquet),
+ "FILE_FORMAT_KIND_ARROW" => Ok(FileFormatKind::Arrow),
+ "FILE_FORMAT_KIND_AVRO" => Ok(FileFormatKind::Avro),
+ _ => Err(serde::de::Error::unknown_variant(value, FIELDS)),
+ }
+ }
+ }
+ deserializer.deserialize_any(GeneratedVisitor)
+ }
+}
+impl serde::Serialize for FileFormatProto {
+ #[allow(deprecated)]
+ fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok,
S::Error>
+ where
+ S: serde::Serializer,
+ {
+ use serde::ser::SerializeStruct;
+ let mut len = 0;
+ if self.kind != 0 {
+ len += 1;
+ }
+ if !self.encoded_file_format.is_empty() {
+ len += 1;
+ }
+ let mut struct_ser =
serializer.serialize_struct("datafusion.FileFormatProto", len)?;
+ if self.kind != 0 {
+ let v = FileFormatKind::try_from(self.kind)
+ .map_err(|_| serde::ser::Error::custom(format!("Invalid
variant {}", self.kind)))?;
+ struct_ser.serialize_field("kind", &v)?;
+ }
+ if !self.encoded_file_format.is_empty() {
+ #[allow(clippy::needless_borrow)]
+ #[allow(clippy::needless_borrows_for_generic_args)]
+ struct_ser.serialize_field("encodedFileFormat",
pbjson::private::base64::encode(&self.encoded_file_format).as_str())?;
+ }
+ struct_ser.end()
+ }
+}
+impl<'de> serde::Deserialize<'de> for FileFormatProto {
+ #[allow(deprecated)]
+ fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
+ where
+ D: serde::Deserializer<'de>,
+ {
+ const FIELDS: &[&str] = &[
+ "kind",
+ "encoded_file_format",
+ "encodedFileFormat",
+ ];
+
+ #[allow(clippy::enum_variant_names)]
+ enum GeneratedField {
+ Kind,
+ EncodedFileFormat,
+ }
+ impl<'de> serde::Deserialize<'de> for GeneratedField {
+ fn deserialize<D>(deserializer: D) ->
std::result::Result<GeneratedField, D::Error>
+ where
+ D: serde::Deserializer<'de>,
+ {
+ struct GeneratedVisitor;
+
+ impl serde::de::Visitor<'_> for GeneratedVisitor {
+ type Value = GeneratedField;
+
+ fn expecting(&self, formatter: &mut
std::fmt::Formatter<'_>) -> std::fmt::Result {
+ write!(formatter, "expected one of: {:?}", &FIELDS)
+ }
+
+ #[allow(unused_variables)]
+ fn visit_str<E>(self, value: &str) ->
std::result::Result<GeneratedField, E>
+ where
+ E: serde::de::Error,
+ {
+ match value {
+ "kind" => Ok(GeneratedField::Kind),
+ "encodedFileFormat" | "encoded_file_format" =>
Ok(GeneratedField::EncodedFileFormat),
+ _ => Err(serde::de::Error::unknown_field(value,
FIELDS)),
+ }
+ }
+ }
+ deserializer.deserialize_identifier(GeneratedVisitor)
+ }
+ }
+ struct GeneratedVisitor;
+ impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
+ type Value = FileFormatProto;
+
+ fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) ->
std::fmt::Result {
+ formatter.write_str("struct datafusion.FileFormatProto")
+ }
+
+ fn visit_map<V>(self, mut map_: V) ->
std::result::Result<FileFormatProto, V::Error>
+ where
+ V: serde::de::MapAccess<'de>,
+ {
+ let mut kind__ = None;
+ let mut encoded_file_format__ = None;
+ while let Some(k) = map_.next_key()? {
+ match k {
+ GeneratedField::Kind => {
+ if kind__.is_some() {
+ return
Err(serde::de::Error::duplicate_field("kind"));
+ }
+ kind__ = Some(map_.next_value::<FileFormatKind>()?
as i32);
+ }
+ GeneratedField::EncodedFileFormat => {
+ if encoded_file_format__.is_some() {
+ return
Err(serde::de::Error::duplicate_field("encodedFileFormat"));
+ }
+ encoded_file_format__ =
+
Some(map_.next_value::<::pbjson::private::BytesDeserialize<_>>()?.0)
+ ;
+ }
+ }
+ }
+ Ok(FileFormatProto {
+ kind: kind__.unwrap_or_default(),
+ encoded_file_format:
encoded_file_format__.unwrap_or_default(),
+ })
+ }
+ }
+ deserializer.deserialize_struct("datafusion.FileFormatProto", FIELDS,
GeneratedVisitor)
+ }
+}
impl serde::Serialize for FileGroup {
#[allow(deprecated)]
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok,
S::Error>
diff --git a/datafusion/proto/src/generated/prost.rs
b/datafusion/proto/src/generated/prost.rs
index 5a33d89027..6daf81bf1a 100644
--- a/datafusion/proto/src/generated/prost.rs
+++ b/datafusion/proto/src/generated/prost.rs
@@ -412,6 +412,15 @@ pub struct CopyToNode {
#[prost(string, repeated, tag = "7")]
pub partition_by: ::prost::alloc::vec::Vec<::prost::alloc::string::String>,
}
+/// Wraps a serialized FileFormatFactory with its format kind tag,
+/// so the decoder can dispatch to the correct format-specific codec.
+#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
+pub struct FileFormatProto {
+ #[prost(enumeration = "FileFormatKind", tag = "1")]
+ pub kind: i32,
+ #[prost(bytes = "vec", tag = "2")]
+ pub encoded_file_format: ::prost::alloc::vec::Vec<u8>,
+}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct DmlNode {
#[prost(enumeration = "dml_node::Type", tag = "1")]
@@ -2187,6 +2196,47 @@ pub struct BufferExecNode {
#[prost(uint64, tag = "2")]
pub capacity: u64,
}
+/// Identifies a built-in file format supported by DataFusion.
+/// Used by DefaultLogicalExtensionCodec to serialize/deserialize
+/// FileFormatFactory instances (e.g. in CopyTo plans).
+#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord,
::prost::Enumeration)]
+#[repr(i32)]
+pub enum FileFormatKind {
+ Unspecified = 0,
+ Csv = 1,
+ Json = 2,
+ Parquet = 3,
+ Arrow = 4,
+ Avro = 5,
+}
+impl FileFormatKind {
+ /// String value of the enum field names used in the ProtoBuf definition.
+ ///
+ /// The values are not transformed in any way and thus are considered
stable
+ /// (if the ProtoBuf definition does not change) and safe for programmatic
use.
+ pub fn as_str_name(&self) -> &'static str {
+ match self {
+ Self::Unspecified => "FILE_FORMAT_KIND_UNSPECIFIED",
+ Self::Csv => "FILE_FORMAT_KIND_CSV",
+ Self::Json => "FILE_FORMAT_KIND_JSON",
+ Self::Parquet => "FILE_FORMAT_KIND_PARQUET",
+ Self::Arrow => "FILE_FORMAT_KIND_ARROW",
+ Self::Avro => "FILE_FORMAT_KIND_AVRO",
+ }
+ }
+ /// Creates an enum from field names used in the ProtoBuf definition.
+ pub fn from_str_name(value: &str) -> ::core::option::Option<Self> {
+ match value {
+ "FILE_FORMAT_KIND_UNSPECIFIED" => Some(Self::Unspecified),
+ "FILE_FORMAT_KIND_CSV" => Some(Self::Csv),
+ "FILE_FORMAT_KIND_JSON" => Some(Self::Json),
+ "FILE_FORMAT_KIND_PARQUET" => Some(Self::Parquet),
+ "FILE_FORMAT_KIND_ARROW" => Some(Self::Arrow),
+ "FILE_FORMAT_KIND_AVRO" => Some(Self::Avro),
+ _ => None,
+ }
+ }
+}
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord,
::prost::Enumeration)]
#[repr(i32)]
pub enum WindowFrameUnits {
diff --git a/datafusion/proto/src/logical_plan/mod.rs
b/datafusion/proto/src/logical_plan/mod.rs
index 218c2e4e47..a5d74d7f49 100644
--- a/datafusion/proto/src/logical_plan/mod.rs
+++ b/datafusion/proto/src/logical_plan/mod.rs
@@ -44,13 +44,15 @@ use datafusion_datasource::file_format::FileFormat;
use datafusion_datasource::file_format::{
FileFormatFactory, file_type_to_format, format_as_file_type,
};
-use datafusion_datasource_arrow::file_format::ArrowFormat;
+use datafusion_datasource_arrow::file_format::{ArrowFormat,
ArrowFormatFactory};
#[cfg(feature = "avro")]
use datafusion_datasource_avro::file_format::AvroFormat;
-use datafusion_datasource_csv::file_format::CsvFormat;
-use datafusion_datasource_json::file_format::JsonFormat as OtherNdJsonFormat;
+use datafusion_datasource_csv::file_format::{CsvFormat, CsvFormatFactory};
+use datafusion_datasource_json::file_format::{
+ JsonFormat as OtherNdJsonFormat, JsonFormatFactory,
+};
#[cfg(feature = "parquet")]
-use datafusion_datasource_parquet::file_format::ParquetFormat;
+use datafusion_datasource_parquet::file_format::{ParquetFormat,
ParquetFormatFactory};
use datafusion_expr::{
AggregateUDF, DmlStatement, FetchType, RecursiveQuery, SkipType,
TableSource, Unnest,
};
@@ -207,6 +209,99 @@ impl LogicalExtensionCodec for
DefaultLogicalExtensionCodec {
) -> Result<()> {
not_impl_err!("LogicalExtensionCodec is not provided")
}
+
+ fn try_decode_file_format(
+ &self,
+ buf: &[u8],
+ ctx: &TaskContext,
+ ) -> Result<Arc<dyn FileFormatFactory>> {
+ let proto = protobuf::FileFormatProto::decode(buf).map_err(|e| {
+ internal_datafusion_err!("Failed to decode FileFormatProto: {e}")
+ })?;
+
+ let kind = protobuf::FileFormatKind::try_from(proto.kind).map_err(|_| {
+ internal_datafusion_err!("Unknown FileFormatKind: {}", proto.kind)
+ })?;
+
+ match kind {
+ protobuf::FileFormatKind::Csv =>
file_formats::CsvLogicalExtensionCodec
+ .try_decode_file_format(&proto.encoded_file_format, ctx),
+ protobuf::FileFormatKind::Json =>
file_formats::JsonLogicalExtensionCodec
+ .try_decode_file_format(&proto.encoded_file_format, ctx),
+ #[cfg(feature = "parquet")]
+ protobuf::FileFormatKind::Parquet => {
+ file_formats::ParquetLogicalExtensionCodec
+ .try_decode_file_format(&proto.encoded_file_format, ctx)
+ }
+ protobuf::FileFormatKind::Arrow =>
file_formats::ArrowLogicalExtensionCodec
+ .try_decode_file_format(&proto.encoded_file_format, ctx),
+ protobuf::FileFormatKind::Avro =>
file_formats::AvroLogicalExtensionCodec
+ .try_decode_file_format(&proto.encoded_file_format, ctx),
+ #[cfg(not(feature = "parquet"))]
+ protobuf::FileFormatKind::Parquet => {
+ not_impl_err!("Parquet support requires the 'parquet' feature")
+ }
+ protobuf::FileFormatKind::Unspecified => {
+ not_impl_err!("Unspecified file format kind")
+ }
+ }
+ }
+
+ fn try_encode_file_format(
+ &self,
+ buf: &mut Vec<u8>,
+ node: Arc<dyn FileFormatFactory>,
+ ) -> Result<()> {
+ let mut encoded_file_format = Vec::new();
+
+ let kind = if
node.as_any().downcast_ref::<CsvFormatFactory>().is_some() {
+ file_formats::CsvLogicalExtensionCodec
+ .try_encode_file_format(&mut encoded_file_format,
Arc::clone(&node))?;
+ protobuf::FileFormatKind::Csv
+ } else if node.as_any().downcast_ref::<JsonFormatFactory>().is_some() {
+ file_formats::JsonLogicalExtensionCodec
+ .try_encode_file_format(&mut encoded_file_format,
Arc::clone(&node))?;
+ protobuf::FileFormatKind::Json
+ } else if node.as_any().downcast_ref::<ArrowFormatFactory>().is_some()
{
+ file_formats::ArrowLogicalExtensionCodec
+ .try_encode_file_format(&mut encoded_file_format,
Arc::clone(&node))?;
+ protobuf::FileFormatKind::Arrow
+ } else {
+ #[cfg(feature = "parquet")]
+ {
+ if node
+ .as_any()
+ .downcast_ref::<ParquetFormatFactory>()
+ .is_some()
+ {
+
file_formats::ParquetLogicalExtensionCodec.try_encode_file_format(
+ &mut encoded_file_format,
+ Arc::clone(&node),
+ )?;
+ protobuf::FileFormatKind::Parquet
+ } else {
+ return not_impl_err!(
+ "Unsupported FileFormatFactory type for
DefaultLogicalExtensionCodec"
+ );
+ }
+ }
+ #[cfg(not(feature = "parquet"))]
+ {
+ return not_impl_err!(
+ "Unsupported FileFormatFactory type for
DefaultLogicalExtensionCodec"
+ );
+ }
+ };
+
+ let proto = protobuf::FileFormatProto {
+ kind: kind as i32,
+ encoded_file_format,
+ };
+ proto.encode(buf).map_err(|e| {
+ internal_datafusion_err!("Failed to encode FileFormatProto: {e}")
+ })?;
+ Ok(())
+ }
}
#[macro_export]
diff --git a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
index 9407cbf9a0..63ad00c92e 100644
--- a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
+++ b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
@@ -743,6 +743,192 @@ async fn roundtrip_logical_plan_copy_to_parquet() ->
Result<()> {
Ok(())
}
+#[tokio::test]
+async fn roundtrip_default_codec_csv() -> Result<()> {
+ let ctx = SessionContext::new();
+ let input = create_csv_scan(&ctx).await?;
+
+ let table_options =
+
TableOptions::default_from_session_config(ctx.state().config_options());
+ let mut csv_format = table_options.csv;
+ csv_format.delimiter = b'|';
+ csv_format.has_header = Some(true);
+ csv_format.compression = CompressionTypeVariant::GZIP;
+
+ let file_type =
format_as_file_type(Arc::new(CsvFormatFactory::new_with_options(
+ csv_format.clone(),
+ )));
+
+ let plan = LogicalPlan::Copy(CopyTo::new(
+ Arc::new(input),
+ "test.csv".to_string(),
+ vec![],
+ file_type,
+ Default::default(),
+ ));
+
+ let bytes = logical_plan_to_bytes(&plan)?;
+ let roundtrip = logical_plan_from_bytes(&bytes, &ctx.task_ctx())?;
+
+ match roundtrip {
+ LogicalPlan::Copy(copy_to) => {
+ assert_eq!("test.csv", copy_to.output_url);
+ assert_eq!("csv", copy_to.file_type.get_ext());
+ let dt = copy_to
+ .file_type
+ .as_ref()
+ .as_any()
+ .downcast_ref::<DefaultFileType>()
+ .unwrap();
+ let csv = dt
+ .as_format_factory()
+ .as_ref()
+ .as_any()
+ .downcast_ref::<CsvFormatFactory>()
+ .unwrap();
+ let decoded = csv.options.as_ref().unwrap();
+ assert_eq!(csv_format.delimiter, decoded.delimiter);
+ assert_eq!(csv_format.has_header, decoded.has_header);
+ assert_eq!(csv_format.compression, decoded.compression);
+ }
+ _ => panic!("Expected CopyTo plan"),
+ }
+ Ok(())
+}
+
+#[tokio::test]
+async fn roundtrip_default_codec_json() -> Result<()> {
+ let ctx = SessionContext::new();
+ let input = create_json_scan(&ctx).await?;
+
+ let table_options =
+
TableOptions::default_from_session_config(ctx.state().config_options());
+ let mut json_format = table_options.json;
+ json_format.compression = CompressionTypeVariant::GZIP;
+ json_format.schema_infer_max_rec = Some(500);
+
+ let file_type =
format_as_file_type(Arc::new(JsonFormatFactory::new_with_options(
+ json_format.clone(),
+ )));
+
+ let plan = LogicalPlan::Copy(CopyTo::new(
+ Arc::new(input),
+ "test.json".to_string(),
+ vec![],
+ file_type,
+ Default::default(),
+ ));
+
+ let bytes = logical_plan_to_bytes(&plan)?;
+ let roundtrip = logical_plan_from_bytes(&bytes, &ctx.task_ctx())?;
+
+ match roundtrip {
+ LogicalPlan::Copy(copy_to) => {
+ assert_eq!("test.json", copy_to.output_url);
+ assert_eq!("json", copy_to.file_type.get_ext());
+ let dt = copy_to
+ .file_type
+ .as_ref()
+ .as_any()
+ .downcast_ref::<DefaultFileType>()
+ .unwrap();
+ let json = dt
+ .as_format_factory()
+ .as_ref()
+ .as_any()
+ .downcast_ref::<JsonFormatFactory>()
+ .unwrap();
+ let decoded = json.options.as_ref().unwrap();
+ assert_eq!(json_format.compression, decoded.compression);
+ assert_eq!(
+ json_format.schema_infer_max_rec,
+ decoded.schema_infer_max_rec
+ );
+ }
+ _ => panic!("Expected CopyTo plan"),
+ }
+ Ok(())
+}
+
+#[tokio::test]
+async fn roundtrip_default_codec_parquet() -> Result<()> {
+ let ctx = SessionContext::new();
+ let input = create_parquet_scan(&ctx).await?;
+
+ let table_options =
+
TableOptions::default_from_session_config(ctx.state().config_options());
+ let mut parquet_format = table_options.parquet;
+ parquet_format.global.bloom_filter_on_read = true;
+ parquet_format.global.created_by = "DefaultCodecTest".to_string();
+
+ let file_type = format_as_file_type(Arc::new(
+ ParquetFormatFactory::new_with_options(parquet_format.clone()),
+ ));
+
+ let plan = LogicalPlan::Copy(CopyTo::new(
+ Arc::new(input),
+ "test.parquet".to_string(),
+ vec![],
+ file_type,
+ Default::default(),
+ ));
+
+ let bytes = logical_plan_to_bytes(&plan)?;
+ let roundtrip = logical_plan_from_bytes(&bytes, &ctx.task_ctx())?;
+
+ match roundtrip {
+ LogicalPlan::Copy(copy_to) => {
+ assert_eq!("test.parquet", copy_to.output_url);
+ assert_eq!("parquet", copy_to.file_type.get_ext());
+ let dt = copy_to
+ .file_type
+ .as_ref()
+ .as_any()
+ .downcast_ref::<DefaultFileType>()
+ .unwrap();
+ let pq = dt
+ .as_format_factory()
+ .as_ref()
+ .as_any()
+ .downcast_ref::<ParquetFormatFactory>()
+ .unwrap();
+ let decoded = pq.options.as_ref().unwrap();
+ assert!(decoded.global.bloom_filter_on_read);
+ assert_eq!("DefaultCodecTest", decoded.global.created_by);
+ }
+ _ => panic!("Expected CopyTo plan"),
+ }
+ Ok(())
+}
+
+#[tokio::test]
+async fn roundtrip_default_codec_arrow() -> Result<()> {
+ let ctx = SessionContext::new();
+ let input = create_csv_scan(&ctx).await?;
+
+ let file_type = format_as_file_type(Arc::new(ArrowFormatFactory::new()));
+
+ let plan = LogicalPlan::Copy(CopyTo::new(
+ Arc::new(input),
+ "test.arrow".to_string(),
+ vec![],
+ file_type,
+ Default::default(),
+ ));
+
+ let bytes = logical_plan_to_bytes(&plan)?;
+ let roundtrip = logical_plan_from_bytes(&bytes, &ctx.task_ctx())?;
+
+ match roundtrip {
+ LogicalPlan::Copy(copy_to) => {
+ assert_eq!("test.arrow", copy_to.output_url);
+ assert_eq!("arrow", copy_to.file_type.get_ext());
+ }
+ _ => panic!("Expected CopyTo plan"),
+ }
+ Ok(())
+}
+
async fn create_csv_scan(ctx: &SessionContext) -> Result<LogicalPlan,
DataFusionError> {
ctx.register_csv("t1", "tests/testdata/test.csv",
CsvReadOptions::default())
.await?;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]