This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new c5108aef4 Add missing protobuf serialisation functionality 
GetIndexedFieldExpr. (#5324)
c5108aef4 is described below

commit c5108aef4d2660cce950976af14d33444f27075e
Author: Ahmed Riza <[email protected]>
AuthorDate: Fri Feb 17 19:14:36 2023 +0000

    Add missing protobuf serialisation functionality GetIndexedFieldExpr. 
(#5324)
    
    This is required in order for `ballista` to serialise the indexing
    expressions for nested lists. Otherwise, `ballista` cannot execute
    any SQL that contains indexing expressions.
---
 datafusion/proto/proto/datafusion.proto          |   9 +-
 datafusion/proto/src/generated/pbjson.rs         | 122 +++++++++++++++++++++++
 datafusion/proto/src/generated/prost.rs          |  14 ++-
 datafusion/proto/src/physical_plan/from_proto.rs |  12 +++
 datafusion/proto/src/physical_plan/mod.rs        |  28 +++++-
 datafusion/proto/src/physical_plan/to_proto.rs   |  15 ++-
 6 files changed, 195 insertions(+), 5 deletions(-)

diff --git a/datafusion/proto/proto/datafusion.proto 
b/datafusion/proto/proto/datafusion.proto
index 3bcd7ad76..5bbe1e5e6 100644
--- a/datafusion/proto/proto/datafusion.proto
+++ b/datafusion/proto/proto/datafusion.proto
@@ -1023,6 +1023,8 @@ message PhysicalExprNode {
     PhysicalDateTimeIntervalExprNode date_time_interval_expr = 17;
 
     PhysicalLikeExprNode like_expr = 18;
+
+    PhysicalGetIndexedFieldExprNode get_indexed_field_expr = 19;
   }
 }
 
@@ -1345,4 +1347,9 @@ message ColumnStats {
   ScalarValue max_value = 2;
   uint32 null_count = 3;
   uint32 distinct_count = 4;
-}
\ No newline at end of file
+}
+
+message PhysicalGetIndexedFieldExprNode {
+  PhysicalExprNode arg = 1;
+  ScalarValue key = 2;
+}
diff --git a/datafusion/proto/src/generated/pbjson.rs 
b/datafusion/proto/src/generated/pbjson.rs
index 7437fc238..1c84da002 100644
--- a/datafusion/proto/src/generated/pbjson.rs
+++ b/datafusion/proto/src/generated/pbjson.rs
@@ -13209,6 +13209,9 @@ impl serde::Serialize for PhysicalExprNode {
                 physical_expr_node::ExprType::LikeExpr(v) => {
                     struct_ser.serialize_field("likeExpr", v)?;
                 }
+                physical_expr_node::ExprType::GetIndexedFieldExpr(v) => {
+                    struct_ser.serialize_field("getIndexedFieldExpr", v)?;
+                }
             }
         }
         struct_ser.end()
@@ -13252,6 +13255,8 @@ impl<'de> serde::Deserialize<'de> for PhysicalExprNode {
             "dateTimeIntervalExpr",
             "like_expr",
             "likeExpr",
+            "get_indexed_field_expr",
+            "getIndexedFieldExpr",
         ];
 
         #[allow(clippy::enum_variant_names)]
@@ -13274,6 +13279,7 @@ impl<'de> serde::Deserialize<'de> for PhysicalExprNode {
             ScalarUdf,
             DateTimeIntervalExpr,
             LikeExpr,
+            GetIndexedFieldExpr,
         }
         impl<'de> serde::Deserialize<'de> for GeneratedField {
             fn deserialize<D>(deserializer: D) -> 
std::result::Result<GeneratedField, D::Error>
@@ -13313,6 +13319,7 @@ impl<'de> serde::Deserialize<'de> for PhysicalExprNode {
                             "scalarUdf" | "scalar_udf" => 
Ok(GeneratedField::ScalarUdf),
                             "dateTimeIntervalExpr" | "date_time_interval_expr" 
=> Ok(GeneratedField::DateTimeIntervalExpr),
                             "likeExpr" | "like_expr" => 
Ok(GeneratedField::LikeExpr),
+                            "getIndexedFieldExpr" | "get_indexed_field_expr" 
=> Ok(GeneratedField::GetIndexedFieldExpr),
                             _ => Err(serde::de::Error::unknown_field(value, 
FIELDS)),
                         }
                     }
