This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 5bd8751ecf Separate proto partitioning (#10708)
5bd8751ecf is described below

commit 5bd8751ecf55d028e991f51a3fb37cb89bcbd1be
Author: 张林伟 <[email protected]>
AuthorDate: Wed May 29 18:12:17 2024 +0800

    Separate proto partitioning (#10708)
---
 datafusion/proto/proto/datafusion.proto          |  15 +-
 datafusion/proto/src/generated/pbjson.rs         | 179 +++++++++++++++++------
 datafusion/proto/src/generated/prost.rs          |  25 +++-
 datafusion/proto/src/physical_plan/from_proto.rs |  32 ++++
 datafusion/proto/src/physical_plan/mod.rs        |  81 ++--------
 datafusion/proto/src/physical_plan/to_proto.rs   |  32 +++-
 6 files changed, 244 insertions(+), 120 deletions(-)

diff --git a/datafusion/proto/proto/datafusion.proto 
b/datafusion/proto/proto/datafusion.proto
index 448d9f0582..c065948d3b 100644
--- a/datafusion/proto/proto/datafusion.proto
+++ b/datafusion/proto/proto/datafusion.proto
@@ -1196,10 +1196,19 @@ message PhysicalHashRepartition {
 
 message RepartitionExecNode{
   PhysicalPlanNode input = 1;
+  // oneof partition_method {
+  //   uint64 round_robin = 2;
+  //   PhysicalHashRepartition hash = 3;
+  //   uint64 unknown = 4;
+  // }
+  Partitioning partitioning = 5;
+}
+
+message Partitioning {
   oneof partition_method {
-    uint64 round_robin = 2;
-    PhysicalHashRepartition hash = 3;
-    uint64 unknown = 4;
+    uint64 round_robin = 1;
+    PhysicalHashRepartition hash = 2;
+    uint64 unknown = 3;
   }
 }
 
diff --git a/datafusion/proto/src/generated/pbjson.rs 
b/datafusion/proto/src/generated/pbjson.rs
index 76a367b402..7e7a14a5d1 100644
--- a/datafusion/proto/src/generated/pbjson.rs
+++ b/datafusion/proto/src/generated/pbjson.rs
@@ -12717,6 +12717,129 @@ impl<'de> serde::Deserialize<'de> for PartitionedFile 
{
         deserializer.deserialize_struct("datafusion.PartitionedFile", FIELDS, 
GeneratedVisitor)
     }
 }
+impl serde::Serialize for Partitioning {
+    #[allow(deprecated)]
+    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, 
S::Error>
+    where
+        S: serde::Serializer,
+    {
+        use serde::ser::SerializeStruct;
+        let mut len = 0;
+        if self.partition_method.is_some() {
+            len += 1;
+        }
+        let mut struct_ser = 
serializer.serialize_struct("datafusion.Partitioning", len)?;
+        if let Some(v) = self.partition_method.as_ref() {
+            match v {
+                partitioning::PartitionMethod::RoundRobin(v) => {
+                    #[allow(clippy::needless_borrow)]
+                    struct_ser.serialize_field("roundRobin", 
ToString::to_string(&v).as_str())?;
+                }
+                partitioning::PartitionMethod::Hash(v) => {
+                    struct_ser.serialize_field("hash", v)?;
+                }
+                partitioning::PartitionMethod::Unknown(v) => {
+                    #[allow(clippy::needless_borrow)]
+                    struct_ser.serialize_field("unknown", 
ToString::to_string(&v).as_str())?;
+                }
+            }
+        }
+        struct_ser.end()
+    }
+}
+impl<'de> serde::Deserialize<'de> for Partitioning {
+    #[allow(deprecated)]
+    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
+    where
+        D: serde::Deserializer<'de>,
+    {
+        const FIELDS: &[&str] = &[
+            "round_robin",
+            "roundRobin",
+            "hash",
+            "unknown",
+        ];
+
+        #[allow(clippy::enum_variant_names)]
+        enum GeneratedField {
+            RoundRobin,
+            Hash,
+            Unknown,
+        }
+        impl<'de> serde::Deserialize<'de> for GeneratedField {
+            fn deserialize<D>(deserializer: D) -> 
std::result::Result<GeneratedField, D::Error>
+            where
+                D: serde::Deserializer<'de>,
+            {
+                struct GeneratedVisitor;
+
+                impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
+                    type Value = GeneratedField;
+
+                    fn expecting(&self, formatter: &mut 
std::fmt::Formatter<'_>) -> std::fmt::Result {
+                        write!(formatter, "expected one of: {:?}", &FIELDS)
+                    }
+
+                    #[allow(unused_variables)]
+                    fn visit_str<E>(self, value: &str) -> 
std::result::Result<GeneratedField, E>
+                    where
+                        E: serde::de::Error,
+                    {
+                        match value {
+                            "roundRobin" | "round_robin" => 
Ok(GeneratedField::RoundRobin),
+                            "hash" => Ok(GeneratedField::Hash),
+                            "unknown" => Ok(GeneratedField::Unknown),
+                            _ => Err(serde::de::Error::unknown_field(value, 
FIELDS)),
+                        }
+                    }
+                }
+                deserializer.deserialize_identifier(GeneratedVisitor)
+            }
+        }
+        struct GeneratedVisitor;
+        impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
+            type Value = Partitioning;
+
+            fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> 
std::fmt::Result {
+                formatter.write_str("struct datafusion.Partitioning")
+            }
+
+            fn visit_map<V>(self, mut map_: V) -> 
std::result::Result<Partitioning, V::Error>
+                where
+                    V: serde::de::MapAccess<'de>,
+            {
+                let mut partition_method__ = None;
+                while let Some(k) = map_.next_key()? {
+                    match k {
+                        GeneratedField::RoundRobin => {
+                            if partition_method__.is_some() {
+                                return 
Err(serde::de::Error::duplicate_field("roundRobin"));
+                            }
+                            partition_method__ = 
map_.next_value::<::std::option::Option<::pbjson::private::NumberDeserialize<_>>>()?.map(|x|
 partitioning::PartitionMethod::RoundRobin(x.0));
+                        }
+                        GeneratedField::Hash => {
+                            if partition_method__.is_some() {
+                                return 
Err(serde::de::Error::duplicate_field("hash"));
+                            }
+                            partition_method__ = 
map_.next_value::<::std::option::Option<_>>()?.map(partitioning::PartitionMethod::Hash)
+;
+                        }
+                        GeneratedField::Unknown => {
+                            if partition_method__.is_some() {
+                                return 
Err(serde::de::Error::duplicate_field("unknown"));
+                            }
+                            partition_method__ = 
map_.next_value::<::std::option::Option<::pbjson::private::NumberDeserialize<_>>>()?.map(|x|
 partitioning::PartitionMethod::Unknown(x.0));
+                        }
+                    }
+                }
+                Ok(Partitioning {
+                    partition_method: partition_method__,
+                })
+            }
+        }
+        deserializer.deserialize_struct("datafusion.Partitioning", FIELDS, 
GeneratedVisitor)
+    }
+}
 impl serde::Serialize for PhysicalAggregateExprNode {
     #[allow(deprecated)]
     fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, 
S::Error>
@@ -16885,27 +17008,15 @@ impl serde::Serialize for RepartitionExecNode {
         if self.input.is_some() {
             len += 1;
         }
-        if self.partition_method.is_some() {
+        if self.partitioning.is_some() {
             len += 1;
         }
         let mut struct_ser = 
serializer.serialize_struct("datafusion.RepartitionExecNode", len)?;
         if let Some(v) = self.input.as_ref() {
             struct_ser.serialize_field("input", v)?;
         }
-        if let Some(v) = self.partition_method.as_ref() {
-            match v {
-                repartition_exec_node::PartitionMethod::RoundRobin(v) => {
-                    #[allow(clippy::needless_borrow)]
-                    struct_ser.serialize_field("roundRobin", 
ToString::to_string(&v).as_str())?;
-                }
-                repartition_exec_node::PartitionMethod::Hash(v) => {
-                    struct_ser.serialize_field("hash", v)?;
-                }
-                repartition_exec_node::PartitionMethod::Unknown(v) => {
-                    #[allow(clippy::needless_borrow)]
-                    struct_ser.serialize_field("unknown", 
ToString::to_string(&v).as_str())?;
-                }
-            }
+        if let Some(v) = self.partitioning.as_ref() {
+            struct_ser.serialize_field("partitioning", v)?;
         }
         struct_ser.end()
     }
@@ -16918,18 +17029,13 @@ impl<'de> serde::Deserialize<'de> for 
RepartitionExecNode {
     {
         const FIELDS: &[&str] = &[
             "input",
-            "round_robin",
-            "roundRobin",
-            "hash",
-            "unknown",
+            "partitioning",
         ];
 
         #[allow(clippy::enum_variant_names)]
         enum GeneratedField {
             Input,
-            RoundRobin,
-            Hash,
-            Unknown,
+            Partitioning,
         }
         impl<'de> serde::Deserialize<'de> for GeneratedField {
             fn deserialize<D>(deserializer: D) -> 
std::result::Result<GeneratedField, D::Error>
@@ -16952,9 +17058,7 @@ impl<'de> serde::Deserialize<'de> for 
RepartitionExecNode {
                     {
                         match value {
                             "input" => Ok(GeneratedField::Input),
-                            "roundRobin" | "round_robin" => 
Ok(GeneratedField::RoundRobin),
-                            "hash" => Ok(GeneratedField::Hash),
-                            "unknown" => Ok(GeneratedField::Unknown),
+                            "partitioning" => Ok(GeneratedField::Partitioning),
                             _ => Err(serde::de::Error::unknown_field(value, 
FIELDS)),
                         }
                     }
@@ -16975,7 +17079,7 @@ impl<'de> serde::Deserialize<'de> for 
RepartitionExecNode {
                     V: serde::de::MapAccess<'de>,
             {
                 let mut input__ = None;
-                let mut partition_method__ = None;
+                let mut partitioning__ = None;
                 while let Some(k) = map_.next_key()? {
                     match k {
                         GeneratedField::Input => {
@@ -16984,30 +17088,17 @@ impl<'de> serde::Deserialize<'de> for 
RepartitionExecNode {
                             }
                             input__ = map_.next_value()?;
                         }
-                        GeneratedField::RoundRobin => {
-                            if partition_method__.is_some() {
-                                return 
Err(serde::de::Error::duplicate_field("roundRobin"));
-                            }
-                            partition_method__ = 
map_.next_value::<::std::option::Option<::pbjson::private::NumberDeserialize<_>>>()?.map(|x|
 repartition_exec_node::PartitionMethod::RoundRobin(x.0));
-                        }
-                        GeneratedField::Hash => {
-                            if partition_method__.is_some() {
-                                return 
Err(serde::de::Error::duplicate_field("hash"));
-                            }
-                            partition_method__ = 
map_.next_value::<::std::option::Option<_>>()?.map(repartition_exec_node::PartitionMethod::Hash)
-;
-                        }
-                        GeneratedField::Unknown => {
-                            if partition_method__.is_some() {
-                                return 
Err(serde::de::Error::duplicate_field("unknown"));
+                        GeneratedField::Partitioning => {
+                            if partitioning__.is_some() {
+                                return 
Err(serde::de::Error::duplicate_field("partitioning"));
                             }
-                            partition_method__ = 
map_.next_value::<::std::option::Option<::pbjson::private::NumberDeserialize<_>>>()?.map(|x|
 repartition_exec_node::PartitionMethod::Unknown(x.0));
+                            partitioning__ = map_.next_value()?;
                         }
                     }
                 }
                 Ok(RepartitionExecNode {
                     input: input__,
-                    partition_method: partition_method__,
+                    partitioning: partitioning__,
                 })
             }
         }
diff --git a/datafusion/proto/src/generated/prost.rs 
b/datafusion/proto/src/generated/prost.rs
index 5e0f6613f3..f9138da3ab 100644
--- a/datafusion/proto/src/generated/prost.rs
+++ b/datafusion/proto/src/generated/prost.rs
@@ -1850,19 +1850,30 @@ pub struct PhysicalHashRepartition {
 pub struct RepartitionExecNode {
     #[prost(message, optional, boxed, tag = "1")]
     pub input: 
::core::option::Option<::prost::alloc::boxed::Box<PhysicalPlanNode>>,
-    #[prost(oneof = "repartition_exec_node::PartitionMethod", tags = "2, 3, 
4")]
-    pub partition_method: 
::core::option::Option<repartition_exec_node::PartitionMethod>,
+    /// oneof partition_method {
+    ///    uint64 round_robin = 2;
+    ///    PhysicalHashRepartition hash = 3;
+    ///    uint64 unknown = 4;
+    /// }
+    #[prost(message, optional, tag = "5")]
+    pub partitioning: ::core::option::Option<Partitioning>,
+}
+#[allow(clippy::derive_partial_eq_without_eq)]
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct Partitioning {
+    #[prost(oneof = "partitioning::PartitionMethod", tags = "1, 2, 3")]
+    pub partition_method: 
::core::option::Option<partitioning::PartitionMethod>,
 }
-/// Nested message and enum types in `RepartitionExecNode`.
-pub mod repartition_exec_node {
+/// Nested message and enum types in `Partitioning`.
+pub mod partitioning {
     #[allow(clippy::derive_partial_eq_without_eq)]
     #[derive(Clone, PartialEq, ::prost::Oneof)]
     pub enum PartitionMethod {
-        #[prost(uint64, tag = "2")]
+        #[prost(uint64, tag = "1")]
         RoundRobin(u64),
-        #[prost(message, tag = "3")]
+        #[prost(message, tag = "2")]
         Hash(super::PhysicalHashRepartition),
-        #[prost(uint64, tag = "4")]
+        #[prost(uint64, tag = "3")]
         Unknown(u64),
     }
 }
diff --git a/datafusion/proto/src/physical_plan/from_proto.rs 
b/datafusion/proto/src/physical_plan/from_proto.rs
index c36f60ee44..cf935e6b83 100644
--- a/datafusion/proto/src/physical_plan/from_proto.rs
+++ b/datafusion/proto/src/physical_plan/from_proto.rs
@@ -438,6 +438,38 @@ pub fn parse_protobuf_hash_partitioning(
     }
 }
 
+pub fn parse_protobuf_partitioning(
+    partitioning: Option<&protobuf::Partitioning>,
+    registry: &dyn FunctionRegistry,
+    input_schema: &Schema,
+    codec: &dyn PhysicalExtensionCodec,
+) -> Result<Option<Partitioning>> {
+    match partitioning {
+        Some(protobuf::Partitioning { partition_method }) => match 
partition_method {
+            Some(protobuf::partitioning::PartitionMethod::RoundRobin(
+                partition_count,
+            )) => Ok(Some(Partitioning::RoundRobinBatch(
+                *partition_count as usize,
+            ))),
+            
Some(protobuf::partitioning::PartitionMethod::Hash(hash_repartition)) => {
+                parse_protobuf_hash_partitioning(
+                    Some(hash_repartition),
+                    registry,
+                    input_schema,
+                    codec,
+                )
+            }
+            
Some(protobuf::partitioning::PartitionMethod::Unknown(partition_count)) => {
+                Ok(Some(Partitioning::UnknownPartitioning(
+                    *partition_count as usize,
+                )))
+            }
+            None => Ok(None),
+        },
+        None => Ok(None),
+    }
+}
+
 pub fn parse_protobuf_file_scan_config(
     proto: &protobuf::FileScanExecConf,
     registry: &dyn FunctionRegistry,
diff --git a/datafusion/proto/src/physical_plan/mod.rs 
b/datafusion/proto/src/physical_plan/mod.rs
index 91ed3b7f5e..a9965e1c81 100644
--- a/datafusion/proto/src/physical_plan/mod.rs
+++ b/datafusion/proto/src/physical_plan/mod.rs
@@ -58,8 +58,7 @@ use 
datafusion::physical_plan::sorts::sort_preserving_merge::SortPreservingMerge
 use datafusion::physical_plan::union::{InterleaveExec, UnionExec};
 use datafusion::physical_plan::windows::{BoundedWindowAggExec, WindowAggExec};
 use datafusion::physical_plan::{
-    udaf, AggregateExpr, ExecutionPlan, InputOrderMode, Partitioning, 
PhysicalExpr,
-    WindowExpr,
+    udaf, AggregateExpr, ExecutionPlan, InputOrderMode, PhysicalExpr, 
WindowExpr,
 };
 use datafusion_common::{internal_err, not_impl_err, DataFusionError, Result};
 use datafusion_expr::ScalarUDF;
@@ -77,10 +76,10 @@ use crate::physical_plan::to_proto::{
 use crate::protobuf::physical_aggregate_expr_node::AggregateFunction;
 use crate::protobuf::physical_expr_node::ExprType;
 use crate::protobuf::physical_plan_node::PhysicalPlanType;
-use crate::protobuf::repartition_exec_node::PartitionMethod;
 use crate::protobuf::{self, proto_error, window_agg_exec_node};
 
-use self::to_proto::serialize_physical_expr;
+use self::from_proto::parse_protobuf_partitioning;
+use self::to_proto::{serialize_partitioning, serialize_physical_expr};
 
 pub mod from_proto;
 pub mod to_proto;
@@ -263,47 +262,16 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode {
                     runtime,
                     extension_codec,
                 )?;
-                match repart.partition_method {
-                    Some(PartitionMethod::Hash(ref hash_part)) => {
-                        let expr = hash_part
-                            .hash_expr
-                            .iter()
-                            .map(|e| {
-                                parse_physical_expr(
-                                    e,
-                                    registry,
-                                    input.schema().as_ref(),
-                                    extension_codec,
-                                )
-                            })
-                            .collect::<Result<Vec<Arc<dyn PhysicalExpr>>, 
_>>()?;
-
-                        Ok(Arc::new(RepartitionExec::try_new(
-                            input,
-                            Partitioning::Hash(
-                                expr,
-                                hash_part.partition_count.try_into().unwrap(),
-                            ),
-                        )?))
-                    }
-                    Some(PartitionMethod::RoundRobin(partition_count)) => {
-                        Ok(Arc::new(RepartitionExec::try_new(
-                            input,
-                            Partitioning::RoundRobinBatch(
-                                partition_count.try_into().unwrap(),
-                            ),
-                        )?))
-                    }
-                    Some(PartitionMethod::Unknown(partition_count)) => {
-                        Ok(Arc::new(RepartitionExec::try_new(
-                            input,
-                            Partitioning::UnknownPartitioning(
-                                partition_count.try_into().unwrap(),
-                            ),
-                        )?))
-                    }
-                    _ => internal_err!("Invalid partitioning scheme"),
-                }
+                let partitioning = parse_protobuf_partitioning(
+                    repart.partitioning.as_ref(),
+                    registry,
+                    input.schema().as_ref(),
+                    extension_codec,
+                )?;
+                Ok(Arc::new(RepartitionExec::try_new(
+                    input,
+                    partitioning.unwrap(),
+                )?))
             }
             PhysicalPlanType::GlobalLimit(limit) => {
                 let input: Arc<dyn ExecutionPlan> =
@@ -1648,31 +1616,14 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode {
                 extension_codec,
             )?;
 
-            let pb_partition_method = match exec.partitioning() {
-                Partitioning::Hash(exprs, partition_count) => {
-                    PartitionMethod::Hash(protobuf::PhysicalHashRepartition {
-                        hash_expr: exprs
-                            .iter()
-                            .map(|expr| {
-                                serialize_physical_expr(expr.clone(), 
extension_codec)
-                            })
-                            .collect::<Result<Vec<_>>>()?,
-                        partition_count: *partition_count as u64,
-                    })
-                }
-                Partitioning::RoundRobinBatch(partition_count) => {
-                    PartitionMethod::RoundRobin(*partition_count as u64)
-                }
-                Partitioning::UnknownPartitioning(partition_count) => {
-                    PartitionMethod::Unknown(*partition_count as u64)
-                }
-            };
+            let pb_partitioning =
+                serialize_partitioning(exec.partitioning(), extension_codec)?;
 
             return Ok(protobuf::PhysicalPlanNode {
                 physical_plan_type: 
Some(PhysicalPlanType::Repartition(Box::new(
                     protobuf::RepartitionExecNode {
                         input: Some(Box::new(input)),
-                        partition_method: Some(pb_partition_method),
+                        partitioning: Some(pb_partitioning),
                     },
                 ))),
             });
diff --git a/datafusion/proto/src/physical_plan/to_proto.rs 
b/datafusion/proto/src/physical_plan/to_proto.rs
index d0af2f8338..3135d09593 100644
--- a/datafusion/proto/src/physical_plan/to_proto.rs
+++ b/datafusion/proto/src/physical_plan/to_proto.rs
@@ -33,7 +33,7 @@ use datafusion::physical_plan::expressions::{
 };
 use datafusion::physical_plan::udaf::AggregateFunctionExpr;
 use datafusion::physical_plan::windows::{BuiltInWindowExpr, 
PlainAggregateWindowExpr};
-use datafusion::physical_plan::{AggregateExpr, PhysicalExpr, WindowExpr};
+use datafusion::physical_plan::{AggregateExpr, Partitioning, PhysicalExpr, 
WindowExpr};
 use datafusion::{
     datasource::{
         file_format::{csv::CsvSink, json::JsonSink},
@@ -552,6 +552,36 @@ pub fn serialize_physical_expr(
     }
 }
 
+pub fn serialize_partitioning(
+    partitioning: &Partitioning,
+    codec: &dyn PhysicalExtensionCodec,
+) -> Result<protobuf::Partitioning> {
+    let serialized_partitioning = match partitioning {
+        Partitioning::RoundRobinBatch(partition_count) => 
protobuf::Partitioning {
+            partition_method: 
Some(protobuf::partitioning::PartitionMethod::RoundRobin(
+                *partition_count as u64,
+            )),
+        },
+        Partitioning::Hash(exprs, partition_count) => {
+            let serialized_exprs = serialize_physical_exprs(exprs.clone(), 
codec)?;
+            protobuf::Partitioning {
+                partition_method: 
Some(protobuf::partitioning::PartitionMethod::Hash(
+                    protobuf::PhysicalHashRepartition {
+                        hash_expr: serialized_exprs,
+                        partition_count: *partition_count as u64,
+                    },
+                )),
+            }
+        }
+        Partitioning::UnknownPartitioning(partition_count) => 
protobuf::Partitioning {
+            partition_method: 
Some(protobuf::partitioning::PartitionMethod::Unknown(
+                *partition_count as u64,
+            )),
+        },
+    };
+    Ok(serialized_partitioning)
+}
+
 fn serialize_when_then_expr(
     when_expr: &Arc<dyn PhysicalExpr>,
     then_expr: &Arc<dyn PhysicalExpr>,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to