Copilot commented on code in PR #22702:
URL: https://github.com/apache/datafusion/pull/22702#discussion_r3336003243


##########
datafusion/core/src/dataframe/mod.rs:
##########
@@ -2527,6 +2528,78 @@ impl DataFrame {
             .collect()
     }
 
+    /// Fill NaN values in specified columns with a given value
+    /// If no columns are specified (empty vector), applies to all columns
+    /// Only fills if the value can be cast to the column's type
+    ///
+    /// # Arguments
+    /// * `value` - Value to fill NaNs with
+    /// * `columns` - List of column names to fill. If empty, fills all 
columns.
+    ///
+    /// # Example
+    /// ```
+    /// # use datafusion::prelude::*;
+    /// # use datafusion::error::Result;
+    /// # use datafusion_common::ScalarValue;
+    /// # #[tokio::main]
+    /// # async fn main() -> Result<()> {
+    /// let ctx = SessionContext::new();
+    /// let df = ctx
+    ///     .read_csv("tests/data/example.csv", CsvReadOptions::new())
+    ///     .await?;
+    /// // Fill NaN in only columns "a" and "c":
+    /// let df = df.fill_nan(ScalarValue::from(0.0), vec!["a".to_owned(), 
"c".to_owned()])?;
+    /// // Fill NaN across all columns:
+    /// let df = df.fill_nan(ScalarValue::from(0.0), vec![])?;
+    /// # Ok(())
+    /// # }
+    /// ```
+    #[expect(clippy::needless_pass_by_value)]
+    pub fn fill_nan(
+        &self,
+        value: ScalarValue,
+        columns: Vec<String>,
+    ) -> Result<DataFrame> {
+        let cols = if columns.is_empty() {
+            self.logical_plan()
+                .schema()
+                .fields()
+                .iter()
+                .map(Arc::clone)
+                .collect()
+        } else {
+            self.find_columns(&columns)?
+        };
+
+        let projections = self
+            .logical_plan()
+            .schema()
+            .fields()
+            .iter()
+            .map(|field| {
+                if cols.contains(field) && field.data_type().is_floating() {
+                    // Try to cast fill value to column type. If the cast 
fails, fallback to the original column.
+                    match value.clone().cast_to(field.data_type()) {
+                        Ok(fill_value) => Expr::Alias(Alias {
+                            expr: Box::new(Expr::ScalarFunction(ScalarFunction 
{
+                                func: nanvl(),
+                                args: vec![col(field.name()), lit(fill_value)],
+                            })),
+                            relation: None,
+                            name: field.name().to_string(),
+                            metadata: None,
+                        }),
+                        Err(_) => col(field.name()),
+                    }

Review Comment:
   Silently falling back to the original column when the cast fails can mask 
user errors (e.g., passing an integer scalar for a Float32 column they expected 
to fill). Consider returning an error from `fill_nan` when the user-provided 
value cannot be cast to a targeted column's type, which gives clearer feedback 
than silently leaving NaNs in place. At minimum, this fallback behavior should 
be documented in the doc comment above.



##########
datafusion/core/src/dataframe/mod.rs:
##########
@@ -2527,6 +2528,78 @@ impl DataFrame {
             .collect()
     }
 
+    /// Fill NaN values in specified columns with a given value
+    /// If no columns are specified (empty vector), applies to all columns
+    /// Only fills if the value can be cast to the column's type
+    ///
+    /// # Arguments
+    /// * `value` - Value to fill NaNs with
+    /// * `columns` - List of column names to fill. If empty, fills all 
columns.
+    ///
+    /// # Example
+    /// ```
+    /// # use datafusion::prelude::*;
+    /// # use datafusion::error::Result;
+    /// # use datafusion_common::ScalarValue;
+    /// # #[tokio::main]
+    /// # async fn main() -> Result<()> {
+    /// let ctx = SessionContext::new();
+    /// let df = ctx
+    ///     .read_csv("tests/data/example.csv", CsvReadOptions::new())
+    ///     .await?;
+    /// // Fill NaN in only columns "a" and "c":
+    /// let df = df.fill_nan(ScalarValue::from(0.0), vec!["a".to_owned(), 
"c".to_owned()])?;
+    /// // Fill NaN across all columns:
+    /// let df = df.fill_nan(ScalarValue::from(0.0), vec![])?;
+    /// # Ok(())
+    /// # }
+    /// ```
+    #[expect(clippy::needless_pass_by_value)]
+    pub fn fill_nan(
+        &self,
+        value: ScalarValue,
+        columns: Vec<String>,
+    ) -> Result<DataFrame> {
+        let cols = if columns.is_empty() {
+            self.logical_plan()
+                .schema()
+                .fields()
+                .iter()
+                .map(Arc::clone)
+                .collect()
+        } else {
+            self.find_columns(&columns)?
+        };
+
+        let projections = self
+            .logical_plan()
+            .schema()
+            .fields()
+            .iter()
+            .map(|field| {
+                if cols.contains(field) && field.data_type().is_floating() {
+                    // Try to cast fill value to column type. If the cast 
fails, fallback to the original column.
+                    match value.clone().cast_to(field.data_type()) {
+                        Ok(fill_value) => Expr::Alias(Alias {
+                            expr: Box::new(Expr::ScalarFunction(ScalarFunction 
{
+                                func: nanvl(),
+                                args: vec![col(field.name()), lit(fill_value)],
+                            })),
+                            relation: None,
+                            name: field.name().to_string(),
+                            metadata: None,
+                        }),
+                        Err(_) => col(field.name()),
+                    }
+                } else {
+                    col(field.name())
+                }
+            })
+            .collect::<Vec<_>>();
+
+        self.clone().select(projections)

Review Comment:
   `self.clone()` clones the entire `DataFrame` (including session state) only 
to call `select`. Since the other `fill_*` methods in this file likely follow a 
similar pattern, this may be consistent, but if `select` only requires `&self` 
or a moved plan, consider avoiding the clone to reduce overhead on large plans.



##########
datafusion/core/tests/dataframe/mod.rs:
##########
@@ -6539,6 +6539,83 @@ async fn test_fill_null_all_columns() -> Result<()> {
     Ok(())
 }
 
+async fn create_nan_table() -> Result<DataFrame> {

Review Comment:
   The tests do not cover several important behaviors of `fill_nan`: (1) a 
non-float column explicitly named in `columns` (verifying it is left unchanged 
rather than erroring), (2) a column name that does not exist in the schema (to 
confirm `find_columns` error propagation), and (3) a `ScalarValue` whose type 
cannot be cast to the column's float type (to verify the documented fallback 
behavior). Adding these would meaningfully strengthen coverage of the new API 
surface.



##########
datafusion/core/tests/dataframe/mod.rs:
##########
@@ -6539,6 +6539,83 @@ async fn test_fill_null_all_columns() -> Result<()> {
     Ok(())
 }
 
+async fn create_nan_table() -> Result<DataFrame> {
+    // create a DataFrame with a NaN value in a float column "a" and a
+    // non-float column "b" that must stay untouched by fill_nan.
+    //    "+-----+---+",
+    //    "| a   | b |",
+    //    "+-----+---+",
+    //    "| 1.0 | 1 |",
+    //    "| NaN | 2 |",
+    //    "| 3.0 | 3 |",
+    //    "+-----+---+",
+    let schema = Arc::new(Schema::new(vec![
+        Field::new("a", DataType::Float64, true),
+        Field::new("b", DataType::Int32, true),
+    ]));
+    let a_values = Float64Array::from(vec![Some(1.0), Some(f64::NAN), 
Some(3.0)]);
+    let b_values = Int32Array::from(vec![Some(1), Some(2), Some(3)]);
+    let batch = RecordBatch::try_new(
+        schema.clone(),
+        vec![Arc::new(a_values), Arc::new(b_values)],
+    )?;
+
+    let ctx = SessionContext::new();
+    let table = MemTable::try_new(schema.clone(), vec![vec![batch]])?;
+    ctx.register_table("t_nan", Arc::new(table))?;
+    let df = ctx.table("t_nan").await?;
+    Ok(df)
+}
+
+#[tokio::test]
+async fn test_fill_nan() -> Result<()> {
+    let df = create_nan_table().await?;
+
+    // Fill NaNs in the float column "a" with 0.0.
+    let df_filled =
+        df.fill_nan(ScalarValue::Float64(Some(0.0)), vec!["a".to_string()])?;
+
+    let results = df_filled.collect().await?;
+    assert_snapshot!(
+        batches_to_sort_string(&results),
+        @r"
+    +-----+---+
+    | a   | b |
+    +-----+---+
+    | 0.0 | 2 |
+    | 1.0 | 1 |
+    | 3.0 | 3 |
+    +-----+---+
+    "
+    );
+
+    Ok(())
+}
+
+#[tokio::test]
+async fn test_fill_nan_all_columns() -> Result<()> {
+    let df = create_nan_table().await?;
+
+    // Fill NaNs across all columns. Only the float column "a" is affected;
+    // the non-float column "b" is left unchanged since NaN only exists for
+    // floating-point types.
+    let df_filled = df.fill_nan(ScalarValue::Float64(Some(0.0)), vec![])?;
+
+    let results = df_filled.collect().await?;
+    assert_snapshot!(
+        batches_to_sort_string(&results),
+        @r"
+    +-----+---+
+    | a   | b |
+    +-----+---+
+    | 0.0 | 2 |
+    | 1.0 | 1 |
+    | 3.0 | 3 |
+    +-----+---+
+    "
+    );
+    Ok(())
+}
 #[tokio::test]

Review Comment:
   Missing blank line between the end of `test_fill_nan_all_columns` and the 
next `#[tokio::test]` attribute, which is inconsistent with the spacing used 
between other tests in this file.



##########
datafusion/core/tests/dataframe/mod.rs:
##########
@@ -6539,6 +6539,83 @@ async fn test_fill_null_all_columns() -> Result<()> {
     Ok(())
 }
 
+async fn create_nan_table() -> Result<DataFrame> {

Review Comment:
   `create_nan_table` is an async helper used only by tests but is not gated 
behind `#[cfg(test)]` nor marked as a test. Since this is inside a test module 
file, it's likely fine, but consider marking it `pub(crate)`-free and adding a 
brief doc note, or co-locating it nearer to the tests that use it for clarity.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to