@@ -13459,6 +13466,13 @@ impl<'de> serde::Deserialize<'de> for PhysicalExprNode 
{
                                 return 
Err(serde::de::Error::duplicate_field("likeExpr"));
                             }
                             expr_type__ = 
map.next_value::<::std::option::Option<_>>()?.map(physical_expr_node::ExprType::LikeExpr)
+;
+                        }
+                        GeneratedField::GetIndexedFieldExpr => {
+                            if expr_type__.is_some() {
+                                return 
Err(serde::de::Error::duplicate_field("getIndexedFieldExpr"));
+                            }
+                            expr_type__ = 
map.next_value::<::std::option::Option<_>>()?.map(physical_expr_node::ExprType::GetIndexedFieldExpr)
 ;
                         }
                     }
@@ -13581,6 +13595,114 @@ impl<'de> serde::Deserialize<'de> for 
PhysicalExtensionNode {
         deserializer.deserialize_struct("datafusion.PhysicalExtensionNode", 
FIELDS, GeneratedVisitor)
     }
 }
+impl serde::Serialize for PhysicalGetIndexedFieldExprNode {
+    #[allow(deprecated)]
+    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, 
S::Error>
+    where
+        S: serde::Serializer,
+    {
+        use serde::ser::SerializeStruct;
+        let mut len = 0;
+        if self.arg.is_some() {
+            len += 1;
+        }
+        if self.key.is_some() {
+            len += 1;
+        }
+        let mut struct_ser = 
serializer.serialize_struct("datafusion.PhysicalGetIndexedFieldExprNode", len)?;
+        if let Some(v) = self.arg.as_ref() {
+            struct_ser.serialize_field("arg", v)?;
+        }
+        if let Some(v) = self.key.as_ref() {
+            struct_ser.serialize_field("key", v)?;
+        }
+        struct_ser.end()
+    }
+}
+impl<'de> serde::Deserialize<'de> for PhysicalGetIndexedFieldExprNode {
+    #[allow(deprecated)]
+    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
+    where
+        D: serde::Deserializer<'de>,
+    {
+        const FIELDS: &[&str] = &[
+            "arg",
+            "key",
+        ];
+
+        #[allow(clippy::enum_variant_names)]
+        enum GeneratedField {
+            Arg,
+            Key,
+        }
+        impl<'de> serde::Deserialize<'de> for GeneratedField {
+            fn deserialize<D>(deserializer: D) -> 
std::result::Result<GeneratedField, D::Error>
+            where
+                D: serde::Deserializer<'de>,
+            {
+                struct GeneratedVisitor;
+
+                impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
+                    type Value = GeneratedField;
+
+                    fn expecting(&self, formatter: &mut 
std::fmt::Formatter<'_>) -> std::fmt::Result {
+                        write!(formatter, "expected one of: {:?}", &FIELDS)
+                    }
+
+                    #[allow(unused_variables)]
+                    fn visit_str<E>(self, value: &str) -> 
std::result::Result<GeneratedField, E>
+                    where
+                        E: serde::de::Error,
+                    {
+                        match value {
+                            "arg" => Ok(GeneratedField::Arg),
+                            "key" => Ok(GeneratedField::Key),
+                            _ => Err(serde::de::Error::unknown_field(value, 
FIELDS)),
+                        }
+                    }
+                }
+                deserializer.deserialize_identifier(GeneratedVisitor)
+            }
+        }
+        struct GeneratedVisitor;
+        impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
+            type Value = PhysicalGetIndexedFieldExprNode;
+
+            fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> 
std::fmt::Result {
+                formatter.write_str("struct 
datafusion.PhysicalGetIndexedFieldExprNode")
+            }
+
+            fn visit_map<V>(self, mut map: V) -> 
std::result::Result<PhysicalGetIndexedFieldExprNode, V::Error>
+                where
+                    V: serde::de::MapAccess<'de>,
+            {
+                let mut arg__ = None;
+                let mut key__ = None;
+                while let Some(k) = map.next_key()? {
+                    match k {
+                        GeneratedField::Arg => {
+                            if arg__.is_some() {
+                                return 
Err(serde::de::Error::duplicate_field("arg"));
+                            }
+                            arg__ = map.next_value()?;
+                        }
+                        GeneratedField::Key => {
+                            if key__.is_some() {
+                                return 
Err(serde::de::Error::duplicate_field("key"));
+                            }
+                            key__ = map.next_value()?;
+                        }
+                    }
+                }
+                Ok(PhysicalGetIndexedFieldExprNode {
+                    arg: arg__,
+                    key: key__,
+                })
+            }
+        }
+        
deserializer.deserialize_struct("datafusion.PhysicalGetIndexedFieldExprNode", 
FIELDS, GeneratedVisitor)
+    }
+}
 impl serde::Serialize for PhysicalHashRepartition {
     #[allow(deprecated)]
     fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, 
S::Error>
diff --git a/datafusion/proto/src/generated/prost.rs 
b/datafusion/proto/src/generated/prost.rs
index 67bddc23a..5497a8783 100644
--- a/datafusion/proto/src/generated/prost.rs
+++ b/datafusion/proto/src/generated/prost.rs
@@ -1394,7 +1394,7 @@ pub struct PhysicalExtensionNode {
 pub struct PhysicalExprNode {
     #[prost(
         oneof = "physical_expr_node::ExprType",
-        tags = "1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18"
+        tags = "1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 
19"
     )]
     pub expr_type: ::core::option::Option<physical_expr_node::ExprType>,
 }
