This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 5a9712efd7 minor: unnest protobuf serde support (#10681)
5a9712efd7 is described below

commit 5a9712efd7a2b8a98f404fb0627557183fe3789f
Author: Andrey Koshchiy <[email protected]>
AuthorDate: Tue May 28 16:04:07 2024 +0300

    minor: unnest protobuf serde support (#10681)
    
    Signed-off-by: Andrey Koshchiy <[email protected]>
---
 datafusion/proto/proto/datafusion.proto            |  15 +
 datafusion/proto/src/generated/pbjson.rs           | 311 +++++++++++++++++++++
 datafusion/proto/src/generated/prost.rs            |  28 +-
 datafusion/proto/src/logical_plan/from_proto.rs    |  10 +-
 datafusion/proto/src/logical_plan/mod.rs           |  67 ++++-
 datafusion/proto/src/logical_plan/to_proto.rs      |  10 +-
 .../proto/tests/cases/roundtrip_logical_plan.rs    |  25 ++
 7 files changed, 459 insertions(+), 7 deletions(-)

diff --git a/datafusion/proto/proto/datafusion.proto 
b/datafusion/proto/proto/datafusion.proto
index cb0ae0f551..6b4e2aae29 100644
--- a/datafusion/proto/proto/datafusion.proto
+++ b/datafusion/proto/proto/datafusion.proto
@@ -58,6 +58,7 @@ message LogicalPlanNode {
     DropViewNode drop_view = 27;
     DistinctOnNode distinct_on = 28;
     CopyToNode copy_to = 29;
+    UnnestNode unnest = 30;
   }
 }
 
@@ -260,6 +261,20 @@ message CopyToNode {
     repeated string partition_by = 7;
 }
 
+message UnnestNode {
+    LogicalPlanNode input = 1;
+    repeated datafusion_common.Column exec_columns = 2;
+    repeated uint64 list_type_columns = 3;
+    repeated uint64 struct_type_columns = 4;
+    repeated uint64 dependency_indices = 5;
+    datafusion_common.DfSchema schema = 6;
+    UnnestOptions options = 7;
+}
+
+message UnnestOptions {
+    bool preserve_nulls = 1;
+}
+
 message UnionNode {
   repeated LogicalPlanNode inputs = 1;
 }
diff --git a/datafusion/proto/src/generated/pbjson.rs 
b/datafusion/proto/src/generated/pbjson.rs
index 2edbae2429..bbee3311b7 100644
--- a/datafusion/proto/src/generated/pbjson.rs
+++ b/datafusion/proto/src/generated/pbjson.rs
@@ -10360,6 +10360,9 @@ impl serde::Serialize for LogicalPlanNode {
                 logical_plan_node::LogicalPlanType::CopyTo(v) => {
                     struct_ser.serialize_field("copyTo", v)?;
                 }
+                logical_plan_node::LogicalPlanType::Unnest(v) => {
+                    struct_ser.serialize_field("unnest", v)?;
+                }
             }
         }
         struct_ser.end()
@@ -10413,6 +10416,7 @@ impl<'de> serde::Deserialize<'de> for LogicalPlanNode {
             "distinctOn",
             "copy_to",
             "copyTo",
+            "unnest",
         ];
 
         #[allow(clippy::enum_variant_names)]
@@ -10445,6 +10449,7 @@ impl<'de> serde::Deserialize<'de> for LogicalPlanNode {
             DropView,
             DistinctOn,
             CopyTo,
+            Unnest,
         }
         impl<'de> serde::Deserialize<'de> for GeneratedField {
             fn deserialize<D>(deserializer: D) -> 
std::result::Result<GeneratedField, D::Error>
@@ -10494,6 +10499,7 @@ impl<'de> serde::Deserialize<'de> for LogicalPlanNode {
                             "dropView" | "drop_view" => 
Ok(GeneratedField::DropView),
                             "distinctOn" | "distinct_on" => 
Ok(GeneratedField::DistinctOn),
                             "copyTo" | "copy_to" => Ok(GeneratedField::CopyTo),
+                            "unnest" => Ok(GeneratedField::Unnest),
                             _ => Err(serde::de::Error::unknown_field(value, 
FIELDS)),
                         }
                     }
@@ -10710,6 +10716,13 @@ impl<'de> serde::Deserialize<'de> for LogicalPlanNode {
                                 return 
Err(serde::de::Error::duplicate_field("copyTo"));
                             }
                             logical_plan_type__ = 
