This is an automated email from the ASF dual-hosted git repository.

dongjoon-hyun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/spark-connect-swift.git


The following commit(s) were added to refs/heads/main by this push:
     new db0806c  [SPARK-57302] Support `withColumn/withColumns` for `DataFrame`
db0806c is described below

commit db0806c293a7ea279dc82b6a6843f348b276ad78
Author: Dongjoon Hyun <[email protected]>
AuthorDate: Sun Jun 7 11:35:37 2026 -0700

    [SPARK-57302] Support `withColumn/withColumns` for `DataFrame`
    
    ### What changes were proposed in this pull request?
    
    This PR adds `withColumn` and `withColumns` transformations to `DataFrame`, 
exposing the Spark Connect `WithColumns` relation that was already defined in 
the generated protobuf but not surfaced in the Swift API.
    
    New public APIs in `DataFrame+Transformations.swift`:
    
    ```swift
    public func withColumn(_ colName: String, _ expr: String) -> DataFrame
    public func withColumns(_ colsMap: [String: String]) -> DataFrame
    ```
    
    - `withColumn` adds a new column or replaces an existing column with the 
same name; it delegates to `withColumns` with a single-entry map.
    - The column expression is provided as a SQL expression string and parsed 
on the server, consistent with the existing `filter(_:)` / `selectExpr(_:)` 
APIs (this client has no `Column` type).
    - A static plan builder `SparkConnectClient.getWithColumns(_:_:)` 
constructs the `WithColumns` relation by mapping each `(name, expr)` pair to an 
`Expression.Alias`, mirroring the sibling `getWithColumnRenamed(_:_:)`.
    - Adds the `WithColumns` type alias in `TypeAliases.swift`, matching the 
existing `WithColumnsRenamed` alias.
    
    ### Why are the changes needed?
    
    `DataFrame.withColumn` / `withColumns` are among the most commonly used 
PySpark and Spark SQL APIs for deriving or replacing columns from expressions. 
The Swift client previously supported only column *renaming* 
(`withColumnRenamed`) and had no way to add or replace a column from a computed 
expression. This change improves API parity with PySpark/Spark SQL.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes. This adds two new public `DataFrame` APIs.
    
    ```swift
    let df = try await spark.range(1)                    // columns: [id]
    try await df.withColumn("b", "id + 1").show()        // columns: [id, b]
    try await df.withColumns(["b": "id + 1",
                              "c": "id * 2"]).show()     // columns: [id, b, c]
    ```
    
    ### How was this patch tested?
    
    Pass the CIs.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    Generated-by: Claude Code (Claude Opus 4.8)
    
    Closes #405 from dongjoon-hyun/SPARK-57302.
    
    Authored-by: Dongjoon Hyun <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 Sources/SparkConnect/DataFrame+Transformations.swift | 19 +++++++++++++++++++
 Sources/SparkConnect/SparkConnectClient.swift        | 12 ++++++++++++
 Sources/SparkConnect/TypeAliases.swift               |  1 +
 Tests/SparkConnectTests/DataFrameTests.swift         | 15 +++++++++++++++
 4 files changed, 47 insertions(+)

diff --git a/Sources/SparkConnect/DataFrame+Transformations.swift 
b/Sources/SparkConnect/DataFrame+Transformations.swift
index 0ba0a2a..2f21d29 100644
--- a/Sources/SparkConnect/DataFrame+Transformations.swift
+++ b/Sources/SparkConnect/DataFrame+Transformations.swift
@@ -142,6 +142,25 @@ extension DataFrame {
       spark: self.spark, plan: 
SparkConnectClient.getWithColumnRenamed(self.plan.root, colsMap))
   }
 
