This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 5a9712efd7 minor: unnest protobuf serde support (#10681)
5a9712efd7 is described below
commit 5a9712efd7a2b8a98f404fb0627557183fe3789f
Author: Andrey Koshchiy <[email protected]>
AuthorDate: Tue May 28 16:04:07 2024 +0300
minor: unnest protobuf serde support (#10681)
Signed-off-by: Andrey Koshchiy <[email protected]>
---
datafusion/proto/proto/datafusion.proto | 15 +
datafusion/proto/src/generated/pbjson.rs | 311 +++++++++++++++++++++
datafusion/proto/src/generated/prost.rs | 28 +-
datafusion/proto/src/logical_plan/from_proto.rs | 10 +-
datafusion/proto/src/logical_plan/mod.rs | 67 ++++-
datafusion/proto/src/logical_plan/to_proto.rs | 10 +-
.../proto/tests/cases/roundtrip_logical_plan.rs | 25 ++
7 files changed, 459 insertions(+), 7 deletions(-)
diff --git a/datafusion/proto/proto/datafusion.proto
b/datafusion/proto/proto/datafusion.proto
index cb0ae0f551..6b4e2aae29 100644
--- a/datafusion/proto/proto/datafusion.proto
+++ b/datafusion/proto/proto/datafusion.proto
@@ -58,6 +58,7 @@ message LogicalPlanNode {
DropViewNode drop_view = 27;
DistinctOnNode distinct_on = 28;
CopyToNode copy_to = 29;
+ UnnestNode unnest = 30;
}
}
@@ -260,6 +261,20 @@ message CopyToNode {
repeated string partition_by = 7;
}
+message UnnestNode {
+ LogicalPlanNode input = 1;
+ repeated datafusion_common.Column exec_columns = 2;
+ repeated uint64 list_type_columns = 3;
+ repeated uint64 struct_type_columns = 4;
+ repeated uint64 dependency_indices = 5;
+ datafusion_common.DfSchema schema = 6;
+ UnnestOptions options = 7;
+}
+
+message UnnestOptions {
+ bool preserve_nulls = 1;
+}
+
message UnionNode {
repeated LogicalPlanNode inputs = 1;
}
diff --git a/datafusion/proto/src/generated/pbjson.rs
b/datafusion/proto/src/generated/pbjson.rs
index 2edbae2429..bbee3311b7 100644
--- a/datafusion/proto/src/generated/pbjson.rs
+++ b/datafusion/proto/src/generated/pbjson.rs
@@ -10360,6 +10360,9 @@ impl serde::Serialize for LogicalPlanNode {
logical_plan_node::LogicalPlanType::CopyTo(v) => {
struct_ser.serialize_field("copyTo", v)?;
}
+ logical_plan_node::LogicalPlanType::Unnest(v) => {
+ struct_ser.serialize_field("unnest", v)?;
+ }
}
}
struct_ser.end()
@@ -10413,6 +10416,7 @@ impl<'de> serde::Deserialize<'de> for LogicalPlanNode {
"distinctOn",
"copy_to",
"copyTo",
+ "unnest",
];
#[allow(clippy::enum_variant_names)]
@@ -10445,6 +10449,7 @@ impl<'de> serde::Deserialize<'de> for LogicalPlanNode {
DropView,
DistinctOn,
CopyTo,
+ Unnest,
}
impl<'de> serde::Deserialize<'de> for GeneratedField {
fn deserialize<D>(deserializer: D) ->
std::result::Result<GeneratedField, D::Error>
@@ -10494,6 +10499,7 @@ impl<'de> serde::Deserialize<'de> for LogicalPlanNode {
"dropView" | "drop_view" =>
Ok(GeneratedField::DropView),
"distinctOn" | "distinct_on" =>
Ok(GeneratedField::DistinctOn),
"copyTo" | "copy_to" => Ok(GeneratedField::CopyTo),
+ "unnest" => Ok(GeneratedField::Unnest),
_ => Err(serde::de::Error::unknown_field(value,
FIELDS)),
}
}
@@ -10710,6 +10716,13 @@ impl<'de> serde::Deserialize<'de> for LogicalPlanNode {
return
Err(serde::de::Error::duplicate_field("copyTo"));
}
logical_plan_type__ =
map_.next_value::<::std::option::Option<_>>()?.map(logical_plan_node::LogicalPlanType::CopyTo)
+;
+ }
+ GeneratedField::Unnest => {
+ if logical_plan_type__.is_some() {
+ return
Err(serde::de::Error::duplicate_field("unnest"));
+ }
+ logical_plan_type__ =
map_.next_value::<::std::option::Option<_>>()?.map(logical_plan_node::LogicalPlanType::Unnest)
;
}
}
@@ -19229,6 +19242,304 @@ impl<'de> serde::Deserialize<'de> for Unnest {
deserializer.deserialize_struct("datafusion.Unnest", FIELDS,
GeneratedVisitor)
}
}
+impl serde::Serialize for UnnestNode {
+ #[allow(deprecated)]
+ fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok,
S::Error>
+ where
+ S: serde::Serializer,
+ {
+ use serde::ser::SerializeStruct;
+ let mut len = 0;
+ if self.input.is_some() {
+ len += 1;
+ }
+ if !self.exec_columns.is_empty() {
+ len += 1;
+ }
+ if !self.list_type_columns.is_empty() {
+ len += 1;
+ }
+ if !self.struct_type_columns.is_empty() {
+ len += 1;
+ }
+ if !self.dependency_indices.is_empty() {
+ len += 1;
+ }
+ if self.schema.is_some() {
+ len += 1;
+ }
+ if self.options.is_some() {
+ len += 1;
+ }
+ let mut struct_ser =
serializer.serialize_struct("datafusion.UnnestNode", len)?;
+ if let Some(v) = self.input.as_ref() {
+ struct_ser.serialize_field("input", v)?;
+ }
+ if !self.exec_columns.is_empty() {
+ struct_ser.serialize_field("execColumns", &self.exec_columns)?;
+ }
+ if !self.list_type_columns.is_empty() {
+ struct_ser.serialize_field("listTypeColumns",
&self.list_type_columns.iter().map(ToString::to_string).collect::<Vec<_>>())?;
+ }
+ if !self.struct_type_columns.is_empty() {
+ struct_ser.serialize_field("structTypeColumns",
&self.struct_type_columns.iter().map(ToString::to_string).collect::<Vec<_>>())?;
+ }
+ if !self.dependency_indices.is_empty() {
+ struct_ser.serialize_field("dependencyIndices",
&self.dependency_indices.iter().map(ToString::to_string).collect::<Vec<_>>())?;
+ }
+ if let Some(v) = self.schema.as_ref() {
+ struct_ser.serialize_field("schema", v)?;
+ }
+ if let Some(v) = self.options.as_ref() {
+ struct_ser.serialize_field("options", v)?;
+ }
+ struct_ser.end()
+ }
+}
+impl<'de> serde::Deserialize<'de> for UnnestNode {
+ #[allow(deprecated)]
+ fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
+ where
+ D: serde::Deserializer<'de>,
+ {
+ const FIELDS: &[&str] = &[
+ "input",
+ "exec_columns",
+ "execColumns",
+ "list_type_columns",
+ "listTypeColumns",
+ "struct_type_columns",
+ "structTypeColumns",
+ "dependency_indices",
+ "dependencyIndices",
+ "schema",
+ "options",
+ ];
+
+ #[allow(clippy::enum_variant_names)]
+ enum GeneratedField {
+ Input,
+ ExecColumns,
+ ListTypeColumns,
+ StructTypeColumns,
+ DependencyIndices,
+ Schema,
+ Options,
+ }
+ impl<'de> serde::Deserialize<'de> for GeneratedField {
+ fn deserialize<D>(deserializer: D) ->
std::result::Result<GeneratedField, D::Error>
+ where
+ D: serde::Deserializer<'de>,
+ {
+ struct GeneratedVisitor;
+
+ impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
+ type Value = GeneratedField;
+
+ fn expecting(&self, formatter: &mut
std::fmt::Formatter<'_>) -> std::fmt::Result {
+ write!(formatter, "expected one of: {:?}", &FIELDS)
+ }
+
+ #[allow(unused_variables)]
+ fn visit_str<E>(self, value: &str) ->
std::result::Result<GeneratedField, E>
+ where
+ E: serde::de::Error,
+ {
+ match value {
+ "input" => Ok(GeneratedField::Input),
+ "execColumns" | "exec_columns" =>
Ok(GeneratedField::ExecColumns),
+ "listTypeColumns" | "list_type_columns" =>
Ok(GeneratedField::ListTypeColumns),
+ "structTypeColumns" | "struct_type_columns" =>
Ok(GeneratedField::StructTypeColumns),
+ "dependencyIndices" | "dependency_indices" =>
Ok(GeneratedField::DependencyIndices),
+ "schema" => Ok(GeneratedField::Schema),
+ "options" => Ok(GeneratedField::Options),
+ _ => Err(serde::de::Error::unknown_field(value,
FIELDS)),
+ }
+ }
+ }
+ deserializer.deserialize_identifier(GeneratedVisitor)
+ }
+ }
+ struct GeneratedVisitor;
+ impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
+ type Value = UnnestNode;
+
+ fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) ->
std::fmt::Result {
+ formatter.write_str("struct datafusion.UnnestNode")
+ }
+
+ fn visit_map<V>(self, mut map_: V) ->
std::result::Result<UnnestNode, V::Error>
+ where
+ V: serde::de::MapAccess<'de>,
+ {
+ let mut input__ = None;
+ let mut exec_columns__ = None;
+ let mut list_type_columns__ = None;
+ let mut struct_type_columns__ = None;
+ let mut dependency_indices__ = None;
+ let mut schema__ = None;
+ let mut options__ = None;
+ while let Some(k) = map_.next_key()? {
+ match k {
+ GeneratedField::Input => {
+ if input__.is_some() {
+ return
Err(serde::de::Error::duplicate_field("input"));
+ }
+ input__ = map_.next_value()?;
+ }
+ GeneratedField::ExecColumns => {
+ if exec_columns__.is_some() {
+ return
Err(serde::de::Error::duplicate_field("execColumns"));
+ }
+ exec_columns__ = Some(map_.next_value()?);
+ }
+ GeneratedField::ListTypeColumns => {
+ if list_type_columns__.is_some() {
+ return
Err(serde::de::Error::duplicate_field("listTypeColumns"));
+ }
+ list_type_columns__ =
+
Some(map_.next_value::<Vec<::pbjson::private::NumberDeserialize<_>>>()?
+ .into_iter().map(|x| x.0).collect())
+ ;
+ }
+ GeneratedField::StructTypeColumns => {
+ if struct_type_columns__.is_some() {
+ return
Err(serde::de::Error::duplicate_field("structTypeColumns"));
+ }
+ struct_type_columns__ =
+
Some(map_.next_value::<Vec<::pbjson::private::NumberDeserialize<_>>>()?
+ .into_iter().map(|x| x.0).collect())
+ ;
+ }
+ GeneratedField::DependencyIndices => {
+ if dependency_indices__.is_some() {
+ return
Err(serde::de::Error::duplicate_field("dependencyIndices"));
+ }
+ dependency_indices__ =
+
Some(map_.next_value::<Vec<::pbjson::private::NumberDeserialize<_>>>()?
+ .into_iter().map(|x| x.0).collect())
+ ;
+ }
+ GeneratedField::Schema => {
+ if schema__.is_some() {
+ return
Err(serde::de::Error::duplicate_field("schema"));
+ }
+ schema__ = map_.next_value()?;
+ }
+ GeneratedField::Options => {
+ if options__.is_some() {
+ return
Err(serde::de::Error::duplicate_field("options"));
+ }
+ options__ = map_.next_value()?;
+ }
+ }
+ }
+ Ok(UnnestNode {
+ input: input__,
+ exec_columns: exec_columns__.unwrap_or_default(),
+ list_type_columns: list_type_columns__.unwrap_or_default(),
+ struct_type_columns:
struct_type_columns__.unwrap_or_default(),
+ dependency_indices:
dependency_indices__.unwrap_or_default(),
+ schema: schema__,
+ options: options__,
+ })
+ }
+ }
+ deserializer.deserialize_struct("datafusion.UnnestNode", FIELDS,
GeneratedVisitor)
+ }
+}
+impl serde::Serialize for UnnestOptions {
+ #[allow(deprecated)]
+ fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok,
S::Error>
+ where
+ S: serde::Serializer,
+ {
+ use serde::ser::SerializeStruct;
+ let mut len = 0;
+ if self.preserve_nulls {
+ len += 1;
+ }
+ let mut struct_ser =
serializer.serialize_struct("datafusion.UnnestOptions", len)?;
+ if self.preserve_nulls {
+ struct_ser.serialize_field("preserveNulls", &self.preserve_nulls)?;
+ }
+ struct_ser.end()
+ }
+}
+impl<'de> serde::Deserialize<'de> for UnnestOptions {
+ #[allow(deprecated)]
+ fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
+ where
+ D: serde::Deserializer<'de>,
+ {
+ const FIELDS: &[&str] = &[
+ "preserve_nulls",
+ "preserveNulls",
+ ];
+
+ #[allow(clippy::enum_variant_names)]
+ enum GeneratedField {
+ PreserveNulls,
+ }
+ impl<'de> serde::Deserialize<'de> for GeneratedField {
+ fn deserialize<D>(deserializer: D) ->
std::result::Result<GeneratedField, D::Error>
+ where
+ D: serde::Deserializer<'de>,
+ {
+ struct GeneratedVisitor;
+
+ impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
+ type Value = GeneratedField;
+
+ fn expecting(&self, formatter: &mut
std::fmt::Formatter<'_>) -> std::fmt::Result {
+ write!(formatter, "expected one of: {:?}", &FIELDS)
+ }
+
+ #[allow(unused_variables)]
+ fn visit_str<E>(self, value: &str) ->
std::result::Result<GeneratedField, E>
+ where
+ E: serde::de::Error,
+ {
+ match value {
+ "preserveNulls" | "preserve_nulls" =>
Ok(GeneratedField::PreserveNulls),
+ _ => Err(serde::de::Error::unknown_field(value,
FIELDS)),
+ }
+ }
+ }
+ deserializer.deserialize_identifier(GeneratedVisitor)
+ }
+ }
+ struct GeneratedVisitor;
+ impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
+ type Value = UnnestOptions;
+
+ fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) ->
std::fmt::Result {
+ formatter.write_str("struct datafusion.UnnestOptions")
+ }
+
+ fn visit_map<V>(self, mut map_: V) ->
std::result::Result<UnnestOptions, V::Error>
+ where
+ V: serde::de::MapAccess<'de>,
+ {
+ let mut preserve_nulls__ = None;
+ while let Some(k) = map_.next_key()? {
+ match k {
+ GeneratedField::PreserveNulls => {
+ if preserve_nulls__.is_some() {
+ return
Err(serde::de::Error::duplicate_field("preserveNulls"));
+ }
+ preserve_nulls__ = Some(map_.next_value()?);
+ }
+ }
+ }
+ Ok(UnnestOptions {
+ preserve_nulls: preserve_nulls__.unwrap_or_default(),
+ })
+ }
+ }
+ deserializer.deserialize_struct("datafusion.UnnestOptions", FIELDS,
GeneratedVisitor)
+ }
+}
impl serde::Serialize for ValuesNode {
#[allow(deprecated)]
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok,
S::Error>
diff --git a/datafusion/proto/src/generated/prost.rs
b/datafusion/proto/src/generated/prost.rs
index e9407cc65b..0354ead9e7 100644
--- a/datafusion/proto/src/generated/prost.rs
+++ b/datafusion/proto/src/generated/prost.rs
@@ -6,7 +6,7 @@
pub struct LogicalPlanNode {
#[prost(
oneof = "logical_plan_node::LogicalPlanType",
- tags = "1, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18,
19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29"
+ tags = "1, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18,
19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30"
)]
pub logical_plan_type:
::core::option::Option<logical_plan_node::LogicalPlanType>,
}
@@ -71,6 +71,8 @@ pub mod logical_plan_node {
DistinctOn(::prost::alloc::boxed::Box<super::DistinctOnNode>),
#[prost(message, tag = "29")]
CopyTo(::prost::alloc::boxed::Box<super::CopyToNode>),
+ #[prost(message, tag = "30")]
+ Unnest(::prost::alloc::boxed::Box<super::UnnestNode>),
}
}
#[allow(clippy::derive_partial_eq_without_eq)]
@@ -433,6 +435,30 @@ pub mod copy_to_node {
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct UnnestNode {
+ #[prost(message, optional, boxed, tag = "1")]
+ pub input:
::core::option::Option<::prost::alloc::boxed::Box<LogicalPlanNode>>,
+ #[prost(message, repeated, tag = "2")]
+ pub exec_columns:
::prost::alloc::vec::Vec<super::datafusion_common::Column>,
+ #[prost(uint64, repeated, tag = "3")]
+ pub list_type_columns: ::prost::alloc::vec::Vec<u64>,
+ #[prost(uint64, repeated, tag = "4")]
+ pub struct_type_columns: ::prost::alloc::vec::Vec<u64>,
+ #[prost(uint64, repeated, tag = "5")]
+ pub dependency_indices: ::prost::alloc::vec::Vec<u64>,
+ #[prost(message, optional, tag = "6")]
+ pub schema: ::core::option::Option<super::datafusion_common::DfSchema>,
+ #[prost(message, optional, tag = "7")]
+ pub options: ::core::option::Option<UnnestOptions>,
+}
+#[allow(clippy::derive_partial_eq_without_eq)]
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct UnnestOptions {
+ #[prost(bool, tag = "1")]
+ pub preserve_nulls: bool,
+}
+#[allow(clippy::derive_partial_eq_without_eq)]
+#[derive(Clone, PartialEq, ::prost::Message)]
pub struct UnionNode {
#[prost(message, repeated, tag = "1")]
pub inputs: ::prost::alloc::vec::Vec<LogicalPlanNode>,
diff --git a/datafusion/proto/src/logical_plan/from_proto.rs
b/datafusion/proto/src/logical_plan/from_proto.rs
index 905c6654cf..e2a2f875ea 100644
--- a/datafusion/proto/src/logical_plan/from_proto.rs
+++ b/datafusion/proto/src/logical_plan/from_proto.rs
@@ -20,7 +20,7 @@ use std::sync::Arc;
use datafusion::execution::registry::FunctionRegistry;
use datafusion_common::{
internal_err, plan_datafusion_err, DataFusionError, Result, ScalarValue,
- TableReference,
+ TableReference, UnnestOptions,
};
use datafusion_expr::expr::Unnest;
use datafusion_expr::expr::{Alias, Placeholder};
@@ -50,6 +50,14 @@ use crate::protobuf::{
use super::LogicalExtensionCodec;
+impl From<&protobuf::UnnestOptions> for UnnestOptions {
+ fn from(opts: &protobuf::UnnestOptions) -> Self {
+ Self {
+ preserve_nulls: opts.preserve_nulls,
+ }
+ }
+}
+
impl From<protobuf::WindowFrameUnits> for WindowFrameUnits {
fn from(units: protobuf::WindowFrameUnits) -> Self {
match units {
diff --git a/datafusion/proto/src/logical_plan/mod.rs
b/datafusion/proto/src/logical_plan/mod.rs
index 939bda9f24..ef37150a35 100644
--- a/datafusion/proto/src/logical_plan/mod.rs
+++ b/datafusion/proto/src/logical_plan/mod.rs
@@ -22,7 +22,7 @@ use std::sync::Arc;
use crate::protobuf::logical_plan_node::LogicalPlanType::CustomScan;
use crate::protobuf::{CustomTableScanNode, LogicalExprNodeCollection};
use crate::{
- convert_required,
+ convert_required, into_required,
protobuf::{
self, listing_table_scan_node::FileFormatType,
logical_plan_node::LogicalPlanType, LogicalExtensionNode,
LogicalPlanNode,
@@ -47,6 +47,7 @@ use datafusion_common::{
context, internal_datafusion_err, internal_err, not_impl_err,
DataFusionError,
Result, TableReference,
};
+use datafusion_expr::Unnest;
use datafusion_expr::{
dml,
logical_plan::{
@@ -838,6 +839,31 @@ impl AsLogicalPlan for LogicalPlanNode {
},
))
}
+ LogicalPlanType::Unnest(unnest) => {
+ let input: LogicalPlan =
+ into_logical_plan!(unnest.input, ctx, extension_codec)?;
+ Ok(datafusion_expr::LogicalPlan::Unnest(Unnest {
+ input: Arc::new(input),
+ exec_columns: unnest.exec_columns.iter().map(|c|
c.into()).collect(),
+ list_type_columns: unnest
+ .list_type_columns
+ .iter()
+ .map(|c| *c as usize)
+ .collect(),
+ struct_type_columns: unnest
+ .struct_type_columns
+ .iter()
+ .map(|c| *c as usize)
+ .collect(),
+ dependency_indices: unnest
+ .dependency_indices
+ .iter()
+ .map(|c| *c as usize)
+ .collect(),
+ schema: Arc::new(convert_required!(unnest.schema)?),
+ options: into_required!(unnest.options)?,
+ }))
+ }
}
}
@@ -1510,9 +1536,42 @@ impl AsLogicalPlan for LogicalPlanNode {
))),
})
}
- LogicalPlan::Unnest(_) => Err(proto_error(
- "LogicalPlan serde is not yet implemented for Unnest",
- )),
+ LogicalPlan::Unnest(Unnest {
+ input,
+ exec_columns,
+ list_type_columns,
+ struct_type_columns,
+ dependency_indices,
+ schema,
+ options,
+ }) => {
+ let input = protobuf::LogicalPlanNode::try_from_logical_plan(
+ input,
+ extension_codec,
+ )?;
+ Ok(protobuf::LogicalPlanNode {
+ logical_plan_type: Some(LogicalPlanType::Unnest(Box::new(
+ protobuf::UnnestNode {
+ input: Some(Box::new(input)),
+ exec_columns: exec_columns.iter().map(|c|
c.into()).collect(),
+ list_type_columns: list_type_columns
+ .iter()
+ .map(|c| *c as u64)
+ .collect(),
+ struct_type_columns: struct_type_columns
+ .iter()
+ .map(|c| *c as u64)
+ .collect(),
+ dependency_indices: dependency_indices
+ .iter()
+ .map(|c| *c as u64)
+ .collect(),
+ schema: Some(schema.try_into()?),
+ options: Some(options.into()),
+ },
+ ))),
+ })
+ }
LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(_)) =>
Err(proto_error(
"LogicalPlan serde is not yet implemented for
CreateMemoryTable",
)),
diff --git a/datafusion/proto/src/logical_plan/to_proto.rs
b/datafusion/proto/src/logical_plan/to_proto.rs
index b0059aff61..d2783305f6 100644
--- a/datafusion/proto/src/logical_plan/to_proto.rs
+++ b/datafusion/proto/src/logical_plan/to_proto.rs
@@ -19,7 +19,7 @@
//! DataFusion logical plans to be serialized and transmitted between
//! processes.
-use datafusion_common::TableReference;
+use datafusion_common::{TableReference, UnnestOptions};
use datafusion_expr::expr::{
self, AggregateFunctionDefinition, Alias, Between, BinaryExpr, Cast,
GroupingSet,
InList, Like, Placeholder, ScalarFunction, Sort, Unnest,
@@ -45,6 +45,14 @@ use crate::protobuf::{
use super::LogicalExtensionCodec;
+impl From<&UnnestOptions> for protobuf::UnnestOptions {
+ fn from(opts: &UnnestOptions) -> Self {
+ Self {
+ preserve_nulls: opts.preserve_nulls,
+ }
+ }
+}
+
impl From<&StringifiedPlan> for protobuf::StringifiedPlan {
fn from(stringified_plan: &StringifiedPlan) -> Self {
Self {
diff --git a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
index d2721dfafd..b756d4688d 100644
--- a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
+++ b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
@@ -522,6 +522,31 @@ async fn roundtrip_logical_plan_with_extension() ->
Result<()> {
Ok(())
}
+#[tokio::test]
+async fn roundtrip_logical_plan_unnest() -> Result<()> {
+ let ctx = SessionContext::new();
+ let schema = Schema::new(vec![
+ Field::new("a", DataType::Int64, true),
+ Field::new(
+ "b",
+ DataType::List(Arc::new(Field::new("item", DataType::Int32,
false))),
+ true,
+ ),
+ ]);
+ ctx.register_csv(
+ "t1",
+ "tests/testdata/test.csv",
+ CsvReadOptions::default().schema(&schema),
+ )
+ .await?;
+ let query = "SELECT unnest(b) FROM t1";
+ let plan = ctx.sql(query).await?.into_optimized_plan()?;
+ let bytes = logical_plan_to_bytes(&plan)?;
+ let logical_round_trip = logical_plan_from_bytes(&bytes, &ctx)?;
+ assert_eq!(format!("{plan:?}"), format!("{logical_round_trip:?}"));
+ Ok(())
+}
+
#[tokio::test]
async fn roundtrip_expr_api() -> Result<()> {
let ctx = SessionContext::new();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]