This is an automated email from the ASF dual-hosted git repository.
dongjoon-hyun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/spark-connect-swift.git
The following commit(s) were added to refs/heads/main by this push:
new 9fc11ff [SPARK-57304] Support `na.(fill|drop|replace)` for `DataFrame`
9fc11ff is described below
commit 9fc11ff80c349430b1371cac82c670fd46f71967
Author: Dongjoon Hyun <[email protected]>
AuthorDate: Sun Jun 7 13:25:32 2026 -0700
[SPARK-57304] Support `na.(fill|drop|replace)` for `DataFrame`
### What changes were proposed in this pull request?
This PR adds the `DataFrameNaFunctions` API (via `DataFrame.na`), mirroring
PySpark's `df.na.fill/drop/replace` and wiring the existing `NAFill`,
`NADrop`,
and `NAReplace` Spark Connect messages into the Swift client.
- `fill`, `drop`, `replace` on `DataFrame.na`
- `getNAFill`, `getNADrop`, `getNAReplace` plan builders in
`SparkConnectClient`
### Why are the changes needed?
To reach feature parity with PySpark/Spark SQL for handling missing data.
### Does this PR introduce _any_ user-facing change?
Yes, this adds new public APIs.
```swift
try await df.na.fill(0).show()
try await df.na.drop().show()
try await df.na.replace("s", ["a": "z"]).show()
```
### How was this patch tested?
Pass the CI build.
### Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Code (Claude Opus 4.8)
Closes #406 from dongjoon-hyun/SPARK-57304.
Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
Sources/SparkConnect/DataFrameNaFunctions.swift | 199 ++++++++++++++++++++++++
Sources/SparkConnect/SparkConnectClient.swift | 38 +++++
Tests/SparkConnectTests/DataFrameTests.swift | 70 +++++++++
3 files changed, 307 insertions(+)
diff --git a/Sources/SparkConnect/DataFrameNaFunctions.swift
b/Sources/SparkConnect/DataFrameNaFunctions.swift
new file mode 100644
index 0000000..4977074
--- /dev/null
+++ b/Sources/SparkConnect/DataFrameNaFunctions.swift
@@ -0,0 +1,199 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+/// Functionality for working with missing data in ``DataFrame``s.
+///
+/// Use ``DataFrame/na`` to access this. It mirrors PySpark's
`DataFrameNaFunctions`
+/// (`df.na.fill`, `df.na.drop`, `df.na.replace`).
+public actor DataFrameNaFunctions: Sendable {
+ let df: DataFrame
+
+ init(df: DataFrame) {
+ self.df = df
+ }
+
+ // MARK: - Fill
+
+ /// Returns a new ``DataFrame`` that replaces null values in boolean columns
with `value`.
+ /// - Parameters:
+ /// - value: The replacement value.
+ /// - cols: Column names to consider. When empty, all type-compatible
columns are considered.
+ /// - Returns: A ``DataFrame``.
+ public func fill(_ value: Bool, _ cols: [String] = []) async -> DataFrame {
+ var literal = ExpressionLiteral()
+ literal.boolean = value
+ return await transform { SparkConnectClient.getNAFill($0, cols, [literal])
}
+ }
+
+ /// Returns a new ``DataFrame`` that replaces null values in numeric columns
with `value`.
+ /// - Parameters:
+ /// - value: The replacement value.
+ /// - cols: Column names to consider. When empty, all type-compatible
columns are considered.
+ /// - Returns: A ``DataFrame``.
+ public func fill(_ value: Int, _ cols: [String] = []) async -> DataFrame {
+ var literal = ExpressionLiteral()
+ literal.long = Int64(value)
+ return await transform { SparkConnectClient.getNAFill($0, cols, [literal])
}
+ }
+
+ /// Returns a new ``DataFrame`` that replaces null values in numeric columns
with `value`.
+ /// - Parameters:
+ /// - value: The replacement value.
+ /// - cols: Column names to consider. When empty, all type-compatible
columns are considered.
+ /// - Returns: A ``DataFrame``.
+ public func fill(_ value: Double, _ cols: [String] = []) async -> DataFrame {
+ var literal = ExpressionLiteral()
+ literal.double = value
+ return await transform { SparkConnectClient.getNAFill($0, cols, [literal])
}
+ }
+
+ /// Returns a new ``DataFrame`` that replaces null values in string columns
with `value`.
+ /// - Parameters:
+ /// - value: The replacement value.
+ /// - cols: Column names to consider. When empty, all type-compatible
columns are considered.
+ /// - Returns: A ``DataFrame``.
+ public func fill(_ value: String, _ cols: [String] = []) async -> DataFrame {
+ var literal = ExpressionLiteral()
+ literal.string = value
+ return await transform { SparkConnectClient.getNAFill($0, cols, [literal])
}
+ }
+
+ /// Returns a new ``DataFrame`` that replaces null values, using the column
name to replacement
+ /// value mapping. The replacement values must be of type `Bool`, `Int64`,
`Double`, or `String`.
+ /// - Parameter valueMap: A dictionary of column name and replacement value.
+ /// - Returns: A ``DataFrame``.
+ public func fill(_ valueMap: [String: Sendable]) async -> DataFrame {
+ let cols = Array(valueMap.keys)
+ let values = cols.map { fillLiteral(valueMap[$0]!) }
+ return await transform { SparkConnectClient.getNAFill($0, cols, values) }
+ }
+
+ // MARK: - Drop
+
+ /// Returns a new ``DataFrame`` that drops rows containing null values.
+ /// - Parameters:
+ /// - how: `any` (default) drops a row if it contains any null value,
`all` drops a row only
+ /// if every considered value is null.
+ /// - cols: Column names to consider. When empty, all columns are
considered.
+ /// - Returns: A ``DataFrame``.
+ public func drop(how: String = "any", _ cols: [String] = []) async ->
DataFrame {
+ let minNonNulls: Int32? = how.lowercased() == "all" ? 1 : nil
+ return await transform { SparkConnectClient.getNADrop($0, cols,
minNonNulls) }
+ }
+
+ /// Returns a new ``DataFrame`` that drops rows containing less than
`minNonNulls` non-null values.
+ /// - Parameters:
+ /// - minNonNulls: The minimum number of non-null values a row must
contain to be kept.
+ /// - cols: Column names to consider. When empty, all columns are
considered.
+ /// - Returns: A ``DataFrame``.
+ public func drop(minNonNulls: Int32, _ cols: [String] = []) async ->
DataFrame {
+ return await transform { SparkConnectClient.getNADrop($0, cols,
minNonNulls) }
+ }
+
+ // MARK: - Replace
+
+ /// Returns a new ``DataFrame`` that replaces values in the given column
using the replacement
+ /// mapping. The values must be of type `Bool`, `Double`, or `String`.
+ /// - Parameters:
+ /// - col: A column name, or `*` to consider all type-compatible columns.
+ /// - replacement: A mapping of old value to new value.
+ /// - Returns: A ``DataFrame``.
+ public func replace<T: Sendable & Hashable>(
+ _ col: String, _ replacement: [T: T]
+ ) async -> DataFrame {
+ return await replace(col == "*" ? [] : [col], replacement)
+ }
+
+ /// Returns a new ``DataFrame`` that replaces values in the given columns
using the replacement
+ /// mapping. The values must be of type `Bool`, `Double`, or `String`.
+ /// - Parameters:
+ /// - cols: Column names to consider. When empty, all type-compatible
columns are considered.
+ /// - replacement: A mapping of old value to new value.
+ /// - Returns: A ``DataFrame``.
+ public func replace<T: Sendable & Hashable>(
+ _ cols: [String], _ replacement: [T: T]
+ ) async -> DataFrame {
+ let replacements = replacement.map { (replaceLiteral($0.key),
replaceLiteral($0.value)) }
+ return await transform { SparkConnectClient.getNAReplace($0, cols,
replacements) }
+ }
+
+ // MARK: - Helpers
+
+ /// Builds a new ``DataFrame`` from this ``DataFrame``'s plan using the
given plan builder.
+ private func transform(_ f: (Relation) -> Plan) async -> DataFrame {
+ let plan = await df.getPlan() as! Plan
+ return DataFrame(spark: await df.spark, plan: f(plan.root))
+ }
+
+ /// Converts a `fill` value to an ``ExpressionLiteral`` (`bool`, `long`,
`double`, `string`).
+ private func fillLiteral(_ value: Sendable) -> ExpressionLiteral {
+ var literal = ExpressionLiteral()
+ switch value {
+ case let value as Bool:
+ literal.boolean = value
+ case let value as Int:
+ literal.long = Int64(value)
+ case let value as Int32:
+ literal.long = Int64(value)
+ case let value as Int64:
+ literal.long = value
+ case let value as Float:
+ literal.double = Double(value)
+ case let value as Double:
+ literal.double = value
+ case let value as String:
+ literal.string = value
+ default:
+ literal.string = value as! String
+ }
+ return literal
+ }
+
+ /// Converts a `replace` value to an ``ExpressionLiteral`` (`bool`,
`double`, `string`).
+ /// Integers are represented as `double` because `replace` does not support
`long`.
+ private func replaceLiteral(_ value: Sendable) -> ExpressionLiteral {
+ var literal = ExpressionLiteral()
+ switch value {
+ case let value as Bool:
+ literal.boolean = value
+ case let value as Int:
+ literal.double = Double(value)
+ case let value as Int32:
+ literal.double = Double(value)
+ case let value as Int64:
+ literal.double = Double(value)
+ case let value as Float:
+ literal.double = Double(value)
+ case let value as Double:
+ literal.double = value
+ case let value as String:
+ literal.string = value
+ default:
+ literal.string = value as! String
+ }
+ return literal
+ }
+}
+
+extension DataFrame {
+ /// Returns a ``DataFrameNaFunctions`` for working with missing data.
+ public var na: DataFrameNaFunctions {
+ DataFrameNaFunctions(df: self)
+ }
+}
diff --git a/Sources/SparkConnect/SparkConnectClient.swift
b/Sources/SparkConnect/SparkConnectClient.swift
index d7c1c05..6793302 100644
--- a/Sources/SparkConnect/SparkConnectClient.swift
+++ b/Sources/SparkConnect/SparkConnectClient.swift
@@ -659,6 +659,44 @@ public actor SparkConnectClient {
return createPlan { $0.tail = tail }
}
+ static func getNAFill(
+ _ child: Relation, _ cols: [String], _ values: [ExpressionLiteral]
+ ) -> Plan {
+ var fillNa = Spark_Connect_NAFill()
+ fillNa.input = child
+ fillNa.cols = cols
+ fillNa.values = values
+ return createPlan { $0.fillNa = fillNa }
+ }
+
+ static func getNADrop(
+ _ child: Relation, _ cols: [String], _ minNonNulls: Int32?
+ ) -> Plan {
+ var dropNa = Spark_Connect_NADrop()
+ dropNa.input = child
+ dropNa.cols = cols
+ if let minNonNulls {
+ dropNa.minNonNulls = minNonNulls
+ }
+ return createPlan { $0.dropNa = dropNa }
+ }
+
+ static func getNAReplace(
+ _ child: Relation, _ cols: [String],
+ _ replacements: [(ExpressionLiteral, ExpressionLiteral)]
+ ) -> Plan {
+ var replace = Spark_Connect_NAReplace()
+ replace.input = child
+ replace.cols = cols
+ replace.replacements = replacements.map {
+ var replacement = Spark_Connect_NAReplace.Replacement()
+ replacement.oldValue = $0.0
+ replacement.newValue = $0.1
+ return replacement
+ }
+ return createPlan { $0.replace = replace }
+ }
+
var result: [ExecutePlanResponse] = []
private func addResponse(_ response: ExecutePlanResponse) {
self.result.append(response)
diff --git a/Tests/SparkConnectTests/DataFrameTests.swift
b/Tests/SparkConnectTests/DataFrameTests.swift
index 139c0ed..bdacb8a 100644
--- a/Tests/SparkConnectTests/DataFrameTests.swift
+++ b/Tests/SparkConnectTests/DataFrameTests.swift
@@ -309,6 +309,76 @@ struct DataFrameTests {
await spark.stop()
}
+ @Test
+ func naFill() async throws {
+ let spark = try await SparkSession.builder.getOrCreate()
+ let df = try await spark.sql(
+ "SELECT * FROM VALUES (1, 10, 'a'), (NULL, NULL, 'b'), (3, 30, NULL) AS
T(a, b, s)")
+ // Fill all type-compatible (numeric) columns.
+ #expect(
+ try await df.na.fill(0).collect()
+ == [Row(1, 10, "a"), Row(0, 0, "b"), Row(3, 30, nil)])
+ // Fill a subset of columns.
+ #expect(
+ try await df.na.fill(0, ["a"]).collect()
+ == [Row(1, 10, "a"), Row(0, nil, "b"), Row(3, 30, nil)])
+ // Fill string columns.
+ #expect(
+ try await df.na.fill("z").collect()
+ == [Row(1, 10, "a"), Row(nil, nil, "b"), Row(3, 30, "z")])
+ // Fill per-column values.
+ #expect(
+ try await df.na.fill(["a": 0, "s": "z"]).collect()
+ == [Row(1, 10, "a"), Row(0, nil, "b"), Row(3, 30, "z")])
+ await spark.stop()
+ }
+
+ @Test
+ func naDrop() async throws {
+ let spark = try await SparkSession.builder.getOrCreate()
+ let df = try await spark.sql(
+ "SELECT * FROM VALUES (1, 10, 'a'), (NULL, NULL, 'b'), (3, 30, NULL) AS
T(a, b, s)")
+ // Drop rows containing any null value (default).
+ #expect(try await df.na.drop().collect() == [Row(1, 10, "a")])
+ // Drop rows only when every value is null.
+ #expect(
+ try await df.na.drop(how: "all").collect()
+ == [Row(1, 10, "a"), Row(nil, nil, "b"), Row(3, 30, nil)])
+ // Keep rows with at least 2 non-null values.
+ #expect(
+ try await df.na.drop(minNonNulls: 2).collect()
+ == [Row(1, 10, "a"), Row(3, 30, nil)])
+ // Consider only a subset of columns.
+ #expect(
+ try await df.na.drop(how: "any", ["s"]).collect()
+ == [Row(1, 10, "a"), Row(nil, nil, "b")])
+ await spark.stop()
+ }
+
+ @Test
+ func naReplace() async throws {
+ let spark = try await SparkSession.builder.getOrCreate()
+ let df = try await spark.sql("SELECT * FROM VALUES (1, 'a'), (2, 'b'), (3,
'a') AS T(n, s)")
+ // Replace string values in a single column.
+ #expect(
+ try await df.na.replace("s", ["a": "z"]).collect()
+ == [Row(1, "z"), Row(2, "b"), Row(3, "z")])
+ // Replace several string values across the given columns.
+ #expect(
+ try await df.na.replace(["s"], ["a": "z", "b": "y"]).collect()
+ == [Row(1, "z"), Row(2, "y"), Row(3, "z")])
+ // `*` considers all type-compatible columns.
+ #expect(
+ try await df.na.replace("*", ["a": "z"]).collect()
+ == [Row(1, "z"), Row(2, "b"), Row(3, "z")])
+ // Replace numeric (double) values.
+ let df2 = try await spark.sql("SELECT * FROM VALUES (1.0D), (2.0D), (1.0D)
AS T(d)")
+ #expect(
+ try await df2.na.replace("d", [1.0: 9.0]).collect()
+ == [Row(9.0), Row(2.0), Row(9.0)])
+ await spark.stop()
+ }
+
@Test
func filter() async throws {
let spark = try await SparkSession.builder.getOrCreate()
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]