This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new c5108aef4 Add missing protobuf serialisation functionality
GetIndexedFieldExpr. (#5324)
c5108aef4 is described below
commit c5108aef4d2660cce950976af14d33444f27075e
Author: Ahmed Riza <[email protected]>
AuthorDate: Fri Feb 17 19:14:36 2023 +0000
Add missing protobuf serialisation functionality GetIndexedFieldExpr.
(#5324)
This is required in order for `ballista` to serialise the indexing
expressions for nested lists. Otherwise, `ballista` cannot execute
any SQL that contains indexing expressions.
---
datafusion/proto/proto/datafusion.proto | 9 +-
datafusion/proto/src/generated/pbjson.rs | 122 +++++++++++++++++++++++
datafusion/proto/src/generated/prost.rs | 14 ++-
datafusion/proto/src/physical_plan/from_proto.rs | 12 +++
datafusion/proto/src/physical_plan/mod.rs | 28 +++++-
datafusion/proto/src/physical_plan/to_proto.rs | 15 ++-
6 files changed, 195 insertions(+), 5 deletions(-)
diff --git a/datafusion/proto/proto/datafusion.proto
b/datafusion/proto/proto/datafusion.proto
index 3bcd7ad76..5bbe1e5e6 100644
--- a/datafusion/proto/proto/datafusion.proto
+++ b/datafusion/proto/proto/datafusion.proto
@@ -1023,6 +1023,8 @@ message PhysicalExprNode {
PhysicalDateTimeIntervalExprNode date_time_interval_expr = 17;
PhysicalLikeExprNode like_expr = 18;
+
+ PhysicalGetIndexedFieldExprNode get_indexed_field_expr = 19;
}
}
@@ -1345,4 +1347,9 @@ message ColumnStats {
ScalarValue max_value = 2;
uint32 null_count = 3;
uint32 distinct_count = 4;
-}
\ No newline at end of file
+}
+
+message PhysicalGetIndexedFieldExprNode {
+ PhysicalExprNode arg = 1;
+ ScalarValue key = 2;
+}
diff --git a/datafusion/proto/src/generated/pbjson.rs
b/datafusion/proto/src/generated/pbjson.rs
index 7437fc238..1c84da002 100644
--- a/datafusion/proto/src/generated/pbjson.rs
+++ b/datafusion/proto/src/generated/pbjson.rs
@@ -13209,6 +13209,9 @@ impl serde::Serialize for PhysicalExprNode {
physical_expr_node::ExprType::LikeExpr(v) => {
struct_ser.serialize_field("likeExpr", v)?;
}
+ physical_expr_node::ExprType::GetIndexedFieldExpr(v) => {
+ struct_ser.serialize_field("getIndexedFieldExpr", v)?;
+ }
}
}
struct_ser.end()
@@ -13252,6 +13255,8 @@ impl<'de> serde::Deserialize<'de> for PhysicalExprNode {
"dateTimeIntervalExpr",
"like_expr",
"likeExpr",
+ "get_indexed_field_expr",
+ "getIndexedFieldExpr",
];
#[allow(clippy::enum_variant_names)]
@@ -13274,6 +13279,7 @@ impl<'de> serde::Deserialize<'de> for PhysicalExprNode {
ScalarUdf,
DateTimeIntervalExpr,
LikeExpr,
+ GetIndexedFieldExpr,
}
impl<'de> serde::Deserialize<'de> for GeneratedField {
fn deserialize<D>(deserializer: D) ->
std::result::Result<GeneratedField, D::Error>
@@ -13313,6 +13319,7 @@ impl<'de> serde::Deserialize<'de> for PhysicalExprNode {
"scalarUdf" | "scalar_udf" =>
Ok(GeneratedField::ScalarUdf),
"dateTimeIntervalExpr" | "date_time_interval_expr"
=> Ok(GeneratedField::DateTimeIntervalExpr),
"likeExpr" | "like_expr" =>
Ok(GeneratedField::LikeExpr),
+ "getIndexedFieldExpr" | "get_indexed_field_expr"
=> Ok(GeneratedField::GetIndexedFieldExpr),
_ => Err(serde::de::Error::unknown_field(value,
FIELDS)),
}
}
@@ -13459,6 +13466,13 @@ impl<'de> serde::Deserialize<'de> for PhysicalExprNode
{
return
Err(serde::de::Error::duplicate_field("likeExpr"));
}
expr_type__ =
map.next_value::<::std::option::Option<_>>()?.map(physical_expr_node::ExprType::LikeExpr)
+;
+ }
+ GeneratedField::GetIndexedFieldExpr => {
+ if expr_type__.is_some() {
+ return
Err(serde::de::Error::duplicate_field("getIndexedFieldExpr"));
+ }
+ expr_type__ =
map.next_value::<::std::option::Option<_>>()?.map(physical_expr_node::ExprType::GetIndexedFieldExpr)
;
}
}
@@ -13581,6 +13595,114 @@ impl<'de> serde::Deserialize<'de> for
PhysicalExtensionNode {
deserializer.deserialize_struct("datafusion.PhysicalExtensionNode",
FIELDS, GeneratedVisitor)
}
}
+impl serde::Serialize for PhysicalGetIndexedFieldExprNode {
+ #[allow(deprecated)]
+ fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok,
S::Error>
+ where
+ S: serde::Serializer,
+ {
+ use serde::ser::SerializeStruct;
+ let mut len = 0;
+ if self.arg.is_some() {
+ len += 1;
+ }
+ if self.key.is_some() {
+ len += 1;
+ }
+ let mut struct_ser =
serializer.serialize_struct("datafusion.PhysicalGetIndexedFieldExprNode", len)?;
+ if let Some(v) = self.arg.as_ref() {
+ struct_ser.serialize_field("arg", v)?;
+ }
+ if let Some(v) = self.key.as_ref() {
+ struct_ser.serialize_field("key", v)?;
+ }
+ struct_ser.end()
+ }
+}
+impl<'de> serde::Deserialize<'de> for PhysicalGetIndexedFieldExprNode {
+ #[allow(deprecated)]
+ fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
+ where
+ D: serde::Deserializer<'de>,
+ {
+ const FIELDS: &[&str] = &[
+ "arg",
+ "key",
+ ];
+
+ #[allow(clippy::enum_variant_names)]
+ enum GeneratedField {
+ Arg,
+ Key,
+ }
+ impl<'de> serde::Deserialize<'de> for GeneratedField {
+ fn deserialize<D>(deserializer: D) ->
std::result::Result<GeneratedField, D::Error>
+ where
+ D: serde::Deserializer<'de>,
+ {
+ struct GeneratedVisitor;
+
+ impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
+ type Value = GeneratedField;
+
+ fn expecting(&self, formatter: &mut
std::fmt::Formatter<'_>) -> std::fmt::Result {
+ write!(formatter, "expected one of: {:?}", &FIELDS)
+ }
+
+ #[allow(unused_variables)]
+ fn visit_str<E>(self, value: &str) ->
std::result::Result<GeneratedField, E>
+ where
+ E: serde::de::Error,
+ {
+ match value {
+ "arg" => Ok(GeneratedField::Arg),
+ "key" => Ok(GeneratedField::Key),
+ _ => Err(serde::de::Error::unknown_field(value,
FIELDS)),
+ }
+ }
+ }
+ deserializer.deserialize_identifier(GeneratedVisitor)
+ }
+ }
+ struct GeneratedVisitor;
+ impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
+ type Value = PhysicalGetIndexedFieldExprNode;
+
+ fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) ->
std::fmt::Result {
+ formatter.write_str("struct
datafusion.PhysicalGetIndexedFieldExprNode")
+ }
+
+ fn visit_map<V>(self, mut map: V) ->
std::result::Result<PhysicalGetIndexedFieldExprNode, V::Error>
+ where
+ V: serde::de::MapAccess<'de>,
+ {
+ let mut arg__ = None;
+ let mut key__ = None;
+ while let Some(k) = map.next_key()? {
+ match k {
+ GeneratedField::Arg => {
+ if arg__.is_some() {
+ return
Err(serde::de::Error::duplicate_field("arg"));
+ }
+ arg__ = map.next_value()?;
+ }
+ GeneratedField::Key => {
+ if key__.is_some() {
+ return
Err(serde::de::Error::duplicate_field("key"));
+ }
+ key__ = map.next_value()?;
+ }
+ }
+ }
+ Ok(PhysicalGetIndexedFieldExprNode {
+ arg: arg__,
+ key: key__,
+ })
+ }
+ }
+
deserializer.deserialize_struct("datafusion.PhysicalGetIndexedFieldExprNode",
FIELDS, GeneratedVisitor)
+ }
+}
impl serde::Serialize for PhysicalHashRepartition {
#[allow(deprecated)]
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok,
S::Error>
diff --git a/datafusion/proto/src/generated/prost.rs
b/datafusion/proto/src/generated/prost.rs
index 67bddc23a..5497a8783 100644
--- a/datafusion/proto/src/generated/prost.rs
+++ b/datafusion/proto/src/generated/prost.rs
@@ -1394,7 +1394,7 @@ pub struct PhysicalExtensionNode {
pub struct PhysicalExprNode {
#[prost(
oneof = "physical_expr_node::ExprType",
- tags = "1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18"
+ tags = "1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18,
19"
)]
pub expr_type: ::core::option::Option<physical_expr_node::ExprType>,
}
@@ -1446,6 +1446,10 @@ pub mod physical_expr_node {
),
#[prost(message, tag = "18")]
LikeExpr(::prost::alloc::boxed::Box<super::PhysicalLikeExprNode>),
+ #[prost(message, tag = "19")]
+ GetIndexedFieldExpr(
+ ::prost::alloc::boxed::Box<super::PhysicalGetIndexedFieldExprNode>,
+ ),
}
}
#[allow(clippy::derive_partial_eq_without_eq)]
@@ -1956,6 +1960,14 @@ pub struct ColumnStats {
#[prost(uint32, tag = "4")]
pub distinct_count: u32,
}
+#[allow(clippy::derive_partial_eq_without_eq)]
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct PhysicalGetIndexedFieldExprNode {
+ #[prost(message, optional, boxed, tag = "1")]
+ pub arg:
::core::option::Option<::prost::alloc::boxed::Box<PhysicalExprNode>>,
+ #[prost(message, optional, tag = "2")]
+ pub key: ::core::option::Option<ScalarValue>,
+}
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord,
::prost::Enumeration)]
#[repr(i32)]
pub enum JoinType {
diff --git a/datafusion/proto/src/physical_plan/from_proto.rs
b/datafusion/proto/src/physical_plan/from_proto.rs
index 5669410a8..40ab07175 100644
--- a/datafusion/proto/src/physical_plan/from_proto.rs
+++ b/datafusion/proto/src/physical_plan/from_proto.rs
@@ -29,6 +29,7 @@ use datafusion::execution::FunctionRegistry;
use datafusion::logical_expr::window_function::WindowFunction;
use datafusion::physical_expr::expressions::DateTimeIntervalExpr;
use datafusion::physical_expr::{PhysicalSortExpr, ScalarFunctionExpr};
+use datafusion::physical_plan::expressions::GetIndexedFieldExpr;
use datafusion::physical_plan::expressions::LikeExpr;
use datafusion::physical_plan::file_format::FileScanConfig;
use datafusion::physical_plan::{
@@ -282,6 +283,17 @@ pub fn parse_physical_expr(
input_schema,
)?,
)),
+ ExprType::GetIndexedFieldExpr(get_indexed_field_expr) => {
+ Arc::new(GetIndexedFieldExpr::new(
+ parse_required_physical_expr(
+ get_indexed_field_expr.arg.as_deref(),
+ registry,
+ "arg",
+ input_schema,
+ )?,
+ convert_required!(get_indexed_field_expr.key)?,
+ ))
+ }
};
Ok(pexpr)
diff --git a/datafusion/proto/src/physical_plan/mod.rs
b/datafusion/proto/src/physical_plan/mod.rs
index 9647dbdd2..0898ec416 100644
--- a/datafusion/proto/src/physical_plan/mod.rs
+++ b/datafusion/proto/src/physical_plan/mod.rs
@@ -1218,7 +1218,7 @@ mod roundtrip_tests {
use datafusion::physical_expr::expressions::DateTimeIntervalExpr;
use datafusion::physical_expr::ScalarFunctionExpr;
use datafusion::physical_plan::aggregates::PhysicalGroupBy;
- use datafusion::physical_plan::expressions::like;
+ use datafusion::physical_plan::expressions::{like, GetIndexedFieldExpr};
use datafusion::physical_plan::functions;
use datafusion::physical_plan::functions::make_scalar_function;
use datafusion::physical_plan::projection::ProjectionExec;
@@ -1628,4 +1628,30 @@ mod roundtrip_tests {
)?);
roundtrip_test(plan)
}
+
+ #[test]
+ fn roundtrip_get_indexed_field() -> Result<()> {
+ let fields = vec![
+ Field::new("id", DataType::Int64, true),
+ Field::new(
+ "a",
+ DataType::List(Box::new(Field::new("item", DataType::Float64,
true))),
+ true,
+ ),
+ ];
+
+ let schema = Schema::new(fields);
+ let input = Arc::new(EmptyExec::new(false, Arc::new(schema.clone())));
+
+ let col_a = col("a", &schema)?;
+ let key = ScalarValue::Int64(Some(1));
+ let get_indexed_field_expr = Arc::new(GetIndexedFieldExpr::new(col_a,
key));
+
+ let plan = Arc::new(ProjectionExec::try_new(
+ vec![(get_indexed_field_expr, "result".to_string())],
+ input,
+ )?);
+
+ roundtrip_test(plan)
+ }
}
diff --git a/datafusion/proto/src/physical_plan/to_proto.rs
b/datafusion/proto/src/physical_plan/to_proto.rs
index a987e7fee..622f9d780 100644
--- a/datafusion/proto/src/physical_plan/to_proto.rs
+++ b/datafusion/proto/src/physical_plan/to_proto.rs
@@ -42,9 +42,9 @@ use datafusion::physical_plan::expressions::{
use datafusion::physical_plan::{AggregateExpr, PhysicalExpr};
use crate::protobuf;
-use crate::protobuf::PhysicalSortExprNode;
+use crate::protobuf::{PhysicalSortExprNode, ScalarValue};
use datafusion::logical_expr::BuiltinScalarFunction;
-use datafusion::physical_expr::expressions::DateTimeIntervalExpr;
+use datafusion::physical_expr::expressions::{DateTimeIntervalExpr,
GetIndexedFieldExpr};
use datafusion::physical_expr::ScalarFunctionExpr;
use datafusion::physical_plan::joins::utils::JoinSide;
use datafusion_common::DataFusionError;
@@ -342,6 +342,17 @@ impl TryFrom<Arc<dyn PhysicalExpr>> for
protobuf::PhysicalExprNode {
}),
)),
})
+ } else if let Some(expr) = expr.downcast_ref::<GetIndexedFieldExpr>() {
+ Ok(protobuf::PhysicalExprNode {
+ expr_type: Some(
+
protobuf::physical_expr_node::ExprType::GetIndexedFieldExpr(
+ Box::new(protobuf::PhysicalGetIndexedFieldExprNode {
+ arg:
Some(Box::new(expr.arg().to_owned().try_into()?)),
+ key: Some(ScalarValue::try_from(expr.key())?),
+ }),
+ ),
+ ),
+ })
} else {
Err(DataFusionError::Internal(format!(
"physical_plan::to_proto() unsupported expression {value:?}"