This is an automated email from the ASF dual-hosted git repository.

linwei pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 4ac9b55fc0 Fix `CoalescePartitionsExec` proto serialization (#15824)
4ac9b55fc0 is described below

commit 4ac9b55fc0a1657e9d412ac3a76a4dfa196b2908
Author: 张林伟 <[email protected]>
AuthorDate: Fri Apr 25 13:34:04 2025 +0800

    Fix `CoalescePartitionsExec` proto serialization (#15824)
    
    * add fetch to CoalescePartitionsExecNode
    
    * gen proto code
    
    * Add test
    
    * fix
    
    * fix build
    
    * Fix test build
    
    * remove comments
---
 .../core/tests/physical_optimizer/enforce_sorting.rs  |  5 ++---
 .../replace_with_order_preserving_variants.rs         |  5 ++---
 .../physical-optimizer/src/enforce_distribution.rs    |  7 +++----
 .../physical-optimizer/src/enforce_sorting/mod.rs     |  8 ++++----
 datafusion/physical-plan/src/coalesce_partitions.rs   |  6 ++++++
 datafusion/proto/proto/datafusion.proto               |  1 +
 datafusion/proto/src/generated/pbjson.rs              | 19 +++++++++++++++++++
 datafusion/proto/src/generated/prost.rs               |  2 ++
 datafusion/proto/src/physical_plan/mod.rs             |  6 +++++-
 .../proto/tests/cases/roundtrip_physical_plan.rs      | 19 ++++++++++++++++++-
 10 files changed, 62 insertions(+), 16 deletions(-)

diff --git a/datafusion/core/tests/physical_optimizer/enforce_sorting.rs 
b/datafusion/core/tests/physical_optimizer/enforce_sorting.rs
index 052db454ef..f7668c8aab 100644
--- a/datafusion/core/tests/physical_optimizer/enforce_sorting.rs
+++ b/datafusion/core/tests/physical_optimizer/enforce_sorting.rs
@@ -3447,9 +3447,8 @@ fn test_parallelize_sort_preserves_fetch() -> Result<()> {
     let schema = create_test_schema3()?;
     let parquet_exec = parquet_exec(&schema);
     let coalesced = 
Arc::new(CoalescePartitionsExec::new(parquet_exec.clone()));
-    let top_coalesced = CoalescePartitionsExec::new(coalesced.clone())
-        .with_fetch(Some(10))
-        .unwrap();
+    let top_coalesced =
+        
Arc::new(CoalescePartitionsExec::new(coalesced.clone()).with_fetch(Some(10)));
 
     let requirements = PlanWithCorrespondingCoalescePartitions::new(
         top_coalesced.clone(),
diff --git 
a/datafusion/core/tests/physical_optimizer/replace_with_order_preserving_variants.rs
 
b/datafusion/core/tests/physical_optimizer/replace_with_order_preserving_variants.rs
index ada3f06d39..71b9757604 100644
--- 
a/datafusion/core/tests/physical_optimizer/replace_with_order_preserving_variants.rs
+++ 
b/datafusion/core/tests/physical_optimizer/replace_with_order_preserving_variants.rs
@@ -1272,9 +1272,8 @@ fn 
test_plan_with_order_preserving_variants_preserves_fetch() -> Result<()> {
         "a", &schema,
     )];
     let parquet_exec = parquet_exec_sorted(&schema, parquet_sort_exprs);
-    let coalesced = CoalescePartitionsExec::new(parquet_exec.clone())
-        .with_fetch(Some(10))
-        .unwrap();
+    let coalesced =
+        
Arc::new(CoalescePartitionsExec::new(parquet_exec.clone()).with_fetch(Some(10)));
 
     // Test sort's fetch is greater than coalesce fetch, return error because 
it's not reasonable
     let requirements = OrderPreservationContext::new(
diff --git a/datafusion/physical-optimizer/src/enforce_distribution.rs 
b/datafusion/physical-optimizer/src/enforce_distribution.rs
index 523762401d..947fd3eba2 100644
--- a/datafusion/physical-optimizer/src/enforce_distribution.rs
+++ b/datafusion/physical-optimizer/src/enforce_distribution.rs
@@ -1035,10 +1035,9 @@ pub fn replace_order_preserving_variants(
 
     if is_sort_preserving_merge(&context.plan) {
         let child_plan = Arc::clone(&context.children[0].plan);
-        // It's safe to unwrap because `CoalescePartitionsExec` supports 
`fetch`.
-        context.plan = CoalescePartitionsExec::new(child_plan)
-            .with_fetch(context.plan.fetch())
-            .unwrap();
+        context.plan = Arc::new(
+            
CoalescePartitionsExec::new(child_plan).with_fetch(context.plan.fetch()),
+        );
         return Ok(context);
     } else if let Some(repartition) =
         context.plan.as_any().downcast_ref::<RepartitionExec>()
diff --git a/datafusion/physical-optimizer/src/enforce_sorting/mod.rs 
b/datafusion/physical-optimizer/src/enforce_sorting/mod.rs
index b606aa85c1..04c1a67740 100644
--- a/datafusion/physical-optimizer/src/enforce_sorting/mod.rs
+++ b/datafusion/physical-optimizer/src/enforce_sorting/mod.rs
@@ -409,10 +409,10 @@ pub fn parallelize_sorts(
 
         Ok(Transformed::yes(
             PlanWithCorrespondingCoalescePartitions::new(
-                // Safe to unwrap, because `CoalescePartitionsExec` has a fetch
-                CoalescePartitionsExec::new(Arc::clone(&requirements.plan))
-                    .with_fetch(fetch)
-                    .unwrap(),
+                Arc::new(
+                    CoalescePartitionsExec::new(Arc::clone(&requirements.plan))
+                        .with_fetch(fetch),
+                ),
                 false,
                 vec![requirements],
             ),
diff --git a/datafusion/physical-plan/src/coalesce_partitions.rs 
b/datafusion/physical-plan/src/coalesce_partitions.rs
index 95a0c8f6ce..715dd159e7 100644
--- a/datafusion/physical-plan/src/coalesce_partitions.rs
+++ b/datafusion/physical-plan/src/coalesce_partitions.rs
@@ -59,6 +59,12 @@ impl CoalescePartitionsExec {
         }
     }
 
+    /// Update fetch with the argument
+    pub fn with_fetch(mut self, fetch: Option<usize>) -> Self {
+        self.fetch = fetch;
+        self
+    }
+
     /// Input execution plan
     pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
         &self.input
diff --git a/datafusion/proto/proto/datafusion.proto 
b/datafusion/proto/proto/datafusion.proto
index 39236da3b9..4c8b6c588d 100644
--- a/datafusion/proto/proto/datafusion.proto
+++ b/datafusion/proto/proto/datafusion.proto
@@ -1217,6 +1217,7 @@ message CoalesceBatchesExecNode {
 
 message CoalescePartitionsExecNode {
   PhysicalPlanNode input = 1;
+  optional uint32 fetch = 2;
 }
 
 message PhysicalHashRepartition {
diff --git a/datafusion/proto/src/generated/pbjson.rs 
b/datafusion/proto/src/generated/pbjson.rs
index 6166b6ec47..9324229445 100644
--- a/datafusion/proto/src/generated/pbjson.rs
+++ b/datafusion/proto/src/generated/pbjson.rs
@@ -2050,10 +2050,16 @@ impl serde::Serialize for CoalescePartitionsExecNode {
         if self.input.is_some() {
             len += 1;
         }
+        if self.fetch.is_some() {
+            len += 1;
+        }
         let mut struct_ser = 
serializer.serialize_struct("datafusion.CoalescePartitionsExecNode", len)?;
         if let Some(v) = self.input.as_ref() {
             struct_ser.serialize_field("input", v)?;
         }
+        if let Some(v) = self.fetch.as_ref() {
+            struct_ser.serialize_field("fetch", v)?;
+        }
         struct_ser.end()
     }
 }
@@ -2065,11 +2071,13 @@ impl<'de> serde::Deserialize<'de> for 
CoalescePartitionsExecNode {
     {
         const FIELDS: &[&str] = &[
             "input",
+            "fetch",
         ];
 
         #[allow(clippy::enum_variant_names)]
         enum GeneratedField {
             Input,
+            Fetch,
         }
         impl<'de> serde::Deserialize<'de> for GeneratedField {
             fn deserialize<D>(deserializer: D) -> 
std::result::Result<GeneratedField, D::Error>
@@ -2092,6 +2100,7 @@ impl<'de> serde::Deserialize<'de> for 
CoalescePartitionsExecNode {
                     {
                         match value {
                             "input" => Ok(GeneratedField::Input),
+                            "fetch" => Ok(GeneratedField::Fetch),
                             _ => Err(serde::de::Error::unknown_field(value, 
FIELDS)),
                         }
                     }
@@ -2112,6 +2121,7 @@ impl<'de> serde::Deserialize<'de> for 
CoalescePartitionsExecNode {
                     V: serde::de::MapAccess<'de>,
             {
                 let mut input__ = None;
+                let mut fetch__ = None;
                 while let Some(k) = map_.next_key()? {
                     match k {
                         GeneratedField::Input => {
@@ -2120,10 +2130,19 @@ impl<'de> serde::Deserialize<'de> for 
CoalescePartitionsExecNode {
                             }
                             input__ = map_.next_value()?;
                         }
+                        GeneratedField::Fetch => {
+                            if fetch__.is_some() {
+                                return 
Err(serde::de::Error::duplicate_field("fetch"));
+                            }
+                            fetch__ = 
+                                
map_.next_value::<::std::option::Option<::pbjson::private::NumberDeserialize<_>>>()?.map(|x|
 x.0)
+                            ;
+                        }
                     }
                 }
                 Ok(CoalescePartitionsExecNode {
                     input: input__,
+                    fetch: fetch__,
                 })
             }
         }
diff --git a/datafusion/proto/src/generated/prost.rs 
b/datafusion/proto/src/generated/prost.rs
index 41c60b22e3..c2f4e93cef 100644
--- a/datafusion/proto/src/generated/prost.rs
+++ b/datafusion/proto/src/generated/prost.rs
@@ -1824,6 +1824,8 @@ pub struct CoalesceBatchesExecNode {
 pub struct CoalescePartitionsExecNode {
     #[prost(message, optional, boxed, tag = "1")]
     pub input: 
::core::option::Option<::prost::alloc::boxed::Box<PhysicalPlanNode>>,
+    #[prost(uint32, optional, tag = "2")]
+    pub fetch: ::core::option::Option<u32>,
 }
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct PhysicalHashRepartition {
diff --git a/datafusion/proto/src/physical_plan/mod.rs 
b/datafusion/proto/src/physical_plan/mod.rs
index 90d071ab23..4429c8fdd8 100644
--- a/datafusion/proto/src/physical_plan/mod.rs
+++ b/datafusion/proto/src/physical_plan/mod.rs
@@ -792,7 +792,10 @@ impl protobuf::PhysicalPlanNode {
     ) -> Result<Arc<dyn ExecutionPlan>> {
         let input: Arc<dyn ExecutionPlan> =
             into_physical_plan(&merge.input, registry, runtime, 
extension_codec)?;
-        Ok(Arc::new(CoalescePartitionsExec::new(input)))
+        Ok(Arc::new(
+            CoalescePartitionsExec::new(input)
+                .with_fetch(merge.fetch.map(|f| f as usize)),
+        ))
     }
 
     fn try_into_repartition_physical_plan(
@@ -2354,6 +2357,7 @@ impl protobuf::PhysicalPlanNode {
             physical_plan_type: Some(PhysicalPlanType::Merge(Box::new(
                 protobuf::CoalescePartitionsExecNode {
                     input: Some(Box::new(input)),
+                    fetch: exec.fetch().map(|f| f as u32),
                 },
             ))),
         })
diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs 
b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
index 6dddbb5ea0..94fcd5ca82 100644
--- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
+++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
@@ -66,6 +66,7 @@ use datafusion::physical_plan::aggregates::{
     AggregateExec, AggregateMode, PhysicalGroupBy,
 };
 use datafusion::physical_plan::analyze::AnalyzeExec;
+use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
 use datafusion::physical_plan::empty::EmptyExec;
 use datafusion::physical_plan::expressions::{
     binary, cast, col, in_list, like, lit, BinaryExpr, Column, NotExpr, 
PhysicalSortExpr,
@@ -709,7 +710,7 @@ fn roundtrip_sort_preserve_partitioning() -> Result<()> {
 }
 
 #[test]
-fn roundtrip_coalesce_with_fetch() -> Result<()> {
+fn roundtrip_coalesce_batches_with_fetch() -> Result<()> {
     let field_a = Field::new("a", DataType::Boolean, false);
     let field_b = Field::new("b", DataType::Int64, false);
     let schema = Arc::new(Schema::new(vec![field_a, field_b]));
@@ -725,6 +726,22 @@ fn roundtrip_coalesce_with_fetch() -> Result<()> {
     ))
 }
 
+#[test]
+fn roundtrip_coalesce_partitions_with_fetch() -> Result<()> {
+    let field_a = Field::new("a", DataType::Boolean, false);
+    let field_b = Field::new("b", DataType::Int64, false);
+    let schema = Arc::new(Schema::new(vec![field_a, field_b]));
+
+    roundtrip_test(Arc::new(CoalescePartitionsExec::new(Arc::new(
+        EmptyExec::new(schema.clone()),
+    ))))?;
+
+    roundtrip_test(Arc::new(
+        CoalescePartitionsExec::new(Arc::new(EmptyExec::new(schema)))
+            .with_fetch(Some(10)),
+    ))
+}
+
 #[test]
 fn roundtrip_parquet_exec_with_pruning_predicate() -> Result<()> {
     let file_schema =


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to