This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 4bac1cf878 impl ser/de for preserve_order in RepartitionExec (#20798)
4bac1cf878 is described below

commit 4bac1cf8786f42951d855bbbe54043d9309bb04a
Author: Huaijin <[email protected]>
AuthorDate: Wed Mar 11 18:12:54 2026 +0800

    impl ser/de for preserve_order in RepartitionExec (#20798)
    
    ## Which issue does this PR close?
    
    <!--
    We generally require a GitHub issue to be filed for all bug fixes and
    enhancements and this helps us generate change logs for our releases.
    You can link an issue to this PR using the GitHub syntax. For example
    `Closes #123` indicates that this PR will close issue #123.
    -->
    
    - Closes #20797
    
    ## Rationale for this change
    
    - see #20797
    
    ## What changes are included in this PR?
    
    impl ser/de for preserve_order in RepartitionExec
    
    ## Are these changes tested?
    
    add one test case
    
    ## Are there any user-facing changes?
    
    <!--
    If there are user-facing changes then we may require documentation to be
    updated before approving the PR.
    -->
    
    <!--
    If there are any breaking changes to public APIs, please add the `api
    change` label.
    -->
---
 datafusion/proto/proto/datafusion.proto            |  1 +
 datafusion/proto/src/generated/pbjson.rs           | 18 ++++++++++++++
 datafusion/proto/src/generated/prost.rs            |  2 ++
 datafusion/proto/src/physical_plan/mod.rs          | 10 +++++---
 .../proto/tests/cases/roundtrip_physical_plan.rs   | 29 ++++++++++++++++++++++
 5 files changed, 56 insertions(+), 4 deletions(-)

diff --git a/datafusion/proto/proto/datafusion.proto 
b/datafusion/proto/proto/datafusion.proto
index 8de3224c91..197be2631a 100644
--- a/datafusion/proto/proto/datafusion.proto
+++ b/datafusion/proto/proto/datafusion.proto
@@ -1353,6 +1353,7 @@ message RepartitionExecNode{
   //   uint64 unknown = 4;
   // }
   Partitioning partitioning = 5;
+  bool preserve_order = 6;
 }
 
 message Partitioning {
diff --git a/datafusion/proto/src/generated/pbjson.rs 
b/datafusion/proto/src/generated/pbjson.rs
index 2b8089f563..7a4964c4c4 100644
--- a/datafusion/proto/src/generated/pbjson.rs
+++ b/datafusion/proto/src/generated/pbjson.rs
@@ -20929,6 +20929,9 @@ impl serde::Serialize for RepartitionExecNode {
         if self.partitioning.is_some() {
             len += 1;
         }
+        if self.preserve_order {
+            len += 1;
+        }
         let mut struct_ser = 
serializer.serialize_struct("datafusion.RepartitionExecNode", len)?;
         if let Some(v) = self.input.as_ref() {
             struct_ser.serialize_field("input", v)?;
@@ -20936,6 +20939,9 @@ impl serde::Serialize for RepartitionExecNode {
         if let Some(v) = self.partitioning.as_ref() {
             struct_ser.serialize_field("partitioning", v)?;
         }
+        if self.preserve_order {
+            struct_ser.serialize_field("preserveOrder", &self.preserve_order)?;
+        }
         struct_ser.end()
     }
 }
@@ -20948,12 +20954,15 @@ impl<'de> serde::Deserialize<'de> for 
RepartitionExecNode {
         const FIELDS: &[&str] = &[
             "input",
             "partitioning",
+            "preserve_order",
+            "preserveOrder",
         ];
 
         #[allow(clippy::enum_variant_names)]
         enum GeneratedField {
             Input,
             Partitioning,
+            PreserveOrder,
         }
         impl<'de> serde::Deserialize<'de> for GeneratedField {
             fn deserialize<D>(deserializer: D) -> 
std::result::Result<GeneratedField, D::Error>
@@ -20977,6 +20986,7 @@ impl<'de> serde::Deserialize<'de> for 
RepartitionExecNode {
                         match value {
                             "input" => Ok(GeneratedField::Input),
                             "partitioning" => Ok(GeneratedField::Partitioning),
+                            "preserveOrder" | "preserve_order" => 
Ok(GeneratedField::PreserveOrder),
                             _ => Err(serde::de::Error::unknown_field(value, 
FIELDS)),
                         }
                     }
@@ -20998,6 +21008,7 @@ impl<'de> serde::Deserialize<'de> for 
RepartitionExecNode {
             {
                 let mut input__ = None;
                 let mut partitioning__ = None;
+                let mut preserve_order__ = None;
                 while let Some(k) = map_.next_key()? {
                     match k {
                         GeneratedField::Input => {
@@ -21012,11 +21023,18 @@ impl<'de> serde::Deserialize<'de> for 
RepartitionExecNode {
                             }
                             partitioning__ = map_.next_value()?;
                         }
+                        GeneratedField::PreserveOrder => {
+                            if preserve_order__.is_some() {
+                                return 
Err(serde::de::Error::duplicate_field("preserveOrder"));
+                            }
+                            preserve_order__ = Some(map_.next_value()?);
+                        }
                     }
                 }
                 Ok(RepartitionExecNode {
                     input: input__,
                     partitioning: partitioning__,
+                    preserve_order: preserve_order__.unwrap_or_default(),
                 })
             }
         }
diff --git a/datafusion/proto/src/generated/prost.rs 
b/datafusion/proto/src/generated/prost.rs
index 6daf81bf1a..c708c61328 100644
--- a/datafusion/proto/src/generated/prost.rs
+++ b/datafusion/proto/src/generated/prost.rs
@@ -2001,6 +2001,8 @@ pub struct RepartitionExecNode {
     /// }
     #[prost(message, optional, tag = "5")]
     pub partitioning: ::core::option::Option<Partitioning>,
+    #[prost(bool, tag = "6")]
+    pub preserve_order: bool,
 }
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct Partitioning {
diff --git a/datafusion/proto/src/physical_plan/mod.rs 
b/datafusion/proto/src/physical_plan/mod.rs
index 85406e31da..99cd39d9f2 100644
--- a/datafusion/proto/src/physical_plan/mod.rs
+++ b/datafusion/proto/src/physical_plan/mod.rs
@@ -1011,10 +1011,11 @@ impl protobuf::PhysicalPlanNode {
             codec,
             proto_converter,
         )?;
-        Ok(Arc::new(RepartitionExec::try_new(
-            input,
-            partitioning.unwrap(),
-        )?))
+        let mut repart_exec = RepartitionExec::try_new(input, 
partitioning.unwrap())?;
+        if repart.preserve_order {
+            repart_exec = repart_exec.with_preserve_order();
+        }
+        Ok(Arc::new(repart_exec))
     }
 
     fn try_into_global_limit_physical_plan(
@@ -3056,6 +3057,7 @@ impl protobuf::PhysicalPlanNode {
                 protobuf::RepartitionExecNode {
                     input: Some(Box::new(input)),
                     partitioning: Some(pb_partitioning),
+                    preserve_order: exec.preserve_order(),
                 },
             ))),
         })
diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs 
b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
index 66ca903e4e..31626ef9fd 100644
--- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
+++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
@@ -1781,6 +1781,35 @@ fn roundtrip_union() -> Result<()> {
     roundtrip_test(union)
 }
 
+#[test]
+fn roundtrip_repartition_preserve_order() -> Result<()> {
+    let field_a = Field::new("a", DataType::Int64, false);
+    let schema = Arc::new(Schema::new(vec![field_a]));
+    let sort_exprs: LexOrdering = [PhysicalSortExpr {
+        expr: col("a", &schema)?,
+        options: SortOptions::default(),
+    }]
+    .into();
+
+    // Create two sorted single-partition inputs, then union them to get
+    // a sorted input with 2 partitions.
+    let source1 = SortExec::new(
+        sort_exprs.clone(),
+        Arc::new(EmptyExec::new(Arc::clone(&schema))),
+    );
+    let source2 = SortExec::new(sort_exprs, Arc::new(EmptyExec::new(schema)));
+    let union = UnionExec::try_new(vec![
+        Arc::new(source1) as Arc<dyn ExecutionPlan>,
+        Arc::new(source2) as Arc<dyn ExecutionPlan>,
+    ])?;
+
+    let repartition = RepartitionExec::try_new(union, 
Partitioning::RoundRobinBatch(10))?
+        .with_preserve_order();
+    assert!(repartition.preserve_order());
+
+    roundtrip_test(Arc::new(repartition))
+}
+
 #[test]
 fn roundtrip_interleave() -> Result<()> {
     let field_a = Field::new("col", DataType::Int64, false);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to