map_.next_value::<::std::option::Option<_>>()?.map(logical_plan_node::LogicalPlanType::CopyTo)
+;
+                        }
+                        GeneratedField::Unnest => {
+                            if logical_plan_type__.is_some() {
+                                return 
Err(serde::de::Error::duplicate_field("unnest"));
+                            }
+                            logical_plan_type__ = 
map_.next_value::<::std::option::Option<_>>()?.map(logical_plan_node::LogicalPlanType::Unnest)
 ;
                         }
                     }
@@ -19229,6 +19242,304 @@ impl<'de> serde::Deserialize<'de> for Unnest {
         deserializer.deserialize_struct("datafusion.Unnest", FIELDS, 
GeneratedVisitor)
     }
 }
+impl serde::Serialize for UnnestNode {
+    #[allow(deprecated)]
+    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, 
S::Error>
+    where
+        S: serde::Serializer,
+    {
+        use serde::ser::SerializeStruct;
+        let mut len = 0;
+        if self.input.is_some() {
+            len += 1;
+        }
+        if !self.exec_columns.is_empty() {
+            len += 1;
+        }
+        if !self.list_type_columns.is_empty() {
+            len += 1;
+        }
+        if !self.struct_type_columns.is_empty() {
+            len += 1;
+        }
+        if !self.dependency_indices.is_empty() {
+            len += 1;
+        }
+        if self.schema.is_some() {
+            len += 1;
+        }
+        if self.options.is_some() {
+            len += 1;
+        }
+        let mut struct_ser = 
serializer.serialize_struct("datafusion.UnnestNode", len)?;
+        if let Some(v) = self.input.as_ref() {
+            struct_ser.serialize_field("input", v)?;
+        }
+        if !self.exec_columns.is_empty() {
+            struct_ser.serialize_field("execColumns", &self.exec_columns)?;
+        }
+        if !self.list_type_columns.is_empty() {
+            struct_ser.serialize_field("listTypeColumns", 
&self.list_type_columns.iter().map(ToString::to_string).collect::<Vec<_>>())?;
+        }
+        if !self.struct_type_columns.is_empty() {
+            struct_ser.serialize_field("structTypeColumns", 
&self.struct_type_columns.iter().map(ToString::to_string).collect::<Vec<_>>())?;
+        }
+        if !self.dependency_indices.is_empty() {
+            struct_ser.serialize_field("dependencyIndices", 
&self.dependency_indices.iter().map(ToString::to_string).collect::<Vec<_>>())?;
+        }
+        if let Some(v) = self.schema.as_ref() {
+            struct_ser.serialize_field("schema", v)?;
+        }
+        if let Some(v) = self.options.as_ref() {
+            struct_ser.serialize_field("options", v)?;
+        }
+        struct_ser.end()
+    }
+}
+impl<'de> serde::Deserialize<'de> for UnnestNode {
+    #[allow(deprecated)]
+    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
+    where
+        D: serde::Deserializer<'de>,
+    {
+        const FIELDS: &[&str] = &[
+            "input",
+            "exec_columns",
+            "execColumns",
+            "list_type_columns",
+            "listTypeColumns",
+            "struct_type_columns",
+            "structTypeColumns",
+            "dependency_indices",
+            "dependencyIndices",
+            "schema",
+            "options",
+        ];
+
+        #[allow(clippy::enum_variant_names)]
+        enum GeneratedField {
+            Input,
+            ExecColumns,
+            ListTypeColumns,
+            StructTypeColumns,
+            DependencyIndices,
+            Schema,
+            Options,
+        }
+        impl<'de> serde::Deserialize<'de> for GeneratedField {
+            fn deserialize<D>(deserializer: D) -> 
std::result::Result<GeneratedField, D::Error>
+            where
+                D: serde::Deserializer<'de>,
+            {
+                struct GeneratedVisitor;
+
+                impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
+                    type Value = GeneratedField;
+
+                    fn expecting(&self, formatter: &mut 
std::fmt::Formatter<'_>) -> std::fmt::Result {
+                        write!(formatter, "expected one of: {:?}", &FIELDS)
+                    }
+
+                    #[allow(unused_variables)]
+                    fn visit_str<E>(self, value: &str) -> 
std::result::Result<GeneratedField, E>
+                    where
+                        E: serde::de::Error,
+                    {
+                        match value {
+                            "input" => Ok(GeneratedField::Input),
+                            "execColumns" | "exec_columns" => 
Ok(GeneratedField::ExecColumns),
+                            "listTypeColumns" | "list_type_columns" => 
Ok(GeneratedField::ListTypeColumns),
+                            "structTypeColumns" | "struct_type_columns" => 
Ok(GeneratedField::StructTypeColumns),
+                            "dependencyIndices" | "dependency_indices" => 
Ok(GeneratedField::DependencyIndices),
+                            "schema" => Ok(GeneratedField::Schema),
+                            "options" => Ok(GeneratedField::Options),
+                            _ => Err(serde::de::Error::unknown_field(value, 
FIELDS)),
+                        }
+                    }
+                }
+                deserializer.deserialize_identifier(GeneratedVisitor)
+            }
+        }
+        struct GeneratedVisitor;
+        impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
+            type Value = UnnestNode;
+
+            fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> 
std::fmt::Result {
+                formatter.write_str("struct datafusion.UnnestNode")
+            }
+
+            fn visit_map<V>(self, mut map_: V) -> 
std::result::Result<UnnestNode, V::Error>
+                where
+                    V: serde::de::MapAccess<'de>,
+            {
+                let mut input__ = None;
+                let mut exec_columns__ = None;
+                let mut list_type_columns__ = None;
+                let mut struct_type_columns__ = None;
+                let mut dependency_indices__ = None;
+                let mut schema__ = None;
+                let mut options__ = None;
+                while let Some(k) = map_.next_key()? {
+                    match k {
+                        GeneratedField::Input => {
+                            if input__.is_some() {
+                                return 
Err(serde::de::Error::duplicate_field("input"));
+                            }
+                            input__ = map_.next_value()?;
+                        }
+                        GeneratedField::ExecColumns => {
+                            if exec_columns__.is_some() {
+                                return 
Err(serde::de::Error::duplicate_field("execColumns"));
+                            }
+                            exec_columns__ = Some(map_.next_value()?);
+                        }
+                        GeneratedField::ListTypeColumns => {
+                            if list_type_columns__.is_some() {
+                                return 
Err(serde::de::Error::duplicate_field("listTypeColumns"));
+                            }
+                            list_type_columns__ = 
+                                
Some(map_.next_value::<Vec<::pbjson::private::NumberDeserialize<_>>>()?
+                                    .into_iter().map(|x| x.0).collect())
+                            ;
+                        }
+                        GeneratedField::StructTypeColumns => {
+                            if struct_type_columns__.is_some() {
+                                return 
Err(serde::de::Error::duplicate_field("structTypeColumns"));
+                            }
+                            struct_type_columns__ = 
+                                
Some(map_.next_value::<Vec<::pbjson::private::NumberDeserialize<_>>>()?
+                                    .into_iter().map(|x| x.0).collect())
+                            ;
+                        }
+                        GeneratedField::DependencyIndices => {
+                            if dependency_indices__.is_some() {
+                                return 
Err(serde::de::Error::duplicate_field("dependencyIndices"));
+                            }
+                            dependency_indices__ = 
+                                
Some(map_.next_value::<Vec<::pbjson::private::NumberDeserialize<_>>>()?
+                                    .into_iter().map(|x| x.0).collect())
+                            ;
+                        }
+                        GeneratedField::Schema => {
+                            if schema__.is_some() {
+                                return 
Err(serde::de::Error::duplicate_field("schema"));
+                            }
+                            schema__ = map_.next_value()?;
+                        }
+                        GeneratedField::Options => {
+                            if options__.is_some() {
+                                return 
Err(serde::de::Error::duplicate_field("options"));
+                            }
+                            options__ = map_.next_value()?;
+                        }
+                    }
+                }
+                Ok(UnnestNode {
+                    input: input__,
+                    exec_columns: exec_columns__.unwrap_or_default(),
+                    list_type_columns: list_type_columns__.unwrap_or_default(),
+                    struct_type_columns: 
struct_type_columns__.unwrap_or_default(),
+                    dependency_indices: 
dependency_indices__.unwrap_or_default(),
+                    schema: schema__,
+                    options: options__,
+                })
+            }
+        }
+        deserializer.deserialize_struct("datafusion.UnnestNode", FIELDS, 
GeneratedVisitor)
+    }
+}
+impl serde::Serialize for UnnestOptions {
+    #[allow(deprecated)]
+    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, 
S::Error>
+    where
+        S: serde::Serializer,
+    {
+        use serde::ser::SerializeStruct;
+        let mut len = 0;
+        if self.preserve_nulls {
+            len += 1;
+        }
+        let mut struct_ser = 
serializer.serialize_struct("datafusion.UnnestOptions", len)?;
+        if self.preserve_nulls {
+            struct_ser.serialize_field("preserveNulls", &self.preserve_nulls)?;
+        }
+        struct_ser.end()
+    }
+}
+impl<'de> serde::Deserialize<'de> for UnnestOptions {
+    #[allow(deprecated)]
+    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
+    where
+        D: serde::Deserializer<'de>,
+    {
+        const FIELDS: &[&str] = &[
+            "preserve_nulls",
+            "preserveNulls",
+        ];
+
+        #[allow(clippy::enum_variant_names)]
+        enum GeneratedField {
+            PreserveNulls,
+        }
+        impl<'de> serde::Deserialize<'de> for GeneratedField {
+            fn deserialize<D>(deserializer: D) -> 
std::result::Result<GeneratedField, D::Error>
+            where
+                D: serde::Deserializer<'de>,
+            {
+                struct GeneratedVisitor;
+
+                impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
+                    type Value = GeneratedField;
+
+                    fn expecting(&self, formatter: &mut 
std::fmt::Formatter<'_>) -> std::fmt::Result {
+                        write!(formatter, "expected one of: {:?}", &FIELDS)
+                    }
+
+                    #[allow(unused_variables)]
+                    fn visit_str<E>(self, value: &str) -> 
std::result::Result<GeneratedField, E>
+                    where
+                        E: serde::de::Error,
+                    {
+                        match value {
+                            "preserveNulls" | "preserve_nulls" => 
Ok(GeneratedField::PreserveNulls),
+                            _ => Err(serde::de::Error::unknown_field(value, 
FIELDS)),
+                        }
+                    }
+                }
+                deserializer.deserialize_identifier(GeneratedVisitor)
+            }
+        }
+        struct GeneratedVisitor;
+        impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
+            type Value = UnnestOptions;
+
+            fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> 
std::fmt::Result {
+                formatter.write_str("struct datafusion.UnnestOptions")
+            }
+
+            fn visit_map<V>(self, mut map_: V) -> 
std::result::Result<UnnestOptions, V::Error>
+                where
+                    V: serde::de::MapAccess<'de>,
+            {
+                let mut preserve_nulls__ = None;
+                while let Some(k) = map_.next_key()? {
+                    match k {
+                        GeneratedField::PreserveNulls => {
+                            if preserve_nulls__.is_some() {
+                                return 
Err(serde::de::Error::duplicate_field("preserveNulls"));
+                            }
+                            preserve_nulls__ = Some(map_.next_value()?);
+                        }
+                    }
+                }
+                Ok(UnnestOptions {
+                    preserve_nulls: preserve_nulls__.unwrap_or_default(),
+                })
+            }
+        }
+        deserializer.deserialize_struct("datafusion.UnnestOptions", FIELDS, 
GeneratedVisitor)
+    }
+}
 impl serde::Serialize for ValuesNode {
     #[allow(deprecated)]
     fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, 
S::Error>
diff --git a/datafusion/proto/src/generated/prost.rs 
b/datafusion/proto/src/generated/prost.rs
index e9407cc65b..0354ead9e7 100644
--- a/datafusion/proto/src/generated/prost.rs
+++ b/datafusion/proto/src/generated/prost.rs
@@ -6,7 +6,7 @@
 pub struct LogicalPlanNode {
     #[prost(
         oneof = "logical_plan_node::LogicalPlanType",
-        tags = "1, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 
19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29"
+        tags = "1, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 
19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30"
     )]
     pub logical_plan_type: 