@@ -1446,6 +1446,10 @@ pub mod physical_expr_node {
         ),
         #[prost(message, tag = "18")]
         LikeExpr(::prost::alloc::boxed::Box<super::PhysicalLikeExprNode>),
+        #[prost(message, tag = "19")]
+        GetIndexedFieldExpr(
+            ::prost::alloc::boxed::Box<super::PhysicalGetIndexedFieldExprNode>,
+        ),
     }
 }
 #[allow(clippy::derive_partial_eq_without_eq)]
@@ -1956,6 +1960,14 @@ pub struct ColumnStats {
     #[prost(uint32, tag = "4")]
     pub distinct_count: u32,
 }
+#[allow(clippy::derive_partial_eq_without_eq)]
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct PhysicalGetIndexedFieldExprNode {
+    #[prost(message, optional, boxed, tag = "1")]
+    pub arg: 
::core::option::Option<::prost::alloc::boxed::Box<PhysicalExprNode>>,
+    #[prost(message, optional, tag = "2")]
+    pub key: ::core::option::Option<ScalarValue>,
+}
 #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, 
::prost::Enumeration)]
 #[repr(i32)]
 pub enum JoinType {
diff --git a/datafusion/proto/src/physical_plan/from_proto.rs 
b/datafusion/proto/src/physical_plan/from_proto.rs
index 5669410a8..40ab07175 100644
--- a/datafusion/proto/src/physical_plan/from_proto.rs
+++ b/datafusion/proto/src/physical_plan/from_proto.rs
@@ -29,6 +29,7 @@ use datafusion::execution::FunctionRegistry;
 use datafusion::logical_expr::window_function::WindowFunction;
 use datafusion::physical_expr::expressions::DateTimeIntervalExpr;
 use datafusion::physical_expr::{PhysicalSortExpr, ScalarFunctionExpr};
+use datafusion::physical_plan::expressions::GetIndexedFieldExpr;
 use datafusion::physical_plan::expressions::LikeExpr;
 use datafusion::physical_plan::file_format::FileScanConfig;
 use datafusion::physical_plan::{
@@ -282,6 +283,17 @@ pub fn parse_physical_expr(
                 input_schema,
             )?,
         )),
+        ExprType::GetIndexedFieldExpr(get_indexed_field_expr) => {
+            Arc::new(GetIndexedFieldExpr::new(
+                parse_required_physical_expr(
+                    get_indexed_field_expr.arg.as_deref(),
+                    registry,
+                    "arg",
+                    input_schema,
+                )?,
+                convert_required!(get_indexed_field_expr.key)?,
+            ))
+        }
     };
 
     Ok(pexpr)
