This is an automated email from the ASF dual-hosted git repository.

findepi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new b122a1643a fix: prevent UnionExec panic with empty inputs (#17449)
b122a1643a is described below

commit b122a1643adf9f89c01adda5afbfb4bdc0284e85
Author: EeshanBembi <33062610+eeshanbe...@users.noreply.github.com>
AuthorDate: Tue Sep 16 00:55:58 2025 +0530

    fix: prevent UnionExec panic with empty inputs (#17449)
    
    * fix: prevent UnionExec panic with empty inputs
    
    This commit fixes a panic in UnionExec when constructed with empty inputs.
    Previously, UnionExec::new(vec![]) would cause an index out of bounds panic
    at union.rs:542 when trying to access inputs[0].
    
    Changes:
    - Made UnionExec::new() return Result<Self> with proper validation
    - Made union_schema() return Result<SchemaRef> with empty input checks
    - Added descriptive error messages for empty input cases
    - Updated all call sites to handle the new Result return type
    - Added comprehensive tests for edge cases
    
    Error messages:
    - "UnionExec requires at least one input"
    - "Cannot create union schema from empty inputs"
    
    The fix maintains backward compatibility for valid inputs while preventing
    crashes and providing clear error messages for invalid usage.
    
    Fixes #17052
    
    * refactor: address PR review comments for UnionExec empty inputs fix
    
    - Add new try_new method that returns Result<Arc<dyn ExecutionPlan>>
    - Deprecate existing new method in favor of try_new
    - Optimize single-input case: try_new returns the input directly
    - Remove redundant assert!(result.is_err()) from tests
    - Rename test_union_multiple_inputs_still_works to 
test_union_schema_multiple_inputs
    - Update all call sites to use appropriate API (try_new for new code, 
deprecated new for tests)
    
    This maintains backward compatibility while providing better error handling
    and optimization for single-input cases.
    
    * Fix cargo fmt and clippy warnings
    
    - Add proper feature gates for parquet_encryption in datasource-parquet
    - Format code to pass cargo fmt checks
    - All tests passing
    
    * Fix clippy
    
    ---------
    
    Co-authored-by: Eeshan <eeshan@Eeshans-MacBook-Pro.local>
    Co-authored-by: ebembi-crdb <ebe...@cockroachlabs.com>
    Co-authored-by: Andrew Lamb <and...@nerdnetworks.org>
---
 datafusion/core/src/physical_planner.rs            |   2 +-
 .../physical_optimizer/enforce_distribution.rs     |   2 +
 .../physical_optimizer/partition_statistics.rs     |   1 +
 .../physical_optimizer/projection_pushdown.rs      |   1 +
 .../core/tests/physical_optimizer/test_utils.rs    |   1 +
 datafusion/datasource-parquet/src/opener.rs        |  11 ++
 datafusion/datasource-parquet/src/source.rs        |   3 +
 datafusion/physical-plan/src/repartition/mod.rs    |   2 +
 datafusion/physical-plan/src/union.rs              | 123 +++++++++++++++++++--
 datafusion/proto/src/physical_plan/mod.rs          |   1 +
 .../proto/tests/cases/roundtrip_physical_plan.rs   |   1 +
 11 files changed, 138 insertions(+), 10 deletions(-)

diff --git a/datafusion/core/src/physical_planner.rs 
b/datafusion/core/src/physical_planner.rs
index d7f30609a4..23d19d4ca4 100644
--- a/datafusion/core/src/physical_planner.rs
+++ b/datafusion/core/src/physical_planner.rs
@@ -1250,7 +1250,7 @@ impl DefaultPhysicalPlanner {
             }
 
             // N Children
-            LogicalPlan::Union(_) => Arc::new(UnionExec::new(children.vec())),
+            LogicalPlan::Union(_) => UnionExec::try_new(children.vec())?,
             LogicalPlan::Extension(Extension { node }) => {
                 let mut maybe_plan = None;
                 let children = children.vec();
diff --git a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs 
b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs
index e0826c90dd..1ddeb3c611 100644
--- a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs
+++ b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs
@@ -1783,6 +1783,7 @@ fn union_to_interleave() -> Result<()> {
     );
 
     //  Union
+    #[allow(deprecated)]
     let plan = Arc::new(UnionExec::new(vec![left, right]));
 
     // final agg
@@ -1827,6 +1828,7 @@ fn union_not_to_interleave() -> Result<()> {
     );
 
     //  Union
+    #[allow(deprecated)]
     let plan = Arc::new(UnionExec::new(vec![left, right]));
 
     // final agg
diff --git a/datafusion/core/tests/physical_optimizer/partition_statistics.rs 
b/datafusion/core/tests/physical_optimizer/partition_statistics.rs
index 2332d79ac9..26f179a6cd 100644
--- a/datafusion/core/tests/physical_optimizer/partition_statistics.rs
+++ b/datafusion/core/tests/physical_optimizer/partition_statistics.rs
@@ -356,6 +356,7 @@ mod test {
     #[tokio::test]
     async fn test_statistic_by_partition_of_union() -> Result<()> {
         let scan = create_scan_exec_with_statistics(None, Some(2)).await;
+        #[allow(deprecated)]
         let union_exec: Arc<dyn ExecutionPlan> =
             Arc::new(UnionExec::new(vec![scan.clone(), scan]));
         let statistics = 
(0..union_exec.output_partitioning().partition_count())
diff --git a/datafusion/core/tests/physical_optimizer/projection_pushdown.rs 
b/datafusion/core/tests/physical_optimizer/projection_pushdown.rs
index 7160ed4184..ab753d00b4 100644
--- a/datafusion/core/tests/physical_optimizer/projection_pushdown.rs
+++ b/datafusion/core/tests/physical_optimizer/projection_pushdown.rs
@@ -1535,6 +1535,7 @@ fn test_sort_preserving_after_projection() -> Result<()> {
 #[test]
 fn test_union_after_projection() -> Result<()> {
     let csv = create_simple_csv_exec();
+    #[allow(deprecated)]
     let union = Arc::new(UnionExec::new(vec![csv.clone(), csv.clone(), csv]));
     let projection: Arc<dyn ExecutionPlan> = Arc::new(ProjectionExec::try_new(
         vec![
diff --git a/datafusion/core/tests/physical_optimizer/test_utils.rs 
b/datafusion/core/tests/physical_optimizer/test_utils.rs
index 69dbe04927..49efe24fb8 100644
--- a/datafusion/core/tests/physical_optimizer/test_utils.rs
+++ b/datafusion/core/tests/physical_optimizer/test_utils.rs
@@ -304,6 +304,7 @@ pub fn sort_preserving_merge_exec_with_fetch(
 }
 
 pub fn union_exec(input: Vec<Arc<dyn ExecutionPlan>>) -> Arc<dyn 
ExecutionPlan> {
+    #[allow(deprecated)]
     Arc::new(UnionExec::new(input))
 }
 
diff --git a/datafusion/datasource-parquet/src/opener.rs 
b/datafusion/datasource-parquet/src/opener.rs
index 93a3d4af54..44fad2d2dd 100644
--- a/datafusion/datasource-parquet/src/opener.rs
+++ b/datafusion/datasource-parquet/src/opener.rs
@@ -98,6 +98,7 @@ pub(super) struct ParquetOpener {
     /// Coerce INT96 timestamps to specific TimeUnit
     pub coerce_int96: Option<TimeUnit>,
     /// Optional parquet FileDecryptionProperties
+    #[cfg(feature = "parquet_encryption")]
     pub file_decryption_properties: Option<Arc<FileDecryptionProperties>>,
     /// Rewrite expressions in the context of the file schema
     pub(crate) expr_adapter_factory: Option<Arc<dyn 
PhysicalExprAdapterFactory>>,
@@ -151,9 +152,11 @@ impl FileOpener for ParquetOpener {
         let mut predicate_file_schema = Arc::clone(&self.logical_file_schema);
 
         let enable_page_index = self.enable_page_index;
+        #[cfg(feature = "parquet_encryption")]
         let encryption_context = self.get_encryption_context();
 
         Ok(Box::pin(async move {
+            #[cfg(feature = "parquet_encryption")]
             let file_decryption_properties = encryption_context
                 .get_file_decryption_properties(&file_location)
                 .await?;
@@ -502,6 +505,7 @@ where
 }
 
 #[derive(Default)]
+#[cfg_attr(not(feature = "parquet_encryption"), allow(dead_code))]
 struct EncryptionContext {
     #[cfg(feature = "parquet_encryption")]
     file_decryption_properties: Option<Arc<FileDecryptionProperties>>,
@@ -544,6 +548,7 @@ impl EncryptionContext {
 }
 
 #[cfg(not(feature = "parquet_encryption"))]
+#[allow(dead_code)]
 impl EncryptionContext {
     async fn get_file_decryption_properties(
         &self,
@@ -563,6 +568,7 @@ impl ParquetOpener {
     }
 
     #[cfg(not(feature = "parquet_encryption"))]
+    #[allow(dead_code)]
     fn get_encryption_context(&self) -> EncryptionContext {
         EncryptionContext::default()
     }
@@ -819,6 +825,7 @@ mod test {
                 schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory),
                 enable_row_group_stats_pruning: true,
                 coerce_int96: None,
+                #[cfg(feature = "parquet_encryption")]
                 file_decryption_properties: None,
                 expr_adapter_factory: 
Some(Arc::new(DefaultPhysicalExprAdapterFactory)),
                 #[cfg(feature = "parquet_encryption")]
@@ -907,6 +914,7 @@ mod test {
                 schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory),
                 enable_row_group_stats_pruning: true,
                 coerce_int96: None,
+                #[cfg(feature = "parquet_encryption")]
                 file_decryption_properties: None,
                 expr_adapter_factory: 
Some(Arc::new(DefaultPhysicalExprAdapterFactory)),
                 #[cfg(feature = "parquet_encryption")]
@@ -1011,6 +1019,7 @@ mod test {
                 schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory),
                 enable_row_group_stats_pruning: true,
                 coerce_int96: None,
+                #[cfg(feature = "parquet_encryption")]
                 file_decryption_properties: None,
                 expr_adapter_factory: 
Some(Arc::new(DefaultPhysicalExprAdapterFactory)),
                 #[cfg(feature = "parquet_encryption")]
@@ -1125,6 +1134,7 @@ mod test {
                 schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory),
                 enable_row_group_stats_pruning: false, // note that this is 
false!
                 coerce_int96: None,
+                #[cfg(feature = "parquet_encryption")]
                 file_decryption_properties: None,
                 expr_adapter_factory: 
Some(Arc::new(DefaultPhysicalExprAdapterFactory)),
                 #[cfg(feature = "parquet_encryption")]
@@ -1240,6 +1250,7 @@ mod test {
                 schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory),
                 enable_row_group_stats_pruning: true,
                 coerce_int96: None,
+                #[cfg(feature = "parquet_encryption")]
                 file_decryption_properties: None,
                 expr_adapter_factory: 
Some(Arc::new(DefaultPhysicalExprAdapterFactory)),
                 #[cfg(feature = "parquet_encryption")]
diff --git a/datafusion/datasource-parquet/src/source.rs 
b/datafusion/datasource-parquet/src/source.rs
index 007c239ef4..644cea85ca 100644
--- a/datafusion/datasource-parquet/src/source.rs
+++ b/datafusion/datasource-parquet/src/source.rs
@@ -52,6 +52,7 @@ use datafusion_physical_plan::metrics::Count;
 use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
 use datafusion_physical_plan::DisplayFormatType;
 
+#[cfg(feature = "parquet_encryption")]
 use datafusion_common::encryption::map_config_decryption_to_decryption;
 #[cfg(feature = "parquet_encryption")]
 use datafusion_execution::parquet_encryption::EncryptionFactory;
@@ -541,6 +542,7 @@ impl FileSource for ParquetSource {
                 Arc::new(DefaultParquetFileReaderFactory::new(object_store)) 
as _
             });
 
+        #[cfg(feature = "parquet_encryption")]
         let file_decryption_properties = self
             .table_parquet_options()
             .crypto
@@ -576,6 +578,7 @@ impl FileSource for ParquetSource {
             enable_row_group_stats_pruning: 
self.table_parquet_options.global.pruning,
             schema_adapter_factory,
             coerce_int96,
+            #[cfg(feature = "parquet_encryption")]
             file_decryption_properties,
             expr_adapter_factory,
             #[cfg(feature = "parquet_encryption")]
diff --git a/datafusion/physical-plan/src/repartition/mod.rs 
b/datafusion/physical-plan/src/repartition/mod.rs
index 3cd6ee6c1a..cd188a648f 100644
--- a/datafusion/physical-plan/src/repartition/mod.rs
+++ b/datafusion/physical-plan/src/repartition/mod.rs
@@ -1783,6 +1783,7 @@ mod test {
         let source1 = sorted_memory_exec(&schema, sort_exprs.clone());
         let source2 = sorted_memory_exec(&schema, sort_exprs);
         // output has multiple partitions, and is sorted
+        #[allow(deprecated)]
         let union = UnionExec::new(vec![source1, source2]);
         let exec =
             RepartitionExec::try_new(Arc::new(union), 
Partitioning::RoundRobinBatch(10))
@@ -1825,6 +1826,7 @@ mod test {
         let source1 = memory_exec(&schema);
         let source2 = memory_exec(&schema);
         // output has multiple partitions, but is not sorted
+        #[allow(deprecated)]
         let union = UnionExec::new(vec![source1, source2]);
         let exec =
             RepartitionExec::try_new(Arc::new(union), 
Partitioning::RoundRobinBatch(10))
diff --git a/datafusion/physical-plan/src/union.rs 
b/datafusion/physical-plan/src/union.rs
index aca03c57b1..f1e9ee53ac 100644
--- a/datafusion/physical-plan/src/union.rs
+++ b/datafusion/physical-plan/src/union.rs
@@ -101,8 +101,10 @@ pub struct UnionExec {
 
 impl UnionExec {
     /// Create a new UnionExec
+    #[deprecated(since = "44.0.0", note = "Use UnionExec::try_new instead")]
     pub fn new(inputs: Vec<Arc<dyn ExecutionPlan>>) -> Self {
-        let schema = union_schema(&inputs);
+        let schema =
+            union_schema(&inputs).expect("UnionExec::new called with empty 
inputs");
         // The schema of the inputs and the union schema is consistent when:
         // - They have the same number of fields, and
         // - Their fields have same types at the same indices.
@@ -116,6 +118,37 @@ impl UnionExec {
         }
     }
 
+    /// Try to create a new UnionExec.
+    ///
+    /// # Errors
+    /// Returns an error if:
+    /// - `inputs` is empty
+    ///
+    /// # Optimization
+    /// If there is only one input, returns that input directly rather than 
wrapping it in a UnionExec
+    pub fn try_new(
+        inputs: Vec<Arc<dyn ExecutionPlan>>,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        match inputs.len() {
+            0 => exec_err!("UnionExec requires at least one input"),
+            1 => Ok(inputs.into_iter().next().unwrap()),
+            _ => {
+                let schema = union_schema(&inputs)?;
+                // The schema of the inputs and the union schema is consistent 
when:
+                // - They have the same number of fields, and
+                // - Their fields have same types at the same indices.
+                // Here, we know that schemas are consistent and the call 
below can
+                // not return an error.
+                let cache = Self::compute_properties(&inputs, schema).unwrap();
+                Ok(Arc::new(UnionExec {
+                    inputs,
+                    metrics: ExecutionPlanMetricsSet::new(),
+                    cache,
+                }))
+            }
+        }
+    }
+
     /// Get inputs of the execution plan
     pub fn inputs(&self) -> &Vec<Arc<dyn ExecutionPlan>> {
         &self.inputs
@@ -220,7 +253,7 @@ impl ExecutionPlan for UnionExec {
         self: Arc<Self>,
         children: Vec<Arc<dyn ExecutionPlan>>,
     ) -> Result<Arc<dyn ExecutionPlan>> {
-        Ok(Arc::new(UnionExec::new(children)))
+        UnionExec::try_new(children)
     }
 
     fn execute(
@@ -319,6 +352,7 @@ impl ExecutionPlan for UnionExec {
             .map(|child| make_with_child(projection, child))
             .collect::<Result<Vec<_>>>()?;
 
+        #[allow(deprecated)]
         Ok(Some(Arc::new(UnionExec::new(new_children))))
     }
 }
@@ -373,7 +407,7 @@ impl InterleaveExec {
                 "Not all InterleaveExec children have a consistent hash 
partitioning"
             );
         }
-        let cache = Self::compute_properties(&inputs);
+        let cache = Self::compute_properties(&inputs)?;
         Ok(InterleaveExec {
             inputs,
             metrics: ExecutionPlanMetricsSet::new(),
@@ -387,17 +421,17 @@ impl InterleaveExec {
     }
 
     /// This function creates the cache object that stores the plan properties 
such as schema, equivalence properties, ordering, partitioning, etc.
-    fn compute_properties(inputs: &[Arc<dyn ExecutionPlan>]) -> PlanProperties 
{
-        let schema = union_schema(inputs);
+    fn compute_properties(inputs: &[Arc<dyn ExecutionPlan>]) -> 
Result<PlanProperties> {
+        let schema = union_schema(inputs)?;
         let eq_properties = EquivalenceProperties::new(schema);
         // Get output partitioning:
         let output_partitioning = inputs[0].output_partitioning().clone();
-        PlanProperties::new(
+        Ok(PlanProperties::new(
             eq_properties,
             output_partitioning,
             emission_type_from_children(inputs),
             boundedness_from_children(inputs),
-        )
+        ))
     }
 }
 
@@ -538,7 +572,11 @@ pub fn can_interleave<T: Borrow<Arc<dyn ExecutionPlan>>>(
             .all(|partition| partition == *reference)
 }
 
-fn union_schema(inputs: &[Arc<dyn ExecutionPlan>]) -> SchemaRef {
+fn union_schema(inputs: &[Arc<dyn ExecutionPlan>]) -> Result<SchemaRef> {
+    if inputs.is_empty() {
+        return exec_err!("Cannot create union schema from empty inputs");
+    }
+
     let first_schema = inputs[0].schema();
 
     let fields = (0..first_schema.fields().len())
@@ -581,7 +619,10 @@ fn union_schema(inputs: &[Arc<dyn ExecutionPlan>]) -> 
SchemaRef {
         .flat_map(|i| i.schema().metadata().clone().into_iter())
         .collect();
 
-    Arc::new(Schema::new_with_metadata(fields, all_metadata_merged))
+    Ok(Arc::new(Schema::new_with_metadata(
+        fields,
+        all_metadata_merged,
+    )))
 }
 
 /// CombinedRecordBatchStream can be used to combine a Vec of 
SendableRecordBatchStreams into one
@@ -710,6 +751,7 @@ mod tests {
         let csv = test::scan_partitioned(4);
         let csv2 = test::scan_partitioned(5);
 
+        #[allow(deprecated)]
         let union_exec = Arc::new(UnionExec::new(vec![csv, csv2]));
 
         // Should have 9 partitions and 9 output batches
@@ -892,6 +934,7 @@ mod tests {
             let mut union_expected_eq = 
EquivalenceProperties::new(Arc::clone(&schema));
             union_expected_eq.add_orderings(union_expected_orderings);
 
+            #[allow(deprecated)]
             let union = UnionExec::new(vec![child1, child2]);
             let union_eq_properties = 
union.properties().equivalence_properties();
             let err_msg = format!(
@@ -916,4 +959,66 @@ mod tests {
             assert!(lhs_orderings.contains(rhs_ordering), "{}", err_msg);
         }
     }
+
+    #[test]
+    fn test_union_empty_inputs() {
+        // Test that UnionExec::try_new fails with empty inputs
+        let result = UnionExec::try_new(vec![]);
+        assert!(result
+            .unwrap_err()
+            .to_string()
+            .contains("UnionExec requires at least one input"));
+    }
+
+    #[test]
+    fn test_union_schema_empty_inputs() {
+        // Test that union_schema fails with empty inputs
+        let result = union_schema(&[]);
+        assert!(result
+            .unwrap_err()
+            .to_string()
+            .contains("Cannot create union schema from empty inputs"));
+    }
+
+    #[test]
+    fn test_union_single_input() -> Result<()> {
+        // Test that UnionExec::try_new returns the single input directly
+        let schema = create_test_schema()?;
+        let memory_exec: Arc<dyn ExecutionPlan> =
+            Arc::new(TestMemoryExec::try_new(&[], Arc::clone(&schema), None)?);
+        let memory_exec_clone = Arc::clone(&memory_exec);
+        let result = UnionExec::try_new(vec![memory_exec])?;
+
+        // Check that the result is the same as the input (no UnionExec 
wrapper)
+        assert_eq!(result.schema(), schema);
+        // Verify it's the same execution plan
+        assert!(Arc::ptr_eq(&result, &memory_exec_clone));
+
+        Ok(())
+    }
+
+    #[test]
+    fn test_union_schema_multiple_inputs() -> Result<()> {
+        // Test that existing functionality with multiple inputs still works
+        let schema = create_test_schema()?;
+        let memory_exec1 =
+            Arc::new(TestMemoryExec::try_new(&[], Arc::clone(&schema), None)?);
+        let memory_exec2 =
+            Arc::new(TestMemoryExec::try_new(&[], Arc::clone(&schema), None)?);
+
+        let union_plan = UnionExec::try_new(vec![memory_exec1, memory_exec2])?;
+
+        // Downcast to verify it's a UnionExec
+        let union = union_plan
+            .as_any()
+            .downcast_ref::<UnionExec>()
+            .expect("Expected UnionExec");
+
+        // Check that schema is correct
+        assert_eq!(union.schema(), schema);
+        // Check that we have 2 inputs
+        assert_eq!(union.inputs().len(), 2);
+
+        Ok(())
+    }
 }
diff --git a/datafusion/proto/src/physical_plan/mod.rs 
b/datafusion/proto/src/physical_plan/mod.rs
index e577de5b1d..0d6bdeb08b 100644
--- a/datafusion/proto/src/physical_plan/mod.rs
+++ b/datafusion/proto/src/physical_plan/mod.rs
@@ -1405,6 +1405,7 @@ impl protobuf::PhysicalPlanNode {
         for input in &union.inputs {
             inputs.push(input.try_into_physical_plan(ctx, runtime, 
extension_codec)?);
         }
+        #[allow(deprecated)]
         Ok(Arc::new(UnionExec::new(inputs)))
     }
 
diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs 
b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
index a5357a132e..f408ec1a91 100644
--- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
+++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
@@ -1649,6 +1649,7 @@ fn roundtrip_union() -> Result<()> {
     let left = EmptyExec::new(Arc::new(schema_left));
     let right = EmptyExec::new(Arc::new(schema_right));
     let inputs: Vec<Arc<dyn ExecutionPlan>> = vec![Arc::new(left), 
Arc::new(right)];
+    #[allow(deprecated)]
     let union = UnionExec::new(inputs);
     roundtrip_test(Arc::new(union))
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org
For additional commands, e-mail: commits-h...@datafusion.apache.org

Reply via email to