This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/spark-connect-swift.git


The following commit(s) were added to refs/heads/main by this push:
     new 8dedd94  [SPARK-51620] Support `columns` for `DataFrame`
8dedd94 is described below

commit 8dedd941b6d2c8ae3b6b5ec43bee4d2ea16efcee
Author: Dongjoon Hyun <[email protected]>
AuthorDate: Wed Mar 26 20:24:29 2025 -0700

    [SPARK-51620] Support `columns` for `DataFrame`
    
    ### What changes were proposed in this pull request?
    
    This PR aims to support `columns` API for `DataFrame`.
    
    ### Why are the changes needed?
    
    For feature parity.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Pass the CIs.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #27 from dongjoon-hyun/SPARK-51620.
    
    Authored-by: Dongjoon Hyun <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 Sources/SparkConnect/DataFrame.swift         | 28 +++++++++++++++++++++++++---
 Tests/SparkConnectTests/DataFrameTests.swift | 10 ++++++++++
 2 files changed, 35 insertions(+), 3 deletions(-)

diff --git a/Sources/SparkConnect/DataFrame.swift 
b/Sources/SparkConnect/DataFrame.swift
index 8732c9e..237c08c 100644
--- a/Sources/SparkConnect/DataFrame.swift
+++ b/Sources/SparkConnect/DataFrame.swift
@@ -71,11 +71,28 @@ public actor DataFrame: Sendable {
     throw SparkConnectError.UnsupportedOperationException
   }
 
+  /// Return an array of column name strings
+  /// - Returns: a string array
+  public func columns() async throws -> [String] {
+    var columns: [String] = []
+    try await analyzePlanIfNeeded()
+    for field in self.schema!.struct.fields {
+      columns.append(field.name)
+    }
+    return columns
+  }
+
   /// Return a `JSON` string of data type because we cannot expose the 
internal type ``DataType``.
   /// - Returns: a `JSON` string.
   public func schema() async throws -> String {
-    var dataType: String? = nil
+    try await analyzePlanIfNeeded()
+    return try self.schema!.jsonString()
+  }
 
+  private func analyzePlanIfNeeded() async throws {
+    if self.schema != nil {
+      return
+    }
     try await withGRPCClient(
       transport: .http2NIOPosix(
         target: .dns(host: spark.client.host, port: spark.client.port),
@@ -85,9 +102,8 @@ public actor DataFrame: Sendable {
       let service = Spark_Connect_SparkConnectService.Client(wrapping: client)
       let response = try await service.analyzePlan(
         spark.client.getAnalyzePlanRequest(spark.sessionID, plan))
-      dataType = try response.schema.schema.jsonString()
+      self.setSchema(response.schema.schema)
     }
-    return dataType!
   }
 
   /// Return the total number of rows.
@@ -266,6 +282,8 @@ public actor DataFrame: Sendable {
     return try await select().limit(1).count() == 0
   }
 
+  /// Persist this `DataFrame` with the default storage level 
(`MEMORY_AND_DISK`).
+  /// - Returns: A `DataFrame`.
   public func cache() async throws -> DataFrame {
     return try await persist()
   }
@@ -291,6 +309,10 @@ public actor DataFrame: Sendable {
     return self
   }
 
+  /// Mark the `DataFrame` as non-persistent, and remove all blocks for it 
from memory and disk.
+  /// This will not un-persist any cached data that is built upon this 
`DataFrame`.
+  /// - Parameter blocking: Whether to block until all blocks are deleted.
+  /// - Returns: A `DataFrame`
   public func unpersist(blocking: Bool = false) async throws -> DataFrame {
     try await withGRPCClient(
       transport: .http2NIOPosix(
diff --git a/Tests/SparkConnectTests/DataFrameTests.swift 
b/Tests/SparkConnectTests/DataFrameTests.swift
index c7170d3..c49fa15 100644
--- a/Tests/SparkConnectTests/DataFrameTests.swift
+++ b/Tests/SparkConnectTests/DataFrameTests.swift
@@ -32,6 +32,16 @@ struct DataFrameTests {
     await spark.stop()
   }
 
+  @Test
+  func columns() async throws {
+    let spark = try await SparkSession.builder.getOrCreate()
+    #expect(try await spark.sql("SELECT 1 as col1").columns() == ["col1"])
+    #expect(try await spark.sql("SELECT 1 as col1, 2 as col2").columns() == 
["col1", "col2"])
+    #expect(try await spark.sql("SELECT CAST(null as STRING) col1").columns() 
== ["col1"])
+    #expect(try await spark.sql("DROP TABLE IF EXISTS nonexistent").columns() 
== [])
+    await spark.stop()
+  }
+
   @Test
   func schema() async throws {
     let spark = try await SparkSession.builder.getOrCreate()


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to