This is an automated email from the ASF dual-hosted git repository.

dongjoon-hyun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/spark-connect-swift.git


The following commit(s) were added to refs/heads/main by this push:
     new 9fc11ff  [SPARK-57304] Support `na.(fill|drop|replace)` for `DataFrame`
9fc11ff is described below

commit 9fc11ff80c349430b1371cac82c670fd46f71967
Author: Dongjoon Hyun <[email protected]>
AuthorDate: Sun Jun 7 13:25:32 2026 -0700

    [SPARK-57304] Support `na.(fill|drop|replace)` for `DataFrame`
    
    ### What changes were proposed in this pull request?
    
    This PR adds the `DataFrameNaFunctions` API (via `DataFrame.na`), mirroring
    PySpark's `df.na.fill/drop/replace` and wiring the existing `NAFill`, 
`NADrop`,
    and `NAReplace` Spark Connect messages into the Swift client.
    
    - `fill`, `drop`, `replace` on `DataFrame.na`
    - `getNAFill`, `getNADrop`, `getNAReplace` plan builders in 
`SparkConnectClient`
    
    ### Why are the changes needed?
    
    To reach feature parity with PySpark/Spark SQL for handling missing data.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, this adds new public APIs.
    
    ```swift
    try await df.na.fill(0).show()
    try await df.na.drop().show()
    try await df.na.replace("s", ["a": "z"]).show()
    ```
    
    ### How was this patch tested?
    
    Pass the CI build.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    Generated-by: Claude Code (Claude Opus 4.8)
    
    Closes #406 from dongjoon-hyun/SPARK-57304.
    
    Authored-by: Dongjoon Hyun <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 Sources/SparkConnect/DataFrameNaFunctions.swift | 199 ++++++++++++++++++++++++
 Sources/SparkConnect/SparkConnectClient.swift   |  38 +++++
 Tests/SparkConnectTests/DataFrameTests.swift    |  70 +++++++++
 3 files changed, 307 insertions(+)

