This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 5bd8751ecf Separate proto partitioning (#10708)
5bd8751ecf is described below
commit 5bd8751ecf55d028e991f51a3fb37cb89bcbd1be
Author: 张林伟 <[email protected]>
AuthorDate: Wed May 29 18:12:17 2024 +0800
Separate proto partitioning (#10708)
---
datafusion/proto/proto/datafusion.proto | 15 +-
datafusion/proto/src/generated/pbjson.rs | 179 +++++++++++++++++------
datafusion/proto/src/generated/prost.rs | 25 +++-
datafusion/proto/src/physical_plan/from_proto.rs | 32 ++++
datafusion/proto/src/physical_plan/mod.rs | 81 ++--------
datafusion/proto/src/physical_plan/to_proto.rs | 32 +++-
6 files changed, 244 insertions(+), 120 deletions(-)
diff --git a/datafusion/proto/proto/datafusion.proto
b/datafusion/proto/proto/datafusion.proto
index 448d9f0582..c065948d3b 100644
--- a/datafusion/proto/proto/datafusion.proto
+++ b/datafusion/proto/proto/datafusion.proto
@@ -1196,10 +1196,19 @@ message PhysicalHashRepartition {
message RepartitionExecNode{
PhysicalPlanNode input = 1;
+ // oneof partition_method {
+ // uint64 round_robin = 2;
+ // PhysicalHashRepartition hash = 3;
+ // uint64 unknown = 4;
+ // }
+ Partitioning partitioning = 5;
+}
+
+message Partitioning {
oneof partition_method {
- uint64 round_robin = 2;
- PhysicalHashRepartition hash = 3;
- uint64 unknown = 4;
+ uint64 round_robin = 1;
+ PhysicalHashRepartition hash = 2;
+ uint64 unknown = 3;
}
}
diff --git a/datafusion/proto/src/generated/pbjson.rs
b/datafusion/proto/src/generated/pbjson.rs
index 76a367b402..7e7a14a5d1 100644
--- a/datafusion/proto/src/generated/pbjson.rs
+++ b/datafusion/proto/src/generated/pbjson.rs
@@ -12717,6 +12717,129 @@ impl<'de> serde::Deserialize<'de> for PartitionedFile
{
deserializer.deserialize_struct("datafusion.PartitionedFile", FIELDS,
GeneratedVisitor)
}
}
+impl serde::Serialize for Partitioning {
+ #[allow(deprecated)]
+ fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok,
S::Error>
+ where
+ S: serde::Serializer,
+ {
+ use serde::ser::SerializeStruct;
+ let mut len = 0;
+ if self.partition_method.is_some() {
+ len += 1;
+ }
+ let mut struct_ser =
serializer.serialize_struct("datafusion.Partitioning", len)?;
+ if let Some(v) = self.partition_method.as_ref() {
+ match v {
+ partitioning::PartitionMethod::RoundRobin(v) => {
+ #[allow(clippy::needless_borrow)]
+ struct_ser.serialize_field("roundRobin",
ToString::to_string(&v).as_str())?;
+ }
+ partitioning::PartitionMethod::Hash(v) => {
+ struct_ser.serialize_field("hash", v)?;
+ }
+ partitioning::PartitionMethod::Unknown(v) => {
+ #[allow(clippy::needless_borrow)]
+ struct_ser.serialize_field("unknown",
ToString::to_string(&v).as_str())?;
+ }
+ }
+ }
+ struct_ser.end()
+ }
+}
+impl<'de> serde::Deserialize<'de> for Partitioning {
+ #[allow(deprecated)]
+ fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
+ where
+ D: serde::Deserializer<'de>,
+ {
+ const FIELDS: &[&str] = &[
+ "round_robin",
+ "roundRobin",
+ "hash",
+ "unknown",
+ ];
+
+ #[allow(clippy::enum_variant_names)]
+ enum GeneratedField {
+ RoundRobin,
+ Hash,
+ Unknown,
+ }
+ impl<'de> serde::Deserialize<'de> for GeneratedField {
+ fn deserialize<D>(deserializer: D) ->
std::result::Result<GeneratedField, D::Error>
+ where
+ D: serde::Deserializer<'de>,
+ {
+ struct GeneratedVisitor;
+
+ impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
+ type Value = GeneratedField;
+
+ fn expecting(&self, formatter: &mut
std::fmt::Formatter<'_>) -> std::fmt::Result {
+ write!(formatter, "expected one of: {:?}", &FIELDS)
+ }
+
+ #[allow(unused_variables)]
+ fn visit_str<E>(self, value: &str) ->
std::result::Result<GeneratedField, E>
+ where
+ E: serde::de::Error,
+ {
+ match value {
+ "roundRobin" | "round_robin" =>
Ok(GeneratedField::RoundRobin),
+ "hash" => Ok(GeneratedField::Hash),
+ "unknown" => Ok(GeneratedField::Unknown),
+ _ => Err(serde::de::Error::unknown_field(value,
FIELDS)),
+ }
+ }
+ }
+ deserializer.deserialize_identifier(GeneratedVisitor)
+ }
+ }
+ struct GeneratedVisitor;
+ impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
+ type Value = Partitioning;
+
+ fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) ->
std::fmt::Result {
+ formatter.write_str("struct datafusion.Partitioning")
+ }
+
+ fn visit_map<V>(self, mut map_: V) ->
std::result::Result<Partitioning, V::Error>
+ where
+ V: serde::de::MapAccess<'de>,
+ {
+ let mut partition_method__ = None;
+ while let Some(k) = map_.next_key()? {
+ match k {
+ GeneratedField::RoundRobin => {
+ if partition_method__.is_some() {
+ return
Err(serde::de::Error::duplicate_field("roundRobin"));
+ }
+ partition_method__ =
map_.next_value::<::std::option::Option<::pbjson::private::NumberDeserialize<_>>>()?.map(|x|
partitioning::PartitionMethod::RoundRobin(x.0));
+ }
+ GeneratedField::Hash => {
+ if partition_method__.is_some() {
+ return
Err(serde::de::Error::duplicate_field("hash"));
+ }
+ partition_method__ =
map_.next_value::<::std::option::Option<_>>()?.map(partitioning::PartitionMethod::Hash)
+;
+ }
+ GeneratedField::Unknown => {
+ if partition_method__.is_some() {
+ return
Err(serde::de::Error::duplicate_field("unknown"));
+ }
+ partition_method__ =
map_.next_value::<::std::option::Option<::pbjson::private::NumberDeserialize<_>>>()?.map(|x|
partitioning::PartitionMethod::Unknown(x.0));
+ }
+ }
+ }
+ Ok(Partitioning {
+ partition_method: partition_method__,
+ })
+ }
+ }
+ deserializer.deserialize_struct("datafusion.Partitioning", FIELDS,
GeneratedVisitor)
+ }
+}
impl serde::Serialize for PhysicalAggregateExprNode {
#[allow(deprecated)]
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok,
S::Error>
@@ -16885,27 +17008,15 @@ impl serde::Serialize for RepartitionExecNode {
if self.input.is_some() {
len += 1;
}
- if self.partition_method.is_some() {
+ if self.partitioning.is_some() {
len += 1;
}
let mut struct_ser =
serializer.serialize_struct("datafusion.RepartitionExecNode", len)?;
if let Some(v) = self.input.as_ref() {
struct_ser.serialize_field("input", v)?;
}
- if let Some(v) = self.partition_method.as_ref() {
- match v {
- repartition_exec_node::PartitionMethod::RoundRobin(v) => {
- #[allow(clippy::needless_borrow)]
- struct_ser.serialize_field("roundRobin",
ToString::to_string(&v).as_str())?;
- }
- repartition_exec_node::PartitionMethod::Hash(v) => {
- struct_ser.serialize_field("hash", v)?;
- }
- repartition_exec_node::PartitionMethod::Unknown(v) => {
- #[allow(clippy::needless_borrow)]
- struct_ser.serialize_field("unknown",
ToString::to_string(&v).as_str())?;
- }
- }
+ if let Some(v) = self.partitioning.as_ref() {
+ struct_ser.serialize_field("partitioning", v)?;
}
struct_ser.end()
}
@@ -16918,18 +17029,13 @@ impl<'de> serde::Deserialize<'de> for
RepartitionExecNode {
{
const FIELDS: &[&str] = &[
"input",
- "round_robin",
- "roundRobin",
- "hash",
- "unknown",
+ "partitioning",
];
#[allow(clippy::enum_variant_names)]
enum GeneratedField {
Input,
- RoundRobin,
- Hash,
- Unknown,
+ Partitioning,
}
impl<'de> serde::Deserialize<'de> for GeneratedField {
fn deserialize<D>(deserializer: D) ->
std::result::Result<GeneratedField, D::Error>
@@ -16952,9 +17058,7 @@ impl<'de> serde::Deserialize<'de> for
RepartitionExecNode {
{
match value {
"input" => Ok(GeneratedField::Input),
- "roundRobin" | "round_robin" =>
Ok(GeneratedField::RoundRobin),
- "hash" => Ok(GeneratedField::Hash),
- "unknown" => Ok(GeneratedField::Unknown),
+ "partitioning" => Ok(GeneratedField::Partitioning),
_ => Err(serde::de::Error::unknown_field(value,
FIELDS)),
}
}
@@ -16975,7 +17079,7 @@ impl<'de> serde::Deserialize<'de> for
RepartitionExecNode {
V: serde::de::MapAccess<'de>,
{
let mut input__ = None;
- let mut partition_method__ = None;
+ let mut partitioning__ = None;
while let Some(k) = map_.next_key()? {
match k {
GeneratedField::Input => {
@@ -16984,30 +17088,17 @@ impl<'de> serde::Deserialize<'de> for
RepartitionExecNode {
}
input__ = map_.next_value()?;
}
- GeneratedField::RoundRobin => {
- if partition_method__.is_some() {
- return
Err(serde::de::Error::duplicate_field("roundRobin"));
- }
- partition_method__ =
map_.next_value::<::std::option::Option<::pbjson::private::NumberDeserialize<_>>>()?.map(|x|
repartition_exec_node::PartitionMethod::RoundRobin(x.0));
- }
- GeneratedField::Hash => {
- if partition_method__.is_some() {
- return
Err(serde::de::Error::duplicate_field("hash"));
- }
- partition_method__ =
map_.next_value::<::std::option::Option<_>>()?.map(repartition_exec_node::PartitionMethod::Hash)
-;
- }
- GeneratedField::Unknown => {
- if partition_method__.is_some() {
- return
Err(serde::de::Error::duplicate_field("unknown"));
+ GeneratedField::Partitioning => {
+ if partitioning__.is_some() {
+ return
Err(serde::de::Error::duplicate_field("partitioning"));
}
- partition_method__ =
map_.next_value::<::std::option::Option<::pbjson::private::NumberDeserialize<_>>>()?.map(|x|
repartition_exec_node::PartitionMethod::Unknown(x.0));
+ partitioning__ = map_.next_value()?;
}
}
}
Ok(RepartitionExecNode {
input: input__,
- partition_method: partition_method__,
+ partitioning: partitioning__,
})
}
}
diff --git a/datafusion/proto/src/generated/prost.rs
b/datafusion/proto/src/generated/prost.rs
index 5e0f6613f3..f9138da3ab 100644
--- a/datafusion/proto/src/generated/prost.rs
+++ b/datafusion/proto/src/generated/prost.rs
@@ -1850,19 +1850,30 @@ pub struct PhysicalHashRepartition {
pub struct RepartitionExecNode {
#[prost(message, optional, boxed, tag = "1")]
pub input:
::core::option::Option<::prost::alloc::boxed::Box<PhysicalPlanNode>>,
- #[prost(oneof = "repartition_exec_node::PartitionMethod", tags = "2, 3,
4")]
- pub partition_method:
::core::option::Option<repartition_exec_node::PartitionMethod>,
+ /// oneof partition_method {
+ /// uint64 round_robin = 2;
+ /// PhysicalHashRepartition hash = 3;
+ /// uint64 unknown = 4;
+ /// }
+ #[prost(message, optional, tag = "5")]
+ pub partitioning: ::core::option::Option<Partitioning>,
+}
+#[allow(clippy::derive_partial_eq_without_eq)]
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct Partitioning {
+ #[prost(oneof = "partitioning::PartitionMethod", tags = "1, 2, 3")]
+ pub partition_method:
::core::option::Option<partitioning::PartitionMethod>,
}
-/// Nested message and enum types in `RepartitionExecNode`.
-pub mod repartition_exec_node {
+/// Nested message and enum types in `Partitioning`.
+pub mod partitioning {
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Oneof)]
pub enum PartitionMethod {
- #[prost(uint64, tag = "2")]
+ #[prost(uint64, tag = "1")]
RoundRobin(u64),
- #[prost(message, tag = "3")]
+ #[prost(message, tag = "2")]
Hash(super::PhysicalHashRepartition),
- #[prost(uint64, tag = "4")]
+ #[prost(uint64, tag = "3")]
Unknown(u64),
}
}
diff --git a/datafusion/proto/src/physical_plan/from_proto.rs
b/datafusion/proto/src/physical_plan/from_proto.rs
index c36f60ee44..cf935e6b83 100644
--- a/datafusion/proto/src/physical_plan/from_proto.rs
+++ b/datafusion/proto/src/physical_plan/from_proto.rs
@@ -438,6 +438,38 @@ pub fn parse_protobuf_hash_partitioning(
}
}
+pub fn parse_protobuf_partitioning(
+ partitioning: Option<&protobuf::Partitioning>,
+ registry: &dyn FunctionRegistry,
+ input_schema: &Schema,
+ codec: &dyn PhysicalExtensionCodec,
+) -> Result<Option<Partitioning>> {
+ match partitioning {
+ Some(protobuf::Partitioning { partition_method }) => match
partition_method {
+ Some(protobuf::partitioning::PartitionMethod::RoundRobin(
+ partition_count,
+ )) => Ok(Some(Partitioning::RoundRobinBatch(
+ *partition_count as usize,
+ ))),
+
Some(protobuf::partitioning::PartitionMethod::Hash(hash_repartition)) => {
+ parse_protobuf_hash_partitioning(
+ Some(hash_repartition),
+ registry,
+ input_schema,
+ codec,
+ )
+ }
+
Some(protobuf::partitioning::PartitionMethod::Unknown(partition_count)) => {
+ Ok(Some(Partitioning::UnknownPartitioning(
+ *partition_count as usize,
+ )))
+ }
+ None => Ok(None),
+ },
+ None => Ok(None),
+ }
+}
+
pub fn parse_protobuf_file_scan_config(
proto: &protobuf::FileScanExecConf,
registry: &dyn FunctionRegistry,
diff --git a/datafusion/proto/src/physical_plan/mod.rs
b/datafusion/proto/src/physical_plan/mod.rs
index 91ed3b7f5e..a9965e1c81 100644
--- a/datafusion/proto/src/physical_plan/mod.rs
+++ b/datafusion/proto/src/physical_plan/mod.rs
@@ -58,8 +58,7 @@ use
datafusion::physical_plan::sorts::sort_preserving_merge::SortPreservingMerge
use datafusion::physical_plan::union::{InterleaveExec, UnionExec};
use datafusion::physical_plan::windows::{BoundedWindowAggExec, WindowAggExec};
use datafusion::physical_plan::{
- udaf, AggregateExpr, ExecutionPlan, InputOrderMode, Partitioning,
PhysicalExpr,
- WindowExpr,
+ udaf, AggregateExpr, ExecutionPlan, InputOrderMode, PhysicalExpr,
WindowExpr,
};
use datafusion_common::{internal_err, not_impl_err, DataFusionError, Result};
use datafusion_expr::ScalarUDF;
@@ -77,10 +76,10 @@ use crate::physical_plan::to_proto::{
use crate::protobuf::physical_aggregate_expr_node::AggregateFunction;
use crate::protobuf::physical_expr_node::ExprType;
use crate::protobuf::physical_plan_node::PhysicalPlanType;
-use crate::protobuf::repartition_exec_node::PartitionMethod;
use crate::protobuf::{self, proto_error, window_agg_exec_node};
-use self::to_proto::serialize_physical_expr;
+use self::from_proto::parse_protobuf_partitioning;
+use self::to_proto::{serialize_partitioning, serialize_physical_expr};
pub mod from_proto;
pub mod to_proto;
@@ -263,47 +262,16 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode {
runtime,
extension_codec,
)?;
- match repart.partition_method {
- Some(PartitionMethod::Hash(ref hash_part)) => {
- let expr = hash_part
- .hash_expr
- .iter()
- .map(|e| {
- parse_physical_expr(
- e,
- registry,
- input.schema().as_ref(),
- extension_codec,
- )
- })
- .collect::<Result<Vec<Arc<dyn PhysicalExpr>>,
_>>()?;
-
- Ok(Arc::new(RepartitionExec::try_new(
- input,
- Partitioning::Hash(
- expr,
- hash_part.partition_count.try_into().unwrap(),
- ),
- )?))
- }
- Some(PartitionMethod::RoundRobin(partition_count)) => {
- Ok(Arc::new(RepartitionExec::try_new(
- input,
- Partitioning::RoundRobinBatch(
- partition_count.try_into().unwrap(),
- ),
- )?))
- }
- Some(PartitionMethod::Unknown(partition_count)) => {
- Ok(Arc::new(RepartitionExec::try_new(
- input,
- Partitioning::UnknownPartitioning(
- partition_count.try_into().unwrap(),
- ),
- )?))
- }
- _ => internal_err!("Invalid partitioning scheme"),
- }
+ let partitioning = parse_protobuf_partitioning(
+ repart.partitioning.as_ref(),
+ registry,
+ input.schema().as_ref(),
+ extension_codec,
+ )?;
+ Ok(Arc::new(RepartitionExec::try_new(
+ input,
+ partitioning.unwrap(),
+ )?))
}
PhysicalPlanType::GlobalLimit(limit) => {
let input: Arc<dyn ExecutionPlan> =
@@ -1648,31 +1616,14 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode {
extension_codec,
)?;
- let pb_partition_method = match exec.partitioning() {
- Partitioning::Hash(exprs, partition_count) => {
- PartitionMethod::Hash(protobuf::PhysicalHashRepartition {
- hash_expr: exprs
- .iter()
- .map(|expr| {
- serialize_physical_expr(expr.clone(),
extension_codec)
- })
- .collect::<Result<Vec<_>>>()?,
- partition_count: *partition_count as u64,
- })
- }
- Partitioning::RoundRobinBatch(partition_count) => {
- PartitionMethod::RoundRobin(*partition_count as u64)
- }
- Partitioning::UnknownPartitioning(partition_count) => {
- PartitionMethod::Unknown(*partition_count as u64)
- }
- };
+ let pb_partitioning =
+ serialize_partitioning(exec.partitioning(), extension_codec)?;
return Ok(protobuf::PhysicalPlanNode {
physical_plan_type:
Some(PhysicalPlanType::Repartition(Box::new(
protobuf::RepartitionExecNode {
input: Some(Box::new(input)),
- partition_method: Some(pb_partition_method),
+ partitioning: Some(pb_partitioning),
},
))),
});
diff --git a/datafusion/proto/src/physical_plan/to_proto.rs
b/datafusion/proto/src/physical_plan/to_proto.rs
index d0af2f8338..3135d09593 100644
--- a/datafusion/proto/src/physical_plan/to_proto.rs
+++ b/datafusion/proto/src/physical_plan/to_proto.rs
@@ -33,7 +33,7 @@ use datafusion::physical_plan::expressions::{
};
use datafusion::physical_plan::udaf::AggregateFunctionExpr;
use datafusion::physical_plan::windows::{BuiltInWindowExpr,
PlainAggregateWindowExpr};
-use datafusion::physical_plan::{AggregateExpr, PhysicalExpr, WindowExpr};
+use datafusion::physical_plan::{AggregateExpr, Partitioning, PhysicalExpr,
WindowExpr};
use datafusion::{
datasource::{
file_format::{csv::CsvSink, json::JsonSink},
@@ -552,6 +552,36 @@ pub fn serialize_physical_expr(
}
}
+pub fn serialize_partitioning(
+ partitioning: &Partitioning,
+ codec: &dyn PhysicalExtensionCodec,
+) -> Result<protobuf::Partitioning> {
+ let serialized_partitioning = match partitioning {
+ Partitioning::RoundRobinBatch(partition_count) =>
protobuf::Partitioning {
+ partition_method:
Some(protobuf::partitioning::PartitionMethod::RoundRobin(
+ *partition_count as u64,
+ )),
+ },
+ Partitioning::Hash(exprs, partition_count) => {
+ let serialized_exprs = serialize_physical_exprs(exprs.clone(),
codec)?;
+ protobuf::Partitioning {
+ partition_method:
Some(protobuf::partitioning::PartitionMethod::Hash(
+ protobuf::PhysicalHashRepartition {
+ hash_expr: serialized_exprs,
+ partition_count: *partition_count as u64,
+ },
+ )),
+ }
+ }
+ Partitioning::UnknownPartitioning(partition_count) =>
protobuf::Partitioning {
+ partition_method:
Some(protobuf::partitioning::PartitionMethod::Unknown(
+ *partition_count as u64,
+ )),
+ },
+ };
+ Ok(serialized_partitioning)
+}
+
fn serialize_when_then_expr(
when_expr: &Arc<dyn PhysicalExpr>,
then_expr: &Arc<dyn PhysicalExpr>,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]