This is an automated email from the ASF dual-hosted git repository.
linwei pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 4ac9b55fc0 Fix `CoalescePartitionsExec` proto serialization (#15824)
4ac9b55fc0 is described below
commit 4ac9b55fc0a1657e9d412ac3a76a4dfa196b2908
Author: 张林伟 <[email protected]>
AuthorDate: Fri Apr 25 13:34:04 2025 +0800
Fix `CoalescePartitionsExec` proto serialization (#15824)
* add fetch to CoalescePartitionsExecNode
* gen proto code
* Add test
* fix
* fix build
* Fix test build
* remove comments
---
.../core/tests/physical_optimizer/enforce_sorting.rs | 5 ++---
.../replace_with_order_preserving_variants.rs | 5 ++---
.../physical-optimizer/src/enforce_distribution.rs | 7 +++----
.../physical-optimizer/src/enforce_sorting/mod.rs | 8 ++++----
datafusion/physical-plan/src/coalesce_partitions.rs | 6 ++++++
datafusion/proto/proto/datafusion.proto | 1 +
datafusion/proto/src/generated/pbjson.rs | 19 +++++++++++++++++++
datafusion/proto/src/generated/prost.rs | 2 ++
datafusion/proto/src/physical_plan/mod.rs | 6 +++++-
.../proto/tests/cases/roundtrip_physical_plan.rs | 19 ++++++++++++++++++-
10 files changed, 62 insertions(+), 16 deletions(-)
diff --git a/datafusion/core/tests/physical_optimizer/enforce_sorting.rs
b/datafusion/core/tests/physical_optimizer/enforce_sorting.rs
index 052db454ef..f7668c8aab 100644
--- a/datafusion/core/tests/physical_optimizer/enforce_sorting.rs
+++ b/datafusion/core/tests/physical_optimizer/enforce_sorting.rs
@@ -3447,9 +3447,8 @@ fn test_parallelize_sort_preserves_fetch() -> Result<()> {
let schema = create_test_schema3()?;
let parquet_exec = parquet_exec(&schema);
let coalesced =
Arc::new(CoalescePartitionsExec::new(parquet_exec.clone()));
- let top_coalesced = CoalescePartitionsExec::new(coalesced.clone())
- .with_fetch(Some(10))
- .unwrap();
+ let top_coalesced =
+
Arc::new(CoalescePartitionsExec::new(coalesced.clone()).with_fetch(Some(10)));
let requirements = PlanWithCorrespondingCoalescePartitions::new(
top_coalesced.clone(),
diff --git
a/datafusion/core/tests/physical_optimizer/replace_with_order_preserving_variants.rs
b/datafusion/core/tests/physical_optimizer/replace_with_order_preserving_variants.rs
index ada3f06d39..71b9757604 100644
---
a/datafusion/core/tests/physical_optimizer/replace_with_order_preserving_variants.rs
+++
b/datafusion/core/tests/physical_optimizer/replace_with_order_preserving_variants.rs
@@ -1272,9 +1272,8 @@ fn
test_plan_with_order_preserving_variants_preserves_fetch() -> Result<()> {
"a", &schema,
)];
let parquet_exec = parquet_exec_sorted(&schema, parquet_sort_exprs);
- let coalesced = CoalescePartitionsExec::new(parquet_exec.clone())
- .with_fetch(Some(10))
- .unwrap();
+ let coalesced =
+
Arc::new(CoalescePartitionsExec::new(parquet_exec.clone()).with_fetch(Some(10)));
// Test sort's fetch is greater than coalesce fetch, return error because
it's not reasonable
let requirements = OrderPreservationContext::new(
diff --git a/datafusion/physical-optimizer/src/enforce_distribution.rs
b/datafusion/physical-optimizer/src/enforce_distribution.rs
index 523762401d..947fd3eba2 100644
--- a/datafusion/physical-optimizer/src/enforce_distribution.rs
+++ b/datafusion/physical-optimizer/src/enforce_distribution.rs
@@ -1035,10 +1035,9 @@ pub fn replace_order_preserving_variants(
if is_sort_preserving_merge(&context.plan) {
let child_plan = Arc::clone(&context.children[0].plan);
- // It's safe to unwrap because `CoalescePartitionsExec` supports
`fetch`.
- context.plan = CoalescePartitionsExec::new(child_plan)
- .with_fetch(context.plan.fetch())
- .unwrap();
+ context.plan = Arc::new(
+
CoalescePartitionsExec::new(child_plan).with_fetch(context.plan.fetch()),
+ );
return Ok(context);
} else if let Some(repartition) =
context.plan.as_any().downcast_ref::<RepartitionExec>()
diff --git a/datafusion/physical-optimizer/src/enforce_sorting/mod.rs
b/datafusion/physical-optimizer/src/enforce_sorting/mod.rs
index b606aa85c1..04c1a67740 100644
--- a/datafusion/physical-optimizer/src/enforce_sorting/mod.rs
+++ b/datafusion/physical-optimizer/src/enforce_sorting/mod.rs
@@ -409,10 +409,10 @@ pub fn parallelize_sorts(
Ok(Transformed::yes(
PlanWithCorrespondingCoalescePartitions::new(
- // Safe to unwrap, because `CoalescePartitionsExec` has a fetch
- CoalescePartitionsExec::new(Arc::clone(&requirements.plan))
- .with_fetch(fetch)
- .unwrap(),
+ Arc::new(
+ CoalescePartitionsExec::new(Arc::clone(&requirements.plan))
+ .with_fetch(fetch),
+ ),
false,
vec![requirements],
),
diff --git a/datafusion/physical-plan/src/coalesce_partitions.rs
b/datafusion/physical-plan/src/coalesce_partitions.rs
index 95a0c8f6ce..715dd159e7 100644
--- a/datafusion/physical-plan/src/coalesce_partitions.rs
+++ b/datafusion/physical-plan/src/coalesce_partitions.rs
@@ -59,6 +59,12 @@ impl CoalescePartitionsExec {
}
}
+ /// Update fetch with the argument
+ pub fn with_fetch(mut self, fetch: Option<usize>) -> Self {
+ self.fetch = fetch;
+ self
+ }
+
/// Input execution plan
pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
&self.input
diff --git a/datafusion/proto/proto/datafusion.proto
b/datafusion/proto/proto/datafusion.proto
index 39236da3b9..4c8b6c588d 100644
--- a/datafusion/proto/proto/datafusion.proto
+++ b/datafusion/proto/proto/datafusion.proto
@@ -1217,6 +1217,7 @@ message CoalesceBatchesExecNode {
message CoalescePartitionsExecNode {
PhysicalPlanNode input = 1;
+ optional uint32 fetch = 2;
}
message PhysicalHashRepartition {
diff --git a/datafusion/proto/src/generated/pbjson.rs
b/datafusion/proto/src/generated/pbjson.rs
index 6166b6ec47..9324229445 100644
--- a/datafusion/proto/src/generated/pbjson.rs
+++ b/datafusion/proto/src/generated/pbjson.rs
@@ -2050,10 +2050,16 @@ impl serde::Serialize for CoalescePartitionsExecNode {
if self.input.is_some() {
len += 1;
}
+ if self.fetch.is_some() {
+ len += 1;
+ }
let mut struct_ser =
serializer.serialize_struct("datafusion.CoalescePartitionsExecNode", len)?;
if let Some(v) = self.input.as_ref() {
struct_ser.serialize_field("input", v)?;
}
+ if let Some(v) = self.fetch.as_ref() {
+ struct_ser.serialize_field("fetch", v)?;
+ }
struct_ser.end()
}
}
@@ -2065,11 +2071,13 @@ impl<'de> serde::Deserialize<'de> for
CoalescePartitionsExecNode {
{
const FIELDS: &[&str] = &[
"input",
+ "fetch",
];
#[allow(clippy::enum_variant_names)]
enum GeneratedField {
Input,
+ Fetch,
}
impl<'de> serde::Deserialize<'de> for GeneratedField {
fn deserialize<D>(deserializer: D) ->
std::result::Result<GeneratedField, D::Error>
@@ -2092,6 +2100,7 @@ impl<'de> serde::Deserialize<'de> for
CoalescePartitionsExecNode {
{
match value {
"input" => Ok(GeneratedField::Input),
+ "fetch" => Ok(GeneratedField::Fetch),
_ => Err(serde::de::Error::unknown_field(value,
FIELDS)),
}
}
@@ -2112,6 +2121,7 @@ impl<'de> serde::Deserialize<'de> for
CoalescePartitionsExecNode {
V: serde::de::MapAccess<'de>,
{
let mut input__ = None;
+ let mut fetch__ = None;
while let Some(k) = map_.next_key()? {
match k {
GeneratedField::Input => {
@@ -2120,10 +2130,19 @@ impl<'de> serde::Deserialize<'de> for
CoalescePartitionsExecNode {
}
input__ = map_.next_value()?;
}
+ GeneratedField::Fetch => {
+ if fetch__.is_some() {
+ return
Err(serde::de::Error::duplicate_field("fetch"));
+ }
+ fetch__ =
+
map_.next_value::<::std::option::Option<::pbjson::private::NumberDeserialize<_>>>()?.map(|x|
x.0)
+ ;
+ }
}
}
Ok(CoalescePartitionsExecNode {
input: input__,
+ fetch: fetch__,
})
}
}
diff --git a/datafusion/proto/src/generated/prost.rs
b/datafusion/proto/src/generated/prost.rs
index 41c60b22e3..c2f4e93cef 100644
--- a/datafusion/proto/src/generated/prost.rs
+++ b/datafusion/proto/src/generated/prost.rs
@@ -1824,6 +1824,8 @@ pub struct CoalesceBatchesExecNode {
pub struct CoalescePartitionsExecNode {
#[prost(message, optional, boxed, tag = "1")]
pub input:
::core::option::Option<::prost::alloc::boxed::Box<PhysicalPlanNode>>,
+ #[prost(uint32, optional, tag = "2")]
+ pub fetch: ::core::option::Option<u32>,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct PhysicalHashRepartition {
diff --git a/datafusion/proto/src/physical_plan/mod.rs
b/datafusion/proto/src/physical_plan/mod.rs
index 90d071ab23..4429c8fdd8 100644
--- a/datafusion/proto/src/physical_plan/mod.rs
+++ b/datafusion/proto/src/physical_plan/mod.rs
@@ -792,7 +792,10 @@ impl protobuf::PhysicalPlanNode {
) -> Result<Arc<dyn ExecutionPlan>> {
let input: Arc<dyn ExecutionPlan> =
into_physical_plan(&merge.input, registry, runtime,
extension_codec)?;
- Ok(Arc::new(CoalescePartitionsExec::new(input)))
+ Ok(Arc::new(
+ CoalescePartitionsExec::new(input)
+ .with_fetch(merge.fetch.map(|f| f as usize)),
+ ))
}
fn try_into_repartition_physical_plan(
@@ -2354,6 +2357,7 @@ impl protobuf::PhysicalPlanNode {
physical_plan_type: Some(PhysicalPlanType::Merge(Box::new(
protobuf::CoalescePartitionsExecNode {
input: Some(Box::new(input)),
+ fetch: exec.fetch().map(|f| f as u32),
},
))),
})
diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
index 6dddbb5ea0..94fcd5ca82 100644
--- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
+++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
@@ -66,6 +66,7 @@ use datafusion::physical_plan::aggregates::{
AggregateExec, AggregateMode, PhysicalGroupBy,
};
use datafusion::physical_plan::analyze::AnalyzeExec;
+use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use datafusion::physical_plan::empty::EmptyExec;
use datafusion::physical_plan::expressions::{
binary, cast, col, in_list, like, lit, BinaryExpr, Column, NotExpr,
PhysicalSortExpr,
@@ -709,7 +710,7 @@ fn roundtrip_sort_preserve_partitioning() -> Result<()> {
}
#[test]
-fn roundtrip_coalesce_with_fetch() -> Result<()> {
+fn roundtrip_coalesce_batches_with_fetch() -> Result<()> {
let field_a = Field::new("a", DataType::Boolean, false);
let field_b = Field::new("b", DataType::Int64, false);
let schema = Arc::new(Schema::new(vec![field_a, field_b]));
@@ -725,6 +726,22 @@ fn roundtrip_coalesce_with_fetch() -> Result<()> {
))
}
+#[test]
+fn roundtrip_coalesce_partitions_with_fetch() -> Result<()> {
+ let field_a = Field::new("a", DataType::Boolean, false);
+ let field_b = Field::new("b", DataType::Int64, false);
+ let schema = Arc::new(Schema::new(vec![field_a, field_b]));
+
+ roundtrip_test(Arc::new(CoalescePartitionsExec::new(Arc::new(
+ EmptyExec::new(schema.clone()),
+ ))))?;
+
+ roundtrip_test(Arc::new(
+ CoalescePartitionsExec::new(Arc::new(EmptyExec::new(schema)))
+ .with_fetch(Some(10)),
+ ))
+}
+
#[test]
fn roundtrip_parquet_exec_with_pruning_predicate() -> Result<()> {
let file_schema =
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]