diff --git a/Sources/SparkConnect/DataFrameNaFunctions.swift 
b/Sources/SparkConnect/DataFrameNaFunctions.swift
new file mode 100644
index 0000000..4977074
--- /dev/null
+++ b/Sources/SparkConnect/DataFrameNaFunctions.swift
@@ -0,0 +1,199 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+/// Functionality for working with missing data in ``DataFrame``s.
+///
+/// Use ``DataFrame/na`` to access this. It mirrors PySpark's 
`DataFrameNaFunctions`
+/// (`df.na.fill`, `df.na.drop`, `df.na.replace`).
+public actor DataFrameNaFunctions: Sendable {
+  let df: DataFrame
+
+  init(df: DataFrame) {
+    self.df = df
+  }
+
+  // MARK: - Fill
+
+  /// Returns a new ``DataFrame`` that replaces null values in boolean columns 
with `value`.
+  /// - Parameters:
+  ///   - value: The replacement value.
+  ///   - cols: Column names to consider. When empty, all type-compatible 
columns are considered.
+  /// - Returns: A ``DataFrame``.
+  public func fill(_ value: Bool, _ cols: [String] = []) async -> DataFrame {
+    var literal = ExpressionLiteral()
+    literal.boolean = value
+    return await transform { SparkConnectClient.getNAFill($0, cols, [literal]) 
}
+  }
+
+  /// Returns a new ``DataFrame`` that replaces null values in numeric columns 
with `value`.
+  /// - Parameters:
+  ///   - value: The replacement value.
+  ///   - cols: Column names to consider. When empty, all type-compatible 
columns are considered.
+  /// - Returns: A ``DataFrame``.
+  public func fill(_ value: Int, _ cols: [String] = []) async -> DataFrame {
+    var literal = ExpressionLiteral()
+    literal.long = Int64(value)
+    return await transform { SparkConnectClient.getNAFill($0, cols, [literal]) 
}
+  }
+
+  /// Returns a new ``DataFrame`` that replaces null values in numeric columns 
with `value`.
+  /// - Parameters:
+  ///   - value: The replacement value.
+  ///   - cols: Column names to consider. When empty, all type-compatible 
columns are considered.
+  /// - Returns: A ``DataFrame``.
+  public func fill(_ value: Double, _ cols: [String] = []) async -> DataFrame {
+    var literal = ExpressionLiteral()
+    literal.double = value
+    return await transform { SparkConnectClient.getNAFill($0, cols, [literal]) 
}
+  }
+
+  /// Returns a new ``DataFrame`` that replaces null values in string columns 
with `value`.
+  /// - Parameters:
+  ///   - value: The replacement value.
+  ///   - cols: Column names to consider. When empty, all type-compatible 
columns are considered.
+  /// - Returns: A ``DataFrame``.
+  public func fill(_ value: String, _ cols: [String] = []) async -> DataFrame {
+    var literal = ExpressionLiteral()
+    literal.string = value
+    return await transform { SparkConnectClient.getNAFill($0, cols, [literal]) 
}
+  }
+
+  /// Returns a new ``DataFrame`` that replaces null values, using the column 
name to replacement
+  /// value mapping. The replacement values must be of type `Bool`, `Int64`, 
`Double`, or `String`.
+  /// - Parameter valueMap: A dictionary of column name and replacement value.
+  /// - Returns: A ``DataFrame``.
+  public func fill(_ valueMap: [String: Sendable]) async -> DataFrame {
+    let cols = Array(valueMap.keys)
+    let values = cols.map { fillLiteral(valueMap[$0]!) }
+    return await transform { SparkConnectClient.getNAFill($0, cols, values) }
+  }
+
+  // MARK: - Drop
+
+  /// Returns a new ``DataFrame`` that drops rows containing null values.
+  /// - Parameters:
+  ///   - how: `any` (default) drops a row if it contains any null value, 
`all` drops a row only
+  ///   if every considered value is null.
+  ///   - cols: Column names to consider. When empty, all columns are 
considered.
+  /// - Returns: A ``DataFrame``.
+  public func drop(how: String = "any", _ cols: [String] = []) async -> 
DataFrame {
+    let minNonNulls: Int32? = how.lowercased() == "all" ? 1 : nil
+    return await transform { SparkConnectClient.getNADrop($0, cols, 
minNonNulls) }
+  }
+
+  /// Returns a new ``DataFrame`` that drops rows containing less than 
`minNonNulls` non-null values.
+  /// - Parameters:
+  ///   - minNonNulls: The minimum number of non-null values a row must 
contain to be kept.
+  ///   - cols: Column names to consider. When empty, all columns are 
considered.
+  /// - Returns: A ``DataFrame``.
+  public func drop(minNonNulls: Int32, _ cols: [String] = []) async -> 
DataFrame {
+    return await transform { SparkConnectClient.getNADrop($0, cols, 
minNonNulls) }
+  }
+
+  // MARK: - Replace
+
+  /// Returns a new ``DataFrame`` that replaces values in the given column 
using the replacement
+  /// mapping. The values must be of type `Bool`, `Double`, or `String`.
+  /// - Parameters:
+  ///   - col: A column name, or `*` to consider all type-compatible columns.
+  ///   - replacement: A mapping of old value to new value.
+  /// - Returns: A ``DataFrame``.
+  public func replace<T: Sendable & Hashable>(
+    _ col: String, _ replacement: [T: T]
+  ) async -> DataFrame {
+    return await replace(col == "*" ? [] : [col], replacement)
+  }
+
+  /// Returns a new ``DataFrame`` that replaces values in the given columns 
using the replacement
+  /// mapping. The values must be of type `Bool`, `Double`, or `String`.
+  /// - Parameters:
+  ///   - cols: Column names to consider. When empty, all type-compatible 
columns are considered.
+  ///   - replacement: A mapping of old value to new value.
+  /// - Returns: A ``DataFrame``.
+  public func replace<T: Sendable & Hashable>(
+    _ cols: [String], _ replacement: [T: T]
+  ) async -> DataFrame {
+    let replacements = replacement.map { (replaceLiteral($0.key), 
replaceLiteral($0.value)) }
+    return await transform { SparkConnectClient.getNAReplace($0, cols, 
replacements) }
+  }
+
+  // MARK: - Helpers
+
+  /// Builds a new ``DataFrame`` from this ``DataFrame``'s plan using the 
given plan builder.
+  private func transform(_ f: (Relation) -> Plan) async -> DataFrame {
+    let plan = await df.getPlan() as! Plan
+    return DataFrame(spark: await df.spark, plan: f(plan.root))
+  }
+
+  /// Converts a `fill` value to an ``ExpressionLiteral`` (`bool`, `long`, 
`double`, `string`).
+  private func fillLiteral(_ value: Sendable) -> ExpressionLiteral {
+    var literal = ExpressionLiteral()
+    switch value {
+    case let value as Bool:
+      literal.boolean = value
+    case let value as Int:
+      literal.long = Int64(value)
+    case let value as Int32:
+      literal.long = Int64(value)
+    case let value as Int64:
+      literal.long = value
+    case let value as Float:
+      literal.double = Double(value)
+    case let value as Double:
+      literal.double = value
+    case let value as String:
+      literal.string = value
+    default:
+      literal.string = value as! String
+    }
+    return literal
+  }
+
+  /// Converts a `replace` value to an ``ExpressionLiteral`` (`bool`, 
`double`, `string`).
+  /// Integers are represented as `double` because `replace` does not support 
`long`.
+  private func replaceLiteral(_ value: Sendable) -> ExpressionLiteral {
+    var literal = ExpressionLiteral()
+    switch value {
+    case let value as Bool:
+      literal.boolean = value
+    case let value as Int:
+      literal.double = Double(value)
+    case let value as Int32:
+      literal.double = Double(value)
+    case let value as Int64:
+      literal.double = Double(value)
+    case let value as Float:
+      literal.double = Double(value)
+    case let value as Double:
+      literal.double = value
+    case let value as String:
+      literal.string = value
+    default:
+      literal.string = value as! String
+    }
+    return literal
+  }
+}
+
+extension DataFrame {
+  /// Returns a ``DataFrameNaFunctions`` for working with missing data.
+  public var na: DataFrameNaFunctions {
+    DataFrameNaFunctions(df: self)
+  }
+}
diff --git a/Sources/SparkConnect/SparkConnectClient.swift 
b/Sources/SparkConnect/SparkConnectClient.swift
index d7c1c05..6793302 100644
--- a/Sources/SparkConnect/SparkConnectClient.swift
+++ b/Sources/SparkConnect/SparkConnectClient.swift
@@ -659,6 +659,44 @@ public actor SparkConnectClient {
     return createPlan { $0.tail = tail }
   }
 
