This is an automated email from the ASF dual-hosted git repository. findepi pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push: new b122a1643a fix: prevent UnionExec panic with empty inputs (#17449) b122a1643a is described below commit b122a1643adf9f89c01adda5afbfb4bdc0284e85 Author: EeshanBembi <33062610+eeshanbe...@users.noreply.github.com> AuthorDate: Tue Sep 16 00:55:58 2025 +0530 fix: prevent UnionExec panic with empty inputs (#17449) * fix: prevent UnionExec panic with empty inputs This commit fixes a panic in UnionExec when constructed with empty inputs. Previously, UnionExec::new(vec![]) would cause an index out of bounds panic at union.rs:542 when trying to access inputs[0]. Changes: - Made UnionExec::new() return Result<Self> with proper validation - Made union_schema() return Result<SchemaRef> with empty input checks - Added descriptive error messages for empty input cases - Updated all call sites to handle the new Result return type - Added comprehensive tests for edge cases Error messages: - "UnionExec requires at least one input" - "Cannot create union schema from empty inputs" The fix maintains backward compatibility for valid inputs while preventing crashes and providing clear error messages for invalid usage. Fixes #17052 * refactor: address PR review comments for UnionExec empty inputs fix - Add new try_new method that returns Result<Arc<dyn ExecutionPlan>> - Deprecate existing new method in favor of try_new - Optimize single-input case: try_new returns the input directly - Remove redundant assert!(result.is_err()) from tests - Rename test_union_multiple_inputs_still_works to test_union_schema_multiple_inputs - Update all call sites to use appropriate API (try_new for new code, deprecated new for tests) This maintains backward compatibility while providing better error handling and optimization for single-input cases. * Fix cargo fmt and clippy warnings - Add proper feature gates for parquet_encryption in datasource-parquet - Format code to pass cargo fmt checks - All tests passing * Fix clippy --------- Co-authored-by: Eeshan <eeshan@Eeshans-MacBook-Pro.local> Co-authored-by: ebembi-crdb <ebe...@cockroachlabs.com> Co-authored-by: Andrew Lamb <and...@nerdnetworks.org> --- datafusion/core/src/physical_planner.rs | 2 +- .../physical_optimizer/enforce_distribution.rs | 2 + .../physical_optimizer/partition_statistics.rs | 1 + .../physical_optimizer/projection_pushdown.rs | 1 + .../core/tests/physical_optimizer/test_utils.rs | 1 + datafusion/datasource-parquet/src/opener.rs | 11 ++ datafusion/datasource-parquet/src/source.rs | 3 + datafusion/physical-plan/src/repartition/mod.rs | 2 + datafusion/physical-plan/src/union.rs | 123 +++++++++++++++++++-- datafusion/proto/src/physical_plan/mod.rs | 1 + .../proto/tests/cases/roundtrip_physical_plan.rs | 1 + 11 files changed, 138 insertions(+), 10 deletions(-) diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index d7f30609a4..23d19d4ca4 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -1250,7 +1250,7 @@ impl DefaultPhysicalPlanner { } // N Children - LogicalPlan::Union(_) => Arc::new(UnionExec::new(children.vec())), + LogicalPlan::Union(_) => UnionExec::try_new(children.vec())?, LogicalPlan::Extension(Extension { node }) => { let mut maybe_plan = None; let children = children.vec(); diff --git a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs index e0826c90dd..1ddeb3c611 100644 --- a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs +++ b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs @@ -1783,6 +1783,7 @@ fn union_to_interleave() -> Result<()> { ); // Union + #[allow(deprecated)] let plan = Arc::new(UnionExec::new(vec![left, right])); // final agg @@ -1827,6 +1828,7 @@ fn union_not_to_interleave() -> Result<()> { ); // Union + #[allow(deprecated)] let plan = Arc::new(UnionExec::new(vec![left, right])); // final agg diff --git a/datafusion/core/tests/physical_optimizer/partition_statistics.rs b/datafusion/core/tests/physical_optimizer/partition_statistics.rs index 2332d79ac9..26f179a6cd 100644 --- a/datafusion/core/tests/physical_optimizer/partition_statistics.rs +++ b/datafusion/core/tests/physical_optimizer/partition_statistics.rs @@ -356,6 +356,7 @@ mod test { #[tokio::test] async fn test_statistic_by_partition_of_union() -> Result<()> { let scan = create_scan_exec_with_statistics(None, Some(2)).await; + #[allow(deprecated)] let union_exec: Arc<dyn ExecutionPlan> = Arc::new(UnionExec::new(vec![scan.clone(), scan])); let statistics = (0..union_exec.output_partitioning().partition_count()) diff --git a/datafusion/core/tests/physical_optimizer/projection_pushdown.rs b/datafusion/core/tests/physical_optimizer/projection_pushdown.rs index 7160ed4184..ab753d00b4 100644 --- a/datafusion/core/tests/physical_optimizer/projection_pushdown.rs +++ b/datafusion/core/tests/physical_optimizer/projection_pushdown.rs @@ -1535,6 +1535,7 @@ fn test_sort_preserving_after_projection() -> Result<()> { #[test] fn test_union_after_projection() -> Result<()> { let csv = create_simple_csv_exec(); + #[allow(deprecated)] let union = Arc::new(UnionExec::new(vec![csv.clone(), csv.clone(), csv])); let projection: Arc<dyn ExecutionPlan> = Arc::new(ProjectionExec::try_new( vec![ diff --git a/datafusion/core/tests/physical_optimizer/test_utils.rs b/datafusion/core/tests/physical_optimizer/test_utils.rs index 69dbe04927..49efe24fb8 100644 --- a/datafusion/core/tests/physical_optimizer/test_utils.rs +++ b/datafusion/core/tests/physical_optimizer/test_utils.rs @@ -304,6 +304,7 @@ pub fn sort_preserving_merge_exec_with_fetch( } pub fn union_exec(input: Vec<Arc<dyn ExecutionPlan>>) -> Arc<dyn ExecutionPlan> { + #[allow(deprecated)] Arc::new(UnionExec::new(input)) } diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index 93a3d4af54..44fad2d2dd 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -98,6 +98,7 @@ pub(super) struct ParquetOpener { /// Coerce INT96 timestamps to specific TimeUnit pub coerce_int96: Option<TimeUnit>, /// Optional parquet FileDecryptionProperties + #[cfg(feature = "parquet_encryption")] pub file_decryption_properties: Option<Arc<FileDecryptionProperties>>, /// Rewrite expressions in the context of the file schema pub(crate) expr_adapter_factory: Option<Arc<dyn PhysicalExprAdapterFactory>>, @@ -151,9 +152,11 @@ impl FileOpener for ParquetOpener { let mut predicate_file_schema = Arc::clone(&self.logical_file_schema); let enable_page_index = self.enable_page_index; + #[cfg(feature = "parquet_encryption")] let encryption_context = self.get_encryption_context(); Ok(Box::pin(async move { + #[cfg(feature = "parquet_encryption")] let file_decryption_properties = encryption_context .get_file_decryption_properties(&file_location) .await?; @@ -502,6 +505,7 @@ where } #[derive(Default)] +#[cfg_attr(not(feature = "parquet_encryption"), allow(dead_code))] struct EncryptionContext { #[cfg(feature = "parquet_encryption")] file_decryption_properties: Option<Arc<FileDecryptionProperties>>, @@ -544,6 +548,7 @@ impl EncryptionContext { } #[cfg(not(feature = "parquet_encryption"))] +#[allow(dead_code)] impl EncryptionContext { async fn get_file_decryption_properties( &self, @@ -563,6 +568,7 @@ impl ParquetOpener { } #[cfg(not(feature = "parquet_encryption"))] + #[allow(dead_code)] fn get_encryption_context(&self) -> EncryptionContext { EncryptionContext::default() } @@ -819,6 +825,7 @@ mod test { schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory), enable_row_group_stats_pruning: true, coerce_int96: None, + #[cfg(feature = "parquet_encryption")] file_decryption_properties: None, expr_adapter_factory: Some(Arc::new(DefaultPhysicalExprAdapterFactory)), #[cfg(feature = "parquet_encryption")] @@ -907,6 +914,7 @@ mod test { schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory), enable_row_group_stats_pruning: true, coerce_int96: None, + #[cfg(feature = "parquet_encryption")] file_decryption_properties: None, expr_adapter_factory: Some(Arc::new(DefaultPhysicalExprAdapterFactory)), #[cfg(feature = "parquet_encryption")] @@ -1011,6 +1019,7 @@ mod test { schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory), enable_row_group_stats_pruning: true, coerce_int96: None, + #[cfg(feature = "parquet_encryption")] file_decryption_properties: None, expr_adapter_factory: Some(Arc::new(DefaultPhysicalExprAdapterFactory)), #[cfg(feature = "parquet_encryption")] @@ -1125,6 +1134,7 @@ mod test { schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory), enable_row_group_stats_pruning: false, // note that this is false! coerce_int96: None, + #[cfg(feature = "parquet_encryption")] file_decryption_properties: None, expr_adapter_factory: Some(Arc::new(DefaultPhysicalExprAdapterFactory)), #[cfg(feature = "parquet_encryption")] @@ -1240,6 +1250,7 @@ mod test { schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory), enable_row_group_stats_pruning: true, coerce_int96: None, + #[cfg(feature = "parquet_encryption")] file_decryption_properties: None, expr_adapter_factory: Some(Arc::new(DefaultPhysicalExprAdapterFactory)), #[cfg(feature = "parquet_encryption")] diff --git a/datafusion/datasource-parquet/src/source.rs b/datafusion/datasource-parquet/src/source.rs index 007c239ef4..644cea85ca 100644 --- a/datafusion/datasource-parquet/src/source.rs +++ b/datafusion/datasource-parquet/src/source.rs @@ -52,6 +52,7 @@ use datafusion_physical_plan::metrics::Count; use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; use datafusion_physical_plan::DisplayFormatType; +#[cfg(feature = "parquet_encryption")] use datafusion_common::encryption::map_config_decryption_to_decryption; #[cfg(feature = "parquet_encryption")] use datafusion_execution::parquet_encryption::EncryptionFactory; @@ -541,6 +542,7 @@ impl FileSource for ParquetSource { Arc::new(DefaultParquetFileReaderFactory::new(object_store)) as _ }); + #[cfg(feature = "parquet_encryption")] let file_decryption_properties = self .table_parquet_options() .crypto @@ -576,6 +578,7 @@ impl FileSource for ParquetSource { enable_row_group_stats_pruning: self.table_parquet_options.global.pruning, schema_adapter_factory, coerce_int96, + #[cfg(feature = "parquet_encryption")] file_decryption_properties, expr_adapter_factory, #[cfg(feature = "parquet_encryption")] diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index 3cd6ee6c1a..cd188a648f 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -1783,6 +1783,7 @@ mod test { let source1 = sorted_memory_exec(&schema, sort_exprs.clone()); let source2 = sorted_memory_exec(&schema, sort_exprs); // output has multiple partitions, and is sorted + #[allow(deprecated)] let union = UnionExec::new(vec![source1, source2]); let exec = RepartitionExec::try_new(Arc::new(union), Partitioning::RoundRobinBatch(10)) @@ -1825,6 +1826,7 @@ mod test { let source1 = memory_exec(&schema); let source2 = memory_exec(&schema); // output has multiple partitions, but is not sorted + #[allow(deprecated)] let union = UnionExec::new(vec![source1, source2]); let exec = RepartitionExec::try_new(Arc::new(union), Partitioning::RoundRobinBatch(10)) diff --git a/datafusion/physical-plan/src/union.rs b/datafusion/physical-plan/src/union.rs index aca03c57b1..f1e9ee53ac 100644 --- a/datafusion/physical-plan/src/union.rs +++ b/datafusion/physical-plan/src/union.rs @@ -101,8 +101,10 @@ pub struct UnionExec { impl UnionExec { /// Create a new UnionExec + #[deprecated(since = "44.0.0", note = "Use UnionExec::try_new instead")] pub fn new(inputs: Vec<Arc<dyn ExecutionPlan>>) -> Self { - let schema = union_schema(&inputs); + let schema = + union_schema(&inputs).expect("UnionExec::new called with empty inputs"); // The schema of the inputs and the union schema is consistent when: // - They have the same number of fields, and // - Their fields have same types at the same indices. @@ -116,6 +118,37 @@ impl UnionExec { } } + /// Try to create a new UnionExec. + /// + /// # Errors + /// Returns an error if: + /// - `inputs` is empty + /// + /// # Optimization + /// If there is only one input, returns that input directly rather than wrapping it in a UnionExec + pub fn try_new( + inputs: Vec<Arc<dyn ExecutionPlan>>, + ) -> Result<Arc<dyn ExecutionPlan>> { + match inputs.len() { + 0 => exec_err!("UnionExec requires at least one input"), + 1 => Ok(inputs.into_iter().next().unwrap()), + _ => { + let schema = union_schema(&inputs)?; + // The schema of the inputs and the union schema is consistent when: + // - They have the same number of fields, and + // - Their fields have same types at the same indices. + // Here, we know that schemas are consistent and the call below can + // not return an error. + let cache = Self::compute_properties(&inputs, schema).unwrap(); + Ok(Arc::new(UnionExec { + inputs, + metrics: ExecutionPlanMetricsSet::new(), + cache, + })) + } + } + } + /// Get inputs of the execution plan pub fn inputs(&self) -> &Vec<Arc<dyn ExecutionPlan>> { &self.inputs @@ -220,7 +253,7 @@ impl ExecutionPlan for UnionExec { self: Arc<Self>, children: Vec<Arc<dyn ExecutionPlan>>, ) -> Result<Arc<dyn ExecutionPlan>> { - Ok(Arc::new(UnionExec::new(children))) + UnionExec::try_new(children) } fn execute( @@ -319,6 +352,7 @@ impl ExecutionPlan for UnionExec { .map(|child| make_with_child(projection, child)) .collect::<Result<Vec<_>>>()?; + #[allow(deprecated)] Ok(Some(Arc::new(UnionExec::new(new_children)))) } } @@ -373,7 +407,7 @@ impl InterleaveExec { "Not all InterleaveExec children have a consistent hash partitioning" ); } - let cache = Self::compute_properties(&inputs); + let cache = Self::compute_properties(&inputs)?; Ok(InterleaveExec { inputs, metrics: ExecutionPlanMetricsSet::new(), @@ -387,17 +421,17 @@ impl InterleaveExec { } /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc. - fn compute_properties(inputs: &[Arc<dyn ExecutionPlan>]) -> PlanProperties { - let schema = union_schema(inputs); + fn compute_properties(inputs: &[Arc<dyn ExecutionPlan>]) -> Result<PlanProperties> { + let schema = union_schema(inputs)?; let eq_properties = EquivalenceProperties::new(schema); // Get output partitioning: let output_partitioning = inputs[0].output_partitioning().clone(); - PlanProperties::new( + Ok(PlanProperties::new( eq_properties, output_partitioning, emission_type_from_children(inputs), boundedness_from_children(inputs), - ) + )) } } @@ -538,7 +572,11 @@ pub fn can_interleave<T: Borrow<Arc<dyn ExecutionPlan>>>( .all(|partition| partition == *reference) } -fn union_schema(inputs: &[Arc<dyn ExecutionPlan>]) -> SchemaRef { +fn union_schema(inputs: &[Arc<dyn ExecutionPlan>]) -> Result<SchemaRef> { + if inputs.is_empty() { + return exec_err!("Cannot create union schema from empty inputs"); + } + let first_schema = inputs[0].schema(); let fields = (0..first_schema.fields().len()) @@ -581,7 +619,10 @@ fn union_schema(inputs: &[Arc<dyn ExecutionPlan>]) -> SchemaRef { .flat_map(|i| i.schema().metadata().clone().into_iter()) .collect(); - Arc::new(Schema::new_with_metadata(fields, all_metadata_merged)) + Ok(Arc::new(Schema::new_with_metadata( + fields, + all_metadata_merged, + ))) } /// CombinedRecordBatchStream can be used to combine a Vec of SendableRecordBatchStreams into one @@ -710,6 +751,7 @@ mod tests { let csv = test::scan_partitioned(4); let csv2 = test::scan_partitioned(5); + #[allow(deprecated)] let union_exec = Arc::new(UnionExec::new(vec![csv, csv2])); // Should have 9 partitions and 9 output batches @@ -892,6 +934,7 @@ mod tests { let mut union_expected_eq = EquivalenceProperties::new(Arc::clone(&schema)); union_expected_eq.add_orderings(union_expected_orderings); + #[allow(deprecated)] let union = UnionExec::new(vec![child1, child2]); let union_eq_properties = union.properties().equivalence_properties(); let err_msg = format!( @@ -916,4 +959,66 @@ mod tests { assert!(lhs_orderings.contains(rhs_ordering), "{}", err_msg); } } + + #[test] + fn test_union_empty_inputs() { + // Test that UnionExec::try_new fails with empty inputs + let result = UnionExec::try_new(vec![]); + assert!(result + .unwrap_err() + .to_string() + .contains("UnionExec requires at least one input")); + } + + #[test] + fn test_union_schema_empty_inputs() { + // Test that union_schema fails with empty inputs + let result = union_schema(&[]); + assert!(result + .unwrap_err() + .to_string() + .contains("Cannot create union schema from empty inputs")); + } + + #[test] + fn test_union_single_input() -> Result<()> { + // Test that UnionExec::try_new returns the single input directly + let schema = create_test_schema()?; + let memory_exec: Arc<dyn ExecutionPlan> = + Arc::new(TestMemoryExec::try_new(&[], Arc::clone(&schema), None)?); + let memory_exec_clone = Arc::clone(&memory_exec); + let result = UnionExec::try_new(vec![memory_exec])?; + + // Check that the result is the same as the input (no UnionExec wrapper) + assert_eq!(result.schema(), schema); + // Verify it's the same execution plan + assert!(Arc::ptr_eq(&result, &memory_exec_clone)); + + Ok(()) + } + + #[test] + fn test_union_schema_multiple_inputs() -> Result<()> { + // Test that existing functionality with multiple inputs still works + let schema = create_test_schema()?; + let memory_exec1 = + Arc::new(TestMemoryExec::try_new(&[], Arc::clone(&schema), None)?); + let memory_exec2 = + Arc::new(TestMemoryExec::try_new(&[], Arc::clone(&schema), None)?); + + let union_plan = UnionExec::try_new(vec![memory_exec1, memory_exec2])?; + + // Downcast to verify it's a UnionExec + let union = union_plan + .as_any() + .downcast_ref::<UnionExec>() + .expect("Expected UnionExec"); + + // Check that schema is correct + assert_eq!(union.schema(), schema); + // Check that we have 2 inputs + assert_eq!(union.inputs().len(), 2); + + Ok(()) + } } diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs index e577de5b1d..0d6bdeb08b 100644 --- a/datafusion/proto/src/physical_plan/mod.rs +++ b/datafusion/proto/src/physical_plan/mod.rs @@ -1405,6 +1405,7 @@ impl protobuf::PhysicalPlanNode { for input in &union.inputs { inputs.push(input.try_into_physical_plan(ctx, runtime, extension_codec)?); } + #[allow(deprecated)] Ok(Arc::new(UnionExec::new(inputs))) } diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index a5357a132e..f408ec1a91 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -1649,6 +1649,7 @@ fn roundtrip_union() -> Result<()> { let left = EmptyExec::new(Arc::new(schema_left)); let right = EmptyExec::new(Arc::new(schema_right)); let inputs: Vec<Arc<dyn ExecutionPlan>> = vec![Arc::new(left), Arc::new(right)]; + #[allow(deprecated)] let union = UnionExec::new(inputs); roundtrip_test(Arc::new(union)) } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org For additional commands, e-mail: commits-h...@datafusion.apache.org