vrongmeal commented on code in PR #7557:
URL: https://github.com/apache/arrow-datafusion/pull/7557#discussion_r1332778771
##########
datafusion/proto/src/physical_plan/to_proto.rs:
##########
@@ -196,6 +216,209 @@ impl TryFrom<Arc<dyn AggregateExpr>> for
protobuf::PhysicalExprNode {
}
}
+impl TryFrom<Arc<dyn WindowExpr>> for protobuf::PhysicalWindowExprNode {
+ type Error = DataFusionError;
+
+ fn try_from(
+ window_expr: Arc<dyn WindowExpr>,
+ ) -> std::result::Result<Self, Self::Error> {
+ let expr = window_expr.as_any();
+
+ let mut args = window_expr.expressions().to_vec();
+
+ let window_function = if let Some(built_in_window_expr) =
+ expr.downcast_ref::<BuiltInWindowExpr>()
+ {
+ let expr = built_in_window_expr.get_built_in_func_expr();
+ let built_in_fn_expr = expr.as_any();
+
+ let builtin_fn = if
built_in_fn_expr.downcast_ref::<RowNumber>().is_some() {
+ protobuf::BuiltInWindowFunction::RowNumber
+ } else if let Some(rank_expr) =
built_in_fn_expr.downcast_ref::<Rank>() {
+ match rank_expr.get_type() {
+ RankType::Basic => protobuf::BuiltInWindowFunction::Rank,
+ RankType::Dense =>
protobuf::BuiltInWindowFunction::DenseRank,
+ RankType::Percent =>
protobuf::BuiltInWindowFunction::PercentRank,
+ }
+ } else if built_in_fn_expr.downcast_ref::<CumeDist>().is_some() {
+ protobuf::BuiltInWindowFunction::CumeDist
+ } else if let Some(ntile_expr) =
built_in_fn_expr.downcast_ref::<Ntile>() {
+ args.insert(
+ 0,
+
Arc::new(Literal::new(datafusion_common::ScalarValue::Int64(Some(
+ ntile_expr.get_n() as i64,
+ )))),
+ );
+ protobuf::BuiltInWindowFunction::Ntile
+ } else if let Some(window_shift_expr) =
+ built_in_fn_expr.downcast_ref::<WindowShift>()
+ {
+ args.insert(
+ 1,
+
Arc::new(Literal::new(datafusion_common::ScalarValue::Int64(Some(
+ window_shift_expr.get_shift_offset(),
+ )))),
+ );
+ if let Some(default_value) =
window_shift_expr.get_default_value() {
+ args.insert(2, Arc::new(Literal::new(default_value)));
+ }
+ if window_shift_expr.get_shift_offset() >= 0 {
+ protobuf::BuiltInWindowFunction::Lag
+ } else {
+ protobuf::BuiltInWindowFunction::Lead
+ }
+ } else if let Some(nth_value_expr) =
+ built_in_fn_expr.downcast_ref::<NthValue>()
+ {
+ match nth_value_expr.get_kind() {
+ NthValueKind::First =>
protobuf::BuiltInWindowFunction::FirstValue,
+ NthValueKind::Last =>
protobuf::BuiltInWindowFunction::LastValue,
+ NthValueKind::Nth(n) => {
+ args.insert(
+ 1,
+ Arc::new(Literal::new(
+ datafusion_common::ScalarValue::Int64(Some(n
as i64)),
+ )),
+ );
+ protobuf::BuiltInWindowFunction::NthValue
+ }
+ }
+ } else {
+ return not_impl_err!("BuiltIn function not supported:
{expr:?}");
+ };
+
+
physical_window_expr_node::WindowFunction::BuiltInFunction(builtin_fn as i32)
+ } else if let Some(plain_aggr_window_expr) =
+ expr.downcast_ref::<PlainAggregateWindowExpr>()
+ {
+
aggr_expr_to_window_fn(plain_aggr_window_expr.get_aggregate_expr().as_ref())?
+ } else if let Some(sliding_aggr_window_expr) =
+ expr.downcast_ref::<SlidingAggregateWindowExpr>()
+ {
+ aggr_expr_to_window_fn(
+ sliding_aggr_window_expr.get_aggregate_expr().as_ref(),
+ )?
+ } else {
+ return not_impl_err!("WindowExpr not supported: {window_expr:?}");
+ };
+
+ let args = args
+ .into_iter()
+ .map(|e| e.try_into())
+ .collect::<Result<Vec<protobuf::PhysicalExprNode>>>()?;
+
+ let partition_by = window_expr
+ .partition_by()
+ .iter()
+ .map(|p| p.clone().try_into())
+ .collect::<Result<Vec<protobuf::PhysicalExprNode>>>()?;
+
+ let order_by = window_expr
+ .order_by()
+ .iter()
+ .map(|o| o.clone().try_into())
+ .collect::<Result<Vec<protobuf::PhysicalSortExprNode>>>()?;
+
+ let window_frame: protobuf::WindowFrame = window_expr
+ .get_window_frame()
+ .as_ref()
+ .try_into()
+ .map_err(|e| DataFusionError::Internal(format!("{e}")))?;
+
+ let name = window_expr.name().to_string();
+
+ Ok(protobuf::PhysicalWindowExprNode {
+ args,
+ partition_by,
+ order_by,
+ window_frame: Some(window_frame),
+ window_function: Some(window_function),
+ name,
+ })
+ }
+}
+
+fn aggr_expr_to_window_fn(
Review Comment:
Yeah, i saw that. Will refactor and combine both the funcs.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]