+  /// Returns a new ``DataFrame`` by adding a column or replacing the existing 
column that has the
+  /// same name.
+  /// - Parameters:
+  ///   - colName: A new column name.
+  ///   - expr: A SQL expression string for the new column.
+  /// - Returns: A ``DataFrame`` with the new or replaced column.
+  public func withColumn(_ colName: String, _ expr: String) -> DataFrame {
+    return withColumns([colName: expr])
+  }
+
+  /// Returns a new ``DataFrame`` by adding columns or replacing the existing 
columns that have the
+  /// same names.
+  /// - Parameter colsMap: A dictionary of column name and SQL expression 
string.
+  /// - Returns: A ``DataFrame`` with the new or replaced columns.
+  public func withColumns(_ colsMap: [String: String]) -> DataFrame {
+    return DataFrame(
+      spark: self.spark, plan: 
SparkConnectClient.getWithColumns(self.plan.root, colsMap))
+  }
+
   // MARK: - Filtering and Sorting
 
   /// Filters rows using the given condition.
diff --git a/Sources/SparkConnect/SparkConnectClient.swift 
b/Sources/SparkConnect/SparkConnectClient.swift
index ae2daa1..d7c1c05 100644
--- a/Sources/SparkConnect/SparkConnectClient.swift
+++ b/Sources/SparkConnect/SparkConnectClient.swift
@@ -557,6 +557,18 @@ public actor SparkConnectClient {
     return createPlan { $0.withColumnsRenamed = withColumnsRenamed }
   }
 
+  static func getWithColumns(_ child: Relation, _ colsMap: [String: String]) 
-> Plan {
+    var withColumns = WithColumns()
+    withColumns.input = child
+    withColumns.aliases = colsMap.map { (name, expr) in
+      var alias = Spark_Connect_Expression.Alias()
+      alias.expr = expr.toExpression
+      alias.name = [name]
+      return alias
+    }
+    return createPlan { $0.withColumns = withColumns }
+  }
+
   static func getFilter(_ child: Relation, _ conditionExpr: String) -> Plan {
     var filter = Filter()
     filter.input = child
diff --git a/Sources/SparkConnect/TypeAliases.swift 
b/Sources/SparkConnect/TypeAliases.swift
index 3f15f56..d70d161 100644
--- a/Sources/SparkConnect/TypeAliases.swift
+++ b/Sources/SparkConnect/TypeAliases.swift
@@ -67,6 +67,7 @@ typealias StructType = Spark_Connect_DataType.Struct
 typealias Tail = Spark_Connect_Tail
 typealias UserContext = Spark_Connect_UserContext
 typealias UnresolvedAttribute = Spark_Connect_Expression.UnresolvedAttribute
+typealias WithColumns = Spark_Connect_WithColumns
 typealias WithColumnsRenamed = Spark_Connect_WithColumnsRenamed
 typealias WriteOperation = Spark_Connect_WriteOperation
 typealias WriteOperationV2 = Spark_Connect_WriteOperationV2
diff --git a/Tests/SparkConnectTests/DataFrameTests.swift 
b/Tests/SparkConnectTests/DataFrameTests.swift
index c745bb9..139c0ed 100644
--- a/Tests/SparkConnectTests/DataFrameTests.swift
+++ b/Tests/SparkConnectTests/DataFrameTests.swift
@@ -283,6 +283,21 @@ struct DataFrameTests {
     await spark.stop()
   }
 
+  @Test
+  func withColumn() async throws {
+    let spark = try await SparkSession.builder.getOrCreate()
+    // Add a new column.
+    #expect(try await spark.range(1).withColumn("b", "id + 1").columns == 
["id", "b"])
+    #expect(try await spark.range(1).withColumn("b", "id + 1").collect() == 
[Row(0, 1)])
+    // Replace an existing column in place.
+    #expect(try await spark.range(1).withColumn("id", "id + 1").columns == 
["id"])
+    #expect(try await spark.range(1).withColumn("id", "id + 1").collect() == 
[Row(1)])
+    // Map overload: replace multiple existing columns (positions are 
preserved).
+    let df = try await spark.sql("SELECT 1 a, 2 b")
+    #expect(try await df.withColumns(["a": "a + 10", "b": "b + 20"]).collect() 
== [Row(11, 22)])
+    await spark.stop()
+  }
+
   @Test
   func drop() async throws {
     let spark = try await SparkSession.builder.getOrCreate()


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to