This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/spark-connect-swift.git


The following commit(s) were added to refs/heads/main by this push:
     new 91d6c98  [SPARK-51719] Support `table` for `SparkSession` and 
`DataFrameReader`
91d6c98 is described below

commit 91d6c98a39498a752f5ada579b8c18c25b6bc2b5
Author: Dongjoon Hyun <[email protected]>
AuthorDate: Sun Apr 6 10:33:51 2025 +0900

    [SPARK-51719] Support `table` for `SparkSession` and `DataFrameReader`
    
    ### What changes were proposed in this pull request?
    
    This PR aims to support `table` API for `SparkSession` and 
`DataFrameReader`.
    
    ### Why are the changes needed?
    
    For feature parity.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No, this is a new addition to the unreleased version.
    
    ### How was this patch tested?
    
    Pass the CIs.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #42 from dongjoon-hyun/SPARK-51719.
    
    Authored-by: Dongjoon Hyun <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 Sources/SparkConnect/DataFrameReader.swift         | 27 ++++++++++++++++++++++
 Sources/SparkConnect/SparkSession.swift            | 14 +++++++++++
 Sources/SparkConnect/TypeAliases.swift             |  1 +
 Tests/SparkConnectTests/DataFrameReaderTests.swift | 10 ++++++++
 Tests/SparkConnectTests/SparkSessionTests.swift    | 10 ++++++++
 5 files changed, 62 insertions(+)

diff --git a/Sources/SparkConnect/DataFrameReader.swift 
b/Sources/SparkConnect/DataFrameReader.swift
index 47f022f..cfa41f9 100644
--- a/Sources/SparkConnect/DataFrameReader.swift
+++ b/Sources/SparkConnect/DataFrameReader.swift
@@ -40,6 +40,33 @@ public actor DataFrameReader: Sendable {
     self.sparkSession = sparkSession
   }
 
+  /// Returns the specified table/view as a ``DataFrame``. If it's a table, it 
must support batch
+  /// reading and the returned ``DataFrame`` is the batch scan query plan of 
this table. If it's a
+  /// view, the returned ``DataFrame`` is simply the query plan of the view, 
which can either be a
+  /// batch or streaming query plan.
+  ///
+  /// - Parameter tableName: a qualified or unqualified name that designates a 
table or view. If a database is
+  /// specified, it identifies the table/view from the database. Otherwise, it 
first attempts to
+  /// find a temporary view with the given name and then match the table/view 
from the current
+  /// database. Note that, the global temporary view database is also valid 
here.
+  /// - Returns: A ``DataFrame`` instance.
+  public func table(_ tableName: String) -> DataFrame {
+    var namedTable = NamedTable()
+    namedTable.unparsedIdentifier = tableName
+    namedTable.options = self.extraOptions.toStringDictionary()
+
+    var read = Read()
+    read.namedTable = namedTable
+
+    var relation = Relation()
+    relation.read = read
+
+    var plan = Plan()
+    plan.opType = .root(relation)
+
+    return DataFrame(spark: sparkSession, plan: plan)
+  }
+
   /// Specifies the input data source format.
   /// - Parameter source: A string.
   /// - Returns: A `DataFrameReader`.
diff --git a/Sources/SparkConnect/SparkSession.swift 
b/Sources/SparkConnect/SparkSession.swift
index 39b6bbe..2fa583c 100644
--- a/Sources/SparkConnect/SparkSession.swift
+++ b/Sources/SparkConnect/SparkSession.swift
@@ -125,6 +125,20 @@ public actor SparkSession {
     }
   }
 
+  /// Returns the specified table/view as a ``DataFrame``. If it's a table, it 
must support batch
+  /// reading and the returned ``DataFrame`` is the batch scan query plan of 
this table. If it's a
+  /// view, the returned ``DataFrame`` is simply the query plan of the view, 
which can either be a
+  /// batch or streaming query plan.
+  ///
+  /// - Parameter tableName: a qualified or unqualified name that designates a 
table or view. If a database is
+  /// specified, it identifies the table/view from the database. Otherwise, it 
first attempts to
+  /// find a temporary view with the given name and then match the table/view 
from the current
+  /// database. Note that, the global temporary view database is also valid 
here.
+  /// - Returns: A ``DataFrame`` instance.
+  public func table(_ tableName: String) async throws -> DataFrame {
+    return await read.table(tableName)
+  }
+
   /// Executes some code block and prints to stdout the time taken to execute 
the block.
   /// - Parameter f: A function to execute.
   /// - Returns: The result of the executed code.
diff --git a/Sources/SparkConnect/TypeAliases.swift 
b/Sources/SparkConnect/TypeAliases.swift
index 198df89..f186729 100644
--- a/Sources/SparkConnect/TypeAliases.swift
+++ b/Sources/SparkConnect/TypeAliases.swift
@@ -31,6 +31,7 @@ typealias Filter = Spark_Connect_Filter
 typealias KeyValue = Spark_Connect_KeyValue
 typealias Limit = Spark_Connect_Limit
 typealias MapType = Spark_Connect_DataType.Map
+typealias NamedTable = Spark_Connect_Read.NamedTable
 typealias OneOf_Analyze = AnalyzePlanRequest.OneOf_Analyze
 typealias Plan = Spark_Connect_Plan
 typealias Project = Spark_Connect_Project
diff --git a/Tests/SparkConnectTests/DataFrameReaderTests.swift 
b/Tests/SparkConnectTests/DataFrameReaderTests.swift
index c159b8f..5c0979d 100644
--- a/Tests/SparkConnectTests/DataFrameReaderTests.swift
+++ b/Tests/SparkConnectTests/DataFrameReaderTests.swift
@@ -64,4 +64,14 @@ struct DataFrameReaderTests {
     #expect(try await spark.read.parquet(path, path).count() == 4)
     await spark.stop()
   }
+
+  @Test
+  func table() async throws {
+    let tableName = UUID().uuidString.replacingOccurrences(of: "-", with: "")
+    let spark = try await SparkSession.builder.getOrCreate()
+    #expect(try await spark.sql("CREATE TABLE \(tableName) AS VALUES (1), 
(2)").count() == 0)
+    #expect(try await spark.read.table(tableName).count() == 2)
+    #expect(try await spark.sql("DROP TABLE \(tableName)").count() == 0)
+    await spark.stop()
+  }
 }
diff --git a/Tests/SparkConnectTests/SparkSessionTests.swift 
b/Tests/SparkConnectTests/SparkSessionTests.swift
index f302349..cba57e4 100644
--- a/Tests/SparkConnectTests/SparkSessionTests.swift
+++ b/Tests/SparkConnectTests/SparkSessionTests.swift
@@ -75,6 +75,16 @@ struct SparkSessionTests {
     await spark.stop()
   }
 
+  @Test
+  func table() async throws {
+    let tableName = UUID().uuidString.replacingOccurrences(of: "-", with: "")
+    let spark = try await SparkSession.builder.getOrCreate()
+    #expect(try await spark.sql("CREATE TABLE \(tableName) AS VALUES (1), 
(2)").count() == 0)
+    #expect(try await spark.table(tableName).count() == 2)
+    #expect(try await spark.sql("DROP TABLE \(tableName)").count() == 0)
+    await spark.stop()
+  }
+
   @Test
   func time() async throws {
     let spark = try await SparkSession.builder.getOrCreate()


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to