This is an automated email from the ASF dual-hosted git repository. alamb pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push: new cce3f3f3b5 fix: column indices in FFI partition evaluator (#16480) cce3f3f3b5 is described below commit cce3f3f3b50f6c5e4f8e9594c2b5b09d220cb8cb Author: Tim Saucer <timsau...@gmail.com> AuthorDate: Fri Jun 27 16:59:20 2025 -0400 fix: column indices in FFI partition evaluator (#16480) * Column indices were not computed correctly, causing a panic * Add unit tests --- datafusion/ffi/src/udwf/mod.rs | 68 +++++++++++++++++++++- .../ffi/src/udwf/partition_evaluator_args.rs | 27 +++++---- 2 files changed, 84 insertions(+), 11 deletions(-) diff --git a/datafusion/ffi/src/udwf/mod.rs b/datafusion/ffi/src/udwf/mod.rs index aaa3f5c992..504bf7a411 100644 --- a/datafusion/ffi/src/udwf/mod.rs +++ b/datafusion/ffi/src/udwf/mod.rs @@ -363,4 +363,70 @@ impl From<&FFI_SortOptions> for SortOptions { } #[cfg(test)] -mod tests {} +#[cfg(feature = "integration-tests")] +mod tests { + use crate::tests::create_record_batch; + use crate::udwf::{FFI_WindowUDF, ForeignWindowUDF}; + use arrow::array::{create_array, ArrayRef}; + use datafusion::functions_window::lead_lag::{lag_udwf, WindowShift}; + use datafusion::logical_expr::expr::Sort; + use datafusion::logical_expr::{col, ExprFunctionExt, WindowUDF, WindowUDFImpl}; + use datafusion::prelude::SessionContext; + use std::sync::Arc; + + fn create_test_foreign_udwf( + original_udwf: impl WindowUDFImpl + 'static, + ) -> datafusion::common::Result<WindowUDF> { + let original_udwf = Arc::new(WindowUDF::from(original_udwf)); + + let local_udwf: FFI_WindowUDF = Arc::clone(&original_udwf).into(); + + let foreign_udwf: ForeignWindowUDF = (&local_udwf).try_into()?; + Ok(foreign_udwf.into()) + } + + #[test] + fn test_round_trip_udwf() -> datafusion::common::Result<()> { + let original_udwf = lag_udwf(); + let original_name = original_udwf.name().to_owned(); + + // Convert to FFI format + let local_udwf: FFI_WindowUDF = Arc::clone(&original_udwf).into(); + + // Convert back to native format + let foreign_udwf: ForeignWindowUDF = (&local_udwf).try_into()?; + let foreign_udwf: WindowUDF = foreign_udwf.into(); + + assert_eq!(original_name, foreign_udwf.name()); + Ok(()) + } + + #[tokio::test] + async fn test_lag_udwf() -> datafusion::common::Result<()> { + let udwf = create_test_foreign_udwf(WindowShift::lag())?; + + let ctx = SessionContext::default(); + let df = ctx.read_batch(create_record_batch(-5, 5))?; + + let df = df.select(vec![ + col("a"), + udwf.call(vec![col("a")]) + .order_by(vec![Sort::new(col("a"), true, true)]) + .build() + .unwrap() + .alias("lag_a"), + ])?; + + df.clone().show().await?; + + let result = df.collect().await?; + let expected = + create_array!(Int32, [None, Some(-5), Some(-4), Some(-3), Some(-2)]) + as ArrayRef; + + assert_eq!(result.len(), 1); + assert_eq!(result[0].column(1), &expected); + + Ok(()) + } +} diff --git a/datafusion/ffi/src/udwf/partition_evaluator_args.rs b/datafusion/ffi/src/udwf/partition_evaluator_args.rs index e74d47aa1a..dffeb23741 100644 --- a/datafusion/ffi/src/udwf/partition_evaluator_args.rs +++ b/datafusion/ffi/src/udwf/partition_evaluator_args.rs @@ -75,17 +75,24 @@ impl TryFrom<PartitionEvaluatorArgs<'_>> for FFI_PartitionEvaluatorArgs { }) .collect(); - let max_column = required_columns.keys().max().unwrap_or(&0).to_owned(); - let fields: Vec<_> = (0..max_column) - .map(|idx| match required_columns.get(&idx) { - Some((name, data_type)) => Field::new(*name, (*data_type).clone(), true), - None => Field::new( - format!("ffi_partition_evaluator_col_{idx}"), - DataType::Null, - true, - ), + let max_column = required_columns.keys().max(); + let fields: Vec<_> = max_column + .map(|max_column| { + (0..(max_column + 1)) + .map(|idx| match required_columns.get(&idx) { + Some((name, data_type)) => { + Field::new(*name, (*data_type).clone(), true) + } + None => Field::new( + format!("ffi_partition_evaluator_col_{idx}"), + DataType::Null, + true, + ), + }) + .collect() }) - .collect(); + .unwrap_or_default(); + let schema = Arc::new(Schema::new(fields)); let codec = DefaultPhysicalExtensionCodec {}; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org For additional commands, e-mail: commits-h...@datafusion.apache.org