This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch branch-53
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/branch-53 by this push:
     new 2c0a38be0e [branch-53] ser/de fetch in FilterExec (#20738) (#20883)
2c0a38be0e is described below

commit 2c0a38be0e18b5e8a2a6416721673e16e6d250d5
Author: Andrew Lamb <[email protected]>
AuthorDate: Thu Mar 12 08:53:13 2026 -0400

    [branch-53] ser/de fetch in FilterExec (#20738) (#20883)
    
    - Part of https://github.com/apache/datafusion/issues/19692
    - Closes https://github.com/apache/datafusion/issues/20737 on branch-53
    
    This PR:
    - Backports https://github.com/apache/datafusion/pull/20738 from
    @haohuaijin to the branch-53 line
    
    Co-authored-by: Huaijin <[email protected]>
---
 datafusion/physical-plan/src/filter.rs                |  4 ++++
 datafusion/proto/proto/datafusion.proto               |  1 +
 datafusion/proto/src/generated/pbjson.rs              | 19 +++++++++++++++++++
 datafusion/proto/src/generated/prost.rs               |  2 ++
 datafusion/proto/src/physical_plan/mod.rs             |  2 ++
 .../proto/tests/cases/roundtrip_physical_plan.rs      | 13 +++++++++++++
 6 files changed, 41 insertions(+)

diff --git a/datafusion/physical-plan/src/filter.rs 
b/datafusion/physical-plan/src/filter.rs
index af7bcc8e3b..fac6fa1e7c 100644
--- a/datafusion/physical-plan/src/filter.rs
+++ b/datafusion/physical-plan/src/filter.rs
@@ -718,6 +718,10 @@ impl ExecutionPlan for FilterExec {
         })
     }
 
+    fn fetch(&self) -> Option<usize> {
+        self.fetch
+    }
+
     fn with_fetch(&self, fetch: Option<usize>) -> Option<Arc<dyn 