::core::option::Option<logical_plan_node::LogicalPlanType>,
 }
@@ -71,6 +71,8 @@ pub mod logical_plan_node {
         DistinctOn(::prost::alloc::boxed::Box<super::DistinctOnNode>),
         #[prost(message, tag = "29")]
         CopyTo(::prost::alloc::boxed::Box<super::CopyToNode>),
+        #[prost(message, tag = "30")]
+        Unnest(::prost::alloc::boxed::Box<super::UnnestNode>),
     }
 }
 #[allow(clippy::derive_partial_eq_without_eq)]
@@ -433,6 +435,30 @@ pub mod copy_to_node {
 }
 #[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
+pub struct UnnestNode {
+    #[prost(message, optional, boxed, tag = "1")]
+    pub input: 
::core::option::Option<::prost::alloc::boxed::Box<LogicalPlanNode>>,
+    #[prost(message, repeated, tag = "2")]
+    pub exec_columns: 
::prost::alloc::vec::Vec<super::datafusion_common::Column>,
+    #[prost(uint64, repeated, tag = "3")]
+    pub list_type_columns: ::prost::alloc::vec::Vec<u64>,
+    #[prost(uint64, repeated, tag = "4")]
+    pub struct_type_columns: ::prost::alloc::vec::Vec<u64>,
+    #[prost(uint64, repeated, tag = "5")]
+    pub dependency_indices: ::prost::alloc::vec::Vec<u64>,
+    #[prost(message, optional, tag = "6")]
+    pub schema: ::core::option::Option<super::datafusion_common::DfSchema>,
+    #[prost(message, optional, tag = "7")]
+    pub options: ::core::option::Option<UnnestOptions>,
+}
+#[allow(clippy::derive_partial_eq_without_eq)]
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct UnnestOptions {
+    #[prost(bool, tag = "1")]
+    pub preserve_nulls: bool,
+}
+#[allow(clippy::derive_partial_eq_without_eq)]
+#[derive(Clone, PartialEq, ::prost::Message)]
 pub struct UnionNode {
     #[prost(message, repeated, tag = "1")]
     pub inputs: ::prost::alloc::vec::Vec<LogicalPlanNode>,
diff --git a/datafusion/proto/src/logical_plan/from_proto.rs 
b/datafusion/proto/src/logical_plan/from_proto.rs
index 905c6654cf..e2a2f875ea 100644
--- a/datafusion/proto/src/logical_plan/from_proto.rs
+++ b/datafusion/proto/src/logical_plan/from_proto.rs
@@ -20,7 +20,7 @@ use std::sync::Arc;
 use datafusion::execution::registry::FunctionRegistry;
 use datafusion_common::{
     internal_err, plan_datafusion_err, DataFusionError, Result, ScalarValue,
-    TableReference,
+    TableReference, UnnestOptions,
 };
 use datafusion_expr::expr::Unnest;
 use datafusion_expr::expr::{Alias, Placeholder};
@@ -50,6 +50,14 @@ use crate::protobuf::{
 
 use super::LogicalExtensionCodec;
 
+impl From<&protobuf::UnnestOptions> for UnnestOptions {
+    fn from(opts: &protobuf::UnnestOptions) -> Self {
+        Self {
+            preserve_nulls: opts.preserve_nulls,
+        }
+    }
+}
+
 impl From<protobuf::WindowFrameUnits> for WindowFrameUnits {
     fn from(units: protobuf::WindowFrameUnits) -> Self {
         match units {
diff --git a/datafusion/proto/src/logical_plan/mod.rs 
b/datafusion/proto/src/logical_plan/mod.rs
index 939bda9f24..ef37150a35 100644
--- a/datafusion/proto/src/logical_plan/mod.rs
+++ b/datafusion/proto/src/logical_plan/mod.rs
@@ -22,7 +22,7 @@ use std::sync::Arc;
 use crate::protobuf::logical_plan_node::LogicalPlanType::CustomScan;
 use crate::protobuf::{CustomTableScanNode, LogicalExprNodeCollection};
 use crate::{
-    convert_required,
+    convert_required, into_required,
     protobuf::{
         self, listing_table_scan_node::FileFormatType,
         logical_plan_node::LogicalPlanType, LogicalExtensionNode, 
LogicalPlanNode,
@@ -47,6 +47,7 @@ use datafusion_common::{
     context, internal_datafusion_err, internal_err, not_impl_err, 
DataFusionError,
     Result, TableReference,
 };
+use datafusion_expr::Unnest;
 use datafusion_expr::{
     dml,
     logical_plan::{
@@ -838,6 +839,31 @@ impl AsLogicalPlan for LogicalPlanNode {
                     },
                 ))
             }
+            LogicalPlanType::Unnest(unnest) => {
+                let input: LogicalPlan =
+                    into_logical_plan!(unnest.input, ctx, extension_codec)?;
+                Ok(datafusion_expr::LogicalPlan::Unnest(Unnest {
+                    input: Arc::new(input),
+                    exec_columns: unnest.exec_columns.iter().map(|c| 
c.into()).collect(),
+                    list_type_columns: unnest
+                        .list_type_columns
+                        .iter()
+                        .map(|c| *c as usize)
+                        .collect(),
+                    struct_type_columns: unnest
+                        .struct_type_columns
+                        .iter()
+                        .map(|c| *c as usize)
+                        .collect(),
+                    dependency_indices: unnest
+                        .dependency_indices
+                        .iter()
+                        .map(|c| *c as usize)
+                        .collect(),
+                    schema: Arc::new(convert_required!(unnest.schema)?),
+                    options: into_required!(unnest.options)?,
+                }))
+            }
         }
     }
 
@@ -1510,9 +1536,42 @@ impl AsLogicalPlan for LogicalPlanNode {
                     ))),
                 })
             }