diff --git a/datafusion/proto/src/physical_plan/mod.rs 
b/datafusion/proto/src/physical_plan/mod.rs
index 9647dbdd2..0898ec416 100644
--- a/datafusion/proto/src/physical_plan/mod.rs
+++ b/datafusion/proto/src/physical_plan/mod.rs
@@ -1218,7 +1218,7 @@ mod roundtrip_tests {
     use datafusion::physical_expr::expressions::DateTimeIntervalExpr;
     use datafusion::physical_expr::ScalarFunctionExpr;
     use datafusion::physical_plan::aggregates::PhysicalGroupBy;
-    use datafusion::physical_plan::expressions::like;
+    use datafusion::physical_plan::expressions::{like, GetIndexedFieldExpr};
     use datafusion::physical_plan::functions;
     use datafusion::physical_plan::functions::make_scalar_function;
     use datafusion::physical_plan::projection::ProjectionExec;
@@ -1628,4 +1628,30 @@ mod roundtrip_tests {
         )?);
         roundtrip_test(plan)
     }
+
+    #[test]
+    fn roundtrip_get_indexed_field() -> Result<()> {
+        let fields = vec![
+            Field::new("id", DataType::Int64, true),
+            Field::new(
+                "a",
+                DataType::List(Box::new(Field::new("item", DataType::Float64, 
true))),
+                true,
+            ),
+        ];
+
+        let schema = Schema::new(fields);
+        let input = Arc::new(EmptyExec::new(false, Arc::new(schema.clone())));
+
+        let col_a = col("a", &schema)?;
+        let key = ScalarValue::Int64(Some(1));
+        let get_indexed_field_expr = Arc::new(GetIndexedFieldExpr::new(col_a, 
key));
+
+        let plan = Arc::new(ProjectionExec::try_new(
+            vec![(get_indexed_field_expr, "result".to_string())],
+            input,
+        )?);
+
+        roundtrip_test(plan)
+    }
 }
diff --git a/datafusion/proto/src/physical_plan/to_proto.rs 
b/datafusion/proto/src/physical_plan/to_proto.rs
index a987e7fee..622f9d780 100644
--- a/datafusion/proto/src/physical_plan/to_proto.rs
+++ b/datafusion/proto/src/physical_plan/to_proto.rs
@@ -42,9 +42,9 @@ use datafusion::physical_plan::expressions::{
 use datafusion::physical_plan::{AggregateExpr, PhysicalExpr};
 
 use crate::protobuf;
-use crate::protobuf::PhysicalSortExprNode;
+use crate::protobuf::{PhysicalSortExprNode, ScalarValue};
 use datafusion::logical_expr::BuiltinScalarFunction;
-use datafusion::physical_expr::expressions::DateTimeIntervalExpr;
+use datafusion::physical_expr::expressions::{DateTimeIntervalExpr, 
GetIndexedFieldExpr};
 use datafusion::physical_expr::ScalarFunctionExpr;
 use datafusion::physical_plan::joins::utils::JoinSide;
 use datafusion_common::DataFusionError;
@@ -342,6 +342,17 @@ impl TryFrom<Arc<dyn PhysicalExpr>> for 
protobuf::PhysicalExprNode {
                     }),
                 )),
             })
+        } else if let Some(expr) = expr.downcast_ref::<GetIndexedFieldExpr>() {
+            Ok(protobuf::PhysicalExprNode {
+                expr_type: Some(
+                    
protobuf::physical_expr_node::ExprType::GetIndexedFieldExpr(
+                        Box::new(protobuf::PhysicalGetIndexedFieldExprNode {
+                            arg: 
Some(Box::new(expr.arg().to_owned().try_into()?)),
+                            key: Some(ScalarValue::try_from(expr.key())?),
+                        }),
+                    ),
+                ),
+            })
         } else {
             Err(DataFusionError::Internal(format!(
                 "physical_plan::to_proto() unsupported expression {value:?}"

Reply via email to