ExecutionPlan>> {
         Some(Arc::new(Self {
             predicate: Arc::clone(&self.predicate),
diff --git a/datafusion/proto/proto/datafusion.proto 
b/datafusion/proto/proto/datafusion.proto
index 7c02688676..37b31a84de 100644
--- a/datafusion/proto/proto/datafusion.proto
+++ b/datafusion/proto/proto/datafusion.proto
@@ -1030,6 +1030,7 @@ message FilterExecNode {
   uint32 default_filter_selectivity = 3;
   repeated uint32 projection = 9;
   uint32 batch_size = 10;
+  optional uint32 fetch = 11;
 }
 
 message FileGroup {
diff --git a/datafusion/proto/src/generated/pbjson.rs 
b/datafusion/proto/src/generated/pbjson.rs
index 5b2b9133ce..419105c40c 100644
--- a/datafusion/proto/src/generated/pbjson.rs
+++ b/datafusion/proto/src/generated/pbjson.rs
@@ -6894,6 +6894,9 @@ impl serde::Serialize for FilterExecNode {
         if self.batch_size != 0 {
             len += 1;
         }
+        if self.fetch.is_some() {
+            len += 1;
+        }
         let mut struct_ser = 
serializer.serialize_struct("datafusion.FilterExecNode", len)?;
         if let Some(v) = self.input.as_ref() {
             struct_ser.serialize_field("input", v)?;
@@ -6910,6 +6913,9 @@ impl serde::Serialize for FilterExecNode {
         if self.batch_size != 0 {
             struct_ser.serialize_field("batchSize", &self.batch_size)?;
         }
+        if let Some(v) = self.fetch.as_ref() {
+            struct_ser.serialize_field("fetch", v)?;
+        }
         struct_ser.end()
     }
 }
@@ -6927,6 +6933,7 @@ impl<'de> serde::Deserialize<'de> for FilterExecNode {
             "projection",
             "batch_size",
             "batchSize",
+            "fetch",
         ];
 
         #[allow(clippy::enum_variant_names)]
@@ -6936,6 +6943,7 @@ impl<'de> serde::Deserialize<'de> for FilterExecNode {
             DefaultFilterSelectivity,
             Projection,
             BatchSize,
+            Fetch,
         }
         impl<'de> serde::Deserialize<'de> for GeneratedField {
             fn deserialize<D>(deserializer: D) -> 
std::result::Result<GeneratedField, D::Error>
@@ -6962,6 +6970,7 @@ impl<'de> serde::Deserialize<'de> for FilterExecNode {
                             "defaultFilterSelectivity" | 
"default_filter_selectivity" => Ok(GeneratedField::DefaultFilterSelectivity),
                             "projection" => Ok(GeneratedField::Projection),
                             "batchSize" | "batch_size" => 
Ok(GeneratedField::BatchSize),
+                            "fetch" => Ok(GeneratedField::Fetch),
                             _ => Err(serde::de::Error::unknown_field(value, 
FIELDS)),
                         }
                     }
@@ -6986,6 +6995,7 @@ impl<'de> serde::Deserialize<'de> for FilterExecNode {
                 let mut default_filter_selectivity__ = None;
                 let mut projection__ = None;
                 let mut batch_size__ = None;
+                let mut fetch__ = None;
                 while let Some(k) = map_.next_key()? {
                     match k {
                         GeneratedField::Input => {
@@ -7025,6 +7035,14 @@ impl<'de> serde::Deserialize<'de> for FilterExecNode {
                                 
Some(map_.next_value::<::pbjson::private::NumberDeserialize<_>>()?.0)
                             ;
                         }
+                        GeneratedField::Fetch => {
+                            if fetch__.is_some() {
+                                return 
Err(serde::de::Error::duplicate_field("fetch"));
+                            }
+                            fetch__ = 
+                                
map_.next_value::<::std::option::Option<::pbjson::private::NumberDeserialize<_>>>()?.map(|x|
 x.0)
+                            ;
+                        }
                     }
                 }
                 Ok(FilterExecNode {
@@ -7033,6 +7051,7 @@ impl<'de> serde::Deserialize<'de> for FilterExecNode {
                     default_filter_selectivity: 
default_filter_selectivity__.unwrap_or_default(),
                     projection: projection__.unwrap_or_default(),
                     batch_size: batch_size__.unwrap_or_default(),
+                    fetch: fetch__,
                 })
             }
         }
diff --git a/datafusion/proto/src/generated/prost.rs 
b/datafusion/proto/src/generated/prost.rs
index d9602665c2..a0d4ef9e97 100644
--- a/datafusion/proto/src/generated/prost.rs
+++ b/datafusion/proto/src/generated/prost.rs
@@ -1563,6 +1563,8 @@ pub struct FilterExecNode {
     pub projection: ::prost::alloc::vec::Vec<u32>,
     #[prost(uint32, tag = "10")]
     pub batch_size: u32,
+    #[prost(uint32, optional, tag = "11")]
+    pub fetch: ::core::option::Option<u32>,
 }
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct FileGroup {
diff --git a/datafusion/proto/src/physical_plan/mod.rs 
b/datafusion/proto/src/physical_plan/mod.rs
index bfba715b91..47fa1319c5 100644
--- a/datafusion/proto/src/physical_plan/mod.rs
+++ b/datafusion/proto/src/physical_plan/mod.rs
@@ -691,6 +691,7 @@ impl protobuf::PhysicalPlanNode {
         let filter = FilterExecBuilder::new(predicate, input)
             .apply_projection(projection)?
             .with_batch_size(filter.batch_size as usize)
+            .with_fetch(filter.fetch.map(|f| f as usize))
             .build()?;
         match filter_selectivity {
             Ok(filter_selectivity) => Ok(Arc::new(
@@ -2320,6 +2321,7 @@ impl protobuf::PhysicalPlanNode {
                         v.iter().map(|x| *x as u32).collect::<Vec<u32>>()
                     }),
                     batch_size: exec.batch_size() as u32,
+                    fetch: exec.fetch().map(|f| f as u32),
                 },
             ))),
         })
diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs 
b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
index 230727c8c1..2b8c1056f3 100644
--- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
+++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
@@ -794,6 +794,19 @@ fn roundtrip_filter_with_not_and_in_list() -> Result<()> {
     )?))
 }
 
+#[test]
+fn roundtrip_filter_with_fetch() -> Result<()> {
+    let field_a = Field::new("a", DataType::Boolean, false);
+    let field_b = Field::new("b", DataType::Int64, false);
+    let schema = Arc::new(Schema::new(vec![field_a, field_b]));
+    let predicate = col("a", &schema)?;
+    let filter = FilterExecBuilder::new(predicate, 
Arc::new(EmptyExec::new(schema)))
+        .with_fetch(Some(10))
+        .build()?;
+    assert_eq!(filter.fetch(), Some(10));
+    roundtrip_test(Arc::new(filter))
+}
+
 #[test]
 fn roundtrip_sort() -> Result<()> {
     let field_a = Field::new("a", DataType::Boolean, false);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to