-            LogicalPlan::Unnest(_) => Err(proto_error(
-                "LogicalPlan serde is not yet implemented for Unnest",
-            )),
+            LogicalPlan::Unnest(Unnest {
+                input,
+                exec_columns,
+                list_type_columns,
+                struct_type_columns,
+                dependency_indices,
+                schema,
+                options,
+            }) => {
+                let input = protobuf::LogicalPlanNode::try_from_logical_plan(
+                    input,
+                    extension_codec,
+                )?;
+                Ok(protobuf::LogicalPlanNode {
+                    logical_plan_type: Some(LogicalPlanType::Unnest(Box::new(
+                        protobuf::UnnestNode {
+                            input: Some(Box::new(input)),
+                            exec_columns: exec_columns.iter().map(|c| 
c.into()).collect(),
+                            list_type_columns: list_type_columns
+                                .iter()
+                                .map(|c| *c as u64)
+                                .collect(),
+                            struct_type_columns: struct_type_columns
+                                .iter()
+                                .map(|c| *c as u64)
+                                .collect(),
+                            dependency_indices: dependency_indices
+                                .iter()
+                                .map(|c| *c as u64)
+                                .collect(),
+                            schema: Some(schema.try_into()?),
+                            options: Some(options.into()),
+                        },
+                    ))),
+                })
+            }
             LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(_)) => 