+  static func getNAFill(
+    _ child: Relation, _ cols: [String], _ values: [ExpressionLiteral]
+  ) -> Plan {
+    var fillNa = Spark_Connect_NAFill()
+    fillNa.input = child
+    fillNa.cols = cols
+    fillNa.values = values
+    return createPlan { $0.fillNa = fillNa }
+  }
+
+  static func getNADrop(
+    _ child: Relation, _ cols: [String], _ minNonNulls: Int32?
+  ) -> Plan {
+    var dropNa = Spark_Connect_NADrop()
+    dropNa.input = child
+    dropNa.cols = cols
+    if let minNonNulls {
+      dropNa.minNonNulls = minNonNulls
+    }
+    return createPlan { $0.dropNa = dropNa }
+  }
+
+  static func getNAReplace(
+    _ child: Relation, _ cols: [String],
+    _ replacements: [(ExpressionLiteral, ExpressionLiteral)]
+  ) -> Plan {
+    var replace = Spark_Connect_NAReplace()
+    replace.input = child
+    replace.cols = cols
+    replace.replacements = replacements.map {
+      var replacement = Spark_Connect_NAReplace.Replacement()
+      replacement.oldValue = $0.0
+      replacement.newValue = $0.1
+      return replacement
+    }
+    return createPlan { $0.replace = replace }
+  }
+
   var result: [ExecutePlanResponse] = []
   private func addResponse(_ response: ExecutePlanResponse) {
     self.result.append(response)
diff --git a/Tests/SparkConnectTests/DataFrameTests.swift 
b/Tests/SparkConnectTests/DataFrameTests.swift
index 139c0ed..bdacb8a 100644
--- a/Tests/SparkConnectTests/DataFrameTests.swift
+++ b/Tests/SparkConnectTests/DataFrameTests.swift
@@ -309,6 +309,76 @@ struct DataFrameTests {
     await spark.stop()
   }
 
+  @Test
+  func naFill() async throws {
+    let spark = try await SparkSession.builder.getOrCreate()
+    let df = try await spark.sql(
+      "SELECT * FROM VALUES (1, 10, 'a'), (NULL, NULL, 'b'), (3, 30, NULL) AS 
T(a, b, s)")
+    // Fill all type-compatible (numeric) columns.
+    #expect(
+      try await df.na.fill(0).collect()
+        == [Row(1, 10, "a"), Row(0, 0, "b"), Row(3, 30, nil)])
+    // Fill a subset of columns.
+    #expect(
+      try await df.na.fill(0, ["a"]).collect()
+        == [Row(1, 10, "a"), Row(0, nil, "b"), Row(3, 30, nil)])
+    // Fill string columns.
+    #expect(
+      try await df.na.fill("z").collect()
+        == [Row(1, 10, "a"), Row(nil, nil, "b"), Row(3, 30, "z")])
+    // Fill per-column values.
+    #expect(
+      try await df.na.fill(["a": 0, "s": "z"]).collect()
+        == [Row(1, 10, "a"), Row(0, nil, "b"), Row(3, 30, "z")])
+    await spark.stop()
+  }
+
+  @Test
+  func naDrop() async throws {
+    let spark = try await SparkSession.builder.getOrCreate()
+    let df = try await spark.sql(
+      "SELECT * FROM VALUES (1, 10, 'a'), (NULL, NULL, 'b'), (3, 30, NULL) AS 
T(a, b, s)")
+    // Drop rows containing any null value (default).
+    #expect(try await df.na.drop().collect() == [Row(1, 10, "a")])
+    // Drop rows only when every value is null.
+    #expect(
+      try await df.na.drop(how: "all").collect()
+        == [Row(1, 10, "a"), Row(nil, nil, "b"), Row(3, 30, nil)])
+    // Keep rows with at least 2 non-null values.
+    #expect(
+      try await df.na.drop(minNonNulls: 2).collect()
+        == [Row(1, 10, "a"), Row(3, 30, nil)])
+    // Consider only a subset of columns.
+    #expect(
+      try await df.na.drop(how: "any", ["s"]).collect()
+        == [Row(1, 10, "a"), Row(nil, nil, "b")])
+    await spark.stop()
+  }
+
+  @Test
+  func naReplace() async throws {
+    let spark = try await SparkSession.builder.getOrCreate()
+    let df = try await spark.sql("SELECT * FROM VALUES (1, 'a'), (2, 'b'), (3, 
'a') AS T(n, s)")
+    // Replace string values in a single column.
+    #expect(
+      try await df.na.replace("s", ["a": "z"]).collect()
+        == [Row(1, "z"), Row(2, "b"), Row(3, "z")])
+    // Replace several string values across the given columns.
+    #expect(
+      try await df.na.replace(["s"], ["a": "z", "b": "y"]).collect()
+        == [Row(1, "z"), Row(2, "y"), Row(3, "z")])
+    // `*` considers all type-compatible columns.
+    #expect(
+      try await df.na.replace("*", ["a": "z"]).collect()
+        == [Row(1, "z"), Row(2, "b"), Row(3, "z")])
+    // Replace numeric (double) values.
+    let df2 = try await spark.sql("SELECT * FROM VALUES (1.0D), (2.0D), (1.0D) 
AS T(d)")
+    #expect(
+      try await df2.na.replace("d", [1.0: 9.0]).collect()
+        == [Row(9.0), Row(2.0), Row(9.0)])
+    await spark.stop()
+  }
+
   @Test
   func filter() async throws {
     let spark = try await SparkSession.builder.getOrCreate()


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to