Err(proto_error(
                 "LogicalPlan serde is not yet implemented for 
CreateMemoryTable",
             )),
diff --git a/datafusion/proto/src/logical_plan/to_proto.rs 
b/datafusion/proto/src/logical_plan/to_proto.rs
index b0059aff61..d2783305f6 100644
--- a/datafusion/proto/src/logical_plan/to_proto.rs
+++ b/datafusion/proto/src/logical_plan/to_proto.rs
@@ -19,7 +19,7 @@
 //! DataFusion logical plans to be serialized and transmitted between
 //! processes.
 
-use datafusion_common::TableReference;
+use datafusion_common::{TableReference, UnnestOptions};
 use datafusion_expr::expr::{
     self, AggregateFunctionDefinition, Alias, Between, BinaryExpr, Cast, 
GroupingSet,
     InList, Like, Placeholder, ScalarFunction, Sort, Unnest,
@@ -45,6 +45,14 @@ use crate::protobuf::{
 
 use super::LogicalExtensionCodec;
 
+impl From<&UnnestOptions> for protobuf::UnnestOptions {
+    fn from(opts: &UnnestOptions) -> Self {
+        Self {
+            preserve_nulls: opts.preserve_nulls,
+        }
+    }
+}
+
 impl From<&StringifiedPlan> for protobuf::StringifiedPlan {
     fn from(stringified_plan: &StringifiedPlan) -> Self {
         Self {
diff --git a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs 
b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
index d2721dfafd..b756d4688d 100644
--- a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
+++ b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
@@ -522,6 +522,31 @@ async fn roundtrip_logical_plan_with_extension() -> 
Result<()> {
     Ok(())
 }
 
+#[tokio::test]
+async fn roundtrip_logical_plan_unnest() -> Result<()> {
+    let ctx = SessionContext::new();
+    let schema = Schema::new(vec![
+        Field::new("a", DataType::Int64, true),
+        Field::new(
+            "b",
+            DataType::List(Arc::new(Field::new("item", DataType::Int32, 
false))),
+            true,
+        ),
+    ]);
+    ctx.register_csv(
+        "t1",
+        "tests/testdata/test.csv",
+        CsvReadOptions::default().schema(&schema),
+    )
+    .await?;
+    let query = "SELECT unnest(b) FROM t1";
+    let plan = ctx.sql(query).await?.into_optimized_plan()?;
+    let bytes = logical_plan_to_bytes(&plan)?;
+    let logical_round_trip = logical_plan_from_bytes(&bytes, &ctx)?;
+    assert_eq!(format!("{plan:?}"), format!("{logical_round_trip:?}"));
+    Ok(())
+}
+
 #[tokio::test]
 async fn roundtrip_expr_api() -> Result<()> {
     let ctx = SessionContext::new();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to